yeskery

SpringBoot 整合 RabbitMQ 实现延时队列

什么是延时队列,延时队列应用于什么场景?

延时队列顾名思义,即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。
那么,为什么需要延迟消费呢?我们来看以下的场景

  • 网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网)
  • 系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会
  • 系统中的业务失败之后,需要重试

这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会。那么一天之中肯定是会有很多个预约的,时间也是不一定的,假设现在有1点 2点 3点 三个预约,如何让系统知道在当前时间等于0点 1点 2点给用户发送信息呢,是不是需要一个轮询,一直去查看所有的预约,比对当前的系统时间和预约提前一小时的时间是否相等呢?这样做非常浪费资源而且轮询的时间间隔不好控制。如果我们使用延时消息队列呢,我们在创建时把需要通知的预约放入消息中间件中,并且设置该消息的过期时间,等过期时间到达时再取出消费即可。

很多时候我们想定时去做某件事情的时候我们会首先想到定时任务,quartz是个不错的选择,但是也有缺点,假如配置在项目中,集群部署会有重复执行的问题,如果持久化在mysql中,解决了集群的问题,但是过于依赖mysql,耦合严重,当然还有日志量庞大、执行时间精度、过于耗费系统资源等等问题。所以这时候使用消息队列中间件的的延时队列就是一个很好得解决方案,我们设置要触发消费的时间和必要的参数入队mq,到时监听queue的消费者自然拿到消息然后去走业务流程,这里介绍的是基于rabbitmq中间件实现的TTL版的延时队列。

Rabbitmq 实现延时队列一般而言有两种形式:

  • 第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)
  • 第二种方式:利用rabbitmq中的插件x-delay-message

什么是TTL?

先简单介绍下rabbitmq执行的流程,它和我之前写到的spring boot整合activeMQ不太一样,除了队列(queue)之外还引入了交换机(exchange)的概念。
rabbitmq的交换机有4种模式,我不详细介绍,简单说下大体执行流程:

spring_boot_rabbit_mq_exchange

  1. 生产者将消息(msg)和路由键(routekey)发送指定的交换机(exchange)上
  2. 交换机(exchange)根据路由键(routekey)找到绑定自己的队列(queue)并把消息给它
  3. 队列(queue)再把消息发送给监听它的消费者(customer)

那么延时队列TTL又是什么呢?这里引入了一个死信(死亡信息)的概念,有死信必定有死亡时间,也就是我们希望延时多久的时间:

spring_boot_rabbit_mq_exchange_ttl

  1. 生产者将消息(msg)和路由键(routekey)发送指定的死信交换机(delayexchange)上
  2. 死信交换机(delayexchange)根据路由键(routekey1)找到绑定自己的死信队列(delayqueue)并把消息给它
  3. 消息(msg)到期死亡变成死信转发给死信接收交换机(delayexchange)
  4. 死信接收交换机(receiveexchange)根据路由键(routekey2)找到绑定自己的死信接收队列(receivequeue)并把消息给它
  5. 死信接收队列(receivequeue)再把消息发送给监听它的消费者(customer)

ps:延时队列也叫死信队列。基于TTL模式的延时队列会涉及到2个交换机、2个路由键、2个队列…emmmmm比较麻烦

流程介绍完了,看下具体代码吧!

