使用RabbitMQ实现延迟消息一般有两种方式:
- 通过死信队列的方式实现延迟消息
- 通过延迟插件的方式实现延迟消息,延迟插件可以让我们直接定义一个延迟交换机,让交换机拥有延迟发送消息的能力,从而实现延迟消息
本篇博文重点介绍一下使用springboot整合RabbitMQ安装延迟插件实现延迟消息
搭建RabbitMQ
我这里使用docker搭建,使用的版本是3.13.7,脚本如下:
docker run -d --hostname ubuntu-cp \
-p 5672:5672 -p 15672:15672 --name rabbit \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=123456 \
rabbitmq:3-management
运行起来后可以进入管理页面:http://localhost:15672/
输入用户名密码root/123456
安装延迟插件
RabbitMQ默认是不支持延迟交换机的,需要安装插件,插件在github上有:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
去releases页面找到3.13.x版本的插件,下载后缀为.ez
的插件文件。
下载后放入到rabbitmq的plugins目录下,我这里是docker安装的,plugins目录在容器的/opt/rabbitmq/plugins/
放入插件后还需要启用插件,插件放好后容器内运行以下命令即可:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
插件启用成功,去管理页面创建交换机时,可以看到Type选项多了一个x-delayed-message
,代表延迟交换机的类型,这里我就不手动创建了,准备通过程序配置自动创建。
代码演示
引入rabbitmq的依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置交换机、队列以及绑定关系:
@Bean
public Exchange delayExchange() {
return ExchangeBuilder.directExchange(DELAY_EXCHANGE)
.delayed() // 实测设置delayed也可以创建延迟交换机替代以上写法
.durable(true)
.build();
}
@Bean
public Queue delayQueue() {
return new Queue(DELAY_QUEUE, true);
}
@Bean
public Binding delayBinding(Queue delayQueue, Exchange delayExchange) {
return BindingBuilder.bind(delayQueue)
.to(delayExchange)
.with(DELAY_ROUTING_KEY)
.noargs();
}
这里创建交换机的写法实测是可以的,最后创建一个接口注入RabbitTemplate
发送消息,创建一个监听函数监听消息测试一下
发送时注意设置延迟时间:
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE, RabbitConfig.DELAY_ROUTING_KEY, message, msg -> {
msg.getMessageProperties().setDelayLong(delay);
return msg;
});
具体演示代码提交到了github:https://github.com/chengpei/spring-ai-demo,代码在rabbitmq-delayed-message-demo模块
评论区