RabbitMQ
同步调用
- 时效性强 , 需要等到结果之后才返回。
- 拓展性差,性能下降 , 连级失败麻烦。
异步调用
消息发送者发给消息代理 ,消息代理发送给消息接收者
- 耦合低 , 无需等待性能好
- 故障隔离
- 时效性差 ,依赖于Broker的可靠性 , 不确定下游是否成功。
安装
注意我的网络是 heima ,大家需要根据自己的网络填写network
docker run \
-e RABBITMQ_DEFAULT_USER=itheima \
-e RABBITMQ_DEFAULT_PASS=123321 \
-v mq-plugins:/plugins \
--name mq \
--hostname mq \
-p 15672:15672 \
-p 5672:5672 \
--network heima\
-d \
rabbitmq:3.8-management

访问虚拟机的15672端口就可以访问到 , 账号密码在docker启动时已经配置

核心概念
- Publisher 消息发送者
- consumer 消息消费者
- queue 队列
- exchange 交换机 ,负责路由
- virtual-host 虚拟主机, 数据隔离
exchange
交换机只负责转发消息(路由),不做消息存储 , 需要绑定队列,不然消息会丢失
bindings 绑定队列
虚拟主机
- 创建不同用户
- 为用户创建virtual-host
java客户端
Spring AMQP
- AMQP协议
依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置
spring:
rabbitmq:
host: 192.168.150.101 # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
提供了 RabbitTemplate 工具类,需要注入
发送消息
@SpringBootTest
class PublisherAMQPApplicationTest {
@Autowired
public RabbitTemplate rabbitTemplate;
@Test
public void SpringAmqpTest() {
// 1. 队列名
String queueName = "simple.queue";
// 2. 消息
String message = "Hello, Spring Amqp";
// 3. 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
接收消息

Work Queues 模型
多个消费者绑定到一个队列
- 同一个消息只能被一个消费者处理
- 轮询 ,消费者分配平均
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
配置每个消费者每次只能获取一条消息 ,让速度快的微服务多处理
Fanout交换机
广播 (每条消息都会发给多个队列)
- Fanout交换机会将消息路由到每个queue
向交换机发送消息需要三个参数
@Test
public void FanoutSpringAmqpTest() {
// 1. 交换机名
String exchangeName = "fanout.exchange";
// 2. 消息
String message = "Hello, Spring Amqp";
// 3. 发送消息
rabbitTemplate.convertAndSend(exchangeName, "" , message); // 空为其他交换机所用 ,传递三个参数辨认exchange为交换机
}
每个队列都会收到消息


Direct交换机
定向路由(根据规则发到指定队列)
需要绑定key

@Test
public void directSpringAmqpTest() {
// 1. 交换机名
String exchangeName = "hmall.direct";
// 2. 消息
String message = "Hello, red !";
// 3. 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red" , message);
}
发送给有这个key的队列。

Topic交换机
队列绑定bindingKey
- 拓展性更强
- 和Direct相似
#代指0个或多个单词
*代指一个单词
声明队列交换机
用代码生成队列和交换机
- 一般在消费者里声明队列和交换机
package com.itheima.consumer.confing;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfiguration {
// 创建交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("hmall.fanout");
}
// 创建队列
@Bean
public Queue queue1(){
return new Queue("fanout.queue1");
}
// 创建队列
@Bean
public Queue queue2(){
return new Queue("fanout.queue2");
}
// 绑定队列
@Bean
public Binding binding1(FanoutExchange fanoutExchange, Queue queue1){
return BindingBuilder.bind(queue1).to(fanoutExchange);
}
@Bean
public Binding binding2(FanoutExchange fanoutExchange, Queue queue2){
return BindingBuilder.bind(queue2).to(fanoutExchange);
}
}
基于注解声明队列交换机
- 使用代码声明交换机和队列太臃肿 。
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue3" , durable = "true"),
exchange = @Exchange(name = "hmall.direct" , type = ExchangeTypes.DIRECT), //type默认就是Direct需要别的可以从ExchangeTypes类中拿
key = {"red" , "green"}
))
public void direct3Listener(String message) {
log.info("direct.queue2 接收到消息:【{}】", message);
}
消息转换器
默认使用jdk的ObjectOutputSteam进行序列化
- jdk 序列化有风险
- 可读性差
- 消息太大
我们使用json的消息转化器
- 配置依赖 (发送方和接收方都需要配置)
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
- 启动类注册Bean
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
MQ高级
消息可靠性
发送者重连
在发送者链接MQ失败 , 之后尝试重新链接
发送者配置
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
- 这种重连机制为阻塞式 ,线程会等待重连 , 性能会下降 , 对业务有要求建议禁用
发送者确认
- ACK 投递成功
- NACK 投递失败
一般无需开启,对性能影响比较大。
MQ的可靠性
- MQ宕机,消息会丢失。
- 内存有限,消费者出现问题,引发MQ阻塞。
- 内存满时会写入磁盘
数据持久化
默认就是持久化交换机。
默认就是持久化队列。
消息需要手动设置持久化 (每条消息都存入磁盘)

- 设置持久化性能会好 ,并发能力下降。
Lazy Queue
- 不存入内存,直接存入磁盘。
- 消费者需要从磁盘读取(如果消费者读取非常快,会提前将部分消息加入内存)

配置arguments 开启 lazy queue
- 性能比较好 , 并发性能好
消费者可靠性
消费者确认机制
- 消费者返回给MQ状态。
处理成功 ack , 消息从队列移除。
处理失败 nack , 消息重新发送给消费者。
处理失败 reject , 不会重新发送给消费者。 (发现消息有问题时, 没有重试的必要时)
- 都是在消费者处理完之后返回结果
AMQP处理机制
- none 不做处理 (默认) ,接收到消息未处理直接删除MQ中的消息
- manual 手动模式
- auto 如果是业务异常 返回nack , 如果是消息处理或校验异常返回 reject
失败重试机制
消费者出现异常时 , 消息存入本地,实行本地重试。
默认重试3次 ,可靠性下降。
RepublishMessageRecoverer 重试耗尽之后失败的消息发送到指定的交换机。
业务幂等性
防止异常重复扣减库存等 , 处理非幂等业务 ,保证幂等性。
设置唯一消息ID
如果是重复消息,放弃处理。
- 在json消息转换器中 , 调用 setCreateMessageIds (true) 为消息生成单独id
@Bean
public MessageConverter messageConverter(){
Jackson2JsonMessageConverter jjmc = new Jackson2JsonMessageConverter();
jjmc.setCreateMessageIds(true);
return jjmc;
}
- 接收者接收 Message 类的消息 , 获取properties , 里有消息id。
@RabbitListener(queues = "simple.queue")
public void Listener(Message message) {
log.info("接收消息id : {}" , message.getMessageProperties().getMessageId());
log.info("SpringRabbitListener 接收到消息:【{}】", new String(message.getBody()));
}
- 业务侵入 ,需要将id写入数据库 , 本来不需要的,为了幂等性降低性能。
业务判断
- 在 Impl 中判断是否已经执行过。
延迟消息
兜底方案 ,防止因为网络问题等 ,就是无法接收消息。
死信交换机
消息过期成为死信
延迟消息插件(Start)
GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ 插件地址
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
- docker 挂载插件并启动

@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue3" , durable = "true"),
exchange = @Exchange(name = "hmall.direct" , delayed = "true"), // delayed 为true
key = {"red" , "green"}
))
- 使用delayed 声明交换机 , 可成为延迟交换机。
- 发送消息 ,向properties中加入setDelay , 添加时间单位为毫秒
@Test
public void testSendMap() {
rabbitTemplate.convertAndSend("object.queue", "hello" , message -> {
message.getMessageProperties().setDelay(5000);
return message;
} );
}
- 避免同一时刻设置太多延时消息 ,cpu压力大 ,尽量降低延迟消息的延时时长。