yeskery

RabbitMQ 的四种交换机

RabbitMQ 作为一个消息队列提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全可靠。

消息 (Message) 由 Client 发送,RabbitMQ 接收到消息之后通过交换机转发到对应的队列上面。Worker 会从队列中获取未被读取的数据处理。

RabbitMQ_Exchange

交换机

有 4 种不同的交换机类型:

  • 直连交换机Direct Exchange

  • 扇形交换机Fanout Exchange

  • 主题交换机Topic Exchange

  • 首部交换机Headers Exchange

扇形交换机

扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

RabbitMQ_Fanout_Exchange

直连交换机

直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个 routing_key,当消息被发送的时候,需要指定一个 binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个 binding_key 也是支持应用到多个队列中的。

这样当一个交换机绑定多个队列,就会被送到对应的队列去处理。

RabbitMQ_Direct_Exchange

适用场景:有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

主题交换机

直连交换机的 routing_key 方案非常简单,如果我们希望一条消息发送给多个队列,那么这个交换机需要绑定上非常多的 routing_key,假设每个交换机上都绑定一堆的 routing_key 连接到各个队列上。那么消息的管理就会异常地困难。

所以 RabbitMQ 提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的 routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。

主题交换机的 routing_key 需要有一定的规则,交换机和队列的 binding_key 需要采用 *.#.*..... 的格式,每个部分用 . 分开,其中:

  • * 表示一个单词

  • # 表示任意数量(零个或多个)单词。

假设有一条消息的 routing_keyfast.rabbit.white,那么带有这样 binding_key 的几个队列都会接收这条消息:

  1. fast..
  2. ..white
  3. fast.#
  4. ……

这个图是网上找的,感觉对主题交换机的描述比较到位:

RabbitMQ_Topic_Exchange

当一个队列的绑定键为 # 的时候,这个队列将会无视消息的路由键,接收所有的消息。

首部交换机

首部交换机是忽略 routing_key 的一种路由方式。路由器和交换机路由的规则是通过 Headers 信息来交换的,这个有点像 HTTPHeaders。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个 Hash 的数据结构,消息发送的时候,会携带一组 hash 数据结构的信息,当 Hash 的内容匹配上的时候,消息就会被写入队列。

绑定交换机和队列的时候,Hash 结构中要求携带一个键“x-match”,这个键的 Value 可以是 any 或者 all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。

Spring Boot 中的实例

推荐在 Docker 中安装运行 RabbitMQ,非常方便。

配置

pom.xml

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

application.properties

  1. #rabbitmq
  2. spring.rabbitmq.host=localhost
  3. spring.rabbitmq.port=5672
  4. spring.rabbitmq.username=guest
  5. spring.rabbitmq.password=guest
  6. spring.rabbitmq.virtual-host=/
  7. #消费者
  8. spring.rabbitmq.listener.simple.concurrency=10
  9. spring.rabbitmq.listener.simple.max-concurrency=10
  10. #从队列中每次取x个
  11. spring.rabbitmq.listener.simple.prefetch=1
  12. #消费者自动启动
  13. spring.rabbitmq.listener.simple.auto-startup=true
  14. # 重试
  15. spring.rabbitmq.listener.simple.default-requeue-rejected=true
  16. #发送者
  17. spring.rabbitmq.template.retry.enabled=true
  18. spring.rabbitmq.template.retry.initial-interval=1000
  19. spring.rabbitmq.template.retry.max-attempts=3
  20. spring.rabbitmq.template.retry.max-interval=10000
  21. spring.rabbitmq.template.retry.multiplier=1.0

直连交换机

配置类

  1. import org.springframework.context.annotation.Bean;
  2. import org.springframework.context.annotation.Configuration;
  3. import org.springframework.amqp.core.Queue;
  4. /**
  5. * 最新版本的RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。
  6. */
  7. @Configuration
  8. public class MQConfig {
  9. public static final String QUEUE = "queue";
  10. @Bean
  11. public Queue queue() {
  12. return new Queue(QUEUE, true);
  13. }
  14. }

