# 安装
# rpm 包安装
官网地址:https://www.rabbitmq.com/download.html
下载 rpm 文件:这里有两个需要下载,分别是 rabbitmq-server 和 erlang
安装文件:
erlang-23.3.4.11-1.el7.x86_64.rpm
rabbitmq-server-3.10.5-1.el8.noarch.rpm
# 需要的环境
rpm -ivh erlang-23.3.4.11-1.el7.x86_64.rpm
# 依赖包,要联网
yum install socat -y
# 本身
rpm -ivh rabbitmq-server-3.10.5-1.el8.noarch.rpm
常用命令
# 开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
# 启动服务
/sbin/service rabbitmq-server start
# 查看服务状态
/sbin/service rabbitmq-server status
# 停止服务状态
/sbin/service rabbitmq-server stop
# 开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management
# 开启
添加新用户
# 创建账号
rabbitmqctl add_user admin 123
# 设置用户角色
rabbitmqctl set_user_tags admin administrator
# 设置用户权限 set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# 用户 user_admin 具有 /vhost1 这个 virtual host 中所有资源的配置、写、读权限
# 当前用户和角色
rabbitmqctl list_users
用户登录
重置命令
# 关闭应用命令
rabbitmqctl stop_app
# 清除命令
rabbitmqctl reset
# 重新启动
rabbitmqctl start_app
# Docker 安装
# 获取 rabbit 镜像
docker pull rabbitmq:management
# 创建并运行容器
docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management
—hostname:指定容器主机名称
—name:指定容器名称
- p:将 mq 端口号映射到本地
或者运行时设置用户和密码
docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management
# 查看日志
docker logs -f myrabbit
# 容器运行正常
使用 http:// 你的 IP 地址:15672 访问 rabbit 控制台
# 额外 Linux 相关排查命令
> more xxx.log 查看日记信息> netstat -naop | grep 5672 查看端口是否被占用> ps -ef | grep 5672 查看进程> systemctl stop 服务
# 简单实践
采用简单模式进行代码实践
生产者 Producer
package com.windlinxy.pqc; | |
import com.rabbitmq.client.Channel; | |
import com.rabbitmq.client.Connection; | |
import com.rabbitmq.client.ConnectionFactory; | |
import java.io.IOException; | |
import java.nio.charset.StandardCharsets; | |
import java.util.concurrent.TimeoutException; | |
/** | |
* @author Windlinxy | |
* @description 生产者 | |
* @date 2022-06-28 15:45 | |
**/ | |
public class Producer { | |
// 队列名称 | |
public static final String QUEUE_NAME = "hello"; | |
/** | |
* 发消息 | |
* | |
* @param args 参数 | |
*/ | |
public static void main(String[] args) throws IOException, TimeoutException { | |
// 创建工厂 | |
ConnectionFactory factory = new ConnectionFactory(); | |
// 工厂 ip 连接 RabbitMQ 的队列 | |
factory.setHost("服务器ip"); | |
factory.setUsername("admin"); | |
factory.setPassword("123"); | |
// 创建连接 | |
Connection connection = factory.newConnection(); | |
// 连接中创建信道(具体看原理图) | |
Channel channel = connection.createChannel(); | |
// 简单连接,忽略交换机,连接队列 | |
/* | |
* 生成一个队列 | |
* 1. 队列名称 | |
* 2. 队列里的消息是否持久化(磁盘) 默认情况是存到内存(false)、 | |
* 3. 该队列是否只供一个消费者进行消费,是否消息共享,默认是 false | |
* 4. 最后一个消费者断开了连接之后,是否自动删除 | |
* */ | |
channel.queueDeclare(QUEUE_NAME, false, false, false, null); | |
// 准备发消息 | |
String message = "hello world!"; | |
/* * 发送 | |
* 1. 发送到哪个交换机 | |
* 2. 路由的 Key 值是哪个,本次是队列的名称 | |
* 3. 其他参数信息 | |
* 4。 发送消息体 */ | |
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); | |
System.out.println("Success!"); | |
channel.close(); | |
connection.close(); | |
} | |
} |
消费者 Consumer
package com.windlinxy.pqc; | |
import com.rabbitmq.client.*; | |
import java.io.IOException; | |
import java.util.Arrays; | |
/** | |
* @author Windlinxy | |
* @description 消费者 | |
* @date 2022-06-28 15:45 | |
**/ | |
public class Consumer { | |
private final static String QUEUE_NAME = "hello"; | |
public static void main(String[] args) throws Exception { | |
ConnectionFactory factory = new ConnectionFactory(); | |
factory.setHost("服务器ip"); | |
factory.setUsername("admin"); | |
factory.setPassword("123"); | |
Connection connection = factory.newConnection(); | |
Channel channel = connection.createChannel(); | |
System.out.println("等待接收消息........."); | |
// 推送的消息如何进行消费的接口回调 | |
com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) { | |
/* | |
回调方法 | |
1. consumerTag 标识 | |
2. envelope 获取一些信息,交换机,路由 key | |
3. properties 配置信息 | |
4. body 数据 | |
*/ | |
@Override | |
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { | |
System.out.println("consumerTag :" + consumerTag); | |
System.out.println("Exchange : "+envelope.getExchange()); | |
System.out.println("RoutingKey : "+envelope.getRoutingKey()); | |
// System.out.println("properties : " + properties); | |
System.out.println("body : " + new String(body)); | |
} | |
}; | |
/* | |
高级用法 | |
DeliverCallback deliverCallback = (consumerTag, delivery) -> { | |
String message = new String (delivery.getBody ()); | |
System.out.println (message); | |
}; | |
// 取消消费的一个回调接口 如在消费的时候队列被删除掉了 | |
CancelCallback cancelCallback = (consumerTag) -> { | |
System.out.println ("消息消费被中断"); | |
}; | |
*/ | |
/* | |
* 消费者消费消息 | |
* 1. 消费哪个队列 | |
* 2. 消费成功之后是否要自动应答 true 代表自动应答 false 手动应答 | |
* 3. 消费者未成功消费的回调 | |
*/ | |
channel.basicConsume(QUEUE_NAME, true, consumer); | |
// 消费者是监听消息,不要去关闭资源 | |
} | |
} |
结果: