HelloRabbitMQ

目标

生产者发送消息,消费者接收消息,用最简单的方式实现

官网说明参见下面超链接:

RabbitMQ tutorial - “Hello World!” — RabbitMQ

image-20240829163431320

具体操作

创建Java工程

  1. 创建一个空项目
image-20240829163221890
  1. 新建一个模块
image-20240829163335410
  1. 添加依赖
1
2
3
4
5
6
7
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.20.0</version>
</dependency>
</dependencies>

发送消息

  1. Java代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {
public static void main(String[] args) throws Exception {
// 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 设置主机地址
connectionFactory.setHost("localhost");
// 设置连接端口号:默认为 5672
connectionFactory.setPort(5672);
// 虚拟主机名称:默认为 /
connectionFactory.setVirtualHost("/");
// 设置连接用户名;默认为guest
connectionFactory.setUsername("guest");
// 设置连接密码;默认为guest
connectionFactory.setPassword("123456");
// 创建连接
Connection connection = connectionFactory.newConnection();
// 创建频道
Channel channel = connection.createChannel();
// 声明(创建)队列
// queue 参数1:队列名称
// durable 参数2:是否定义持久化队列,当 MQ 重启之后还在
// exclusive 参数3:是否独占本次连接。若独占,只能有一个消费者监听这个队列且 Connection 关闭时删除这个队列
// autoDelete 参数4:是否在不使用的时候自动删除队列,也就是在没有Consumer时自动删除
// arguments 参数5:队列其它参数
channel.queueDeclare("simple_queue", true, false, false, null);
// 要发送的信息
String message = "你好;RabbitMQ!";
// 参数1:交换机名称,如果没有指定则使用默认Default Exchange
// 参数2:路由key,简单模式可以传递队列名称
// 参数3:配置信息
// 参数4:消息内容
channel.basicPublish("", "simple_queue", null, message.getBytes());
System.out.println("已发送消息:" + message);
// 关闭资源
channel.close();
connection.close();
}
}

  1. 运行程序

image-20240829164554308

  1. 查看效果

image-20240829164747776

image-20240829164917637

接收消息

  1. Java代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import com.rabbitmq.client.*;
import java.io.IOException;

public class Consumer {
public static void main(String[] args) throws Exception {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 2. 设置参数
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
// 3. 创建连接 Connection
Connection connection = factory.newConnection();
// 4. 创建Channel
Channel channel = connection.createChannel();
// 5. 创建队列
// 如果没有一个名字叫simple_queue的队列,则会创建该队列,如果有则不会创建
// 参数1. queue:队列名称
// 参数2. durable:是否持久化。如果持久化,则当MQ重启之后还在
// 参数3. exclusive:是否独占。
// 参数4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
// 参数5. arguments:其它参数。
channel.queueDeclare("simple_queue",true,false,false,null);
// 接收消息
DefaultConsumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
// 参数1. consumerTag:标识
// 参数2. envelope:获取一些信息,交换机,路由key...
// 参数3. properties:配置信息
// 参数4. body:数据
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
// 参数1. queue:队列名称
// 参数2. autoAck:是否自动确认,类似咱们发短信,发送成功会收到一个确认消息
// 参数3. callback:回调对象
// 消费者类似一个监听程序,主要是用来监听消息
channel.basicConsume("simple_queue",true,consumer);
}
}
  1. 运行程序

image-20240829165326753

  1. 查看后台管理界面

因为消息被消费掉了,所以RabbitMQ服务器上没有了,即消息数量为0:

image-20240829165347416

工作模式

RabbitMQ有7种工作模式:

image-20240829165839070

下面一一进行讲解

工作队列(Work Queues)模式

概述

本质上我们刚刚写的HelloWorld程序就是这种模式,只是简化到了最简单的情况:

  • 生产者只有一个
  • 发送一个消息
  • 消费者也只有一个,消息也只能被这个消费者消费

所以HelloWorld也称为简单模式。

现在我们还原一下常规情况:

  • 生产者发送多个消息
  • 由多个消费者来竞争
  • 谁抢到算谁的
image-20240829170029591

生产者代码

封装工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConnectionUtil {
public static final String HOST_ADDRESS = "localhost";
public static Connection getConnection() throws Exception {
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost(HOST_ADDRESS);
// 端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("123456");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}

编写代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import blog.yuanyuan.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
public static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 1; i <= 10; i++) {
String body = i + "hello rabbitmq!!!";
channel.basicPublish("", QUEUE_NAME, null, body.getBytes());
}
channel.close();
connection.close();
}
}

发送消息效果

image-20240829170843042

消费者代码

编写代码

创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import blog.yuanyuan.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;

public class Consumer1 {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Consumer1 body:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

运行的时候先启动两个消费端程序,然后再启动生产者端程序。如果已经运行过生产者程序,则手动把work_queue队列删掉。

运行效果

最终两个消费端程序竞争结果如下:

image-20240829171454720

结论

  • 多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系。
  • Work Queues工作模式适用于任务较重或任务较多的情况,多消费者分摊任务可以提高消息处理的效率。

发布订阅(Publish/Subscribe)模式

概述

引入新角色:交换机。这时生产者不是把消息直接发送到队列,而是发送到交换机。交换机接收消息,而如何处理消息取决于交换机的类型。

交换机有如下3种常见类型:

  • Fanout:广播,将消息发送给所有绑定到交换机的队列
  • Direct:定向,把消息交给符合指定routing key的队列
  • Topic:通配符,把消息交给符合routing pattern(路由模式)的队列

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

模式说明

组件之间关系:

  • 生产者把消息发送到交换机
  • 队列直接和交换机绑定

工作机制:消息发送到交换机上,就会以广播的形式发送给所有已绑定队列

理解概念:

  • Publish:发布,这里就是把消息发送到交换机上
  • Subscribe:订阅,这里只要把队列和交换机绑定,事实上就形成了一种订阅关系

生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import blog.yuanyuan.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
public static void main(String[] args) throws Exception {
// 1、获取连接
Connection connection = ConnectionUtil.getConnection();
// 2、创建频道
Channel channel = connection.createChannel();
// 参数1. exchange:交换机名称
// 参数2. type:交换机类型
// DIRECT("direct"):定向
// FANOUT("fanout"):扇形(广播),发送消息到每一个与之绑定队列。
// TOPIC("topic"):通配符的方式
// HEADERS("headers"):参数匹配
// 参数3. durable:是否持久化
// 参数4. autoDelete:自动删除
// 参数5. internal:内部使用。一般false
// 参数6. arguments:其它参数
String exchangeName = "test_fanout";
// 3、创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
// 4、创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
// 5、绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout,routingKey设置为""
channel.queueBind(queue1Name, exchangeName, "");
channel.queueBind(queue2Name, exchangeName, "");
String body = "日志信息:张三调用了findAll方法...日志级别:info...";
// 6、发送消息
channel.basicPublish(exchangeName, "", null, body.getBytes());
// 7、释放资源
channel.close();
connection.close();
}
}

消费者代码

消费者1号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import blog.yuanyuan.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
channel.queueDeclare(queue1Name, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("队列 1 消费者 1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name, true, consumer);
}
}

消费者2号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import blog.yuanyuan.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue2Name, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("队列 2 消费者 2 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue2Name, true, consumer);
}
}

运行效果

还是先启动2个消费者,然后再运行生产者程序发送消息:

image-20240829173404929

小结

查看交换机

image-20240829173642626

交换机和队列的绑定关系如下图所示:

image-20240829173722176

交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。

发布订阅模式与工作队列模式的区别:

  • 工作队列模式本质上是绑定默认交换机
  • 发布订阅模式绑定指定交换机
  • 监听同一个队列的消费端程序彼此之间是竞争关系
  • 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息

路由(Routing)模式

概述

  • 通过『路由绑定』的方式,把交换机和队列关联起来

  • 交换机和队列通过路由键进行绑定

  • 生产者发送消息时不仅要指定交换机,还要指定路由键

  • 交换机接收到消息会发送到路由键绑定的队列

  • 在编码上与 Publish/Subscribe发布与订阅模式的区别:

    • 交换机的类型为:Direct

    • 队列绑定交换机的时候需要指定routing key

image-20240829173952601

实际开发中用得最多

生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import blog.yuanyuan.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_direct";
// 创建交换机
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
// 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
// 声明(创建)队列
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
// 队列绑定交换机
// 队列1绑定error
channel.queueBind(queue1Name, exchangeName, "error");
// 队列2绑定info error warning
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "warning");
String message = "日志信息:张三调用了delete方法.错误了,日志级别warning";
// 发送消息
channel.basicPublish(exchangeName, "warning", null, message.getBytes());
System.out.println(message);
// 释放资源
channel.close();
connection.close();
}
}

消费者代码

消费者1号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import blog.yuanyuan.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
channel.queueDeclare(queue1Name, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("Consumer1 将日志信息打印到控制台.....");
}
};
channel.basicConsume(queue1Name, true, consumer);
}
}

消费者2号

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue2Name, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:" + new String(body));
System.out.println("Consumer2 将日志信息存储到数据库.....");
}
};
channel.basicConsume(queue2Name, true, consumer);
}
}

运行结果

消费消息

image-20240829174521779

绑定消息

image-20240829174638324

如果一个交换机通过相同的routing key绑定了多个队列,就会有广播效果

主题(Topics)模式

概述

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符。

Routingkey一般都是由一个或多个单词组成,多个单词之间以“.”分割,例如:item.insert
通配符规则:

  • #:匹配零个或多个词
  • *:匹配一个词

生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import blog.yuanyuan.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

public class Producer {

public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
// 绑定队列和交换机
// 参数1. queue:队列名称
// 参数2. exchange:交换机名称
// 参数3. routingKey:路由键,绑定规则
// 如果交换机的类型为fanout ,routingKey设置为""
// routing key 常用格式:系统的名称.日志的级别。
// 需求: 所有error级别的日志存入数据库,所有order系统的日志存入数据库
channel.queueBind(queue1Name, exchangeName, "#.error");
channel.queueBind(queue1Name, exchangeName, "order.*");
channel.queueBind(queue2Name, exchangeName, "*.*");
// 分别发送消息到队列:order.info、goods.info、goods.error
String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]";
channel.basicPublish(exchangeName, "order.info", null, body.getBytes());
body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]";
channel.basicPublish(exchangeName, "goods.info", null, body.getBytes());
body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]";
channel.basicPublish(exchangeName, "goods.error", null, body.getBytes());
channel.close();
connection.close();
}
}

消费者代码

消费者1号

消费者1监听队列1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import blog.yuanyuan.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue1";
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

消费者2号

消费者2监听队列2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import blog.yuanyuan.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
String QUEUE_NAME = "test_topic_queue2";
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
System.out.println("body:" + new String(body));
}
};
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}

运行效果

image-20240829180034851

绑定关系

交换机:

image-20240829180143352

test_topic_queue1:

image-20240829180300143

test_topic_queue2:

image-20240829180357729

RPC

远程过程调用,本质上是同步调用,和我们使用OpenFeign调用远程接口一样,这不是典型的消息队列工作方式。

image-20240829180748244

Publisher Confirms

发送端消息确认,是我们在进阶篇要探讨的『消息可靠性投递』的一部分。