RabbitMQ 消息可靠投递和成功消费
孙玉超
2021-03-30 11:11:14
0 评论
1891 浏览
0 收藏
0 赞
作为消息队列,要有一种机制来确保消息的发送和接收,RabbitMQ 当然也不例外。那么 RabbitMQ 是用什么机制确保消息可靠性的呢?也就是说怎样保障消息可靠性?
先思考一下,消息队列丢失消息的几种情况。
1. 生产者弄丢。发送消息的时候还没到 RabbitMQ 服务器就丢了,或者到了 RabbitMQ 服务器但是没有匹配的交换机,或者没有正确路由到队列。
2. RabbitMQ 弄丢。消息发送到了 RabbitMQ ,还没来得及消费,RabbitMQ 宕机了,那这个消息就丢了。
3. 消费者弄丢。消费者消费消息过程中,业务代码报错,那 RabbitMQ 认为你都从队列取走消息了,认为你已经消费过了。但其实业务代码报错了相当于没有消费,这条消息就等于丢了。
生产者弄丢
这个问题,RabbitMQ 提供了确认和回退机制,就是说每次发送消息有一个监听机制,如果成功发送到交换机可以触发一个监听,从交换机路由到队列失败也会有一个监听。只需要开启这两个监听机制即可。
yml 文件添加配置:
rabbitmq: port: 5672 username: guest password: guest addresses: xxx.xxx.xxx.xx publisher-returns: true publisher-confirm-type: correlated #新版本 publisher-confirms: true 已过时
初始化交换机和队列:
@Bean public Queue queue(){ return QueueBuilder.durable("queue").build(); } @Bean public Exchange exchange(){ return ExchangeBuilder.directExchange("direct").build(); } @Bean("binding") public Binding binding(){ return BindingBuilder.bind(queue()).to(exchange()).with("").noargs(); }
代码实现:
@Configuration public class RabbitMQConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); //指定 ConfirmCallback rabbitTemplate.setReturnCallback(this); //指定 ReturnCallback // true 代表,路由到队列失败会返回给生产者这个消息,否则丢弃。2.2.5-RELEASE 版本默认就是 true // 如果低版本要手动设置为 true ,false 代表不会回调 returnMessage 方法。 //rabbitTemplate.setMandatory(true); } //消息发送到 rabbitMQ 认证回调 @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("消息发送成功:"+correlationData); } else { System.out.println("消息发送失败:"+cause); } } //Exchange 路由到 Queue 失败 ,回退回调 @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("消息:"+message); System.out.println("返回码:"+replyCode); System.out.println("响应文本:"+replyText); System.out.println("交换机:"+exchange); System.out.println("路由键:"+routingKey); } }
接下来很简单,只要写个方法去发送消息,测试 confirm 方法可以尝试将交换机名字写错,会回调 confirm 方法。将路由键写错来测试 returnMessage 方法。实际业务中,可以在这两个回调方法中做对应的处理,记录、重发消息等。
RabbitMQ 弄丢
这个只要开启持久化就行了。一般来说交换机和队列的持久化是必开的,如果消息到达交换机或者队列但是还没消费,RabbitMQ 服务器挂了。那重启 RabbitMQ 之后消息也不存在了,所以可以设置消息持久化,只要发送消息的时候设置 deliveryMode 为 2 就行了。
注意必须队列和消息都要设置持久化,单独设置队列持久化,消息不持久化,重启之后队列还在,但是消息就没了。单独设置消息持久化,不设置队列持久化,重启之后队列都没了,消息自然也没了。另外,不要所有的消息的设置为持久化的,因为设置持久化的消息要写入磁盘,这对 RabbitMQ 的性能是很大的消耗。只要将非常非常重要的消息设置为持久化即可。
消费者弄丢
所谓消费者弄丢就是消息发出去了,消费者也成功从队列中订阅到了消息。只是消费者执行业务代码时出现了未知异常导致消费者业务没有做完,那 RabbitMQ 认为你都消费了,自然不会保留这条消息,那这条消息就相当于是丢了。对于这个问题,RabbitMQ 给我们提供了一个机制,就是消费者消费完之后可以发送一个回执来确认我这条消息正常处理完毕,叫做 RabbitMQ 的 ack 机制。也可以进行拒绝签收来做一些其他的操作。
默认情况下 RabbitMQ 的 ack 机制是关闭的,就是说只要消息从队列成功到达消费者,RabbitMQ 就认为这条消息消费成功了。所以我们要开启手动 ack 。
rabbitmq: port: 5672 username: guest password: guest addresses: xxx.xxx.xxx.xx publisher-returns: true publisher-confirm-type: correlated listener: simple: acknowledge-mode: manual #手动签收 一共三个值 none:默认不ack;manual:手动ack;auto:根据异常抛出类型ack,一般不用。 prefetch: 1 #每次从队列拉取的消息数量,生产环境为了缓解消费端的压力可以设置此值,比如估算5000消息,可以设置1000,每次拉取1000消息消费。
注意如果开启了手动确认,那么必须要在消费端对消息的消费情况作出应答,要么签收要么拒签。否则队列将不会继续发送消息。消费端代码:
@RabbitListener(queues = "queue") public void listenQueue(String object,Message message,Channel channel) { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println("消费成功:"+deliveryTag); //deliveryTag 代表当前签收的消息标签,可以用message获取,multiple 代表将一次性ack所有小于deliveryTag的消息,因为消息的deliveryTag是递增的 channel.basicAck(deliveryTag,true); }catch (Exception e){ try { //requeue 代表是否重回队列,如果配置为true,拒签之后将会重新回队列再次被消费 channel.basicNack(deliveryTag,true,true); } catch (IOException ioe) { System.out.println("拒签失败"); } } }
注意如果生产环境设置了 requeue = true,那么一旦拒签该消息将会回到队列顶端即将被消费,如果业务再次消费失败,这样就会造成死循环。服务器 CPU 可能会被打满……
所以消息消费手动签收 (ack),不是特殊情况下尽量不要使用,可能会有很多坑...... 对于消费端业务代码异常可以采用 SpringBoot 提供的消息重试机制。
SpringBoot 整合 RabbitMQ 消息重试
在配置文件中添加配置:
#... listener: simple: # acknowledge-mode: manual #手动签收 # prefetch: 9 #表示消费端每次从mq拉取1条消息消费,知道手动确认消费完毕后才会继续拉取下一条 retry: enabled: true #开启重试 max-attempts: 3 #最大尝试次数
开启消息重试之后,如果消费端有未处理的异常(try catch 掉的异常不会触发),会触发这个机制,让消费端再次尝试消费。值得注意的是,这个机制并不是 RabbitMQ 提供的,而是 SpringBoot 提供的,也就是说重试机制跟 RabbitMQ 其实没有关系,是发生在消费端的内部重试。更不是 RabbitMQ 重新发的消息!
业务体系不是很大的情况下其实没有必要用 ack 机制,用了反而不好,用 SpringBoot 提供的重试机制就很 NICE。
以上就是 RabbitMQ 提供的保障消息可靠性的解决方案,SpringBoot 提供的 Starter 做了相对比较完整的封装,简单易用。