HelloRabbitMQ
目标
生产者发送消息,消费者接收消息,用最简单的方式实现
官网说明参见下面超链接:
RabbitMQ tutorial - “Hello World!” — RabbitMQ
具体操作
创建Java工程
- 创建一个空项目
- 新建一个模块
- 添加依赖
1 2 3 4 5 6 7
| <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.20.0</version> </dependency> </dependencies>
|
发送消息
- Java代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("guest"); connectionFactory.setPassword("123456"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("simple_queue", true, false, false, null); String message = "你好;RabbitMQ!"; channel.basicPublish("", "simple_queue", null, message.getBytes()); System.out.println("已发送消息:" + message); channel.close(); connection.close(); } }
|
- 运行程序
- 查看效果
接收消息
- Java代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| import com.rabbitmq.client.*; import java.io.IOException;
public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("simple_queue",true,false,false,null); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("consumerTag:"+consumerTag); System.out.println("Exchange:"+envelope.getExchange()); System.out.println("RoutingKey:"+envelope.getRoutingKey()); System.out.println("properties:"+properties); System.out.println("body:"+new String(body)); } }; channel.basicConsume("simple_queue",true,consumer); } }
|
- 运行程序
- 查看后台管理界面
因为消息被消费掉了,所以RabbitMQ服务器上没有了,即消息数量为0:
工作模式
RabbitMQ有7种工作模式:
下面一一进行讲解
工作队列(Work Queues)模式
概述
本质上我们刚刚写的HelloWorld程序就是这种模式,只是简化到了最简单的情况:
- 生产者只有一个
- 发送一个消息
- 消费者也只有一个,消息也只能被这个消费者消费
所以HelloWorld也称为简单模式。
现在我们还原一下常规情况:
生产者代码
封装工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil { public static final String HOST_ADDRESS = "localhost"; public static Connection getConnection() throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_ADDRESS); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("123456"); Connection connection = factory.newConnection(); return connection; } }
|
编写代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| import blog.yuanyuan.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class Producer { public static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); for (int i = 1; i <= 10; i++) { String body = i + "hello rabbitmq!!!"; channel.basicPublish("", QUEUE_NAME, null, body.getBytes()); } channel.close(); connection.close(); } }
|
发送消息效果
消费者代码
编写代码
创建Consumer1和Consumer2。Consumer2只是类名和打印提示不同,代码完全一样。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| import blog.yuanyuan.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException;
public class Consumer1 { static final String QUEUE_NAME = "work_queue"; public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("Consumer1 body:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
|
运行的时候先启动两个消费端程序,然后再启动生产者端程序。如果已经运行过生产者程序,则手动把work_queue队列删掉。
运行效果
最终两个消费端程序竞争结果如下:
结论
- 多个消费者监听同一个队列,则各消费者之间对同一个消息是竞争的关系。
- Work Queues工作模式适用于任务较重或任务较多的情况,多消费者分摊任务可以提高消息处理的效率。
发布订阅(Publish/Subscribe)模式
概述
引入新角色:交换机。这时生产者不是把消息直接发送到队列,而是发送到交换机。交换机接收消息,而如何处理消息取决于交换机的类型。
交换机有如下3种常见类型:
- Fanout:广播,将消息发送给所有绑定到交换机的队列
- Direct:定向,把消息交给符合指定routing key的队列
- Topic:通配符,把消息交给符合routing pattern(路由模式)的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
模式说明
组件之间关系:
工作机制:消息发送到交换机上,就会以广播的形式发送给所有已绑定队列
理解概念:
- Publish:发布,这里就是把消息发送到交换机上
- Subscribe:订阅,这里只要把队列和交换机绑定,事实上就形成了一种订阅关系
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| import blog.yuanyuan.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_fanout"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null); String queue1Name = "test_fanout_queue1"; String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null); channel.queueBind(queue1Name, exchangeName, ""); channel.queueBind(queue2Name, exchangeName, ""); String body = "日志信息:张三调用了findAll方法...日志级别:info..."; channel.basicPublish(exchangeName, "", null, body.getBytes()); channel.close(); connection.close(); } }
|
消费者代码
消费者1号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import blog.yuanyuan.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_fanout_queue1"; channel.queueDeclare(queue1Name, true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); System.out.println("队列 1 消费者 1 将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name, true, consumer); } }
|
消费者2号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import blog.yuanyuan.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_fanout_queue2"; channel.queueDeclare(queue2Name, true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); System.out.println("队列 2 消费者 2 将日志信息打印到控制台....."); } }; channel.basicConsume(queue2Name, true, consumer); } }
|
运行效果
还是先启动2个消费者,然后再运行生产者程序发送消息:
小结
查看交换机
交换机和队列的绑定关系如下图所示:
交换机需要与队列进行绑定,绑定之后;一个消息可以被多个消费者都收到。
发布订阅模式与工作队列模式的区别:
- 工作队列模式本质上是绑定默认交换机
- 发布订阅模式绑定指定交换机
- 监听同一个队列的消费端程序彼此之间是竞争关系
- 绑定同一个交换机的多个队列在发布订阅模式下,消息是广播的,每个队列都能接收到消息
路由(Routing)模式
概述
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| import blog.yuanyuan.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class Producer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_direct"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null); String queue1Name = "test_direct_queue1"; String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null); channel.queueBind(queue1Name, exchangeName, "error"); channel.queueBind(queue2Name, exchangeName, "info"); channel.queueBind(queue2Name, exchangeName, "error"); channel.queueBind(queue2Name, exchangeName, "warning"); String message = "日志信息:张三调用了delete方法.错误了,日志级别warning"; channel.basicPublish(exchangeName, "warning", null, message.getBytes()); System.out.println(message); channel.close(); connection.close(); } }
|
消费者代码
消费者1号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import blog.yuanyuan.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue1Name = "test_direct_queue1"; channel.queueDeclare(queue1Name, true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); System.out.println("Consumer1 将日志信息打印到控制台....."); } }; channel.basicConsume(queue1Name, true, consumer); } }
|
消费者2号
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String queue2Name = "test_direct_queue2"; channel.queueDeclare(queue2Name, true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("body:" + new String(body)); System.out.println("Consumer2 将日志信息存储到数据库....."); } }; channel.basicConsume(queue2Name, true, consumer); } }
|
运行结果
消费消息
绑定消息
如果一个交换机通过相同的routing key绑定了多个队列,就会有广播效果
主题(Topics)模式
概述
Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key的时候使用通配符。
Routingkey一般都是由一个或多个单词组成,多个单词之间以“.”分割,例如:item.insert
。
通配符规则:
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import blog.yuanyuan.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection;
public class Producer {
public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String exchangeName = "test_topic"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null); String queue1Name = "test_topic_queue1"; String queue2Name = "test_topic_queue2"; channel.queueDeclare(queue1Name, true, false, false, null); channel.queueDeclare(queue2Name, true, false, false, null); channel.queueBind(queue1Name, exchangeName, "#.error"); channel.queueBind(queue1Name, exchangeName, "order.*"); channel.queueBind(queue2Name, exchangeName, "*.*"); String body = "[所在系统:order][日志级别:info][日志内容:订单生成,保存成功]"; channel.basicPublish(exchangeName, "order.info", null, body.getBytes()); body = "[所在系统:goods][日志级别:info][日志内容:商品发布成功]"; channel.basicPublish(exchangeName, "goods.info", null, body.getBytes()); body = "[所在系统:goods][日志级别:error][日志内容:商品发布失败]"; channel.basicPublish(exchangeName, "goods.error", null, body.getBytes()); channel.close(); connection.close(); } }
|
消费者代码
消费者1号
消费者1监听队列1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import blog.yuanyuan.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String QUEUE_NAME = "test_topic_queue1"; channel.queueDeclare(QUEUE_NAME, true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println("body:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
|
消费者2号
消费者2监听队列2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| import blog.yuanyuan.rabbitmq.util.ConnectionUtil; import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); String QUEUE_NAME = "test_topic_queue2"; channel.queueDeclare(QUEUE_NAME, true, false, false, null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { System.out.println("body:" + new String(body)); } }; channel.basicConsume(QUEUE_NAME, true, consumer); } }
|
运行效果
绑定关系
交换机:
test_topic_queue1:
test_topic_queue2:
RPC
远程过程调用,本质上是同步调用,和我们使用OpenFeign调用远程接口一样,这不是典型的消息队列工作方式。
Publisher Confirms
发送端消息确认,是我们在进阶篇要探讨的『消息可靠性投递』的一部分。