整合SpringBoot

消费者工程

创建module

image-20240829182645356

配置pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

application.yml

增加RabbitMQ连接和日志打印的配置:

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: 123456
virtual-host: /
logging:
level:
blog.yuanyuan.rabbitmq.listener.MyMessageListener: info

主启动类

1
2
3
4
5
6
7
8
9
10
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQConsumerApp {

public static void main(String[] args) {
SpringApplication.run(RabbitMQConsumerApp.class, args);
}
}

监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import lombok.extern.slf4j.Slf4j;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyMessageListener {

public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";

// 写法1:监听+在RabbitMQ服务器上创建交换机和队列
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME, durable = "true"),
exchange = @Exchange(value = EXCHANGE_DIRECT),
key = {ROUTING_KEY}
))
public void processMessage(String dateString,
Message message,
Channel channel) {
log.info(dateString);
}

// 写法2:监听
// @RabbitListener(queues = {QUEUE_NAME})
// public void processMessage2(String dateString, Message message, Channel channel) {
// System.out.println("消费者接收到的消息:" + dateString);
// }
}

@RabbitListener注解属性对比

bindings属性

  • 表面作用:
    • 指定交换机和队列之间的绑定关系
    • 指定当前方法要监听的队列
  • 隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们

queues属性

1
@RabbitListener(queues = {QUEUE_YUANYUAN})
  • 作用:指定当前方法要监听的队列
  • 注意:此时框架不会创建相关交换机和队列,必须提前创建好

生产者工程

创建module

配置pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>

application.yml

1
2
3
4
5
6
7
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: 123456
virtual-host: /

主启动类

1
2
3
4
5
6
7
8
9
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQProducerApp {
public static void main(String[] args) {
SpringApplication.run(RabbitMQProducerApp.class, args);
}
}

测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {

public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";

@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Hello SpringBoot RabbitMQ!!!");
}
}

运行测试

先运行Consumer的SpringBoot程序,再运行测试程序生产消息:

image-20240829185058873

消息可靠性投递

在消息传送过程中,可能会出现各种故障:

故障情况1:消息没有发送到消息队列

  • 解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送
  • 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机

故障情况2:消息队列服务器宕机导致内存中消息丢失

  • 解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失

故障情况3:消费端宕机或抛异常导致消息没有成功被消费

  • 消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息
  • 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性)

生产者端消息确认机制

创建module

搭建环境

配置POM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

主启动类

1
2
3
4
5
6
7
8
9
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQProducerApp {
public static void main(String[] args) {
SpringApplication.run(RabbitMQProducerApp.class, args);
}
}

YAML

注意:publisher-confirm-type和publisher-returns是两个必须要增加的配置,如果没有则本节功能不生效

1
2
3
4
5
6
7
8
9
10
11
12
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: 123456
virtual-host: /
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
logging:
level:
blog.yuanyuan.rabbitmq.config.MQProducerAckConfig: info

创建配置类

目标

在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息:

方法名方法功能所属接口接口所属类
confirm()确认消息是否发送到交换机ConfirmCallbackRabbitTemplate
returnedMessage()确认消息是否发送到队列ReturnsCallbackRabbitTemplate

然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。

原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。

而设置对应的组件,需要调用RabbitTemplate对象下面两个方法:

设置组件调用的方法所需对象类型
setConfirmCallback()ConfirmCallback接口类型
setReturnCallback()ReturnCallback接口类型

API说明

  1. ConfirmCallback接口

这是RabbitTemplate内部的一个接口,源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* A callback for publisher confirmations.
*
*/
@FunctionalInterface
public interface ConfirmCallback {

/**
* Confirmation callback.
* @param correlationData correlation data for the callback.
* @param ack true for ack, false for nack
* @param cause An optional cause, for nack, when available, otherwise null.
*/
void confirm(@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause);

}

生产者端发送消息之后,回调confirm()方法

  • ack参数值为true:表示消息成功发送到了交换机
  • ack参数值为false:表示消息没有发送到交换机
  1. ReturnCallback接口

同样也RabbitTemplate内部的一个接口,源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/**
* A callback for returned messages.
*
* @since 2.3
*/
@FunctionalInterface
public interface ReturnsCallback {

/**
* Returned message callback.
* @param returned the returned message and metadata.
*/
void returnedMessage(ReturnedMessage returned);

}