默认的交换机为直连交换机,也可以显式指定直连交换机

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.DirectExchange;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.amqp.core.Queue;
  7. @Configuration
  8. public class MQConfig {
  9. public static final String QUEUE = "queue";
  10. public static final String DIRECT_EXCHANGE = "directExchage";
  11. @Bean
  12. public Queue queue() {
  13. return new Queue(QUEUE, true);
  14. }
  15. /**
  16. * direct模式
  17. * 一个队列会和一个交换机绑定,除此之外再绑定一个routing_key。
  18. */
  19. @Bean
  20. public DirectExchange directExchange(){
  21. return new DirectExchange(DIRECT_EXCHANGE);
  22. }
  23. @Bean
  24. public Binding topicBinding() {
  25. return BindingBuilder.bind(queue()).to(directExchange()).with("key");
  26. }
  27. }

发送者

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class MQSender {
  6. @Autowired
  7. private AmqpTemplate amqpTemplate ;
  8. public void send(Object message) {
  9. System.out.println("send message:" + message.toString());
  10. amqpTemplate.convertAndSend(MQConfig.QUEUE, message.toString());
  11. }
  12. }

如果采用了显式指定直连交换机则需要按照下面的方法来发送消息

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class MQSender {
  6. @Autowired
  7. private AmqpTemplate amqpTemplate ;
  8. public void sendDirect(Object message) {
  9. System.out.println("send direct message:" + message.toString());
  10. amqpTemplate.convertAndSend(MQConfig.DIRECT_EXCHANGE, "key", message.toString());
  11. }
  12. }

接收者

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class MQReceiver {
  5. @RabbitListener(queues=MQConfig.QUEUE)
  6. public void receive(String message) {
  7. System.out.println("receive direct message:" + message);
  8. }
  9. }

主题交换机

配置类

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.TopicExchange;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.amqp.core.Queue;
  7. @Configuration
  8. public class MQConfig {
  9. public static final String QUEUE1 = "queue1";
  10. public static final String QUEUE2 = "queue2";
  11. public static final String TOPIC_EXCHANGE = "topicExchage";
  12. @Bean
  13. public Queue queue1() {
  14. return new Queue(QUEUE1, true);
  15. }
  16. @Bean
  17. public Queue queue2() {
  18. return new Queue(QUEUE2, true);
  19. }
  20. /**
  21. * Topic模式 交换机Exchange
  22. * 先放入交换机中,通过with里参数筛选后再放入queue中。
  23. * 直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,
  24. * 那么这个交换机需要绑定上非常多的routing_key,
  25. * 假设每个交换机上都绑定一堆的routing_key连接到各个队列上。那么消息的管理就会异常地困难。
  26. * 所以RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,
  27. * 主题交换机会根据这个规则将数据发送到对应的(多个)队列上。
  28. * 主题交换机支持通配符,* 代表一个字符,# 代表0个或多个字符
  29. * */
  30. @Bean
  31. public TopicExchange topicExchange(){
  32. return new TopicExchange(TOPIC_EXCHANGE);
  33. }
  34. @Bean
  35. public Binding topicBinding1() {
  36. return BindingBuilder.bind(queue1()).to(topicExchange()).with("topic.key1");
  37. }
  38. @Bean
  39. public Binding topicBinding2() {
  40. return BindingBuilder.bind(queue2()).to(topicExchange()).with("topic.#");
  41. }
  42. }

发送者

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class MQSender {
  6. @Autowired
  7. private AmqpTemplate amqpTemplate ;
  8. public void sendTopic(Object message) {
  9. System.out.println("send topic message:" + message.toString());
  10. amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", "第1条消息:" + message.toString());
  11. amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", "第2条消息:" + message.toString());
  12. }
  13. }

接收者

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class MQReceiver {
  5. @RabbitListener(queues=MQConfig.QUEUE1)
  6. public void receiveQueue1(String message) {
  7. System.out.println("receive queue1 message:" + message);
  8. }
  9. @RabbitListener(queues=MQConfig.QUEUE2)
  10. public void receiveQueue2(String message) {
  11. System.out.println("receive queue2 message:" + message);
  12. }
  13. }

扇形交换机

