RabbitMQ

同步调用

  • 时效性强 , 需要等到结果之后才返回。
  • 拓展性差,性能下降 , 连级失败麻烦。

异步调用

消息发送者发给消息代理 ,消息代理发送给消息接收者

  • 耦合低 , 无需等待性能好
  • 故障隔离
  • 时效性差 ,依赖于Broker的可靠性 , 不确定下游是否成功。

安装

注意我的网络是 heima ,大家需要根据自己的网络填写network

docker run \
 -e RABBITMQ_DEFAULT_USER=itheima \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 -v mq-plugins:/plugins \
 --name mq \
 --hostname mq \
 -p 15672:15672 \
 -p 5672:5672 \
 --network heima\
 -d \
 rabbitmq:3.8-management

Screenshot 2025-11-19 220735.png

访问虚拟机的15672端口就可以访问到 , 账号密码在docker启动时已经配置

Screenshot 2025-11-19 220850.png

核心概念

  • Publisher 消息发送者
  • consumer 消息消费者
  • queue 队列
  • exchange 交换机 ,负责路由
  • virtual-host 虚拟主机, 数据隔离

exchange

交换机只负责转发消息(路由),不做消息存储 , 需要绑定队列,不然消息会丢失

bindings 绑定队列

虚拟主机

  • 创建不同用户
  • 为用户创建virtual-host

java客户端

Spring AMQP

  • AMQP协议

依赖

        <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

配置

spring:
  rabbitmq:
    host: 192.168.150.101 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

提供了 RabbitTemplate 工具类,需要注入

发送消息

@SpringBootTest
class PublisherAMQPApplicationTest {
    @Autowired
    public RabbitTemplate rabbitTemplate;
    @Test
    public void SpringAmqpTest() {
        // 1. 队列名
        String queueName = "simple.queue";
        // 2. 消息
        String message = "Hello, Spring Amqp";
        // 3. 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

接收消息

Screenshot 2025-11-19 225947.png

Work Queues 模型

多个消费者绑定到一个队列

  • 同一个消息只能被一个消费者处理
  • 轮询 ,消费者分配平均
spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

配置每个消费者每次只能获取一条消息 ,让速度快的微服务多处理

Fanout交换机

广播 (每条消息都会发给多个队列)

  • Fanout交换机会将消息路由到每个queue

向交换机发送消息需要三个参数

    @Test
    public void FanoutSpringAmqpTest() {
        // 1. 交换机名
        String exchangeName = "fanout.exchange";
        // 2. 消息
        String message = "Hello, Spring Amqp";
        // 3. 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "" , message);  // 空为其他交换机所用 ,传递三个参数辨认exchange为交换机
    }

每个队列都会收到消息

Screenshot 2025-11-19 233159.png

Screenshot 2025-11-20 194928.png

Direct交换机

定向路由(根据规则发到指定队列)

需要绑定key

Screenshot 2025-11-19 222058.png

    @Test
    public void directSpringAmqpTest() {
        // 1. 交换机名
        String exchangeName = "hmall.direct";
        // 2. 消息
        String message = "Hello, red !";
        // 3. 发送消息
        rabbitTemplate.convertAndSend(exchangeName, "red" , message);
    }

发送给有这个key的队列。

Screenshot 2025-11-20 194928.png

Topic交换机

队列绑定bindingKey

  • 拓展性更强
  • 和Direct相似

#代指0个或多个单词

*代指一个单词

声明队列交换机

用代码生成队列和交换机

  • 一般在消费者里声明队列和交换机
package com.itheima.consumer.confing;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutConfiguration {

    // 创建交换机
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("hmall.fanout");
    }

    // 创建队列
    @Bean
    public Queue queue1(){
        return new Queue("fanout.queue1");
    }
    // 创建队列
    @Bean
    public Queue queue2(){
        return new Queue("fanout.queue2");
    }
    // 绑定队列
    @Bean
    public Binding binding1(FanoutExchange fanoutExchange, Queue queue1){
        return BindingBuilder.bind(queue1).to(fanoutExchange);
    }
    @Bean
    public Binding binding2(FanoutExchange fanoutExchange, Queue queue2){
        return BindingBuilder.bind(queue2).to(fanoutExchange);
    }
}

基于注解声明队列交换机