注意:接口中的returnedMessage()方法在消息没有发送到队列时调用

ReturnedMessage类中主要属性含义如下:

属性名类型含义
messageorg.springframework.amqp.core.Message消息以及消息相关数据
replyCodeint应答码,类似于HTTP响应状态码
replyTextString应答码说明
exchangeString交换机名称
routingKeyString路由键名称

配置类代码

  1. 要点1

加@Component注解,加入IOC容器

  1. 要点2

配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中。

操作封装到了一个专门的void init()方法中。

为了保证这个void init()方法在应用启动时被调用,我们使用@PostConstruct注解来修饰这个方法。

关于@PostConstruct注解大家可以参照以下说明:

@PostConstruct注解是Java中的一个标准注解,它用于指定在对象创建之后立即执行的方法。当使用依赖注入(如Spring框架)或者其他方式创建对象时,@PostConstruct注解可以确保在对象完全初始化之后,执行相应的方法。

使用@PostConstruct注解的方法必须满足以下条件:

  1. 方法不能有任何参数
  2. 方法必须是非静态的
  3. 方法不能返回任何值

当容器实例化一个带有@PostConstruct注解的Bean时,它会在调用构造函数之后,并在依赖注入完成之前调用被@PostConstruct注解标记的方法。这样,我们可以在该方法中进行一些初始化操作,比如读取配置文件、建立数据库连接等。

  1. 代码

有了以上说明,下面我们就可以展示配置类的整体代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {

@Resource
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送到交换机成功!数据:{}", correlationData);
} else {
log.info("消息发送到交换机失败!数据:{} 原因:{}", correlationData, cause);
}
}

@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息主体: {}", new String(returned.getMessage().getBody()));
log.info("应答码: {}", returned.getReplyCode());
log.info("描述:{}", returned.getReplyText());
log.info("消息使用的交换器 exchange : {}", returned.getExchange());
log.info("消息使用的路由键 routing : {}", returned.getRoutingKey());
}
}

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {

public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";

@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Message Test Confirm RabbitMQ");
}
}

通过调整代码,测试如下三种情况:

  • 交换机正确、路由键正确
  • 交换机正确、路由键不正确,无法发送到队列
  • 交换机不正确,无法发送到交换机

运行测试

成功情况

image-20240830162039809

模拟失败情况

情况1:没有交换机

1
2
3
4
5
6
7
@Test
public void testFailSendMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT + "~",
ROUTING_KEY,
"Message Test Confirm RabbitMQ");
}

image-20240830162423027

情况2:没有路由键

1
2
3
4
5
6
7
@Test
public void testFailSendMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY + "~",
"Message Test Confirm RabbitMQ");
}

image-20240830162629280

备份交换机

备份交换机原理

image-20240829190106123

创建备份交换机

创建备份交换机

注意:备份交换机一定要选择fanout类型,因为原交换机转入备份交换机时并不会指定路由键

image-20240830163044140

image-20231202183701454

创建备份交换机要绑定的队列

  1. 创建队列
image-20240830163223551

image-20231202183949674

  1. 绑定交换机

注意:这里是要和备份交换机绑定

image-20231203232801504

针对备份队列创建消费端监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup";
public static final String QUEUE_NAME_BACKUP = "queue.order.backup";

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_NAME_BACKUP, durable = "true"),
exchange = @Exchange(value = EXCHANGE_DIRECT_BACKUP),
key = {""}
))
public void processMessageBackup(String dateString,
Message message,
Channel channel) {
log.info("BackUp: " + dateString);
}

设定备份关系

原交换机删除

·

image-20231202184840124

重新创建原交换机

image-20231202185211633

image-20231202185342087

原交换机重新绑定原队列

image-20231202190111581

image-20231202185955138

image-20231202190036520

测试

  • 启动消费者端
  • 发送消息,但是路由键不对,于是转入备份交换机
1
2
3
4
5
6
7
@Test
public void testFailSendMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY + "~",
"Message Test Backup RabbitMQ");
}

image-20240830164615153

image-20240830164650605

交换机和队列持久化

测试非持久化交换机和队列

创建非持久化交换机

image-20231106192621173

创建之后,可以在列表中看到:

image-20231106192708597

创建非持久化队列

image-20231106195216265

创建之后,可以在列表中看到:

image-20231106195132627

绑定

image-20231106195748319

发送消息

1
2
3
4
5
6
7
8
9
10
public static final String EXCHANGE_TRANSIENT = "exchange.transient.user";
public static final String ROUTING_KEY_TRANSIENT = "user";

@Test
public void testSendMessageTransient() {
rabbitTemplate.convertAndSend(
EXCHANGE_TRANSIENT,
ROUTING_KEY_TRANSIENT,
"Hello transient user~~~");
}

临时性的交换机和队列也能够接收消息,但如果RabbitMQ服务器重启之后会怎么样呢?

重启RabbitMQ服务器

1
docker restart rabbitmq

重启之后,刚才临时性的交换机和队列都没了。在交换机和队列这二者中,队列是消息存储的容器,队列没了,消息就也跟着没了。

持久化的交换机和队列

我们其实不必专门创建持久化的交换机和队列,因为它们默认就是持久化的。接下来我们只需要确认一下:存放到队列中,尚未被消费端取走的消息,是否会随着RabbitMQ服务器重启而丢失?

发送消息

运行以前的发送消息方法即可,不过要关掉消费端程序

重启RabbitMQ服务器

1
docker restart rabbitmq

再次查看消息

发现消息仍然还在

结论

在后台管理界面创建交换机和队列时,默认就是持久化的模式。

此时消息也是持久化的,不需要额外设置。

消费端消息确认

ACK

ACK是acknowledge的缩写,表示已确认

默认情况

默认情况下,消费端取回消息后,默认会自动返回ACK确认消息,所以在前面的测试中消息被消费端消费之后,RabbitMQ得到ACK确认信息就会删除消息

但实际开发中,消费端根据消息队列投递的消息执行对应的业务,未必都能执行成功,如果希望能够多次重试,那么默认设定就不满足要求了

所以还是要修改成手动确认

创建消费端module

配置pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

application.yml

增加针对监听器的设置:

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 把消息确认模式改为手动确认

主启动类

1
2
3
4
5
6
7
8
9
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQConsumerApp {
public static void main(String[] args) {
SpringApplication.run(RabbitMQConsumerApp.class, args);
}
}

消费端监听器

创建监听器类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;

@Component
public class MyMessageListener {

public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";

public void processMessage(String dataString, Message message, Channel channel) {

}

}

在接收消息的方法上应用注解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 修饰监听方法
@RabbitListener(
// 设置绑定关系
bindings = @QueueBinding(
// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
// 配置路由键信息
key = {ROUTING_KEY}
))
public void processMessage(String dataString, Message message, Channel channel) {

}

接收消息方法内部逻辑

  • 业务处理成功:手动返回ACK信息,表示消息成功消费
  • 业务处理失败:手动返回NACK信息,表示消息消费失败。此时有两种后续操作供选择:
    • 把消息重新放回消息队列,RabbitMQ会重新投递这条消息,那么消费端将重新消费这条消息——从而让业务代码再执行一遍
    • 不把消息放回消息队列,返回reject信息表示拒绝,那么这条消息的处理就到此为止

相关API

我们需要引入deliveryTag:交付标签机制

deliveryTag是一个64位整数,消息往消费端投递时,会携带交付标签。

  1. 交付标签有啥用?
  • 消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作,例如删除消息、重新排队或标记为死信等等。那么Broker就必须知道它现在要操作的消息具体是哪一条。而deliveryTag作为消息的唯一标识就很好的满足了这个需求。
  1. 如果交换机是Fanout模式,同一个消息广播到了不同队列,deliveryTag会重复吗?
  • 不会,deliveryTag在Broker范围内唯一

下面我们探讨的三个方法都是来自于com.rabbitmq.client.Channel接口

  1. basicAck()方法
  • 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了
  • 参数列表:
参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean multiple取值为true:为小于、等于deliveryTag的消息批量返回ACK信息
取值为false:仅为指定的deliveryTag返回ACK信息
  1. basicNack()方法
  • 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值
  • 参数列表:
参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean multiple取值为true:为小于、等于deliveryTag的消息批量返回ACK信息
取值为false:仅为指定的deliveryTag返回ACK信息
boolean requeue取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端
取值为false:Broker将消息标记为已消费,不会放回队列
  1. basicReject()方法
  • 方法功能:根据指定的deliveryTag,对该消息表示拒绝
  • 参数列表:
参数名称含义
long deliveryTagBroker给每一条进入队列的消息都设定一个唯一标识
boolean requeue取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端
取值为false:Broker将消息标记为已消费,不会放回队列
  • basicNack()和basicReject()有啥区别?
    • basicNack()有批量操作
    • basicReject()没有批量操作

完整代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.stereotype.Component;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;

import java.io.IOException;

@Component
@Slf4j
public class MyMessageListener {

public static final String EXCHANGE_DIRECT = "exchange.direct.order";
public static final String ROUTING_KEY = "order";
public static final String QUEUE_NAME = "queue.order";

// 修饰监听方法
@RabbitListener(
// 设置绑定关系
bindings = @QueueBinding(
// 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"),
// 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除
exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"),
// 配置路由键信息
key = {ROUTING_KEY}
))
public void processMessage(String dataString, Message message, Channel channel) throws IOException {
// 1、获取当前消息的 deliveryTag 值备用
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 2、正常业务操作
log.info("消费端接收到消息内容:" + dataString);
// System.out.println(10 / 0);
// 3、给 RabbitMQ 服务器返回 ACK 确认信息
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 4、获取信息,看当前消息是否曾经被投递过
Boolean redelivered = message.getMessageProperties().getRedelivered();
if (!redelivered) {
// 5、如果没有被投递过,那就重新放回队列,重新投递,再试一次
channel.basicNack(deliveryTag, false, true);
} else {
// 6、如果已经被投递过,且这一次仍然进入了 catch 块,那么返回拒绝且不再放回队列
channel.basicReject(deliveryTag, false);
}
}
}
}

要点总结

  • 要点1:把消息确认模式改为手动确认
  • 要点2:调用Channel对象的方法返回信息
    • ACK:Acknowledgement,表示消息处理成功
    • NACK:Negative Acknowledgement,表示消息处理失败
    • Reject:拒绝,同样表示消息处理失败
  • 要点3:后续操作
    • requeue为true:重新放回队列,重新投递,再次尝试
    • requeue为false:不放回队列,不重新投递
  • 要点4:deliveryTag 消息的唯一标识,查找具体某一条消息的依据

消费端如果设定消息重新放回队列,Broker重新投递消息,那么消费端就可以再次消费消息,这是一种“重试”机制,这需要消费端代码支持“幂等性

Prefetch

思路

  • 生产者发送100个消息
  • 对照两种情况:
    • 消费端没有设置prefetch参数:100个消息被全部取回
    • 消费端设置prefetch参数为1:100个消息慢慢取回

生产者端代码

1
2
3
4
5
6
7
8
9
@Test  
public void testPrefetch() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(
EXCHANGE_DIRECT,
ROUTING_KEY,
"Test prefetch" + i);
}
}

image-20240830172531876

消费者端代码

1
2
3
4
5
6
@RabbitListener(queues = {QUEUE_NAME})
public void processMessagePrefetch(String dataString, Message message, Channel channel) throws Exception {
log.info("消费端接收到消息内容:{}", dataString);
TimeUnit.SECONDS.sleep(1);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

image-20240830173038750

一次性取完,缓慢ack。

测试

未使用prefetch

  • 不要启动消费端程序,如果正在运行就把它停了
  • 运行生产者端程序发送100条消息
  • 查看队列中消息的情况:

image-20231107155915253

  • 说明:

    • Ready表示已经发送到队列的消息数量
    • Unacked表示已经发送到消费端但是消费端尚未返回ACK信息的消息数量
    • Total未被删除的消息总数
  • 接下来启动消费端程序,再查看队列情况:

image-20231107160233539

  • 能看到消息全部被消费端取走了,正在逐个处理、确认,说明有多少消息消费端就并发处理多少

设定prefetch

  1. YAML配置
1
2
3
4
5
6
7
8
9
10
11
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual
prefetch: 1 # 设置每次最多从消息队列服务器取回多少消息

image-20240830173328802

Ready缓慢减少,一秒减少一个。

  1. 测试流程
  • 停止消费端程序
  • 运行生产者端程序发送100条消息
  • 查看队列中消息的情况:

image-20231107160820062

  • 接下来启动消费端程序,持续观察队列情况:

image-20231107160922632

image-20231107160936216

image-20231107160951639

  • 能看到消息不是一次性全部取回的,而是有个过程

消息超时

给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除

我们可以从两个层面来给消息设定过期时间:

  • 队列层面:在队列层面设定消息的过期时间,并不是队列的过期时间。意思是这
    个队列中的消息全部使用同一个过期时间。
  • 消息本身:给具体的某个消息设定过期时间

如果两个层面都做了设置,那么哪个时间短,哪个生效

队列层面设置

设置

创建交换机

image-20240830173749496 image-20240830173855392

别忘了设置绑定关系:

image-20240830174042615

测试

  • 不启动消费端程序
  • 向设置了过期时间的队列中发送100条消息
1
2
3
4
5
6
7
8
9
10
11
12
public static final String EXCHANGE_TIMEOUT = "exchange.test.timeout";
public static final String ROUTING_KEY_TIMEOUT = "routing.key.test.timeout";

@Test
public void testTimeout() {
for (int i = 0; i < 100; i++) {
rabbitTemplate.convertAndSend(
EXCHANGE_TIMEOUT,
ROUTING_KEY_TIMEOUT,
"Test timeout" + i);
}
}
  • 等5秒后,看是否全部被过期删除
image-20240830174505189

消息层面设置

删除queue-test-timeout,重新创建,只不过这次我们不设置过期时间,记得绑定交换机

image-20240830175011182

设置

1
2
3
4
5
6
7
8
9
10
11
@Test
public void testTimeoutQueue() {
// 创建消息后置处理器对象
MessagePostProcessor postProcessor = message -> {
// 8000毫秒后过期
message.getMessageProperties().setExpiration("8000");
return message;
};
rabbitTemplate.convertAndSend(
EXCHANGE_TIMEOUT, ROUTING_KEY_TIMEOUT, "Test timeout queue", postProcessor);
}

查看效果

这次我们是发送到普通队列上:

image-20240830175513717

死信

概述

概念:当一个消息无法被消费,它就变成了死信。
死信产生的原因大致有下面三种:

  • 拒绝:消费者拒接消息,basicNack()/basicReject(),并且不把消息重新放入原目标队列,requeue=false
  • 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变成死信
  • 超时:消息到达超时时间未被消费

死信的处理方式大致有下面三种:

  • 丢弃:对不重要的消息直接丢弃,不做处理
  • 入库:把死信写入数据库,日后处理
  • 监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用)

测试相关准备

创建死信交换机和死信队列

常规设定即可,没有特殊设置:

  • 死信交换机:exchange.dead.letter.video
  • 死信队列:queue.dead.letter.video
  • 死信路由键:routing.key.dead.letter.video

创建正常交换机和正常队列

注意:一定要注意正常队列有诸多限定和设置,这样才能让无法处理的消息进入死信交换机

image-20240318165821774

  • 正常交换机:exchange.normal.video
  • 正常队列:queue.normal.video
  • 正常路由键:routing.key.normal.video

全部设置完成后参照如下:

image-20240318165927279

相关常量声明

1
2
3
4
5
6
7
8
public static final String EXCHANGE_NORMAL = "exchange.normal.video";  
public static final String EXCHANGE_DEAD_LETTER = "exchange.dead.letter.video";

public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video";
public static final String ROUTING_KEY_DEAD_LETTER = "routing.key.dead.letter.video";

public static final String QUEUE_NORMAL = "queue.normal.video";
public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video";

消费端拒收消息

发送消息的代码

1
2
3
4
5
6
7
8
@Test  
public void testSendMessageButReject() {
rabbitTemplate
.convertAndSend(
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"测试死信情况1:消息被拒绝");
}

接收消息的代码

  1. 监听正常队列
1
2
3
4
5
6
@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {
// 监听正常队列,但是拒绝消息
log.info("★[normal]消息接收到,但我拒绝。");
channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);
}
  1. 监听死信队列