配置类

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.FanoutExchange;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.amqp.core.Queue;
  7. @Configuration
  8. public class MQConfig {
  9. public static final String QUEUE1 = "queue1";
  10. public static final String QUEUE2 = "queue2";
  11. public static final String TOPIC_EXCHANGE = "topicExchage";
  12. @Bean
  13. public Queue queue1() {
  14. return new Queue(QUEUE1, true);
  15. }
  16. @Bean
  17. public Queue queue2() {
  18. return new Queue(QUEUE2, true);
  19. }
  20. /**
  21. * Fanout模式 交换机Exchange
  22. * 先放入交换机中,不需要通过筛选,所有绑定的queue都能接收到。
  23. * */
  24. @Bean
  25. public FanoutExchange fanoutExchange(){
  26. return new FanoutExchange(FANOUT_EXCHANGE);
  27. }
  28. @Bean
  29. public Binding FanoutBinding1() {
  30. return BindingBuilder.bind(queue1()).to(fanoutExchange());
  31. }
  32. @Bean
  33. public Binding FanoutBinding2() {
  34. return BindingBuilder.bind(queue2()).to(fanoutExchange());
  35. }
  36. }

发送者

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Service;
  4. @Service
  5. public class MQSender {
  6. @Autowired
  7. private AmqpTemplate amqpTemplate ;
  8. public void sendFanout(Object message) {
  9. System.out.println("send fanout message:" + message.toString());
  10. amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", message.toString());
  11. }
  12. }

接收者

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class MQReceiver {
  5. @RabbitListener(queues=MQConfig.QUEUE1)
  6. public void receiveQueue1(String message) {
  7. System.out.println("receive queue1 message:" + message);
  8. }
  9. @RabbitListener(queues=MQConfig.QUEUE2)
  10. public void receiveQueue2(String message) {
  11. System.out.println("receive queue2 message:" + message);
  12. }
  13. }

首部交换机

配置类

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.HeadersExchange;
  4. import org.springframework.context.annotation.Bean;
  5. import org.springframework.context.annotation.Configuration;
  6. import org.springframework.amqp.core.Queue;
  7. import java.util.HashMap;
  8. import java.util.Map;
  9. @Configuration
  10. public class MQConfig {
  11. public static final String HEADERS_QUEUE = "headers.queue";
  12. public static final String HEADERS_EXCHANGE = "headersExchage";
  13. @Bean
  14. public Queue headersQueue() {
  15. return new Queue(HEADERS_QUEUE, true);
  16. }
  17. /**
  18. * Header模式 交换机Exchange
  19. * 首部交换机是忽略routing_key的一种路由方式。
  20. * 路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers。
  21. * 将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,
  22. * 消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。
  23. * 绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,
  24. * 这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。
  25. * 相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。
  26. * */
  27. @Bean
  28. public HeadersExchange headersExchange(){
  29. return new HeadersExchange(HEADERS_EXCHANGE);
  30. }
  31. @Bean
  32. public Binding headersBinding() {
  33. Map<String, Object> map = new HashMap<String, Object>();
  34. map.put("header1", "value1");
  35. map.put("header2", "value2");
  36. //whereAll,whereAny,whereOne等等,有许多条件,含义参看文档
  37. return BindingBuilder.bind(headersQueue1()).to(headersExchange()).whereAll(map).match();
  38. }
  39. }

发送者

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.core.MessageProperties;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Service;
  6. @Service
  7. public class MQSender {
  8. @Autowired
  9. private AmqpTemplate amqpTemplate ;
  10. public void sendHeaders(Object message) {
  11. System.out.println("send headers message:" + message.toString());
  12. MessageProperties properties = new MessageProperties();
  13. properties.setHeader("header1", "value1");
  14. properties.setHeader("header2", "value2");
  15. Message obj = new Message(msg.getBytes(), properties);
  16. amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj);
  17. }
  18. }

接收者

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Service;
  3. @Service
  4. public class MQReceiver {
  5. @RabbitListener(queues=MQConfig.HEADERS_QUEUE)
  6. public void receiveHeaderQueue(byte[] message) {
  7. System.out.println("receive headers queue message:" + new String(message));
  8. }
  9. }

本文部分内容来自:
https://www.jianshu.com/p/469f4608ce5d

评论

发表评论 点击刷新验证码

提示

该功能暂未开放