RabbitMQ笔记(一)
RabbitMQ 是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用
简介
使用场景
摘自 博客
以熟悉的电商场景为例,如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。
按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?
这就需要消息队列登场了。消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。
RabbitMQ 是一种消息队列的具体实现,由 Erlang 语言来编写,且 RabbitMQ 是基于 TCP 协议上的应用层协议 AMQP
概念
AMQP
:AMQP (Advanced Message Queuing Protocol,高级消息队列协议) 是一个进程间传递 异步消息 的 网络协议queue
:消息队列,用来保存消息,接受生产者消息,并供消费者消费producers
:生产消息,将消息丢入消息队列中consumers
:消费者,将消息从队列中取出进行消费exchange
:交换机,接收消息,按照路由规则将消息发送到绑定在交换机下的一个或者多个队列routingKey
:路由键,生产者将消息发送给交换器的时候,会发送一个routingKey
,用来指定路由规则
Hello world
流程图
producer
:生产者生产的消息并未直接发送至消息队列,而是发送至交换机,由交换机通过路由规则推送到相应的队列1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45public class Producer {
private static final String EXCHANGE_NAME = "exchange_direct_hello_world";
private static final String QUEUE_NAME = "queue_hello_world";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
// 连接到 rabbitmq 并获取其中到一个 channel
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明一个交换机
channel.exchangeDeclare(
EXCHANGE_NAME, // 交换机名称
BuiltinExchangeType.DIRECT // 交换机类型
// 四种类别
// DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");
);
// 声明队列
channel.queueDeclare(
QUEUE_NAME, // 队列名称
false, // 是否需要持久化,设置为 false 也会存盘,但是服务重启后会丢失
false, // 排他性
false, // 在队列中最后一个元素被消费后是否删除队列
null // 附属参数
);
// 将队列帮定至交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 生产者
channel.basicPublish( // 不同的交换机类型有不同的路由规则
EXCHANGE_NAME, // 发送至到交换机
"", // 携带的 路由Key
null, // 携带的 参数 args
"Hello world!".getBytes(StandardCharsets.UTF_8));
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}consumer
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25public class Consumer {
private static final String QUEUE_NAME = "queue_hello_world";
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 从队列中读取消息
channel.basicConsume(
QUEUE_NAME,
true, // 如果服务器在传递消息时应考虑已确认的消息,则为true;如果服务器需要显式确认,则为false
(consumerTag, delivery) -> { // 在传递消息时回调
System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));
},
consumerTag -> { // 取消使用者时的回调
}
);
}
}
四种交换机类型
RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种。
Fanout
:该类型不处理路由 Key,会把所有发送到交换器的消息机由到所有绑定的队列中,优点是转发消息最快,性能最好1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82// 生产者
public class Producer {
private static final String EXCHANGE_NAME = "exchange_fanout_hello_world";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机
channel.exchangeDeclare(
EXCHANGE_NAME,
BuiltinExchangeType.FANOUT
);
// 生产者
channel.basicPublish(
EXCHANGE_NAME,
"",
null,
"Hello fanout!".getBytes(StandardCharsets.UTF_8));
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
// 消费者
public class Consumer {
private static final String EXCHANGE_NAME = "exchange_fanout_hello_world";
public static void main(String[] args) {
new Thread(new ConsumerRunnable("fanout_queue_01")).start();
new Thread(new ConsumerRunnable("fanout_queue_02")).start();
new Thread(new ConsumerRunnable("fanout_queue_03")).start();
}
private static class ConsumerRunnable implements Runnable {
private final String queueName;
public ConsumerRunnable(String queueName) {
this.queueName = queueName;
}
public void run() {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(
queueName,
false,
false,
false,
null
);
channel.queueBind(queueName, EXCHANGE_NAME, "");
// 从队列中读取消息
channel.basicConsume(
queueName,
true,
(consumerTag, delivery) -> {
System.out.println(queueName + " "
+ new String(delivery.getBody(), StandardCharsets.UTF_8));
},
consumerTag -> {
}
);
}
}
}Direct
:生产者将消息推送到交换机,交换机将消息按照routingKey
分发到绑定在当前交换机下,且routingKey
对应相等的队列中1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86// 生产者
public class Producer {
private static final String EXCHANGE_NAME = "exchange_direct_hello_world";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机
channel.exchangeDeclare(
EXCHANGE_NAME,
BuiltinExchangeType.DIRECT
);
// 生产者
channel.basicPublish(
EXCHANGE_NAME,
"hello",
null,
"Hello direct!".getBytes(StandardCharsets.UTF_8));
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
// 消费者
public class Consumer {
private static final String EXCHANGE_NAME = "exchange_direct_hello_world";
public static void main(String[] args) {
new Thread(new ConsumerRunnable("direct_queue_01")).start();
new Thread(new ConsumerRunnable("direct_queue_02")).start();
new Thread(new ConsumerRunnable("direct_queue_03")).start();
}
private static class ConsumerRunnable implements Runnable {
private final String queueName;
public ConsumerRunnable(String queueName) {
this.queueName = queueName;
}
public void run() {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(
queueName,
false,
false,
false,
null
);
if (queueName.charAt(queueName.length() - 1) == '3') {
channel.queueBind(queueName, EXCHANGE_NAME, "world");
} else {
channel.queueBind(queueName, EXCHANGE_NAME, "hello");
}
// 从队列中读取消息
channel.basicConsume(
queueName,
true,
(consumerTag, delivery) -> {
System.out.println(queueName + " "
+ new String(delivery.getBody(), StandardCharsets.UTF_8));
},
consumerTag -> {
}
);
}
}
}Topic
:与上述Direct
类型差不多,只是 routingKey 并不是指定唯一的,而是 模糊匹配注:
*
:仅代表一个词,必须是一个#
:代表零到多个
eg:两个队列
queue_01
:cn.qh.*
queue_02
:cn.qh.#
多个匹配:
cn.qh.hznu
:queue_01
、queue_02
cn.qh.hznu.hangzhou
:queue_02
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108// 生产者
public class Producer {
private static final String EXCHANGE_NAME = "exchange_topic_hello_world";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明交换机
channel.exchangeDeclare(
EXCHANGE_NAME,
BuiltinExchangeType.TOPIC
);
// 生产者
channel.basicPublish(
EXCHANGE_NAME,
"cn.qh",
null,
"Hello topic!".getBytes(StandardCharsets.UTF_8));
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
// 消费者
public class Consumer {
private static final String EXCHANGE_NAME = "exchange_topic_hello_world";
public static void main(String[] args) {
new Thread(new ConsumerRunnable("topic_queue_01")).start();
new Thread(new ConsumerRunnable("topic_queue_02")).start();
new Thread(new ConsumerRunnable("topic_queue_03")).start();
}
private static class ConsumerRunnable implements Runnable {
private final String queueName;
public ConsumerRunnable(String queueName) {
this.queueName = queueName;
}
public void run() {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(
queueName,
false,
false,
false,
null
);
String routingKey = null;
switch (queueName.charAt(queueName.length() - 1)) {
case '1':
routingKey = "cn.qh";
break;
case '2':
routingKey = "cn.qh.*";
break;
case '3':
routingKey = "cn.qh.#";
break;
}
channel.queueBind(queueName, EXCHANGE_NAME, routingKey);
// 从队列中读取消息
channel.basicConsume(
queueName,
true,
(consumerTag, delivery) -> {
System.out.println(queueName + " "
+ new String(delivery.getBody(), StandardCharsets.UTF_8));
},
consumerTag -> {
}
);
}
}
}
/*
对应输入输出
routingKey = cn.qh
topic_queue_03 Hello topic!
topic_queue_01 Hello topic!
routingKey = cn.qh.hznu
topic_queue_03 Hello topic!
topic_queue_02 Hello topic!
routingKey = cn.qh.hznu.hangzhou
topic_queue_03 Hello topic!
*/Headers
:该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用
消费模式
上文中生产者通过交换机将消息路由到对应的队列中,对应之存在一个消费者在消费队列消息,故陆续消费消息即可。
但多个消费者同时消费一个队列的消息时的情况有两种,分别如下:
轮询,即多个消费者按顺序逐个消费,默认情况下就是这种消费方式
具体的演示代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109// 生产者
public class Producer {
private static final String EXCHANGE_NAME = "exchange_hello_world";
public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("admin");
factory.setPassword("admin");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(
EXCHANGE_NAME,
BuiltinExchangeType.DIRECT
);
for (int i = 0; i < 10; i++) {
// 生产者
channel.basicPublish(
EXCHANGE_NAME,
"",
null,
("Hello world!" + i).getBytes(StandardCharsets.UTF_8));
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
}
}
// 消费者
public class Consumer {
// 创建连接工厂
private static final ConnectionFactory FACTORY = new ConnectionFactory();
private static final String QUEUE_NAME = "queue_hello_world";
private static final String EXCHANGE_NAME = "exchange_hello_world";
static {
FACTORY.setHost("localhost");
FACTORY.setUsername("admin");
FACTORY.setPassword("admin");
}
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = FACTORY.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(
QUEUE_NAME, // 队列名称
false, // 是否需要持久化,设置为 false 也会存盘,但是服务重启后会丢失
false, // 排他性
false, // 在队列中最后一个元素被消费后是否删除队列
null // 附属参数
);
// 将队列帮定至交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
new Thread(new ConsumerRunnable("consumer_01")).start();
new Thread(new ConsumerRunnable("consumer_02")).start();
}
private static class ConsumerRunnable implements Runnable {
private final String consumerName;
public ConsumerRunnable(String consumerName) {
this.consumerName = consumerName;
}
public void run() {
Connection connection = FACTORY.newConnection();
Channel channel = connection.createChannel();
// 从队列中读取消息
channel.basicConsume(
QUEUE_NAME,
true,
(consumerTag, delivery) -> {
System.out.println(consumerName + " "
+ new String(delivery.getBody(), StandardCharsets.UTF_8));
},
consumerTag -> {
}
);
}
}
}
/*
生产者产生 10 条消息放入队列中
2 个消费者从队列中消费消息
输出如下:
consumer_02 Hello world!1
consumer_01 Hello world!0
consumer_01 Hello world!2
consumer_01 Hello world!4
consumer_02 Hello world!3
consumer_01 Hello world!6
consumer_01 Hello world!8
consumer_02 Hello world!5
consumer_02 Hello world!7
consumer_02 Hello world!9
*/通过输出不难发现,消费者们交替消费
公平消费,在实际场景中使用时,轮询明显不能满足要求,大多数情况下,每个消费者的消费能力 (服务器配置、网络等原因) 是不同的,比如消费者一消费一个消息需要 200ms 而消费者二需要 1000ms,现任这种情况下完成所有消息的消费时间是最长的,故产生了公平消费这种消费方式,按照消费者消费能力进行分配
改变消费方式需要两个步骤
- 设置
autoAck = false
- 开启手动
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80// 生产者代码不需要变动
// 消费者
public class Consumer {
// 创建连接工厂
private static final ConnectionFactory FACTORY = new ConnectionFactory();
private static final String QUEUE_NAME = "queue_hello_world";
private static final String EXCHANGE_NAME = "exchange_hello_world";
static {
FACTORY.setHost("localhost");
FACTORY.setUsername("admin");
FACTORY.setPassword("admin");
}
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = FACTORY.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(
QUEUE_NAME, // 队列名称
false, // 是否需要持久化,设置为 false 也会存盘,但是服务重启后会丢失
false, // 排他性
false, // 在队列中最后一个元素被消费后是否删除队列
null // 附属参数
);
// 将队列帮定至交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
new Thread(new ConsumerRunnable("consumer_01")).start();
new Thread(new ConsumerRunnable("consumer_02")).start();
}
private static class ConsumerRunnable implements Runnable {
private final String consumerName;
public ConsumerRunnable(String consumerName) {
this.consumerName = consumerName;
}
public void run() {
Connection connection = FACTORY.newConnection();
Channel channel = connection.createChannel();
// 设置消费者单次从队列中取出的消息条数
channel.basicQos(1);
long sleepTime = 0;
if (consumerName.charAt(consumerName.length() - 1) == '1') {
sleepTime = 200;
} else {
sleepTime = 1000;
}
// 从队列中读取消息
long finalSleepTime = sleepTime;
channel.basicConsume(
QUEUE_NAME,
false, // 关闭自动 ack
(consumerTag, delivery) -> {
try {
Thread.sleep(finalSleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(consumerName + " "
+ new String(delivery.getBody(), StandardCharsets.UTF_8));
// 手动 ack
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
},
consumerTag -> {
}
);
}
}
}- 设置