# 安装

# rpm 包安装

  1. 官网地址:https://www.rabbitmq.com/download.html

  2. 下载 rpm 文件:这里有两个需要下载,分别是 rabbitmq-server 和 erlang

  3. 安装文件:

    • erlang-23.3.4.11-1.el7.x86_64.rpm

    • rabbitmq-server-3.10.5-1.el8.noarch.rpm

    # 需要的环境
    rpm -ivh erlang-23.3.4.11-1.el7.x86_64.rpm
    # 依赖包,要联网
    yum install socat -y
    # 本身
    rpm -ivh rabbitmq-server-3.10.5-1.el8.noarch.rpm
  4. 常用命令

    # 开机启动 RabbitMQ 服务
    chkconfig rabbitmq-server on
    # 启动服务
    /sbin/service rabbitmq-server start 
    # 查看服务状态
    /sbin/service rabbitmq-server status
    # 停止服务状态
    /sbin/service rabbitmq-server stop
    # 开启 web 管理插件
    rabbitmq-plugins enable rabbitmq_management
    # 开启

    image-20220627173744214

  5. 添加新用户

    # 创建账号
    rabbitmqctl add_user admin 123
    # 设置用户角色
    rabbitmqctl set_user_tags admin administrator
    # 设置用户权限 set_permissions [-p <vhostpath>] <user> <conf> <write> <read>
    rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
    # 用户 user_admin 具有 /vhost1 这个 virtual host 中所有资源的配置、写、读权限
    # 当前用户和角色
    rabbitmqctl list_users
  6. 用户登录

  7. 重置命令

    # 关闭应用命令
    rabbitmqctl stop_app
    # 清除命令
    rabbitmqctl reset
    # 重新启动
    rabbitmqctl start_app

# Docker 安装

# 获取 rabbit 镜像

docker pull rabbitmq:management

# 创建并运行容器

docker run -di --name=myrabbit -p 15672:15672 rabbitmq:management

—hostname:指定容器主机名称
—name:指定容器名称
- p:将 mq 端口号映射到本地
或者运行时设置用户和密码

docker run -di --name myrabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

# 查看日志

docker logs -f myrabbit

# 容器运行正常

使用 http:// 你的 IP 地址:15672 访问 rabbit 控制台

# 额外 Linux 相关排查命令

> more xxx.log  查看日记信息> netstat -naop | grep 5672 查看端口是否被占用> ps -ef | grep 5672  查看进程> systemctl stop 服务

# 简单实践

采用简单模式进行代码实践

image-20220630083249319

生产者 Producer

package com.windlinxy.pqc;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;
/**
 * @author Windlinxy
 * @description 生产者
 * @date 2022-06-28 15:45
 **/
public class Producer {
    // 队列名称
    public static final String QUEUE_NAME = "hello";
    /**
     * 发消息
     *
     * @param args 参数
     */
    public static void main(String[] args) throws IOException, TimeoutException {
        // 创建工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 工厂 ip 连接 RabbitMQ 的队列
        factory.setHost("服务器ip");
        factory.setUsername("admin");
        factory.setPassword("123");
        // 创建连接
        Connection connection = factory.newConnection();
        // 连接中创建信道(具体看原理图)
        Channel channel = connection.createChannel();
        // 简单连接,忽略交换机,连接队列
        /*
         * 生成一个队列
         * 1. 队列名称
         * 2. 队列里的消息是否持久化(磁盘) 默认情况是存到内存(false)、
         * 3. 该队列是否只供一个消费者进行消费,是否消息共享,默认是 false
         * 4. 最后一个消费者断开了连接之后,是否自动删除
         * */
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 准备发消息
        String message = "hello world!";
/*       * 发送
         * 1. 发送到哪个交换机
         * 2. 路由的 Key 值是哪个,本次是队列的名称
         * 3. 其他参数信息
         * 4。 发送消息体 */
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
        System.out.println("Success!");
        channel.close();
        connection.close();
    }
}

消费者 Consumer

package com.windlinxy.pqc;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Arrays;
/**
 * @author Windlinxy
 * @description 消费者
 * @date 2022-06-28 15:45
 **/
public class Consumer {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("服务器ip");
        factory.setUsername("admin");
        factory.setPassword("123");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        System.out.println("等待接收消息.........");
        // 推送的消息如何进行消费的接口回调
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            /*
            回调方法
            1. consumerTag 标识
            2. envelope 获取一些信息,交换机,路由 key
            3. properties 配置信息
            4. body 数据
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag :" + consumerTag);
                System.out.println("Exchange : "+envelope.getExchange());
                System.out.println("RoutingKey : "+envelope.getRoutingKey());
//                System.out.println("properties : " + properties);
                System.out.println("body : " + new String(body));
            }
        };
        /*
        高级用法
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String (delivery.getBody ());
            System.out.println (message);
        };
        // 取消消费的一个回调接口 如在消费的时候队列被删除掉了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println ("消息消费被中断");
        };
        */
        
        /*
         * 消费者消费消息
         * 1. 消费哪个队列
         * 2. 消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3. 消费者未成功消费的回调
         */
        channel.basicConsume(QUEUE_NAME, true, consumer);
        // 消费者是监听消息,不要去关闭资源
    }
}

结果:

image-20220630091648283

image-20220630091704270

更新于 阅读次数 本文阅读量:

请我喝[茶]~( ̄▽ ̄)~*

Windlinxy 微信支付

微信支付

Windlinxy 支付宝

支付宝