  • 使用代码声明交换机和队列太臃肿 。
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue3" , durable = "true"),
            exchange = @Exchange(name = "hmall.direct" , type = ExchangeTypes.DIRECT), //type默认就是Direct需要别的可以从ExchangeTypes类中拿
            key = {"red" , "green"}
    ))
    public void direct3Listener(String message) {
        log.info("direct.queue2 接收到消息:【{}】", message);
    }

消息转换器

默认使用jdk的ObjectOutputSteam进行序列化

  • jdk 序列化有风险
  • 可读性差
  • 消息太大

我们使用json的消息转化器

  • 配置依赖 (发送方和接收方都需要配置)

    <dependency>
        <groupId>com.fasterxml.jackson.dataformat</groupId>
        <artifactId>jackson-dataformat-xml</artifactId>
        <version>2.9.10</version>
    </dependency>
  • 启动类注册Bean
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

MQ高级

消息可靠性

发送者重连

在发送者链接MQ失败 , 之后尝试重新链接

发送者配置

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数
  • 这种重连机制为阻塞式 ,线程会等待重连 , 性能会下降 , 对业务有要求建议禁用

发送者确认

  • ACK 投递成功
  • NACK 投递失败

一般无需开启,对性能影响比较大。

MQ的可靠性

  • MQ宕机,消息会丢失。
  • 内存有限,消费者出现问题,引发MQ阻塞。
  • 内存满时会写入磁盘

数据持久化

默认就是持久化交换机。

默认就是持久化队列。

消息需要手动设置持久化 (每条消息都存入磁盘)

Screenshot 2025-11-24 211225.png

  • 设置持久化性能会好 ,并发能力下降。

Lazy Queue

  • 不存入内存,直接存入磁盘。
  • 消费者需要从磁盘读取(如果消费者读取非常快,会提前将部分消息加入内存)

Screenshot 2025-11-24 212107.png

配置arguments 开启 lazy queue

  • 性能比较好 , 并发性能好

消费者可靠性

消费者确认机制

  • 消费者返回给MQ状态。

处理成功 ack , 消息从队列移除。

处理失败 nack , 消息重新发送给消费者。

处理失败 reject , 不会重新发送给消费者。 (发现消息有问题时, 没有重试的必要时)

  • 都是在消费者处理完之后返回结果

AMQP处理机制

  • none 不做处理 (默认) ,接收到消息未处理直接删除MQ中的消息
  • manual 手动模式
  • auto 如果是业务异常 返回nack , 如果是消息处理或校验异常返回 reject

失败重试机制

消费者出现异常时 , 消息存入本地,实行本地重试。

默认重试3次 ,可靠性下降。

RepublishMessageRecoverer 重试耗尽之后失败的消息发送到指定的交换机。

业务幂等性

防止异常重复扣减库存等 , 处理非幂等业务 ,保证幂等性。

设置唯一消息ID

如果是重复消息,放弃处理。

  • 在json消息转换器中 , 调用 setCreateMessageIds (true) 为消息生成单独id
@Bean
public MessageConverter messageConverter(){
    Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
    jjmc.setCreateMessageIds(true);
    return jjmc;
}
  • 接收者接收 Message 类的消息 , 获取properties , 里有消息id。
@RabbitListener(queues = "simple.queue")
    public void Listener(Message message) {
        log.info("接收消息id : {}" , message.getMessageProperties().getMessageId());
        log.info("SpringRabbitListener 接收到消息:【{}】", new String(message.getBody()));
    }
  • 业务侵入 ,需要将id写入数据库 , 本来不需要的,为了幂等性降低性能。
业务判断
  • 在 Impl 中判断是否已经执行过。

延迟消息

兜底方案 ,防止因为网络问题等 ,就是无法接收消息。

死信交换机

消息过期成为死信

延迟消息插件(Start)

GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ 插件地址

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • docker 挂载插件并启动

Screenshot 2025-11-24 222356.png

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue3" , durable = "true"),
            exchange = @Exchange(name = "hmall.direct" , delayed = "true"), // delayed 为true
            key = {"red" , "green"}
    ))
  • 使用delayed 声明交换机 , 可成为延迟交换机。
  • 发送消息 ,向properties中加入setDelay , 添加时间单位为毫秒
    @Test
    public void testSendMap() {
        rabbitTemplate.convertAndSend("object.queue", "hello" , message -> {
            message.getMessageProperties().setDelay(5000);
            return message;
        } );
    }
  • 避免同一时刻设置太多延时消息 ,cpu压力大 ,尽量降低延迟消息的延时时长。

MQ 完结!

迷茫java练习生