RabbitMQ详解

初识RabbitMQ

RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang语言编写的,并且RabbitMQ是基于AMQP协议的。

  • 目前很多互联网大厂都在使用RabbitMQ
  • RabbitMQ底层使用Erlang语言编写
  • 开源、性能优秀、稳定性保障
  • 与SpringAMQP完美整合、API丰富
  • 集群模式丰富,表达式配置,HA模式,镜像队列模式
  • 保证数据不丢失的前提做到高可用性,高可靠性
  • AMQP全称:Advanced Message Queuing Protocol(高级消息队列协议)

AMQP协议模型

AMQP核心概论

Virtual host: 虚拟地址,用于进行逻辑隔离,最上层的消息路由。一个Virtual Host里面可以有若干个Exchange和Queue,同时一个Virtual Host里面不能有相同名称的Exchange和Queue

Exchange: 交换机,接收消息,根据路由键转发消息到绑定的队列

Binding: Exchange和Queue之间的虚拟连接,binding中可以包含routing key

Routing key: 一个路由规则,虚拟机可用它来确定如何路由一个特定消息

Queue: 也称为Message Queue,消息队列,保存消息并将它们转发给消费者

RabbitMQ消息流转

RabbitMQ整合SpringBoot2.x

SpringBoot与RabbitMQ集成非常简单,不需要任何额外设置只需要两步即可:

引入相关依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置application.properties

1
2
3
4
5
6
7
8
9
# rabbitmq服务地址
spring.rabbitmq.addresses=192.168.8.193:5672
# rabbitmq用户名密码
spring.rabbitmq.username=admin
spring.rabbitmq.password=admin
# 虚拟地址
spring.rabbitmq.virtual-host=/
# 超时设置
spring.rabbitmq.connection-timeout=15000

代码配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package com.marspie.config;

import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* Copyright (c) 2019, The Marspie Open Source Project
* 配置方法一: application.properties
* 配置方法二: 代码配置ConnectionFactory
* @author alex
* @date 2019/1/11 11 32
* @email yaonew@126.com
* @blog http://nsoft.vip
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
@Configuration
public class RabbitMqConfig {

/* @Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses("192.168.48.51:5672");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);//消息确认
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}*/

//topic
public static final String TOPIC_QUEUE1 = "topic.queue1";
public static final String TOPIC_QUEUE2 = "topic.queue2";
public static final String TOPIC_EXCHANGE = "topic.exchange";
// routingKey
public static final String TOPIC_ROUTING_KEY1 = "order.message";
public static final String TOPIC_ROUTING_KEY2 = "order.#";

//fanout
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_QUEUE2 = "fanout.queue2";
public static final String FANOUT_EXCHANGE = "fanout.exchange";

//redirect模式
public static final String DIRECT_QUEUE1 = "direct.queue1";
public static final String DIRECT_EXCHANGE = "direct.exchange";
public static final String DIRECT_QUEUE2 ="direct.queue2" ;
// routingKey
public static final String DIRECT_ROUTING_KEY = "direct.order";

/**
* Topic模式
* 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。
* 因此“order.#”能够匹配到“order.irs.corporate”,但是“order.*” 只会匹配到“order.irs”。
* @return
*/
@Bean
public Queue topicQueue1() {
return new Queue(TOPIC_QUEUE1);
}

@Bean
public Queue topicQueue2() {
return new Queue(TOPIC_QUEUE2);
}

@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}

@Bean
public Binding topicBinding1() {
return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with(TOPIC_ROUTING_KEY1);
}

@Bean
public Binding topicBinding2() {
return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with(TOPIC_ROUTING_KEY2);
}


/**
* Fanout模式
* Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
* @return
*/
@Bean
public Queue fanoutQueue1() {
return new Queue(FANOUT_QUEUE1);
}

@Bean
public Queue fanoutQueue2() {
return new Queue(FANOUT_QUEUE2);
}

@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}

@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}

@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());
}

/**
* direct模式
* 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配.
* 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “test”,则只有被标记为“test”的消息才被转发,不会转发test.aaa,也不会转发dog.bbb,只会转发test
* @return
*/
@Bean
public Queue directQueue1() {
return new Queue(DIRECT_QUEUE1);
}

@Bean
public DirectExchange directExchange() {
return new DirectExchange(DIRECT_EXCHANGE);
}

@Bean
public Binding directBinding1() {
return BindingBuilder.bind(directQueue1()).to(directExchange()).with(DIRECT_ROUTING_KEY);
}
}

Topic模式

将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。 因此“order.#”能够匹配到“order.irs.corporate”,但是“order.*” 只会匹配到“order.irs”。

-生产者

1
2
3
4
5
6
7
8
9
10
@Component
public class TopicSender {
@Autowired
private AmqpTemplate rabbitTemplate;

public void send(Order order) {
this.rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE,"order.message", order);
this.rabbitTemplate.convertAndSend(RabbitMqConfig.TOPIC_EXCHANGE,"order.ha", order);
}
}

-消费者

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class TopicReceiver {
// queues是指要监听的队列的名字
@RabbitListener(queues = RabbitMqConfig.TOPIC_QUEUE1)
public void receiveTopic1(Order order) {
System.out.println("【receiveTopic1监听到消息】" + order);
}
@RabbitListener(queues = RabbitMqConfig.TOPIC_QUEUE2)
public void receiveTopic2(Order order) {
System.out.println("【receiveTopic2监听到消息】" + order);
}
}

Fanout模式

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
-生产者

1
2
3
4
5
6
7
8
9
10
@Component
public class FanoutSender {

@Autowired
private AmqpTemplate rabbitTemplate;

public void send(Order order) {
this.rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE, "", order);
}
}

-消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class FanoutReceiver {

// queues是指要监听的队列的名字
@RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE1)
public void receiveTopic1(Order order) {
System.out.println("【receiveFanout1监听到消息】" + order);
}

@RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE2)
public void receiveTopic2(Order order) {
System.out.println("【receiveFanout2监听到消息】" + order);
}
}

direct模式

消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配.这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “test”,则只有被标记为“test”的消息才被转发,不会转发test.aaa,也不会转发dog.bbb,只会转发test。
-生产者

1
2
3
4
5
6
7
8
9
10
@Component
public class DirectSender {

@Autowired
private RabbitTemplate rabbitTemplate;

public void send(Order order) {
this.rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE, "direct.order", order);
}
}

-消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Component
public class DirectReceiver {
// queues是指要监听的队列的名字
@RabbitListener(queues = RabbitMqConfig.DIRECT_QUEUE1)
public void receiveDirect1(Order order) {
System.out.println("【receiveDirect1监听到消息】" + order);
}

@RabbitListener(queues = RabbitMqConfig.DIRECT_QUEUE1)
public void receiveDirect2(Order order) {
System.out.println("【receiveDirect2监听到消息】" + order);
}

}

GitHub源码
码云源码

-------------本文结束感谢您的阅读-------------
Alex.Yao wechat
欢迎您扫一扫上面的微信公众号,订阅我的博客!
坚持原创技术分享,您的支持将鼓励我继续创作!