登录后台

页面导航

本文编写于 2259 天前,最后修改于 1464 天前,其中某些信息可能已经过时。

RabbitMQ 作为一个消息队列提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全可靠。

消息 (Message) 由 Client 发送,RabbitMQ 接收到消息之后通过交换机转发到对应的队列上面。Worker 会从队列中获取未被读取的数据处理。

title=

交换机

有 4 种不同的交换机类型:

  • 直连交换机Direct Exchange
  • 扇形交换机Fanout Exchange
  • 主题交换机Topic Exchange
  • 首部交换机Headers Exchange

扇形交换机

扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

title=

直连交换机

直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个 routing_key,当消息被发送的时候,需要指定一个 binding_key,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个 binding_key 也是支持应用到多个队列中的。

这样当一个交换机绑定多个队列,就会被送到对应的队列去处理。

title=

适用场景:有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。

主题交换机

直连交换机的 routing_key 方案非常简单,如果我们希望一条消息发送给多个队列,那么这个交换机需要绑定上非常多的 routing_key,假设每个交换机上都绑定一堆的 routing_key 连接到各个队列上。那么消息的管理就会异常地困难。

所以 RabbitMQ 提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的 routing_key,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。

主题交换机的 routing_key 需要有一定的规则,交换机和队列的 binding_key 需要采用 *.#.*..... 的格式,每个部分用 . 分开,其中:

  • * 表示一个单词
  • # 表示任意数量(零个或多个)单词。

假设有一条消息的 routing_keyfast.rabbit.white,那么带有这样 binding_key 的几个队列都会接收这条消息:

  1. fast..
  2. ..white
  3. fast.#
  4. ……

这个图是网上找的,感觉对主题交换机的描述比较到位:

title=

当一个队列的绑定键为 # 的时候,这个队列将会无视消息的路由键,接收所有的消息。

首部交换机

首部交换机是忽略 routing_key 的一种路由方式。路由器和交换机路由的规则是通过 Headers 信息来交换的,这个有点像 HTTPHeaders。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个 Hash 的数据结构,消息发送的时候,会携带一组 hash 数据结构的信息,当 Hash 的内容匹配上的时候,消息就会被写入队列。

绑定交换机和队列的时候,Hash 结构中要求携带一个键“x-match”,这个键的 Value 可以是 any 或者 all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。

Spring Boot 中的实例

推荐在 Docker 中安装运行 RabbitMQ,非常方便。

配置

pom.xml

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

application.properties

#rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#消费者
spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.simple.max-concurrency=10
#从队列中每次取x个
spring.rabbitmq.listener.simple.prefetch=1
#消费者自动启动
spring.rabbitmq.listener.simple.auto-startup=true
# 重试
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#发送者
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0

直连交换机

配置类

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;

/**
 * 最新版本的RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。
 */
@Configuration
public class MQConfig {
    public static final String QUEUE = "queue";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE, true);
    }
}

默认的交换机为直连交换机,也可以显式指定直连交换机

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.springframework.amqp.core.Queue;

@Configuration
public class MQConfig {
    public static final String QUEUE = "queue";
    public static final String DIRECT_EXCHANGE = "directExchage";

    @Bean
    public Queue queue() {
        return new Queue(QUEUE, true);
    }

    /**
     * direct模式
     * 一个队列会和一个交换机绑定,除此之外再绑定一个routing_key。
     */
    @Bean
    public DirectExchange directExchange(){
        return new DirectExchange(DIRECT_EXCHANGE);
    }

    @Bean
    public Binding topicBinding() {
        return BindingBuilder.bind(queue()).to(directExchange()).with("key");
    }
}

发送者

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
public class MQSender {

    @Autowired
    private AmqpTemplate amqpTemplate ;

    public void send(Object message) {
        System.out.println("send message:" + message.toString());
        amqpTemplate.convertAndSend(MQConfig.QUEUE, message.toString());
    }
}

如果采用了显式指定直连交换机则需要按照下面的方法来发送消息

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
public class MQSender {

    @Autowired
    private AmqpTemplate amqpTemplate ;

    public void sendDirect(Object message) {
        System.out.println("send direct message:" + message.toString());
        amqpTemplate.convertAndSend(MQConfig.DIRECT_EXCHANGE, "key", message.toString());
    }
}

接收者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class MQReceiver {

    @RabbitListener(queues=MQConfig.QUEUE)
    public void receive(String message) {
        System.out.println("receive direct message:" + message);
    }
}

主题交换机

配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.springframework.amqp.core.Queue;

@Configuration
public class MQConfig {
    public static final String QUEUE1 = "queue1";
    public static final String QUEUE2 = "queue2";
    public static final String TOPIC_EXCHANGE = "topicExchage";

    @Bean
    public Queue queue1() {
        return new Queue(QUEUE1, true);
    }

