RabbitMQ 的工作流程和几种交换机的工作模式
孙玉超
2021-03-24 15:00:35
0 评论
1425 浏览
0 收藏
0 赞
由于博主公司使用的是 RabbitMQ ,所以学习就以 RabbitMQ 为例,主要是学习思想,其实所有消息队列理念上都大同小异,只是实现方式或者性能指标略有差异。不过我们公司是使用的 SpringCloud-Stream 整合的 RabbitMQ ,使用非常简单,但是这样只会复制粘贴,根本不懂 RabbitMQ ,以后就算有业务难点也没法去解决,所以还是要把根基打牢!
RabbitMQ 架构图
先了解一下 RabbitMQ 发送和消费消息的过程,生产者先和 RabbitMQ 服务建立连接 Connection ,Connection 中有许多 channel (我觉得这有点像 IO 多路复用的意思),通过 channel 传递消息到 RabbitMQ 服务器,服务器内部通过交换机来转发消息,消息最终落入哪个队列由 Binding 来决定,Binding 是通过路由键将交换机和队列绑定在一起,这样不同的交换器就能匹配到对应的队列。而交换机和队列又被一个个的虚拟机 Virtual Host 分隔开,这个 Virtual Host 和 RabbitMQ Server 可以理解为 MySQL 和 MySQL 中的不同数据库。消费者和生产者一样需要先和 RabbitMQ 服务器建立连接 Connection ,内部也是通过 channel 来通信,去订阅不同的队列来消费消息。
RabbitMQ 的几种交换机类型
广播 fanout
广播类型的交换机用于把消息发送到所有与之绑定的 Queue 中
广播类型的交换机不受路由键的限制,虽然使用 SpringBoot 的官方 Stater 的 API 会有 RoutingKey 这个参数,但其实不受影响。
广播类型的交换器适用于哪些业务场景呢?比如秒杀系统下单成功,此时可以发一个广播消息,库存系统,优惠券系统等都要去订阅这个消息。但是生产环境一个微服务一般都是多台实例集群。我们要保证一个微服务的两个实例只能消费一次消息,如下图:假设 C1 是库存系统,那么两个库存系统的实例只能消费一次消息。很简单,只要两个微服务监听的是同一个队列,它们就只能有一个微服务能消费成功。RabbitMQ 已经帮我们做好了,不然就麻烦了。
direct 直接匹配路由
要理解路由的规则,先看一段代码
@Bean("binding1") public Binding binding1(){ return BindingBuilder.bind(queue1()).to(exchange()).with("routingKey").noargs(); }
交换机和队列通过路由键建立关系,形成一个 Binding 对象绑定在一起。direct 类型的交换机把消息发送到 与之 RoutingKey 相匹配的所有队列。有些地方可能会把 BingdingKey 和 Routingkey 分开,但是据官方提供的客户端依赖包以及 SpirngBoot 提供的依赖包都把这两个都当成 RoutingKey 来理解。如果一定要分开,可以这么理解。BingdingKey 是将交换机和队列绑定时需要的,RoutingKey 是交换机转发消息的时候需要的,当 RoutingKey 和 BindingKey 匹配上,消息就能正确发送到队列。
上图交换机通过路由键 key1 绑定了两个队列,那么当交换机发送消息时指定 key1 路由键的话,两个队列都能收到消息。如果指定 key2 或者 key3 为路由键只有下面的队列能收到消息。实际业务场景中很常见,有时我们发送一个消息希望多个系统去消费,但有时发送一个消息只希望其中一个系统消费,就可以使用这种路由匹配模式。
topics 通配符路由匹配
这种类型的交换机工作模式和 direct 几乎一样,不同的是 topics 可以使用通配符 “*” 和 “#” 来模糊匹配路由键。“*” 可以恰好代替一个单词;“#” 可以恰好代替零个或多个单词。
代码示例
原生 rabbit-client 代码比较多,这里只介绍 SpringBoot 官方提供封装好的 Starter 的代码首先引入 maven 依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
构建交换机、队列并且绑定
#构建交换机,可以根据业务需求创建不同类型的交换机 @Bean public Exchange exchange(){ return ExchangeBuilder.directExchange("direct").build(); } #创建队列 @Bean public Queue queue(){ return QueueBuilder.durable("queue").build(); } #将队列和交换机通过路由键绑定 @Bean("binding") public Binding binding(){ return BindingBuilder.bind(queue()).to(exchange()).with("routingKey").noargs(); }
发送和监听消息
#发送消息 rabbitTemplate.convertAndSend("direct","routingKey","消息"); #监听消息 @RabbitListener(queues = "queue") public void listen1(String str){ System.out.println(str); }
以上就是基础的消息发送和监听代码,使用起来很简单。如果使用 SpringCloud-Stream 会更简单......但是据网上说 SpringCloud-Stream 无法保证 消息可靠性投递和消费签收 。这个我还没尝试,待研究之后再说