整合SpringBoot 消费者工程 创建module 配置pom.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 3.1.5</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies >
application.yml 增加RabbitMQ连接和日志打印的配置:
1 2 3 4 5 6 7 8 9 10 spring: rabbitmq: host: localhost port: 5672 username: guest password: 123456 virtual-host: / logging: level: blog.yuanyuan.rabbitmq.listener.MyMessageListener: info
主启动类 1 2 3 4 5 6 7 8 9 10 import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitMQConsumerApp { public static void main (String[] args) { SpringApplication.run(RabbitMQConsumerApp.class, args); } }
监听器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import lombok.extern.slf4j.Slf4j;import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;@Component @Slf4j public class MyMessageListener { public static final String EXCHANGE_DIRECT = "exchange.direct.order" ; public static final String ROUTING_KEY = "order" ; public static final String QUEUE_NAME = "queue.order" ; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = QUEUE_NAME, durable = "true"), exchange = @Exchange(value = EXCHANGE_DIRECT), key = {ROUTING_KEY} )) public void processMessage (String dateString, Message message, Channel channel) { log.info(dateString); } }
@RabbitListener注解属性对比 bindings属性 表面作用:指定交换机和队列之间的绑定关系 指定当前方法要监听的队列 隐藏效果:如果RabbitMQ服务器上没有这里指定的交换机和队列,那么框架底层的代码会创建它们 queues属性 1 @RabbitListener(queues = {QUEUE_YUANYUAN})
作用:指定当前方法要监听的队列 注意 :此时框架不会创建相关交换机和队列,必须提前创建好生产者工程 创建module 配置pom.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 3.1.5</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > </dependency > </dependencies >
application.yml 1 2 3 4 5 6 7 spring: rabbitmq: host: localhost port: 5672 username: guest password: 123456 virtual-host: /
主启动类 1 2 3 4 5 6 7 8 9 import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitMQProducerApp { public static void main (String[] args) { SpringApplication.run(RabbitMQProducerApp.class, args); } }
测试程序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest public class RabbitMQTest { public static final String EXCHANGE_DIRECT = "exchange.direct.order" ; public static final String ROUTING_KEY = "order" ; @Autowired private RabbitTemplate rabbitTemplate; @Test public void testSendMessage () { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, "Hello SpringBoot RabbitMQ!!!" ); } }
运行测试 先运行Consumer的SpringBoot程序,再运行测试程序生产消息:
消息可靠性投递 在消息传送过程中,可能会出现各种故障:
故障情况1:消息没有发送到消息队列
解决思路A:在生产者端进行确认,具体操作中我们会分别针对交换机和队列来确认,如果没有成功发送到消息队列服务器上,那就可以尝试重新发送 解决思路B:为目标交换机指定备份交换机,当目标交换机投递失败时,把消息投递至备份交换机 故障情况2:消息队列服务器宕机导致内存中消息丢失
解决思路:消息持久化到硬盘上,哪怕服务器重启也不会导致消息丢失 故障情况3:消费端宕机或抛异常导致消息没有成功被消费
消费端消费消息成功,给服务器返回ACK信息,然后消息队列删除该消息 消费端消费消息失败,给服务器端返回NACK信息,同时把消息恢复为待消费的状态,这样就可以再次取回消息,重试一次(当然,这就需要消费端接口支持幂等性) 生产者端消息确认机制 创建module 搭建环境 配置POM 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 3.1.5</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies >
主启动类 1 2 3 4 5 6 7 8 9 import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitMQProducerApp { public static void main (String[] args) { SpringApplication.run(RabbitMQProducerApp.class, args); } }
YAML 注意 :publisher-confirm-type和publisher-returns是两个必须要增加的配置,如果没有则本节功能不生效
1 2 3 4 5 6 7 8 9 10 11 12 spring: rabbitmq: host: localhost port: 5672 username: guest password: 123456 virtual-host: / publisher-confirm-type: CORRELATED publisher-returns: true logging: level: blog.yuanyuan.rabbitmq.config.MQProducerAckConfig: info
创建配置类 目标 在这里我们为什么要创建这个配置类呢?首先,我们需要声明回调函数来接收RabbitMQ服务器返回的确认信息:
方法名 方法功能 所属接口 接口所属类 confirm() 确认消息是否发送到交换机 ConfirmCallback RabbitTemplate returnedMessage() 确认消息是否发送到队列 ReturnsCallback RabbitTemplate
然后,就是对RabbitTemplate的功能进行增强,因为回调函数所在对象必须设置到RabbitTemplate对象中才能生效。
原本RabbitTemplate对象并没有生产者端消息确认的功能,要给它设置对应的组件才可以。
而设置对应的组件,需要调用RabbitTemplate对象下面两个方法:
设置组件调用的方法 所需对象类型 setConfirmCallback() ConfirmCallback接口类型 setReturnCallback() ReturnCallback接口类型
API说明 ConfirmCallback接口 这是RabbitTemplate内部的一个接口,源代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 @FunctionalInterface public interface ConfirmCallback { void confirm (@Nullable CorrelationData correlationData, boolean ack, @Nullable String cause) ; }
生产者端发送消息之后,回调confirm()方法
ack参数值为true:表示消息成功发送到了交换机 ack参数值为false:表示消息没有发送到交换机 ReturnCallback接口 同样也RabbitTemplate内部的一个接口,源代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 @FunctionalInterface public interface ReturnsCallback { void returnedMessage (ReturnedMessage returned) ; }
注意 :接口中的returnedMessage()方法仅 在消息没有 发送到队列时调用
ReturnedMessage类中主要属性含义如下:
属性名 类型 含义 message org.springframework.amqp.core.Message 消息以及消息相关数据 replyCode int 应答码,类似于HTTP响应状态码 replyText String 应答码说明 exchange String 交换机名称 routingKey String 路由键名称
配置类代码 要点1 加@Component注解,加入IOC容器
要点2 配置类自身实现ConfirmCallback、ReturnCallback这两个接口,然后通过this指针把配置类的对象设置到RabbitTemplate对象中。
操作封装到了一个专门的void init()方法中。
为了保证这个void init()方法在应用启动时被调用,我们使用@PostConstruct注解来修饰这个方法。
关于@PostConstruct注解大家可以参照以下说明:
@PostConstruct注解是Java中的一个标准注解 ,它用于指定在对象创建之后立即执行 的方法。当使用依赖注入(如Spring框架)或者其他方式创建对象时,@PostConstruct注解可以确保在对象完全初始化之后,执行相应的方法。
使用@PostConstruct注解的方法必须满足以下条件:
方法不能有任何参数 。方法必须是非静态的 。方法不能返回任何值 。当容器实例化一个带有@PostConstruct注解的Bean时,它会在调用构造函数之后 ,并在依赖注入完成之前 调用被@PostConstruct注解标记的方法。这样,我们可以在该方法中进行一些初始化操作,比如读取配置文件、建立数据库连接等。
代码 有了以上说明,下面我们就可以展示配置类的整体代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import jakarta.annotation.PostConstruct;import jakarta.annotation.Resource;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.ReturnedMessage;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.stereotype.Component;@Component @Slf4j public class MQProducerAckConfig implements RabbitTemplate .ConfirmCallback, RabbitTemplate.ReturnsCallback { @Resource private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(this ); rabbitTemplate.setReturnsCallback(this ); } @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息发送到交换机成功!数据:{}" , correlationData); } else { log.info("消息发送到交换机失败!数据:{} 原因:{}" , correlationData, cause); } } @Override public void returnedMessage (ReturnedMessage returned) { log.info("消息主体: {}" , new String (returned.getMessage().getBody())); log.info("应答码: {}" , returned.getReplyCode()); log.info("描述:{}" , returned.getReplyText()); log.info("消息使用的交换器 exchange : {}" , returned.getExchange()); log.info("消息使用的路由键 routing : {}" , returned.getRoutingKey()); } }
发送消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest public class RabbitMQTest { public static final String EXCHANGE_DIRECT = "exchange.direct.order" ; public static final String ROUTING_KEY = "order" ; @Resource private RabbitTemplate rabbitTemplate; @Test public void testSendMessage () { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, "Message Test Confirm RabbitMQ" ); } }
通过调整代码,测试如下三种情况:
交换机正确、路由键正确 交换机正确、路由键不正确,无法发送到队列 交换机不正确,无法发送到交换机 运行测试 成功情况
模拟失败情况 情况1:没有交换机
1 2 3 4 5 6 7 @Test public void testFailSendMessage () { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT + "~" , ROUTING_KEY, "Message Test Confirm RabbitMQ" ); }
情况2:没有路由键
1 2 3 4 5 6 7 @Test public void testFailSendMessage () { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY + "~" , "Message Test Confirm RabbitMQ" ); }
备份交换机 备份交换机原理
创建备份交换机 创建备份交换机 注意 :备份交换机一定 要选择fanout类型 ,因为原交换机转入备份交换机时并不会指定路由键
创建备份交换机要绑定的队列 创建队列
绑定交换机 注意 :这里是要和备份交换机绑定
针对备份队列创建消费端监听器 1 2 3 4 5 6 7 8 9 10 11 12 13 public static final String EXCHANGE_DIRECT_BACKUP = "exchange.direct.order.backup" ;public static final String QUEUE_NAME_BACKUP = "queue.order.backup" ;@RabbitListener(bindings = @QueueBinding( value = @Queue(value = QUEUE_NAME_BACKUP, durable = "true"), exchange = @Exchange(value = EXCHANGE_DIRECT_BACKUP), key = {""} )) public void processMessageBackup (String dateString, Message message, Channel channel) { log.info("BackUp: " + dateString); }
设定备份关系 原交换机删除 ·
重新创建原交换机
原交换机重新绑定原队列
测试 启动消费者端 发送消息,但是路由键不对,于是转入备份交换机 1 2 3 4 5 6 7 @Test public void testFailSendMessage () { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY + "~" , "Message Test Backup RabbitMQ" ); }
交换机和队列持久化 测试非持久化交换机和队列 创建非持久化交换机
创建之后,可以在列表中看到:
创建非持久化队列
创建之后,可以在列表中看到:
绑定
发送消息 1 2 3 4 5 6 7 8 9 10 public static final String EXCHANGE_TRANSIENT = "exchange.transient.user" ;public static final String ROUTING_KEY_TRANSIENT = "user" ;@Test public void testSendMessageTransient () { rabbitTemplate.convertAndSend( EXCHANGE_TRANSIENT, ROUTING_KEY_TRANSIENT, "Hello transient user~~~" ); }
临时性的交换机和队列也能够接收消息,但如果RabbitMQ服务器重启之后会怎么样呢?
重启RabbitMQ服务器 重启之后,刚才临时性的交换机和队列都没了。在交换机和队列这二者中,队列是消息存储的容器,队列没了,消息就也跟着没了。
持久化的交换机和队列 我们其实不必专门创建持久化的交换机和队列,因为它们默认就是持久化的。接下来我们只需要确认一下:存放到队列中,尚未被消费端取走的消息,是否会随着RabbitMQ服务器重启而丢失?
发送消息 运行以前的发送消息方法即可,不过要关掉消费端程序
重启RabbitMQ服务器 再次查看消息 发现消息仍然还在
结论 在后台管理界面创建交换机和队列时,默认就是持久化的模式。
此时消息也是持久化的,不需要额外设置。
消费端消息确认 ACK ACK是acknowledge的缩写,表示已确认
默认情况 默认情况下,消费端取回消息后,默认会自动返回ACK确认消息,所以在前面的测试中消息被消费端消费之后,RabbitMQ得到ACK确认信息就会删除消息
但实际开发中,消费端根据消息队列投递的消息执行对应的业务,未必都能执行成功,如果希望能够多次重试,那么默认设定就不满足要求了
所以还是要修改成手动确认
创建消费端module 配置pom.xml 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 3.1.5</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies >
application.yml 增加针对监听器的设置:
1 2 3 4 5 6 7 8 9 10 spring: rabbitmq: host: localhost port: 5672 username: guest password: 123456 virtual-host: / listener: simple: acknowledge-mode: manual
主启动类 1 2 3 4 5 6 7 8 9 import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitMQConsumerApp { public static void main (String[] args) { SpringApplication.run(RabbitMQConsumerApp.class, args); } }
消费端监听器 创建监听器类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 import com.rabbitmq.client.Channel;import org.springframework.amqp.core.Message;import org.springframework.stereotype.Component;@Component public class MyMessageListener { public static final String EXCHANGE_DIRECT = "exchange.direct.order" ; public static final String ROUTING_KEY = "order" ; public static final String QUEUE_NAME = "queue.order" ; public void processMessage (String dataString, Message message, Channel channel) { } }
在接收消息的方法上应用注解 1 2 3 4 5 6 7 8 9 10 11 12 13 14 @RabbitListener( // 设置绑定关系 bindings = @QueueBinding( // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"), // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"), // 配置路由键信息 key = {ROUTING_KEY} )) public void processMessage (String dataString, Message message, Channel channel) {}
接收消息方法内部逻辑 业务处理成功:手动返回ACK信息,表示消息成功消费 业务处理失败:手动返回NACK信息,表示消息消费失败。此时有两种后续操作供选择:把消息重新放回消息队列,RabbitMQ会重新投递这条消息,那么消费端将重新消费这条消息——从而让业务代码再执行一遍 不把消息放回消息队列,返回reject信息表示拒绝,那么这条消息的处理就到此为止 相关API 我们需要引入deliveryTag:交付标签机制
deliveryTag是一个64位整数,消息往消费端投递时,会携带交付标签。
交付标签有啥用? 消费端把消息处理结果ACK、NACK、Reject等返回给Broker之后,Broker需要对对应的消息执行后续操作,例如删除消息、重新排队或标记为死信等等。那么Broker就必须知道它现在要操作的消息具体是哪一条。而deliveryTag作为消息的唯一标识就很好的满足了这个需求。 如果交换机是Fanout模式,同一个消息广播到了不同队列,deliveryTag会重复吗? 不会,deliveryTag在Broker范围内唯一 下面我们探讨的三个方法都是来自于com.rabbitmq.client.Channel 接口
basicAck()方法 方法功能:给Broker返回ACK确认信息,表示消息已经在消费端成功消费,这样Broker就可以把消息删除了 参数列表: 参数名称 含义 long deliveryTag Broker给每一条进入队列的消息都设定一个唯一标识 boolean multiple 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息
basicNack()方法 方法功能:给Broker返回NACK信息,表示消息在消费端消费失败,此时Broker的后续操作取决于参数requeue的值 参数列表: 参数名称 含义 long deliveryTag Broker给每一条进入队列的消息都设定一个唯一标识 boolean multiple 取值为true:为小于、等于deliveryTag的消息批量返回ACK信息 取值为false:仅为指定的deliveryTag返回ACK信息 boolean requeue 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列
basicReject()方法 方法功能:根据指定的deliveryTag,对该消息表示拒绝 参数列表: 参数名称 含义 long deliveryTag Broker给每一条进入队列的消息都设定一个唯一标识 boolean requeue 取值为true:Broker将消息重新放回队列,接下来会重新投递给消费端 取值为false:Broker将消息标记为已消费,不会放回队列
basicNack()和basicReject()有啥区别?basicNack()有批量操作 basicReject()没有批量操作 完整代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.stereotype.Component;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import java.io.IOException;@Component @Slf4j public class MyMessageListener { public static final String EXCHANGE_DIRECT = "exchange.direct.order" ; public static final String ROUTING_KEY = "order" ; public static final String QUEUE_NAME = "queue.order" ; @RabbitListener( // 设置绑定关系 bindings = @QueueBinding( // 配置队列信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 value = @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false"), // 配置交换机信息:durable 设置为 true 表示队列持久化;autoDelete 设置为 false 表示关闭自动删除 exchange = @Exchange(value = EXCHANGE_DIRECT, durable = "true", autoDelete = "false"), // 配置路由键信息 key = {ROUTING_KEY} )) public void processMessage (String dataString, Message message, Channel channel) throws IOException { long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { log.info("消费端接收到消息内容:" + dataString); channel.basicAck(deliveryTag, false ); } catch (Exception e) { Boolean redelivered = message.getMessageProperties().getRedelivered(); if (!redelivered) { channel.basicNack(deliveryTag, false , true ); } else { channel.basicReject(deliveryTag, false ); } } } }
要点总结 要点1:把消息确认模式改为手动确认 要点2:调用Channel对象的方法返回信息ACK:Acknowledgement,表示消息处理成功 NACK:Negative Acknowledgement,表示消息处理失败 Reject:拒绝,同样表示消息处理失败 要点3:后续操作requeue为true:重新放回队列,重新投递,再次尝试 requeue为false:不放回队列,不重新投递 要点4:deliveryTag 消息的唯一标识,查找具体某一条消息的依据 消费端如果设定消息重新放回队列,Broker重新投递消息,那么消费端就可以再次消费消息,这是一种“重试”机制,这需要消费端代码支持“幂等性 ”
Prefetch 思路 生产者发送100个消息 对照两种情况:消费端没有设置prefetch参数:100个消息被全部取回 消费端设置prefetch参数为1:100个消息慢慢取回 生产者端代码 1 2 3 4 5 6 7 8 9 @Test public void testPrefetch () { for (int i = 0 ; i < 100 ; i++) { rabbitTemplate.convertAndSend( EXCHANGE_DIRECT, ROUTING_KEY, "Test prefetch" + i); } }
消费者端代码 1 2 3 4 5 6 @RabbitListener(queues = {QUEUE_NAME}) public void processMessagePrefetch (String dataString, Message message, Channel channel) throws Exception { log.info("消费端接收到消息内容:{}" , dataString); TimeUnit.SECONDS.sleep(1 ); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); }
一次性取完,缓慢ack。
测试 未使用prefetch 不要启动消费端程序,如果正在运行就把它停了 运行生产者端程序发送100条消息 查看队列中消息的情况:
说明:
Ready表示已经发送到队列的消息数量 Unacked表示已经发送到消费端但是消费端尚未返回ACK信息的消息数量 Total未被删除的消息总数 接下来启动消费端程序,再查看队列情况:
能看到消息全部被消费端取走了,正在逐个处理、确认,说明有多少消息消费端就并发处理多少 设定prefetch YAML配置 1 2 3 4 5 6 7 8 9 10 11 spring: rabbitmq: host: localhost port: 5672 username: guest password: 123456 virtual-host: / listener: simple: acknowledge-mode: manual prefetch: 1
Ready缓慢减少,一秒减少一个。
测试流程 停止消费端程序 运行生产者端程序发送100条消息 查看队列中消息的情况:
消息超时 给消息设定一个过期时间,超过这个时间没有被取走的消息就会被删除
我们可以从两个层面来给消息设定过期时间:
队列层面:在队列层面设定消息的过期时间,并不是队列的过期时间。意思是这 个队列中的消息全部使用同一个过期时间。 消息本身:给具体的某个消息设定过期时间 如果两个层面都做了设置,那么哪个时间短,哪个生效
队列层面设置 设置 创建交换机
别忘了设置绑定关系:
测试 不启动消费端程序 向设置了过期时间的队列中发送100条消息 1 2 3 4 5 6 7 8 9 10 11 12 public static final String EXCHANGE_TIMEOUT = "exchange.test.timeout" ;public static final String ROUTING_KEY_TIMEOUT = "routing.key.test.timeout" ;@Test public void testTimeout () { for (int i = 0 ; i < 100 ; i++) { rabbitTemplate.convertAndSend( EXCHANGE_TIMEOUT, ROUTING_KEY_TIMEOUT, "Test timeout" + i); } }
消息层面设置 删除queue-test-timeout,重新创建,只不过这次我们不设置过期时间,记得绑定交换机
设置 1 2 3 4 5 6 7 8 9 10 11 @Test public void testTimeoutQueue () { MessagePostProcessor postProcessor = message -> { message.getMessageProperties().setExpiration("8000" ); return message; }; rabbitTemplate.convertAndSend( EXCHANGE_TIMEOUT, ROUTING_KEY_TIMEOUT, "Test timeout queue" , postProcessor); }
查看效果 这次我们是发送到普通队列上:
死信 概述 概念:当一个消息无法被消费,它就变成了死信。 死信产生的原因大致有下面三种:
拒绝:消费者拒接消息,basicNack()/basicReject(),并且不把消息重新放入原目标队列,requeue=false 溢出:队列中消息数量到达限制。比如队列最大只能存储10条消息,且现在已经存储了10条,此时如果再发送一条消息进来,根据先进先出原则,队列中最早的消息会变成死信 超时:消息到达超时时间未被消费 死信的处理方式大致有下面三种:
丢弃:对不重要的消息直接丢弃,不做处理 入库:把死信写入数据库,日后处理 监听:消息变成死信后进入死信队列,我们专门设置消费端监听死信队列,做后续处理(通常采用) 测试相关准备 创建死信交换机和死信队列 常规设定即可,没有特殊设置:
死信交换机:exchange.dead.letter.video 死信队列:queue.dead.letter.video 死信路由键:routing.key.dead.letter.video 创建正常交换机和正常队列 注意 :一定要注意正常队列有诸多限定和设置,这样才能让无法处理的消息进入死信交换机
正常交换机:exchange.normal.video 正常队列:queue.normal.video 正常路由键:routing.key.normal.video 全部设置完成后参照如下:
相关常量声明 1 2 3 4 5 6 7 8 public static final String EXCHANGE_NORMAL = "exchange.normal.video" ; public static final String EXCHANGE_DEAD_LETTER = "exchange.dead.letter.video" ; public static final String ROUTING_KEY_NORMAL = "routing.key.normal.video" ; public static final String ROUTING_KEY_DEAD_LETTER = "routing.key.dead.letter.video" ; public static final String QUEUE_NORMAL = "queue.normal.video" ; public static final String QUEUE_DEAD_LETTER = "queue.dead.letter.video" ;
消费端拒收消息 发送消息的代码 1 2 3 4 5 6 7 8 @Test public void testSendMessageButReject () { rabbitTemplate .convertAndSend( EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, "测试死信情况1:消息被拒绝" ); }
接收消息的代码 监听正常队列 1 2 3 4 5 6 @RabbitListener(queues = {QUEUE_NORMAL}) public void processMessageNormal (Message message, Channel channel) throws IOException { log.info("★[normal]消息接收到,但我拒绝。" ); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false ); }
监听死信队列 1 2 3 4 5 6 7 @RabbitListener(queues = {QUEUE_DEAD_LETTER}) public void processMessageDead (String dataString, Message message, Channel channel) throws IOException { log.info("★[dead letter]dataString = {}" , dataString); log.info("★[dead letter]我是死信监听方法,我接收到了死信消息" ); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); }
执行结果
消息数量超过队列容纳极限 发送消息的代码 1 2 3 4 5 6 7 8 9 @Test public void testSendMultiMessage () { for (int i = 0 ; i < 20 ; i++) { rabbitTemplate.convertAndSend( EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, "测试死信情况2:消息数量超过队列的最大容量" + i); } }
接收消息的代码 消息接收代码不再拒绝消息:
1 2 3 4 5 6 @RabbitListener(queues = {QUEUE_NORMAL}) public void processMessageNormal (Message message, Channel channel) throws IOException { log.info("★[normal]消息接收到。" ); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); }
重启微服务使代码修改生效。
执行效果 正常队列的参数如下图所示:
生产者发送20条消息之后,消费端死信队列接收到前10条消息:
消息超时未消费 发送消息的代码 正常发送一条消息即可,所以使用第一个例子的代码。
1 2 3 4 5 6 7 8 @Test public void testSendMessageTimeout () { rabbitTemplate .convertAndSend( EXCHANGE_NORMAL, ROUTING_KEY_NORMAL, "测试死信情况3:消息超时" ); }
执行效果 队列参数生效:
因为没有消费端监听程序,所以消息未超时前滞留在队列中:
消息超时后,进入死信队列:
延迟插件 插件简介 插件安装 确定卷映射目录 1 2 3 docker inspect rabbitmq | jq -r '.[0].Mounts[].Source' # 或者 docker inspect --format='{{range .Mounts}}{{.Source}}{{end}}' rabbitmq
运行结果:
下载延迟插件 官方文档说明页地址:https://www.rabbitmq.com/community-plugins.html
下载插件安装文件:
1 2 wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.13.0/rabbitmq_delayed_message_exchange-3.13.0.ez sudo mv rabbitmq_delayed_message_exchange-3.13.0.ez /home/docker/volumes/rabbitmq-plugin/_data # 将/home/docker/volumes/rabbitmq-plugin/_data替换为你的数据卷目录
启用插件 1 2 3 4 5 6 7 8 # 登录进入容器内部 docker exec -it rabbitmq /bin/bash # rabbitmq-plugins命令所在目录已经配置到$PATH 环境变量中了,可以直接调用 rabbitmq-plugins enable rabbitmq_delayed_message_exchange # 退出Docker容器 exit # 重启Docker容器 docker restart rabbitmq
确认 确认点1:查看当前节点已启用插件的列表:
确认点2:如果创建新交换机时可以在type中看到x-delayed-message选项,那就说明插件安装好了
创建交换机 rabbitmq_delayed_message_exchange插件在工作时要求交换机是x-delayed-message 类型才可以,创建方式如下:
关于x-delayed-type 参数的理解:
原本指定交换机类型的地方使用了x-delayed-message这个值,那么这个交换机除了支持延迟消息之外,到底是direct、fanout、topic这些类型中的哪一个呢?
这里就额外使用x-delayed-type来指定交换机本身的类型
创建队列并绑定交换机
代码测试 生产者端代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public static final String EXCHANGE_DELAY = "exchange.test.delay" ;public static final String ROUTING_KEY_DELAY = "routing.key.test.delay" ;@Test public void testSendDelayMessage () { rabbitTemplate.convertAndSend( EXCHANGE_DELAY, ROUTING_KEY_DELAY, "测试基于插件的延迟消息 [" + new SimpleDateFormat ("hh:mm:ss" ).format(new Date ()) + "]" , messageProcessor -> { messageProcessor.getMessageProperties().setHeader("x-delay" , "10000" ); return messageProcessor; }); }
消费者端代码 情况A:资源已创建 1 2 3 4 5 6 7 8 public static final String QUEUE_DELAY = "queue.test.delay" ;@RabbitListener(queues = {QUEUE_DELAY}) public void process (String dataString, Message message, Channel channel) throws IOException { log.info("[生产者]{}" , dataString); log.info("[消费者]{}" , new SimpleDateFormat ("hh:mm:ss" ).format(new Date ())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); }
情况B:资源未创建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.Date; @Component @Slf4j public class MyDelayMessageListener { public static final String EXCHANGE_DELAY = "exchange.test.delay" ; public static final String ROUTING_KEY_DELAY = "routing.key.test.delay" ; public static final String QUEUE_DELAY = "queue.test.delay" ; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = QUEUE_DELAY, durable = "true", autoDelete = "false"), exchange = @Exchange( value = EXCHANGE_DELAY, durable = "true", autoDelete = "false", type = "x-delayed-message", arguments = @Argument(name = "x-delayed-type", value = "direct")), key = {ROUTING_KEY_DELAY} )) public void process (String dataString, Message message, Channel channel) throws IOException { log.info("[生产者]" + dataString); log.info("[消费者]" + new SimpleDateFormat ("hh:mm:ss" ).format(new Date ())); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } }
执行效果 交换机类型
生产者端效果 注意 :使用rabbitmq_delayed_message_exchange插件后,即使消息成功发送到队列上,也会导致returnedMessage()方法执行
消费者端效果 事务消息之生产者端 代码 新建module 引入依赖 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 3.1.5</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies >
yaml配置 1 2 3 4 5 6 7 spring: rabbitmq: host: localhost port: 5672 username: guest password: 123456 virtual-host: /
主启动类 1 2 3 4 5 6 7 8 9 10 11 import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitMQProducerApp { public static void main (String[] args) { SpringApplication.run(RabbitMQProducerApp.class, args); } }
相关配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import lombok.Data;import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration @Data public class RabbitConfig { @Bean public RabbitTransactionManager transactionManager (CachingConnectionFactory connectionFactory) { return new RabbitTransactionManager (connectionFactory); } @Bean public RabbitTemplate rabbitTemplate (CachingConnectionFactory connectionFactory) { RabbitTemplate rabbitTemplate = new RabbitTemplate (connectionFactory); rabbitTemplate.setChannelTransacted(true ); return rabbitTemplate; } }
测试代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import jakarta.annotation.Resource;import lombok.extern.slf4j.Slf4j;import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest @Slf4j public class RabbitMQTest { public static final String EXCHANGE_NAME = "exchange.tx.dragon" ; public static final String ROUTING_KEY = "routing.key.tx.dragon" ; @Resource private RabbitTemplate rabbitTemplate; @Test public void testSendMessageInTx () { rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~01)" ); log.info("do bad:" + 10 / 0 ); rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg ~~~02)" ); } }
执行测试 未使用事务 抛出异常前的消息发送了,抛异常后的消息没有发送:
为了不影响后续操作,我们直接在管理界面这里把这条消息消费掉:
使用事务 说明 因为在junit中给测试方法使用@Transactional注解默认就会回滚,所以回滚操作需要使用@RollBack注解操控
测试提交事务的情况 1 2 3 4 5 6 7 8 9 @Test @Transactional @Rollback(value = false) public void testSendMessageInTx () { rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~01)" ); rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [commit] ~~~02)" ); }
测试回滚事务的情况 1 2 3 4 5 6 7 8 9 10 11 @Test @Transactional @Rollback(value = true) public void testSendMessageInTx () { rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~01)" ); log.info("do bad:" + 10 / 0 ); rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, "I am a dragon(tx msg [rollback] ~~~02)" ); }
总结 在生产者端使用事务消息和消费端没有关系 在生产者端使用事务消息仅仅是控制事务内的消息是否发送 提交事务就把事务内所有消息都发送到交换机 回滚事务则事务内任何消息都不会被发送 惰性队列 创建惰性队列 官网说明
队列可以创建为默认
或惰性
模式,模式指定方式是:
使用队列策略(建议) 设置queue.declare
参数 如果策略和队列参数同时指定,那么队列参数有更高优先级。如果队列模式是在声明时通过可选参数指定的,那么只能通过删除队列再重新创建来修改。
基于策略方式设定 1 2 3 4 5 # 登录Docker容器 docker exec -it rabbitmq /bin/bash # 运行rabbitmqctl命令 rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"lazy"}' --apply-to queues
命令解读:
rabbitmqctl命令所在目录是:/opt/rabbitmq/sbin,该目录已配置到Path环境变量
set_policy是子命令,表示设置策略
Lazy是当前要设置的策略名称,是我们自己自定义的,不是系统定义的
"^lazy-queue$"是用正则表达式限定的队列名称,凡是名称符合这个正则表达式的队列都会应用这里的设置
'{“queue-mode”:“lazy”}'是一个JSON格式的参数设置指定了队列的模式为"lazy"
–-apply-to参数指定该策略将应用于队列(queues)级别
命令执行后,所有名称符合正则表达式的队列都会应用指定策略,包括未来新创建的队列
如果需要修改队列模式可以执行如下命令(不必删除队列再重建):
1 rabbitmqctl set_policy Lazy "^lazy-queue$" '{"queue-mode":"default"}' --apply-to queues
在声明队列时使用参数设定 参数名称:x-queue-mode 可用参数值: 不设置就是取值为default Java代码原生API设置方式:
1 2 3 Map<String, Object> args = new HashMap <String, Object>(); args.put("x-queue-mode" , "lazy" ); channel.queueDeclare("myqueue" , false , false , false , args);
Java代码注解设置方式:
1 2 3 @Queue(value = QUEUE_NAME, durable = "true", autoDelete = "false", arguments = { @Argument(name = "x-queue-mode", value = "lazy") })
实操演练 生产者端代码 配置POM 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 3.1.5</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies >
配置YAML 1 2 3 4 5 6 7 spring: rabbitmq: host: localhost port: 5672 username: guest password: 123456 virtual-host: /
主启动类 1 2 3 4 5 6 7 8 9 10 11 import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitMQLazyProducer { public static void main (String[] args) { SpringApplication.run(RabbitMQLazyProducer.class, args); } }
发送消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest public class RabbitMQTest { public static final String EXCHANGE_LAZY_NAME = "exchange.test.lazy" ; public static final String ROUTING_LAZY_KEY = "routing.key.test.lazy" ; @Resource private RabbitTemplate rabbitTemplate; @Test public void testSendMessage () { rabbitTemplate.convertAndSend(EXCHANGE_LAZY_NAME, ROUTING_LAZY_KEY, "I am a message for test lazy queue." ); } }
消费者端代码 配置POM 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 3.1.5</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies >
配置YAML 1 2 3 4 5 6 7 spring: rabbitmq: host: localhost port: 5672 username: guest password: 123456 virtual-host: /
主启动类 1 2 3 4 5 6 7 8 9 10 11 import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitMQLazyConsumerApp { public static void main (String[] args) { SpringApplication.run(RabbitMQLazyConsumerApp.class, args); } }
监听器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.stereotype.Component;@Component @Slf4j public class MyLazyMessageProcessor { public static final String EXCHANGE_LAZY_NAME = "exchange.test.lazy" ; public static final String ROUTING_LAZY_KEY = "routing.key.test.lazy" ; public static final String QUEUE_LAZY_NAME = "queue.test.lazy" ; @RabbitListener(bindings = @QueueBinding( value = @Queue(value = QUEUE_LAZY_NAME, durable = "true", autoDelete = "false", arguments = { @Argument(name = "x-queue-mode", value = "lazy") }), exchange = @Exchange(value = EXCHANGE_LAZY_NAME, durable = "true", autoDelete = "false"), key = {ROUTING_LAZY_KEY} )) public void processMessageLazy (String data, Message message, Channel channel) { log.info("消费端接收到消息:" + data); } }
测试
总结 使用惰性队列的主要原因之一是支持非常长的队列(数百万条消息)。
由于各种原因,排队可能会变得很长:
消费者离线/崩溃/停机进行维护 突然出现消息进入高峰,生产者的速度超过了消费者 消费者比正常情况慢 优先级队列 创建相关资源 创建交换机 exchange.test.priority
创建队列 queue.test.priority
x-max-priority
队列绑定交换机
生产者发送消息 配置POM 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 3.1.5</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies >
配置YAML 1 2 3 4 5 6 7 spring: rabbitmq: host: localhost port: 5672 username: guest password: 123456 virtual-host: /
主启动类 1 2 3 4 5 6 7 8 9 10 11 import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitMQPriorityProducer { public static void main (String[] args) { SpringApplication.run(RabbitMQPriorityProducer.class, args); } }
发送消息 不要启动消费者程序,让多条不同优先级的消息滞留在队列中 第一次发送优先级为1的消息 第二次发送优先级为2的消息 第三次发送优先级为3的消息 先发送的消息优先级低,后发送的消息优先级高,将来看看消费端是不是先收到优先级高的消息 第一次发送优先级为1的消息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest public class RabbitMQTest { public static final String EXCHANGE_PRIORITY = "exchange.test.priority" ; public static final String ROUTING_KEY_PRIORITY = "routing.key.test.priority" ; @Resource private RabbitTemplate rabbitTemplate; @Test public void testSendMessage () { rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 1." , message->{ message.getMessageProperties().setPriority(1 ); return message; }); } }
第二次发送优先级为2的消息 1 2 3 4 5 6 7 @Test public void testSendMessage () { rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 2." , message->{ message.getMessageProperties().setPriority(2 ); return message; }); }
第三次发送优先级为3的消息 1 2 3 4 5 6 7 @Test public void testSendMessage () { rabbitTemplate.convertAndSend(EXCHANGE_PRIORITY, ROUTING_KEY_PRIORITY, "I am a message with priority 3." , message->{ message.getMessageProperties().setPriority(3 ); return message; }); }
消费端接收消息 配置POM 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 3.1.5</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies >
配置YAML 1 2 3 4 5 6 7 spring: rabbitmq: host: localhost port: 5672 username: guest password: 123456 virtual-host: /
主启动类 1 2 3 4 5 6 7 8 9 10 11 import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication public class RabbitMQPriorityConsumer { public static void main (String[] args) { SpringApplication.run(RabbitMQPriorityConsumer.class, args); } }
监听器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.stereotype.Component;@Slf4j @Component public class MyMessageProcessor { public static final String QUEUE_PRIORITY = "queue.test.priority" ; @RabbitListener(queues = {QUEUE_PRIORITY}) public void processPriorityMessage (String data, Message message, Channel channel) throws IOException { log.info(data); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } }
测试效果 对于已经滞留服务器的消息,只要消费端一启动,就能够收到消息队列的投递,打印效果如下: