RabbitMQ笔记(一)

RabbitMQ笔记(一)

RabbitMQ 是目前非常热门的一款消息中间件,不管是互联网大厂还是中小企业都在大量使用

简介

使用场景

摘自 博客

以熟悉的电商场景为例,如果商品服务和订单服务是两个不同的微服务,在下单的过程中订单服务需要调用商品服务进行扣库存操作。

按照传统的方式,下单过程要等到调用完毕之后才能返回下单成功,如果网络产生波动等原因使得商品服务扣库存延迟或者失败,会带来较差的用户体验,如果在高并发的场景下,这样的处理显然是不合适的,那怎么进行优化呢?

这就需要消息队列登场了。消息队列提供一个异步通信机制,消息的发送者不必一直等待到消息被成功处理才返回,而是立即返回。消息中间件负责处理网络通信,如果网络连接不可用,消息被暂存于队列当中,当网络畅通的时候在将消息转发给相应的应用程序或者服务,当然前提是这些服务订阅了该队列。如果在商品服务和订单服务之间使用消息中间件,既可以提高并发量,又降低服务之间的耦合度。

RabbitMQ 是一种消息队列的具体实现,由 Erlang 语言来编写,且 RabbitMQ 是基于 TCP 协议上的应用层协议 AMQP

概念

  • AMQP:AMQP (Advanced Message Queuing Protocol,高级消息队列协议) 是一个进程间传递 异步消息网络协议

  • queue:消息队列,用来保存消息,接受生产者消息,并供消费者消费

  • producers:生产消息,将消息丢入消息队列中

  • consumers:消费者,将消息从队列中取出进行消费
  • exchange:交换机,接收消息,按照路由规则将消息发送到绑定在交换机下的一个或者多个队列
  • routingKey:路由键,生产者将消息发送给交换器的时候,会发送一个 routingKey,用来指定路由规则

Hello world

流程图

img

  • producer:生产者生产的消息并未直接发送至消息队列,而是发送至交换机,由交换机通过路由规则推送到相应的队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    public class Producer {
    private static final String EXCHANGE_NAME = "exchange_direct_hello_world";
    private static final String QUEUE_NAME = "queue_hello_world";

    public static void main(String[] args) {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setUsername("admin");
    factory.setPassword("admin");

    // 连接到 rabbitmq 并获取其中到一个 channel
    try (Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()) {
    // 声明一个交换机
    channel.exchangeDeclare(
    EXCHANGE_NAME, // 交换机名称
    BuiltinExchangeType.DIRECT // 交换机类型
    // 四种类别
    // DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");
    );

    // 声明队列
    channel.queueDeclare(
    QUEUE_NAME, // 队列名称
    false, // 是否需要持久化,设置为 false 也会存盘,但是服务重启后会丢失
    false, // 排他性
    false, // 在队列中最后一个元素被消费后是否删除队列
    null // 附属参数
    );

    // 将队列帮定至交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

    // 生产者
    channel.basicPublish( // 不同的交换机类型有不同的路由规则
    EXCHANGE_NAME, // 发送至到交换机
    "", // 携带的 路由Key
    null, // 携带的 参数 args
    "Hello world!".getBytes(StandardCharsets.UTF_8));
    } catch (IOException | TimeoutException e) {
    e.printStackTrace();
    }
    }
    }
  • consumer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    public class Consumer {
    private static final String QUEUE_NAME = "queue_hello_world";

    public static void main(String[] args) throws IOException, TimeoutException {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setUsername("admin");
    factory.setPassword("admin");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    // 从队列中读取消息
    channel.basicConsume(
    QUEUE_NAME,
    true, // 如果服务器在传递消息时应考虑已确认的消息,则为true;如果服务器需要显式确认,则为false
    (consumerTag, delivery) -> { // 在传递消息时回调
    System.out.println(new String(delivery.getBody(), StandardCharsets.UTF_8));
    },
    consumerTag -> { // 取消使用者时的回调
    }
    );
    }
    }

四种交换机类型

官方案例

RabbitMQ常用的交换器类型有direct、topic、fanout、headers四种。

  • Fanout:该类型不处理路由 Key,会把所有发送到交换器的消息机由到所有绑定的队列中,优点是转发消息最快,性能最好

    img

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    // 生产者
    public class Producer {
    private static final String EXCHANGE_NAME = "exchange_fanout_hello_world";

    public static void main(String[] args) {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setUsername("admin");
    factory.setPassword("admin");

    try (Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()) {
    // 声明交换机
    channel.exchangeDeclare(
    EXCHANGE_NAME,
    BuiltinExchangeType.FANOUT
    );

    // 生产者
    channel.basicPublish(
    EXCHANGE_NAME,
    "",
    null,
    "Hello fanout!".getBytes(StandardCharsets.UTF_8));
    } catch (IOException | TimeoutException e) {
    e.printStackTrace();
    }
    }
    }

    // 消费者
    public class Consumer {
    private static final String EXCHANGE_NAME = "exchange_fanout_hello_world";

    public static void main(String[] args) {
    new Thread(new ConsumerRunnable("fanout_queue_01")).start();
    new Thread(new ConsumerRunnable("fanout_queue_02")).start();
    new Thread(new ConsumerRunnable("fanout_queue_03")).start();
    }

    private static class ConsumerRunnable implements Runnable {
    private final String queueName;

    public ConsumerRunnable(String queueName) {
    this.queueName = queueName;
    }

    @SneakyThrows
    @Override
    public void run() {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setUsername("admin");
    factory.setPassword("admin");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(
    queueName,
    false,
    false,
    false,
    null
    );
    channel.queueBind(queueName, EXCHANGE_NAME, "");

    // 从队列中读取消息
    channel.basicConsume(
    queueName,
    true,
    (consumerTag, delivery) -> {
    System.out.println(queueName + " "
    + new String(delivery.getBody(), StandardCharsets.UTF_8));
    },
    consumerTag -> {
    }
    );
    }
    }
    }
  • Direct:生产者将消息推送到交换机,交换机将消息按照 routingKey 分发到绑定在当前交换机下,且 routingKey 对应相等的队列中

    img

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    // 生产者
    public class Producer {
    private static final String EXCHANGE_NAME = "exchange_direct_hello_world";

    public static void main(String[] args) {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setUsername("admin");
    factory.setPassword("admin");

    try (Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()) {
    // 声明交换机
    channel.exchangeDeclare(
    EXCHANGE_NAME,
    BuiltinExchangeType.DIRECT
    );

    // 生产者
    channel.basicPublish(
    EXCHANGE_NAME,
    "hello",
    null,
    "Hello direct!".getBytes(StandardCharsets.UTF_8));
    } catch (IOException | TimeoutException e) {
    e.printStackTrace();
    }
    }
    }

    // 消费者
    public class Consumer {
    private static final String EXCHANGE_NAME = "exchange_direct_hello_world";

    public static void main(String[] args) {
    new Thread(new ConsumerRunnable("direct_queue_01")).start();
    new Thread(new ConsumerRunnable("direct_queue_02")).start();
    new Thread(new ConsumerRunnable("direct_queue_03")).start();
    }

    private static class ConsumerRunnable implements Runnable {
    private final String queueName;

    public ConsumerRunnable(String queueName) {
    this.queueName = queueName;
    }

    @SneakyThrows
    @Override
    public void run() {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setUsername("admin");
    factory.setPassword("admin");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(
    queueName,
    false,
    false,
    false,
    null
    );
    if (queueName.charAt(queueName.length() - 1) == '3') {
    channel.queueBind(queueName, EXCHANGE_NAME, "world");
    } else {
    channel.queueBind(queueName, EXCHANGE_NAME, "hello");
    }

    // 从队列中读取消息
    channel.basicConsume(
    queueName,
    true,
    (consumerTag, delivery) -> {
    System.out.println(queueName + " "
    + new String(delivery.getBody(), StandardCharsets.UTF_8));
    },
    consumerTag -> {
    }
    );
    }
    }
    }
  • Topic:与上述 Direct 类型差不多,只是 routingKey 并不是指定唯一的,而是 模糊匹配

    img

    注:

    • *:仅代表一个词,必须是一个
    • #:代表零到多个

    eg:两个队列

    • queue_01cn.qh.*
    • queue_02cn.qh.#

    多个匹配:

    • cn.qh.hznuqueue_01queue_02
    • cn.qh.hznu.hangzhouqueue_02
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    // 生产者
    public class Producer {
    private static final String EXCHANGE_NAME = "exchange_topic_hello_world";

    public static void main(String[] args) {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setUsername("admin");
    factory.setPassword("admin");

    try (Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()) {
    // 声明交换机
    channel.exchangeDeclare(
    EXCHANGE_NAME,
    BuiltinExchangeType.TOPIC
    );

    // 生产者
    channel.basicPublish(
    EXCHANGE_NAME,
    "cn.qh",
    null,
    "Hello topic!".getBytes(StandardCharsets.UTF_8));
    } catch (IOException | TimeoutException e) {
    e.printStackTrace();
    }
    }
    }

    // 消费者
    public class Consumer {
    private static final String EXCHANGE_NAME = "exchange_topic_hello_world";

    public static void main(String[] args) {
    new Thread(new ConsumerRunnable("topic_queue_01")).start();
    new Thread(new ConsumerRunnable("topic_queue_02")).start();
    new Thread(new ConsumerRunnable("topic_queue_03")).start();
    }

    private static class ConsumerRunnable implements Runnable {
    private final String queueName;

    public ConsumerRunnable(String queueName) {
    this.queueName = queueName;
    }

    @SneakyThrows
    @Override
    public void run() {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setUsername("admin");
    factory.setPassword("admin");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    // 声明队列
    channel.queueDeclare(
    queueName,
    false,
    false,
    false,
    null
    );
    String routingKey = null;
    switch (queueName.charAt(queueName.length() - 1)) {
    case '1':
    routingKey = "cn.qh";
    break;
    case '2':
    routingKey = "cn.qh.*";
    break;
    case '3':
    routingKey = "cn.qh.#";
    break;
    }
    channel.queueBind(queueName, EXCHANGE_NAME, routingKey);

    // 从队列中读取消息
    channel.basicConsume(
    queueName,
    true,
    (consumerTag, delivery) -> {
    System.out.println(queueName + " "
    + new String(delivery.getBody(), StandardCharsets.UTF_8));
    },
    consumerTag -> {
    }
    );
    }
    }
    }

    /*
    对应输入输出
    routingKey = cn.qh
    topic_queue_03 Hello topic!
    topic_queue_01 Hello topic!

    routingKey = cn.qh.hznu
    topic_queue_03 Hello topic!
    topic_queue_02 Hello topic!

    routingKey = cn.qh.hznu.hangzhou
    topic_queue_03 Hello topic!
    */
  • Headers:该类型的交换器不依赖路由规则来路由消息,而是根据消息内容中的headers属性进行匹配。headers类型交换器性能差,在实际中并不常用

消费模式

上文中生产者通过交换机将消息路由到对应的队列中,对应之存在一个消费者在消费队列消息,故陆续消费消息即可。

但多个消费者同时消费一个队列的消息时的情况有两种,分别如下:

  • 轮询,即多个消费者按顺序逐个消费,默认情况下就是这种消费方式

    具体的演示代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    // 生产者
    public class Producer {
    private static final String EXCHANGE_NAME = "exchange_hello_world";

    public static void main(String[] args) {
    // 创建连接工厂
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    factory.setUsername("admin");
    factory.setPassword("admin");

    try (Connection connection = factory.newConnection();
    Channel channel = connection.createChannel()) {
    channel.exchangeDeclare(
    EXCHANGE_NAME,
    BuiltinExchangeType.DIRECT
    );

    for (int i = 0; i < 10; i++) {
    // 生产者
    channel.basicPublish(
    EXCHANGE_NAME,
    "",
    null,
    ("Hello world!" + i).getBytes(StandardCharsets.UTF_8));
    }
    } catch (IOException | TimeoutException e) {
    e.printStackTrace();
    }
    }
    }

    // 消费者
    public class Consumer {
    // 创建连接工厂
    private static final ConnectionFactory FACTORY = new ConnectionFactory();

    private static final String QUEUE_NAME = "queue_hello_world";
    private static final String EXCHANGE_NAME = "exchange_hello_world";

    static {
    FACTORY.setHost("localhost");
    FACTORY.setUsername("admin");
    FACTORY.setPassword("admin");
    }

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = FACTORY.newConnection();
    Channel channel = connection.createChannel();

    // 声明队列
    channel.queueDeclare(
    QUEUE_NAME, // 队列名称
    false, // 是否需要持久化,设置为 false 也会存盘,但是服务重启后会丢失
    false, // 排他性
    false, // 在队列中最后一个元素被消费后是否删除队列
    null // 附属参数
    );

    // 将队列帮定至交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

    new Thread(new ConsumerRunnable("consumer_01")).start();
    new Thread(new ConsumerRunnable("consumer_02")).start();

    }

    private static class ConsumerRunnable implements Runnable {
    private final String consumerName;

    public ConsumerRunnable(String consumerName) {
    this.consumerName = consumerName;
    }

    @SneakyThrows
    @Override
    public void run() {
    Connection connection = FACTORY.newConnection();
    Channel channel = connection.createChannel();
    // 从队列中读取消息
    channel.basicConsume(
    QUEUE_NAME,
    true,
    (consumerTag, delivery) -> {
    System.out.println(consumerName + " "
    + new String(delivery.getBody(), StandardCharsets.UTF_8));
    },
    consumerTag -> {
    }
    );
    }
    }
    }

    /*
    生产者产生 10 条消息放入队列中
    2 个消费者从队列中消费消息
    输出如下:
    consumer_02 Hello world!1
    consumer_01 Hello world!0
    consumer_01 Hello world!2
    consumer_01 Hello world!4
    consumer_02 Hello world!3
    consumer_01 Hello world!6
    consumer_01 Hello world!8
    consumer_02 Hello world!5
    consumer_02 Hello world!7
    consumer_02 Hello world!9
    */

    通过输出不难发现,消费者们交替消费

  • 公平消费,在实际场景中使用时,轮询明显不能满足要求,大多数情况下,每个消费者的消费能力 (服务器配置、网络等原因) 是不同的,比如消费者一消费一个消息需要 200ms 而消费者二需要 1000ms,现任这种情况下完成所有消息的消费时间是最长的,故产生了公平消费这种消费方式,按照消费者消费能力进行分配

    改变消费方式需要两个步骤

    • 设置 autoAck = false
    • 开启手动 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    // 生产者代码不需要变动

    // 消费者
    public class Consumer {
    // 创建连接工厂
    private static final ConnectionFactory FACTORY = new ConnectionFactory();

    private static final String QUEUE_NAME = "queue_hello_world";
    private static final String EXCHANGE_NAME = "exchange_hello_world";

    static {
    FACTORY.setHost("localhost");
    FACTORY.setUsername("admin");
    FACTORY.setPassword("admin");
    }

    public static void main(String[] args) throws IOException, TimeoutException {
    Connection connection = FACTORY.newConnection();
    Channel channel = connection.createChannel();

    // 声明队列
    channel.queueDeclare(
    QUEUE_NAME, // 队列名称
    false, // 是否需要持久化,设置为 false 也会存盘,但是服务重启后会丢失
    false, // 排他性
    false, // 在队列中最后一个元素被消费后是否删除队列
    null // 附属参数
    );

    // 将队列帮定至交换机
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

    new Thread(new ConsumerRunnable("consumer_01")).start();
    new Thread(new ConsumerRunnable("consumer_02")).start();

    }

    private static class ConsumerRunnable implements Runnable {
    private final String consumerName;

    public ConsumerRunnable(String consumerName) {
    this.consumerName = consumerName;
    }

    @SneakyThrows
    @Override
    public void run() {
    Connection connection = FACTORY.newConnection();
    Channel channel = connection.createChannel();
    // 设置消费者单次从队列中取出的消息条数
    channel.basicQos(1);

    long sleepTime = 0;
    if (consumerName.charAt(consumerName.length() - 1) == '1') {
    sleepTime = 200;
    } else {
    sleepTime = 1000;
    }
    // 从队列中读取消息
    long finalSleepTime = sleepTime;
    channel.basicConsume(
    QUEUE_NAME,
    false, // 关闭自动 ack
    (consumerTag, delivery) -> {
    try {
    Thread.sleep(finalSleepTime);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(consumerName + " "
    + new String(delivery.getBody(), StandardCharsets.UTF_8));
    // 手动 ack
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    },
    consumerTag -> {
    }
    );
    }
    }
    }