# 安装

# rpm 包安装

  1. 官网地址:https://www.rabbitmq.com/download.html

  2. 下载 rpm 文件:这里有两个需要下载,分别是 rabbitmq-server 和 erlang

  3. 安装文件:

    • erlang-23.3.4.11-1.el7.x86_64.rpm

    • rabbitmq-server-3.10.5-1.el8.noarch.rpm

    1
    2
    3
    4
    5
    6
    # 需要的环境
    rpm -ivh erlang-23.3.4.11-1.el7.x86_64.rpm
    # 依赖包,要联网
    yum install socat -y
    # 本身
    rpm -ivh rabbitmq-server-3.10.5-1.el8.noarch.rpm

  4. 常用命令

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    # 开机启动RabbitMQ服务
    chkconfig rabbitmq-server on
    # 启动服务
    /sbin/service rabbitmq-server start
    # 查看服务状态
    /sbin/service rabbitmq-server status
    # 停止服务状态
    /sbin/service rabbitmq-server stop
    # 开启web管理插件
    rabbitmq-plugins enable rabbitmq_management

    # 开启

    image-20220627173744214

  5. 添加新用户

    1
    2
    3
    4
    5
    6
    7
    8
    9
    # 创建账号
    rabbitmqctl add_user admin 123
    # 设置用户角色
    rabbitmqctl set_user_tags admin administrator
    # 设置用户权限 set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
    rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
    # 用户 user_admin 具有/vhost1 这个 virtual host 中所有资源的配置、写、读权限
    # 当前用户和角色
    rabbitmqctl list_users

  6. 用户登录

  7. 重置命令

    1
    2
    3
    4
    5
    6
    7
    # 关闭应用命令
    rabbitmqctl stop_app
    # 清除命令
    rabbitmqctl reset
    # 重新启动
    rabbitmqctl start_app

# Docker 安装

# 获取 rabbit 镜像

1
docker pull rabbitmq:management

# 创建并运行容器

1
docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management

—hostname:指定容器主机名称
—name:指定容器名称
- p:将 mq 端口号映射到本地
或者运行时设置用户和密码

1
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

# 查看日志

1
docker logs -f myrabbit

# 容器运行正常

使用 http:// 你的 IP 地址:15672 访问 rabbit 控制台

# 额外 Linux 相关排查命令

1
> more xxx.log  查看日记信息> netstat -naop | grep 5672 查看端口是否被占用> ps -ef | grep 5672  查看进程> systemctl stop 服务

# 简单实践

采用简单模式进行代码实践

image-20220630083249319

生产者 Producer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.windlinxy.pqc;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
* @author Windlinxy
* @description 生产者
* @date 2022-06-28 15:45
**/
public class Producer {
// 队列名称
public static final String QUEUE_NAME = "hello";


/**
* 发消息
*
* @param args 参数
*/
public static void main(String[] args) throws IOException, TimeoutException {
// 创建工厂
ConnectionFactory factory = new ConnectionFactory();
// 工厂ip连接RabbitMQ的队列
factory.setHost("服务器ip");
factory.setUsername("admin");
factory.setPassword("123");

// 创建连接
Connection connection = factory.newConnection();
// 连接中创建信道(具体看原理图)
Channel channel = connection.createChannel();
// 简单连接,忽略交换机,连接队列
/*
* 生成一个队列
* 1. 队列名称
* 2. 队列里的消息是否持久化(磁盘) 默认情况是存到内存(false)、
* 3. 该队列是否只供一个消费者进行消费,是否消息共享,默认是false
* 4. 最后一个消费者断开了连接之后,是否自动删除
* */
channel.queueDeclare(QUEUE_NAME, false, false, false, null);

// 准备发消息
String message = "hello world!";
/* * 发送
* 1. 发送到哪个交换机
* 2. 路由的Key值是哪个,本次是队列的名称
* 3. 其他参数信息
* 4。 发送消息体 */
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

System.out.println("Success!");

channel.close();
connection.close();
}
}

消费者 Consumer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.windlinxy.pqc;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.Arrays;

/**
* @author Windlinxy
* @description 消费者
* @date 2022-06-28 15:45
**/
public class Consumer {
private final static String QUEUE_NAME = "hello";

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("服务器ip");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息.........");
//推送的消息如何进行消费的接口回调
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
/*
回调方法
1. consumerTag 标识
2. envelope 获取一些信息,交换机,路由key
3. properties 配置信息
4. body 数据
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag :" + consumerTag);
System.out.println("Exchange : "+envelope.getExchange());
System.out.println("RoutingKey : "+envelope.getRoutingKey());
// System.out.println("properties : " + properties);
System.out.println("body : " + new String(body));
}
};

/*
高级用法
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback = (consumerTag) -> {
System.out.println("消息消费被中断");
};
*/

/*
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
* 3.消费者未成功消费的回调
*/
channel.basicConsume(QUEUE_NAME, true, consumer);

//消费者是监听消息,不要去关闭资源
}
}

结果:

image-20220630091648283

image-20220630091704270

更新于 阅读次数 本文阅读量:

请我喝[茶]~( ̄▽ ̄)~*

Windlinxy 微信支付

微信支付

Windlinxy 支付宝

支付宝