1
2
3
4
5
6
7
@RabbitListener(queues = {QUEUE_DEAD_LETTER})
public void processMessageDead(String dataString, Message message, Channel channel) throws IOException {
// 监听死信队列
log.info("★[dead letter]dataString = {}", dataString);
log.info("★[dead letter]我是死信监听方法,我接收到了死信消息");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

执行结果

image-20231107170523503

消息数量超过队列容纳极限

发送消息的代码

1
2
3
4
5
6
7
8
9
@Test  
public void testSendMultiMessage() {
for (int i = 0; i < 20; i++) {
rabbitTemplate.convertAndSend(
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"测试死信情况2:消息数量超过队列的最大容量" + i);
}
}

接收消息的代码

消息接收代码不再拒绝消息:

1
2
3
4
5
6
@RabbitListener(queues = {QUEUE_NORMAL})
public void processMessageNormal(Message message, Channel channel) throws IOException {
// 监听正常队列
log.info("★[normal]消息接收到。");
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

重启微服务使代码修改生效。

执行效果

正常队列的参数如下图所示:

image-20231107171231765

生产者发送20条消息之后,消费端死信队列接收到前10条消息:

images

消息超时未消费

发送消息的代码

正常发送一条消息即可,所以使用第一个例子的代码。

1
2
3
4
5
6
7
8
@Test
public void testSendMessageTimeout() {
rabbitTemplate
.convertAndSend(
EXCHANGE_NORMAL,
ROUTING_KEY_NORMAL,
"测试死信情况3:消息超时");
}

执行效果

队列参数生效:

image-20231107172002297

因为没有消费端监听程序,所以消息未超时前滞留在队列中:

image-20231107172234849

消息超时后,进入死信队列:

image-20231107172042460

延迟插件

插件简介

插件安装

确定卷映射目录

1
2
3
docker inspect rabbitmq | jq -r '.[0].Mounts[].Source'
# 或者
docker inspect --format='{{range .Mounts}}{{.Source}}{{end}}' rabbitmq

运行结果:

image-20240830182100769

下载延迟插件

官方文档说明页地址:https://www.rabbitmq.com/community-plugins.html

image-20231107180045135

下载插件安装文件:

1
2
wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez
sudo mv rabbitmq_delayed_message_exchange-3.13.0.ez /home/docker/volumes/rabbitmq-plugin/_data # 将/home/docker/volumes/rabbitmq-plugin/_data替换为你的数据卷目录

启用插件

1
2
3
4
5
6
7
8
# 登录进入容器内部
docker exec -it rabbitmq /bin/bash
# rabbitmq-plugins命令所在目录已经配置到$PATH环境变量中了,可以直接调用
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 退出Docker容器
exit
# 重启Docker容器
docker restart rabbitmq

确认

确认点1:查看当前节点已启用插件的列表:

image-20240830182613739

确认点2:如果创建新交换机时可以在type中看到x-delayed-message选项,那就说明插件安装好了

image-20231107181914265

创建交换机

rabbitmq_delayed_message_exchange插件在工作时要求交换机是x-delayed-message类型才可以,创建方式如下:

image-20240830182910426

关于x-delayed-type参数的理解:

原本指定交换机类型的地方使用了x-delayed-message这个值,那么这个交换机除了支持延迟消息之外,到底是direct、fanout、topic这些类型中的哪一个呢?

这里就额外使用x-delayed-type来指定交换机本身的类型

创建队列并绑定交换机

image-20240830183042568

代码测试

生产者端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public static final String EXCHANGE_DELAY = "exchange.test.delay";
public static final String ROUTING_KEY_DELAY = "routing.key.test.delay";

@Test
public void testSendDelayMessage() {
rabbitTemplate.convertAndSend(
EXCHANGE_DELAY,
ROUTING_KEY_DELAY,
"测试基于插件的延迟消息 [" + new SimpleDateFormat("hh:mm:ss").format(new Date()) + "]",
messageProcessor -> {
// 设置延迟时间:以毫秒为单位
// x-delay 参数必须基于 x-delay-message-exchange 创建才能生效
messageProcessor.getMessageProperties().setHeader("x-delay", "10000");
return messageProcessor;
});
}

消费者端代码

  1. 情况A:资源已创建
1
2
3
4
5
6
7
8
public static final String QUEUE_DELAY = "queue.test.delay";

@RabbitListener(queues = {QUEUE_DELAY})
public void process(String dataString, Message message, Channel channel) throws IOException {
log.info("[生产者]{}", dataString);
log.info("[消费者]{}", new SimpleDateFormat("hh:mm:ss").format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
  1. 情况B:资源未创建
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

@Component
@Slf4j
public class MyDelayMessageListener {

public static final String EXCHANGE_DELAY = "exchange.test.delay";
public static final String ROUTING_KEY_DELAY = "routing.key.test.delay";
public static final String QUEUE_DELAY = "queue.test.delay";

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"),
exchange = @Exchange(
value = EXCHANGE_DELAY,
durable = "true",
autoDelete = "false",
type = "x-delayed-message",
arguments = @Argument(name = "x-delayed-type", value = "direct")),
key = {ROUTING_KEY_DELAY}
))
public void process(String dataString, Message message, Channel channel) throws IOException {
log.info("[生产者]" + dataString);
log.info("[消费者]" + new SimpleDateFormat("hh:mm:ss").format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

执行效果

  1. 交换机类型

image-20240319171359652

  1. 生产者端效果

注意:使用rabbitmq_delayed_message_exchange插件后,即使消息成功发送到队列上,也会导致returnedMessage()方法执行

image-20240830184332873
  1. 消费者端效果
image-20240830184025464

事务消息之生产者端

代码

新建module

引入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

yaml配置

1
2
3
4
5
6
7
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: 123456
virtual-host: /

主启动类

1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQProducerApp {

public static void main(String[] args) {
SpringApplication.run(RabbitMQProducerApp.class, args);
}

}

相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import lombok.Data;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@Data
public class RabbitConfig {

@Bean
public RabbitTransactionManager transactionManager(CachingConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}

@Bean
public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
@Slf4j
public class RabbitMQTest {

public static final String EXCHANGE_NAME = "exchange.tx.dragon";
public static final String ROUTING_KEY = "routing.key.tx.dragon";

@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void testSendMessageInTx() {
// 1、发送第一条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)");
// 2、抛出异常
log.info("do bad:" + 10 / 0);
// 3、发送第二条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)");
}

}

执行测试

未使用事务

抛出异常前的消息发送了,抛异常后的消息没有发送:

image-20231109131321901

image-20231109131413185

为了不影响后续操作,我们直接在管理界面这里把这条消息消费掉:

image-20231109131520985

image-20231109131611991

使用事务

  1. 说明

因为在junit中给测试方法使用@Transactional注解默认就会回滚,所以回滚操作需要使用@RollBack注解操控

  1. 测试提交事务的情况
1
2
3
4
5
6
7
8
9
@Test
@Transactional
@Rollback(value = false)
public void testSendMessageInTx() {
// 1、发送第一条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~01)");
// 2、发送第二条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~02)");
}

image-20231109132025204

image-20231109132112164

  1. 测试回滚事务的情况
1
2
3
4
5
6
7
8
9
10
11
@Test
@Transactional
@Rollback(value = true)
public void testSendMessageInTx() {
// 1、发送第一条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~01)");
// 2、抛出异常
log.info("do bad:" + 10 / 0);
// 3、发送第二条消息
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~02)");
}

image-20231109132312914

总结

  • 在生产者端使用事务消息和消费端没有关系
  • 在生产者端使用事务消息仅仅是控制事务内的消息是否发送
  • 提交事务就把事务内所有消息都发送到交换机
  • 回滚事务则事务内任何消息都不会被发送

事务控制对消费者端无效!!!

惰性队列

创建惰性队列

官网说明

image-20231110110607266

队列可以创建为默认惰性模式,模式指定方式是:

  • 使用队列策略(建议)
  • 设置queue.declare参数

如果策略和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。

基于策略方式设定

1
2
3
4
5
# 登录Docker容器
docker exec -it rabbitmq /bin/bash

# 运行rabbitmqctl命令
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues

命令解读:

  • rabbitmqctl命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到Path环境变量

  • set_policy是子命令,表示设置策略

  • Lazy是当前要设置的策略名称,是我们自己自定义的,不是系统定义的

  • "^lazy-queue$"是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置

  • '{“queue-mode”:“lazy”}'是一个JSON格式的参数设置指定了队列的模式为"lazy"

  • –-apply-to参数指定该策略将应用于队列(queues)级别

  • 命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列

如果需要修改队列模式可以执行如下命令(不必删除队列再重建):

1
rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues

在声明队列时使用参数设定

  • 参数名称:x-queue-mode
  • 可用参数值:
    • default
    • lazy
  • 不设置就是取值为default

Java代码原生API设置方式:

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-queue-mode", "lazy");
channel.queueDeclare("myqueue", false, false, false, args);

Java代码注解设置方式:

1
2
3
@Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = {
@Argument(name = "x-queue-mode", value = "lazy")
})

实操演练

生产者端代码

  1. 配置POM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
  1. 配置YAML
1
2
3
4
5
6
7
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: 123456
virtual-host: /
  1. 主启动类
1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQLazyProducer {

public static void main(String[] args) {
SpringApplication.run(RabbitMQLazyProducer.class, args);
}

}
  1. 发送消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {

public static final String EXCHANGE_LAZY_NAME = "exchange.test.lazy";
public static final String ROUTING_LAZY_KEY = "routing.key.test.lazy";

@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_LAZY_NAME, ROUTING_LAZY_KEY, "I am a message for test lazy queue.");
}

}

消费者端代码

  1. 配置POM
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>
  1. 配置YAML
1
2
3
4
5
6
7
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: 123456
virtual-host: /
  1. 主启动类
1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQLazyConsumerApp {

public static void main(String[] args) {
SpringApplication.run(RabbitMQLazyConsumerApp.class, args);
}

}
  1. 监听器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MyLazyMessageProcessor {

public static final String EXCHANGE_LAZY_NAME = "exchange.test.lazy";
public static final String ROUTING_LAZY_KEY = "routing.key.test.lazy";
public static final String QUEUE_LAZY_NAME = "queue.test.lazy";

@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = QUEUE_LAZY_NAME, durable = "true", autoDelete = "false", arguments = {
@Argument(name = "x-queue-mode", value = "lazy")
}),
exchange = @Exchange(value = EXCHANGE_LAZY_NAME, durable = "true", autoDelete = "false"),
key = {ROUTING_LAZY_KEY}
))
public void processMessageLazy(String data, Message message, Channel channel) {
log.info("消费端接收到消息:" + data);
}

}

测试

  • 先启动消费端

  • 基于消费端@RabbitListener注解中的配置,自动创建了队列

image-20231110201151470

  • 发送消息

总结

使用惰性队列的主要原因之一是支持非常长的队列(数百万条消息)。

由于各种原因,排队可能会变得很长:

  • 消费者离线/崩溃/停机进行维护
  • 突然出现消息进入高峰,生产者的速度超过了消费者
  • 消费者比正常情况慢

优先级队列

创建相关资源

创建交换机

exchange.test.priority

image-20231110234945082

创建队列

queue.test.priority

x-max-priority

image-20231110235404630

image-20231110235707445

队列绑定交换机

image-20231110235749304

image-20231110235808541

生产者发送消息

配置POM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

配置YAML

1
2
3
4
5
6
7
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: 123456
virtual-host: /

主启动类

1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQPriorityProducer {

public static void main(String[] args) {
SpringApplication.run(RabbitMQPriorityProducer.class, args);
}

}

发送消息

  • 不要启动消费者程序,让多条不同优先级的消息滞留在队列中
  • 第一次发送优先级为1的消息
  • 第二次发送优先级为2的消息
  • 第三次发送优先级为3的消息
  • 先发送的消息优先级低,后发送的消息优先级高,将来看看消费端是不是先收到优先级高的消息
  1. 第一次发送优先级为1的消息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {

public static final String EXCHANGE_PRIORITY = "exchange.test.priority";
public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority";

@Resource
private RabbitTemplate rabbitTemplate;

@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1.", message->{
message.getMessageProperties().setPriority(1);
return message;
});
}

}
  1. 第二次发送优先级为2的消息
1
2
3
4
5
6
7
@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 2.", message->{
message.getMessageProperties().setPriority(2);
return message;
});
}
  1. 第三次发送优先级为3的消息
1
2
3
4
5
6
7
@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 3.", message->{
message.getMessageProperties().setPriority(3);
return message;
});
}
image-20240830190125314

消费端接收消息

配置POM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

配置YAML

1
2
3
4
5
6
7
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: 123456
virtual-host: /

主启动类

1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQPriorityConsumer {

public static void main(String[] args) {
SpringApplication.run(RabbitMQPriorityConsumer.class, args);
}

}

监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class MyMessageProcessor {

public static final String QUEUE_PRIORITY = "queue.test.priority";

@RabbitListener(queues = {QUEUE_PRIORITY})
public void processPriorityMessage(String data, Message message, Channel channel) throws IOException {
log.info(data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

}

测试效果

对于已经滞留服务器的消息,只要消费端一启动,就能够收到消息队列的投递,打印效果如下:

image-20231111003358425