RabbitMQ 作为一个消息队列提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全可靠。
消息 (Message) 由 Client 发送,RabbitMQ 接收到消息之后通过交换机转发到对应的队列上面。Worker 会从队列中获取未被读取的数据处理。
交换机
有 4 种不同的交换机类型:
- 直连交换机:Direct Exchange
- 扇形交换机:Fanout Exchange
- 主题交换机:Topic Exchange
- 首部交换机:Headers Exchange
扇形交换机
扇形交换机是最基本的交换机类型,它所能做的事情非常简单———广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要“思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。
直连交换机
直连交换机是一种带路由功能的交换机,一个队列会和一个交换机绑定,除此之外再绑定一个 routing_key
,当消息被发送的时候,需要指定一个 binding_key
,这个消息被送达交换机的时候,就会被这个交换机送到指定的队列里面去。同样的一个 binding_key
也是支持应用到多个队列中的。
这样当一个交换机绑定多个队列,就会被送到对应的队列去处理。
适用场景:有优先级的任务,根据任务的优先级把消息发送到对应的队列,这样可以指派更多的资源去处理高优先级的队列。
主题交换机
直连交换机的 routing_key
方案非常简单,如果我们希望一条消息发送给多个队列,那么这个交换机需要绑定上非常多的 routing_key
,假设每个交换机上都绑定一堆的 routing_key
连接到各个队列上。那么消息的管理就会异常地困难。
所以 RabbitMQ
提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的 routing_key
,主题交换机会根据这个规则将数据发送到对应的(多个)队列上。
主题交换机的 routing_key
需要有一定的规则,交换机和队列的 binding_key
需要采用 *.#.*.....
的格式,每个部分用 .
分开,其中:
*
表示一个单词#
表示任意数量(零个或多个)单词。
假设有一条消息的 routing_key
为 fast.rabbit.white
,那么带有这样 binding_key
的几个队列都会接收这条消息:
- fast..
- ..white
- fast.#
- ……
这个图是网上找的,感觉对主题交换机的描述比较到位:
当一个队列的绑定键为 #
的时候,这个队列将会无视消息的路由键,接收所有的消息。
首部交换机
首部交换机是忽略 routing_key
的一种路由方式。路由器和交换机路由的规则是通过 Headers
信息来交换的,这个有点像 HTTP
的 Headers
。将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个 Hash
的数据结构,消息发送的时候,会携带一组 hash
数据结构的信息,当 Hash
的内容匹配上的时候,消息就会被写入队列。
绑定交换机和队列的时候,Hash
结构中要求携带一个键“x-match”,这个键的 Value
可以是 any
或者 all
,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。
Spring Boot 中的实例
推荐在 Docker 中安装运行 RabbitMQ,非常方便。
配置
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application.properties
#rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#消费者
spring.rabbitmq.listener.simple.concurrency=10
spring.rabbitmq.listener.simple.max-concurrency=10
#从队列中每次取x个
spring.rabbitmq.listener.simple.prefetch=1
#消费者自动启动
spring.rabbitmq.listener.simple.auto-startup=true
# 重试
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#发送者
spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000
spring.rabbitmq.template.retry.max-attempts=3
spring.rabbitmq.template.retry.max-interval=10000
spring.rabbitmq.template.retry.multiplier=1.0
直连交换机
配置类
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;
/**
* 最新版本的RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。
*/
@Configuration
public class MQConfig {
public static final String QUEUE = "queue";
@Bean
public Queue queue() {
return new Queue(QUEUE, true);
}
}
默认的交换机为直连交换机,也可以显式指定直连交换机
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;
@Configuration
public class MQConfig {
public static final String QUEUE = "queue";
public static final String DIRECT_EXCHANGE = "directExchage";
@Bean
public Queue queue() {
return new Queue(QUEUE, true);
}
/**
* direct模式
* 一个队列会和一个交换机绑定,除此之外再绑定一个routing_key。
*/
@Bean
public DirectExchange directExchange(){
return new DirectExchange(DIRECT_EXCHANGE);
}
@Bean
public Binding topicBinding() {
return BindingBuilder.bind(queue()).to(directExchange()).with("key");
}
}
发送者
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MQSender {
@Autowired
private AmqpTemplate amqpTemplate ;
public void send(Object message) {
System.out.println("send message:" + message.toString());
amqpTemplate.convertAndSend(MQConfig.QUEUE, message.toString());
}
}
如果采用了显式指定直连交换机则需要按照下面的方法来发送消息
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MQSender {
@Autowired
private AmqpTemplate amqpTemplate ;
public void sendDirect(Object message) {
System.out.println("send direct message:" + message.toString());
amqpTemplate.convertAndSend(MQConfig.DIRECT_EXCHANGE, "key", message.toString());
}
}
接收者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class MQReceiver {
@RabbitListener(queues=MQConfig.QUEUE)
public void receive(String message) {
System.out.println("receive direct message:" + message);
}
}
主题交换机
配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;
@Configuration
public class MQConfig {
public static final String QUEUE1 = "queue1";
public static final String QUEUE2 = "queue2";
public static final String TOPIC_EXCHANGE = "topicExchage";
@Bean
public Queue queue1() {
return new Queue(QUEUE1, true);
}
@Bean
public Queue queue2() {
return new Queue(QUEUE2, true);
}
/**
* Topic模式 交换机Exchange
* 先放入交换机中,通过with里参数筛选后再放入queue中。
* 直连交换机的routing_key方案非常简单,如果我们希望一条消息发送给多个队列,
* 那么这个交换机需要绑定上非常多的routing_key,
* 假设每个交换机上都绑定一堆的routing_key连接到各个队列上。那么消息的管理就会异常地困难。
* 所以RabbitMQ提供了一种主题交换机,发送到主题交换机上的消息需要携带指定规则的routing_key,
* 主题交换机会根据这个规则将数据发送到对应的(多个)队列上。
* 主题交换机支持通配符,* 代表一个字符,# 代表0个或多个字符
* */
@Bean
public TopicExchange topicExchange(){
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(queue1()).to(topicExchange()).with("topic.key1");
}
@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(queue2()).to(topicExchange()).with("topic.#");
}
}
发送者
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MQSender {
@Autowired
private AmqpTemplate amqpTemplate ;
public void sendTopic(Object message) {
System.out.println("send topic message:" + message.toString());
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", "第1条消息:" + message.toString());
amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", "第2条消息:" + message.toString());
}
}
接收者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class MQReceiver {
@RabbitListener(queues=MQConfig.QUEUE1)
public void receiveQueue1(String message) {
System.out.println("receive queue1 message:" + message);
}
@RabbitListener(queues=MQConfig.QUEUE2)
public void receiveQueue2(String message) {
System.out.println("receive queue2 message:" + message);
}
}
扇形交换机
配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;
@Configuration
public class MQConfig {
public static final String QUEUE1 = "queue1";
public static final String QUEUE2 = "queue2";
public static final String TOPIC_EXCHANGE = "topicExchage";
@Bean
public Queue queue1() {
return new Queue(QUEUE1, true);
}
@Bean
public Queue queue2() {
return new Queue(QUEUE2, true);
}
/**
* Fanout模式 交换机Exchange
* 先放入交换机中,不需要通过筛选,所有绑定的queue都能接收到。
* */
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding FanoutBinding1() {
return BindingBuilder.bind(queue1()).to(fanoutExchange());
}
@Bean
public Binding FanoutBinding2() {
return BindingBuilder.bind(queue2()).to(fanoutExchange());
}
}
发送者
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MQSender {
@Autowired
private AmqpTemplate amqpTemplate ;
public void sendFanout(Object message) {
System.out.println("send fanout message:" + message.toString());
amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", message.toString());
}
}
接收者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class MQReceiver {
@RabbitListener(queues=MQConfig.QUEUE1)
public void receiveQueue1(String message) {
System.out.println("receive queue1 message:" + message);
}
@RabbitListener(queues=MQConfig.QUEUE2)
public void receiveQueue2(String message) {
System.out.println("receive queue2 message:" + message);
}
}
首部交换机
配置类
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.amqp.core.Queue;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MQConfig {
public static final String HEADERS_QUEUE = "headers.queue";
public static final String HEADERS_EXCHANGE = "headersExchage";
@Bean
public Queue headersQueue() {
return new Queue(HEADERS_QUEUE, true);
}
/**
* Header模式 交换机Exchange
* 首部交换机是忽略routing_key的一种路由方式。
* 路由器和交换机路由的规则是通过Headers信息来交换的,这个有点像HTTP的Headers。
* 将一个交换机声明成首部交换机,绑定一个队列的时候,定义一个Hash的数据结构,
* 消息发送的时候,会携带一组hash数据结构的信息,当Hash的内容匹配上的时候,消息就会被写入队列。
* 绑定交换机和队列的时候,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,
* 这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。
* 相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)。
* */
@Bean
public HeadersExchange headersExchange(){
return new HeadersExchange(HEADERS_EXCHANGE);
}
@Bean
public Binding headersBinding() {
Map<String, Object> map = new HashMap<String, Object>();
map.put("header1", "value1");
map.put("header2", "value2");
//whereAll,whereAny,whereOne等等,有许多条件,含义参看文档
return BindingBuilder.bind(headersQueue1()).to(headersExchange()).whereAll(map).match();
}
}
发送者
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class MQSender {
@Autowired
private AmqpTemplate amqpTemplate ;
public void sendHeaders(Object message) {
System.out.println("send headers message:" + message.toString());
MessageProperties properties = new MessageProperties();
properties.setHeader("header1", "value1");
properties.setHeader("header2", "value2");
Message obj = new Message(msg.getBytes(), properties);
amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj);
}
}
接收者
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
@Service
public class MQReceiver {
@RabbitListener(queues=MQConfig.HEADERS_QUEUE)
public void receiveHeaderQueue(byte[] message) {
System.out.println("receive headers queue message:" + new String(message));
}
}