您现在的位置是:首页 > 文章 > RabbitMQ 的工作流程和几种交换机的工作模式 网站文章

RabbitMQ 的工作流程和几种交换机的工作模式

孙玉超 2021-03-24 15:00:35 0 评论 1425 浏览 0 收藏 0


由于博主公司使用的是 RabbitMQ ,所以学习就以 RabbitMQ 为例,主要是学习思想,其实所有消息队列理念上都大同小异,只是实现方式或者性能指标略有差异。不过我们公司是使用的 SpringCloud-Stream 整合的 RabbitMQ ,使用非常简单,但是这样只会复制粘贴,根本不懂 RabbitMQ ,以后就算有业务难点也没法去解决,所以还是要把根基打牢!


RabbitMQ 架构图



先了解一下 RabbitMQ 发送和消费消息的过程,生产者先和 RabbitMQ 服务建立连接 Connection ,Connection 中有许多 channel (我觉得这有点像 IO 多路复用的意思),通过 channel 传递消息到 RabbitMQ 服务器,服务器内部通过交换机来转发消息,消息最终落入哪个队列由 Binding 来决定,Binding 是通过路由键将交换机和队列绑定在一起,这样不同的交换器就能匹配到对应的队列。而交换机和队列又被一个个的虚拟机 Virtual Host 分隔开,这个 Virtual Host 和 RabbitMQ Server 可以理解为 MySQL 和 MySQL 中的不同数据库。消费者和生产者一样需要先和 RabbitMQ 服务器建立连接 Connection ,内部也是通过 channel 来通信,去订阅不同的队列来消费消息。


RabbitMQ 的几种交换机类型


广播 fanout


广播类型的交换机用于把消息发送到所有与之绑定的 Queue 中



广播类型的交换机不受路由键的限制,虽然使用 SpringBoot 的官方 Stater 的 API 会有 RoutingKey 这个参数,但其实不受影响。


广播类型的交换器适用于哪些业务场景呢?比如秒杀系统下单成功,此时可以发一个广播消息,库存系统,优惠券系统等都要去订阅这个消息。但是生产环境一个微服务一般都是多台实例集群。我们要保证一个微服务的两个实例只能消费一次消息,如下图:假设 C1 是库存系统,那么两个库存系统的实例只能消费一次消息。很简单,只要两个微服务监听的是同一个队列,它们就只能有一个微服务能消费成功。RabbitMQ 已经帮我们做好了,不然就麻烦了。



direct 直接匹配路由


要理解路由的规则,先看一段代码

    @Bean("binding1")
    public Binding binding1(){
        return BindingBuilder.bind(queue1()).to(exchange()).with("routingKey").noargs();
    }

交换机和队列通过路由键建立关系,形成一个 Binding 对象绑定在一起。direct 类型的交换机把消息发送到 与之 RoutingKey 相匹配的所有队列。有些地方可能会把 BingdingKey 和 Routingkey 分开,但是据官方提供的客户端依赖包以及 SpirngBoot 提供的依赖包都把这两个都当成 RoutingKey 来理解。如果一定要分开,可以这么理解。BingdingKey 是将交换机和队列绑定时需要的,RoutingKey 是交换机转发消息的时候需要的,当 RoutingKey 和 BindingKey 匹配上,消息就能正确发送到队列。



上图交换机通过路由键 key1 绑定了两个队列,那么当交换机发送消息时指定 key1 路由键的话,两个队列都能收到消息。如果指定 key2 或者 key3 为路由键只有下面的队列能收到消息。实际业务场景中很常见,有时我们发送一个消息希望多个系统去消费,但有时发送一个消息只希望其中一个系统消费,就可以使用这种路由匹配模式。


topics 通配符路由匹配


这种类型的交换机工作模式和 direct 几乎一样,不同的是 topics 可以使用通配符 “*” 和 “#” 来模糊匹配路由键。“*” 可以恰好代替一个单词;“#” 可以恰好代替零个或多个单词。



代码示例


原生 rabbit-client 代码比较多,这里只介绍 SpringBoot 官方提供封装好的 Starter 的代码首先引入 maven 依赖

  <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
  </dependency>

构建交换机、队列并且绑定

    #构建交换机,可以根据业务需求创建不同类型的交换机
    @Bean
    public Exchange exchange(){
        return ExchangeBuilder.directExchange("direct").build();
    }

    #创建队列
    @Bean
    public Queue queue(){
        return QueueBuilder.durable("queue").build();
    }

    #将队列和交换机通过路由键绑定
    @Bean("binding")
    public Binding binding(){
        return BindingBuilder.bind(queue()).to(exchange()).with("routingKey").noargs();
    }

发送和监听消息

   #发送消息
   rabbitTemplate.convertAndSend("direct","routingKey","消息");

   #监听消息
   @RabbitListener(queues = "queue")
    public void listen1(String str){
        System.out.println(str);
    }

以上就是基础的消息发送和监听代码,使用起来很简单。如果使用 SpringCloud-Stream 会更简单......但是据网上说 SpringCloud-Stream 无法保证 消息可靠性投递和消费签收 。这个我还没尝试,待研究之后再说




转载请注明出处:转载请注明出处

上一篇 : 为什么使用消息队列? 下一篇 : Redis 的持久化机制

留言评论

所有回复

暮色妖娆丶

96年草根站长,2019年7月接触互联网踏入Java开发岗位,喜欢前后端技术。对技术有强烈的渴望,2019年11月正式上线自己的个人博客