    @Bean
    public Queue queue2() {
        return new Queue(QUEUE2, true);
    }
    /**
     * Topic模式 交换机Exchange
     * 先放入交换机中,通过with里参数筛选后再放入queue中。
     * 直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,
     * 那么这个交换机需要绑定上非常多的routing_key,
     * 假设每个交换机上都绑定一堆的routing_key连接到各个队列上。那么消息的管理就会异常地困难。
     * 所以RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,
     * 主题交换机会根据这个规则将数据发送到对应的(多个)队列上。
     * 主题交换机支持通配符,* 代表一个字符,# 代表0个或多个字符
     * */
    @Bean
    public TopicExchange topicExchange(){
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Binding topicBinding1() {
        return BindingBuilder.bind(queue1()).to(topicExchange()).with("topic.key1");
    }

    @Bean
    public Binding topicBinding2() {
        return BindingBuilder.bind(queue2()).to(topicExchange()).with("topic.#");
    }
}

发送者

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;


@Service
public class MQSender {

    @Autowired
    private AmqpTemplate amqpTemplate ;

    public void sendTopic(Object message) {
        System.out.println("send topic message:" + message.toString());
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", "第1条消息:" + message.toString());
        amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", "第2条消息:" + message.toString());
    }
}

接收者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class MQReceiver {

    @RabbitListener(queues=MQConfig.QUEUE1)
    public void receiveQueue1(String message) {
        System.out.println("receive queue1 message:" + message);
    }

    @RabbitListener(queues=MQConfig.QUEUE2)
    public void receiveQueue2(String message) {
        System.out.println("receive queue2 message:" + message);
    }
}

扇形交换机

配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.springframework.amqp.core.Queue;

@Configuration
public class MQConfig {
    public static final String QUEUE1 = "queue1";
    public static final String QUEUE2 = "queue2";
    public static final String TOPIC_EXCHANGE = "topicExchage";

    @Bean
    public Queue queue1() {
        return new Queue(QUEUE1, true);
    }

    @Bean
    public Queue queue2() {
        return new Queue(QUEUE2, true);
    }

    /**
     * Fanout模式 交换机Exchange
     * 先放入交换机中,不需要通过筛选,所有绑定的queue都能接收到。
     * */
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange(FANOUT_EXCHANGE);
    }
    @Bean
    public Binding FanoutBinding1() {
        return BindingBuilder.bind(queue1()).to(fanoutExchange());
    }
    @Bean
    public Binding FanoutBinding2() {
        return BindingBuilder.bind(queue2()).to(fanoutExchange());
    }
}

发送者

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MQSender {

    @Autowired
    private AmqpTemplate amqpTemplate ;

    public void sendFanout(Object message) {
        System.out.println("send fanout message:" + message.toString());
        amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", message.toString());
    }
}

接收者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class MQReceiver {

    @RabbitListener(queues=MQConfig.QUEUE1)
    public void receiveQueue1(String message) {
        System.out.println("receive queue1 message:" + message);
    }

    @RabbitListener(queues=MQConfig.QUEUE2)
    public void receiveQueue2(String message) {
        System.out.println("receive queue2 message:" + message);
    }
}

首部交换机

配置类

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.springframework.amqp.core.Queue;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MQConfig {
    public static final String HEADERS_QUEUE = "headers.queue";
    public static final String HEADERS_EXCHANGE = "headersExchage";

    @Bean
    public Queue headersQueue() {
        return new Queue(HEADERS_QUEUE, true);
    }

    /**
     * Header模式 交换机Exchange
     * 首部交换机是忽略routing_key的一种路由方式。
     * 路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers。
     * 将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,
     * 消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。
     * 绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,
     * 这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。
     * 相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。
     * */
    @Bean
    public HeadersExchange headersExchange(){
        return new HeadersExchange(HEADERS_EXCHANGE);
    }

    @Bean
    public Binding headersBinding() {
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("header1", "value1");
        map.put("header2", "value2");
        //whereAll,whereAny,whereOne等等,有许多条件,含义参看文档
        return BindingBuilder.bind(headersQueue1()).to(headersExchange()).whereAll(map).match();
    }
}

发送者

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class MQSender {

    @Autowired
    private AmqpTemplate amqpTemplate ;

    public void sendHeaders(Object message) {
        System.out.println("send headers message:" + message.toString());

        MessageProperties properties = new MessageProperties();
        properties.setHeader("header1", "value1");
        properties.setHeader("header2", "value2");
        Message obj = new Message(msg.getBytes(), properties);

        amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj);
    }
}

接收者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;

@Service
public class MQReceiver {

    @RabbitListener(queues=MQConfig.HEADERS_QUEUE)
    public void receiveHeaderQueue(byte[] message) {
        System.out.println("receive headers queue message:" + new String(message));
    }
}

本文部分内容来自:
https://www.jianshu.com/p/469f4608ce5d