您现在的位置是:首页 > 文章 > RabbitMQ 用死信队列和消息过期实现延迟消息 网站文章

RabbitMQ 用死信队列和消息过期实现延迟消息

孙玉超 2021-03-31 11:15:11 0 评论 1260 浏览 0 收藏 0


RabbitMQ 本身并未直接提供延迟队列,但是可以通过其提供的死信队列结合消息的过期来实现延迟队列效果。在实现延迟队列之前我们需要先了解下 RabbitMQ 的 TTL 特性和死信队列。


RabbitMQ 的 TTL 特性


TTL 全称 Time To Live —— 存活时间。RabbitMQ 中既可以对队列中整个消息设置存活时间,也可以对单条消息设置存活时间。如果两者都设置,那么将以时间小的为准。

给整个队列设置消息存活时间,构建队列的时候设置 ttl 即可,单位毫秒。所有发送到该队列的消息 10 秒钟之后将会过期。

   @Bean
    public Queue queue(){
        return QueueBuilder.durable("queue").ttl(10000).build();
    }

给单条消息设置过期时间,前面三个参数依然为交换机、路由键、消息内容,最后一个参数为实现 MessagePostProcessor 的一个匿名类。

 rabbitTemplate.convertAndSend("direct", "", "ces", message -> {
                message.getMessageProperties().setExpiration("10000");
                return message;
            });

可以通过 RabbitMQ 客户端去观察队列中消息的变化以及观察队列的属性。


值得注意的是,当对某一条消息设置过期时间之后,这条消息并不一定在设置的时间之后就会消失,而是惰性删除。当消息处于队列顶端,也就是即将被消费时才会判断是否过期,类似于 Redis 的 key 过期策略之一 惰性删除。否则要开启一个调度每隔一定时间循环队列中的消息判断是否过期,效率很差。


死信队列


死信队列在 RabbitMQ 中叫做 Dead Letter Exchange 简称 DLX。如果我们把普通队列绑定一个死信交换机,那么当消息成为死信之后不会被丢弃,而是发送给这个死信交换机,进而路由到死信交换机绑定的队列。



正如上图所示,如果队列绑定了死信交换机,那么当消息成为死信之后不会被丢弃,而是转到死信交换机路由到对应的队列。为了搞清楚死信队列,我们需要先弄清楚两个问题:

1. 消息什么情况下会变为死信。

2. 如何把普通队列绑定到死信交换机。


消息成为死信的三种情况


1. 队列消息到达长度限制。RabbitMQ 是可以设置队列长度的,比如设置队列长度为 10,当队列中积满 10 条消息,再过来一条到达最大长度限制,那么这条消息就会成为死信。

2. 消费者拒绝签收消息,并且不重回队列,requeue = false 。

3. 消息超过存活时间没有被消费。


为普通队列绑定交换机来实现延迟队列


首先声明死信交换机、队列、绑定。再把普通队列、交换机、绑定,并且给普通队列绑定上死信交换机

    @Bean("dlx")
    public Exchange exchangeDlx(){
        return ExchangeBuilder.directExchange("dlx").build();//死信交换机
    }
    
    @Bean("queueDlx")
    public Queue queueDlx(){
        return QueueBuilder.durable("queueDlx").build();//死信交换机对应的队列
    }
    
    @Bean("bindingDlx")
    public Binding bindingDlx(){
        return BindingBuilder.bind(queueDlx()).to(exchangeDlx()).with("dlxKey").noargs(); //绑定死信交换机和队列
    }


    //普通队列,设置 10 秒消息过期时间,并且根据路由键绑定到死信交换机
    @Bean
    public Queue queue(){
        return QueueBuilder.durable("queue").ttl(10000).deadLetterExchange("dlx").deadLetterRoutingKey("dlxKey").build();
    }
    
    @Bean
    public Exchange exchange(){
        return ExchangeBuilder.directExchange("direct").build(); //普通交换机
    }
    
    @Bean
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(exchange()).with("").noargs();//绑定
    }

这样就为普通队列绑定好了死信交换机,发送消息到队列 queue 。

        //发送 10 条消息
        for(int i=0;i<10;i++){
            rabbitTemplate.convertAndSend("direct", "", "ces" + i);
        }

在 RabbitMQ 管理界面可以看到一开始 queue 中有 10 条消息,10 秒钟之后消息,出现在死信交换机所绑定的队列。



这样就使用 RabbitMQ 的 TTL 特性和死信队列结合来实现了延迟消息的效果。延迟消息在业务中使用非常广泛,比如用户下单 30 分钟内未支付,订单关闭。发货之后用户 15 天未收货自动确认收货等业务场景。



转载请注明出处:转载请注明出处

上一篇 : RabbitMQ 消息可靠投递和成功消费 下一篇 : 我的生活

留言评论

所有回复

暮色妖娆丶

96年草根站长,2019年7月接触互联网踏入Java开发岗位,喜欢前后端技术。对技术有强烈的渴望,2019年11月正式上线自己的个人博客