SpringBoot 集成 RabbitMQ 实现第一种方式

  • 首先pom因为依赖
  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  • 配置文件配置 rabbitmq 的信息
  1. # rabbitmq
  2. spring.rabbitmq.host=localhost
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest
  6. spring.rabbitmq.virtual-host=/
  7. # 手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none
  8. spring.rabbitmq.listener.simple.acknowledge-mode=manual
  • 编写 RabbitMQ 配置类,声明几个 bean
  1. @Configuration
  2. public class RabbitUserConfig {
  3. /**
  4. * 死信交换机
  5. * @return
  6. */
  7. @Bean
  8. public DirectExchange delayExchange(){
  9. return new DirectExchange("delay_exchange");
  10. }
  11. /**
  12. * 死信队列
  13. * @return
  14. */
  15. @Bean
  16. public Queue delayQueue(){
  17. Map<String,Object> map = new HashMap<>(16);
  18. map.put("x-dead-letter-exchange","receive_exchange");
  19. map.put("x-dead-letter-routing-key", "receive_key");
  20. return new Queue("delay_queue",true,false,false,map);
  21. }
  22. /**
  23. * 给死信队列绑定交换机
  24. * @return
  25. */
  26. @Bean
  27. public Binding delayBinding(Queue delayQueue,DirectExchange delayExchange){
  28. return BindingBuilder.bind(delayQueue).to(delayExchange).with("delay_key");
  29. }
  30. /**
  31. * 死信接收交换机
  32. * @return
  33. */
  34. @Bean
  35. public DirectExchange receiveExchange(){
  36. return new DirectExchange("receive_exchange");
  37. }
  38. /**
  39. * 死信接收队列
  40. * @return
  41. */
  42. @Bean
  43. public Queue receiveQueue(){
  44. return new Queue("receive_queue");
  45. }
  46. /**
  47. * 死信交换机绑定消费队列
  48. * @return
  49. */
  50. @Bean
  51. public Binding receiveBinding(Queue receiveQueue,DirectExchange receiveExchange){
  52. return BindingBuilder.bind(receiveQueue).to(receiveExchange).with("receive_key");
  53. }
  54. }
  • 编写 RabbitMQ 生产者:
  1. /**
  2. * rabbitMq生产者类
  3. * @author zhanghang
  4. * @date 2018/12/13
  5. */
  6. @Component
  7. @Slf4j
  8. public class RabbitProduct{
  9. @Autowired
  10. private RabbitTemplate rabbitTemplate;
  11. public void sendDelayMessage(List<Integer> list) {
  12. //这里的消息可以是任意对象,无需额外配置,直接传即可
  13. log.info("===============延时队列生产消息====================");
  14. log.info("发送时间:{},发送内容:{}", LocalDateTime.now(), list.toString());
  15. this.rabbitTemplate.convertAndSend(
  16. "delay_exchange",
  17. "delay_key",
  18. list,
  19. message -> {
  20. //注意这里时间要是字符串形式
  21. message.getMessageProperties().setExpiration("60000");
  22. return message;
  23. }
  24. );
  25. log.info("{}ms后执行", 60000);
  26. }
  • 编写 RabbitMQ 消费者
  1. /**
  2. * activeMq消费者类
  3. * @author zhanghang
  4. * @date 2017/12/19
  5. */
  6. @Component
  7. @Slf4j
  8. public class RabbitConsumer {
  9. @Autowired
  10. private CcqCustomerCfgService ccqCustomerCfgService;
  11. /**
  12. * 默认情况下,如果没有配置手动ACK, 那么Spring Data AMQP 会在消息消费完毕后自动帮我们去ACK
  13. * 存在问题:如果报错了,消息不会丢失,但是会无限循环消费,一直报错,如果开启了错误日志很容易就吧磁盘空间耗完
  14. * 解决方案:手动ACK,或者try-catch 然后在 catch 里面将错误的消息转移到其它的系列中去
  15. * spring.rabbitmq.listener.simple.acknowledge-mode = manual
  16. * @param list 监听的内容
  17. */
  18. @RabbitListener(queues = "receive_queue")
  19. public void cfgUserReceiveDealy(List<Integer> list, Message message, Channel channel) throws IOException {
  20. log.info("===============接收队列接收消息====================");
  21. log.info("接收时间:{},接受内容:{}", LocalDateTime.now(), list.toString());
  22. //通知 MQ 消息已被接收,可以ACK(从队列中删除)了
  23. channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
  24. try {
  25. dosomething.....
  26. } catch (Exception e) {
  27. log.error("============消费失败,尝试消息补发再次消费!==============");
  28. log.error(e.getMessage());
  29. /**
  30. * basicRecover方法是进行补发操作,
  31. * 其中的参数如果为true是把消息退回到queue但是有可能被其它的consumer(集群)接收到,
  32. * 设置为false是只补发给当前的consumer
  33. */
  34. channel.basicRecover(false);
  35. }
  36. }
  37. }
  • 编写测试类
  1. /**
  2. * @author zhanghang
  3. * @date 2019/1/3 17:57
  4. */
  5. @RestController
  6. @RequestMapping("/test")
  7. public class TestController {
  8. @Autowired
  9. private RabbitProduct rabbitProduct;
  10. @GetMapping("/sendMessage")
  11. public void sendMessage(){
  12. List<Integer> list = new ArrayList<>();
  13. list.add(1);
  14. list.add(2);
  15. rabbitProduct.sendDelayMessage(list);
  16. }
  17. }

至此就完成了,但是基于TTL的延时队列存在一个问题,就是同一个队列里的消息延时时间最好一致,比如说队列里的延时时间都是1小时,千万不能队列里的消息延时时间乱七八糟多久的都有,这样的话先入队的消息如果延时时间过长会堵着后入队延时时间小的消息,导致后面的消息到时也无法变成死信转发出去,很坑!!!
举个栗子:延时队列里先后进入A,B,C三条消息,存活时间是3h,2h,1h,结果到了1小时C不会死,到了2hB不会死,到了3小时A死了,同时B,C也死了,意味着3h后A,B,C才能消费,很坑!!!
我本来使用时候以为会像redis的存活时间一样,内部维护一个定时器去扫描死亡时间然后变成死信转发,结果不是。。。

利用 RabbitMQ 的插件 x-delay-message 实现

为了解决上面的问题,Rabbitmq实现了一个插件x-delay-message来实现延时队列。

x-delay-message 安装

介绍Ubuntu系统下插件安装方式:
选择 rabbitmq_delayed_message_exchange 插件,选择3.6版本,进行下载
将安装包进行解压

uzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

将插件移到rabbitmq安装的路径

sudo cp -r rabbitmq_delayed_message_exchange-20171215-3.6.x.ez /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.15/plugins

Enable插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

windows同理

SpringBoot 集成 RabbitMQ 实现第二种方式

  1. @Configuration
  2. public class XdelayConfig {
  3. // 创建一个立即消费队列
  4. @Bean
  5. public Queue immediateQueue() {
  6. // 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
  7. return new Queue(Constants.IMMEDIATE_QUEUE_XDELAY, true);
  8. }
  9. @Bean
  10. public CustomExchange delayExchange() {
  11. Map<String, Object> args = new HashMap<String, Object>();
  12. args.put("x-delayed-type", "direct");
  13. return new CustomExchange(Constants.DELAYED_EXCHANGE_XDELAY, "x-delayed-message", true, false, args);
  14. }
  15. @Bean
  16. public Binding bindingNotify() {
  17. return BindingBuilder.bind(immediateQueue()).to(delayExchange()).with(Constants.DELAY_ROUTING_KEY_XDELAY).noargs();
  18. }
  19. }
  1. @Service
  2. public class XdelaySender {
  3. @Autowired
  4. private RabbitTemplate rabbitTemplate;
  5. public void send(Booking booking, int delayTime) {
  6. System.out.println("delayTime" + delayTime);
  7. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  8. this.rabbitTemplate.convertAndSend(Constants.DELAYED_EXCHANGE_XDELAY, Constants.DELAY_ROUTING_KEY_XDELAY, booking, new MessagePostProcessor() {
  9. @Override
  10. public Message postProcessMessage(Message message) throws AmqpException {
  11. message.getMessageProperties().setDelay(delayTime);
  12. System.out.println(sdf.format(new Date()) + " Delay sent.");
  13. return message;
  14. }
  15. });
  16. }
  17. }
  1. @Component
  2. @EnableRabbit
  3. @Configuration
  4. public class XdelayReceiver {
  5. @RabbitListener(queues = Constants.IMMEDIATE_QUEUE_XDELAY)
  6. public void get(Booking booking) {
  7. System.out.println("Receive" + booking);
  8. }
  9. }
  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class RabbitMqTestApplicationTests {
  4. @Autowired
  5. XdelaySender xdelaySender;
  6. @Test
  7. public void test11() {
  8. Booking booking = new Booking();
  9. booking.setBookingContent("hhaha");
  10. booking.setBookingName("预定房子");
  11. booking.setBookingTime(new Date());
  12. booking.setOperatorName("hellen");
  13. xdelaySender.send(booking, 2000);
  14. }
  15. }

本文内容来自:

  1. https://blog.csdn.net/eumenides_/article/details/86025773
  2. https://blog.csdn.net/zhangyuxuan2/article/details/82986802

评论

发表评论 点击刷新验证码

提示

该功能暂未开放