集群搭建

安装RabbitMQ

前置要求

CentOS发行版的版本≥CentOS 8 Stream

镜像下载地址:https://mirrors.163.com/centos/8-stream/isos/x86_64/CentOS-Stream-8-20240318.0-x86_64-dvd1.iso

RabbitMQ安装方式官方指南:

image-20240325212425988

安装Erlang环境

  1. 创建yum库配置文件
1
vim /etc/yum.repos.d/rabbitmq.repo
  1. 加入配置内容

以下内容来自官方文档:https://www.rabbitmq.com/docs/install-rpm

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# In /etc/yum.repos.d/rabbitmq.repo

##
## Zero dependency Erlang RPM
##

[modern-erlang]
name=modern-erlang-el8
# uses a Cloudsmith mirror @ yum.novemberain.com in addition to its Cloudsmith upstream.
# Unlike Cloudsmith, the mirror does not have any traffic quotas
baseurl=https://yum1.novemberain.com/erlang/el/8/$basearch
https://yum2.novemberain.com/erlang/el/8/$basearch
https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/$basearch
repo_gpgcheck=1
enabled=1
gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key
gpgcheck=1
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
pkg_gpgcheck=1
autorefresh=1
type=rpm-md

[modern-erlang-noarch]
name=modern-erlang-el8-noarch
# uses a Cloudsmith mirror @ yum.novemberain.com.
# Unlike Cloudsmith, it does not have any traffic quotas
baseurl=https://yum1.novemberain.com/erlang/el/8/noarch
https://yum2.novemberain.com/erlang/el/8/noarch
https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/noarch
repo_gpgcheck=1
enabled=1
gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key
https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
gpgcheck=1
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
pkg_gpgcheck=1
autorefresh=1
type=rpm-md

[modern-erlang-source]
name=modern-erlang-el8-source
# uses a Cloudsmith mirror @ yum.novemberain.com.
# Unlike Cloudsmith, it does not have any traffic quotas
baseurl=https://yum1.novemberain.com/erlang/el/8/SRPMS
https://yum2.novemberain.com/erlang/el/8/SRPMS
https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/SRPMS
repo_gpgcheck=1
enabled=1
gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key
https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
gpgcheck=1
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
pkg_gpgcheck=1
autorefresh=1


##
## RabbitMQ Server
##

[rabbitmq-el8]
name=rabbitmq-el8
baseurl=https://yum2.novemberain.com/rabbitmq/el/8/$basearch
https://yum1.novemberain.com/rabbitmq/el/8/$basearch
https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/$basearch
repo_gpgcheck=1
enabled=1
# Cloudsmith's repository key and RabbitMQ package signing key
gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key
https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
gpgcheck=1
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
pkg_gpgcheck=1
autorefresh=1
type=rpm-md

[rabbitmq-el8-noarch]
name=rabbitmq-el8-noarch
baseurl=https://yum2.novemberain.com/rabbitmq/el/8/noarch
https://yum1.novemberain.com/rabbitmq/el/8/noarch
https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/noarch
repo_gpgcheck=1
enabled=1
# Cloudsmith's repository key and RabbitMQ package signing key
gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key
https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc
gpgcheck=1
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
pkg_gpgcheck=1
autorefresh=1
type=rpm-md

[rabbitmq-el8-source]
name=rabbitmq-el8-source
baseurl=https://yum2.novemberain.com/rabbitmq/el/8/SRPMS
https://yum1.novemberain.com/rabbitmq/el/8/SRPMS
https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/SRPMS
repo_gpgcheck=1
enabled=1
gpgkey=https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key
gpgcheck=0
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300
pkg_gpgcheck=1
autorefresh=1
type=rpm-md
  1. 更新yum库

–nobest表示所需安装包即使不是最佳选择也接受

1
yum update -y --nobest
  1. 正式安装Erlang
1
yum install -y erlang

安装RabbitMQ

1
2
3
4
5
6
7
8
9
10
# 导入GPG密钥
rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc'
rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key'
rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key'

# 下载 RPM 包
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server-3.13.0-1.el8.noarch.rpm

# 安装
rpm -ivh rabbitmq-server-3.13.0-1.el8.noarch.rpm

RabbitMQ基础配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# 启用管理界面插件
rabbitmq-plugins enable rabbitmq_management

# 启动 RabbitMQ 服务:
systemctl start rabbitmq-server

# 将 RabbitMQ 服务设置为开机自动启动
systemctl enable rabbitmq-server

# 新增登录账号密码
rabbitmqctl add_user yuanyuan 123456

# 设置登录账号权限
rabbitmqctl set_user_tags yuanyuan administrator
rabbitmqctl set_permissions -p / yuanyuan ".*" ".*" ".*"

# 配置所有稳定功能 flag 启用
rabbitmqctl enable_feature_flag all

# 重启RabbitMQ服务生效
systemctl restart rabbitmq-server

收尾工作

1
rm -rf /etc/yum.repos.d/rabbitmq.repo

克隆VMWare虚拟机

目标

通过克隆操作,一共准备三台VMWare虚拟机

集群节点名称虚拟机 IP 地址
node01192.168.200.100
node02192.168.200.150
node03192.168.200.200

克隆虚拟机

image-20240325161211800

image-20240325161345886

image-20240325161409158

image-20240325161446094

image-20240325161659993

给新机设置 IP 地址

在CentOS 7中,可以使用nmcli命令行工具修改IP地址。以下是具体步骤:

  1. 查看网络连接信息:
1
nmcli con show
  1. 停止指定的网络连接(将<connection_name>替换为实际的网络连接名称):
1
nmcli con down <connection_name>
  1. 修改IP地址(将<connection_name>替换为实际的网络连接名称,将<new_ip_address>替换为新的IP地址,将<subnet_mask>替换为子网掩码,将<gateway>替换为网关):
1
2
3
4
# <new_ip_address>/<subnet_mask>这里是 CIDR 表示法
nmcli con mod <connection_name> ipv4.addresses <new_ip_address>/<subnet_mask>
nmcli con mod <connection_name> ipv4.gateway <gateway>
nmcli con mod <connection_name> ipv4.method manual
  1. 启动网络连接:
1
nmcli con up <connection_name>
  1. 验证新的IP地址是否生效:
1
ip addr show

修改主机名称

主机名称会被RabbitMQ作为集群中的节点名称,后面会用到,所以需要设置一下。

修改方式如下:

1
vim /etc/hostname

保险措施

为了在后续操作过程中,万一遇到操作失误,友情建议拍摄快照。

集群节点彼此发现

node01设置

  1. 设置 IP 地址到主机名称的映射

修改文件/etc/hosts,追加如下内容:

1
2
3
192.168.200.100 node01
192.168.200.150 node02
192.168.200.200 node03
  1. 查看当前RabbitMQ节点的Cookie值并记录
1
2
[root@node01 ~]# cat /var/lib/rabbitmq/.erlang.cookie 
NOTUPTIZIJONXDWWQPOJ # 你的可能不一样
  1. 重置节点应用
1
2
3
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

node02设置

  1. 设置 IP 地址到主机名称的映射

修改文件/etc/hosts,追加如下内容:

1
2
3
192.168.200.100 node01
192.168.200.150 node02
192.168.200.200 node03
  1. 修改当前RabbitMQ节点的Cookie值

node02和node03都改成和node01一样:

1
vim /var/lib/rabbitmq/.erlang.cookie
  1. 重置节点应用并加入集群
1
2
3
4
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node01
rabbitmqctl start_app

node03设置

  1. 设置 IP 地址到主机名称的映射

修改文件/etc/hosts,追加如下内容:

1
2
3
192.168.200.100 node01
192.168.200.150 node02
192.168.200.200 node03
  1. 修改当前RabbitMQ节点的Cookie值

node02和node03都改成和node01一样:

1
vim /var/lib/rabbitmq/.erlang.cookie
  1. 重置节点应用并加入集群
1
2
3
4
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl join_cluster rabbit@node01
rabbitmqctl start_app
  1. 查看集群状态
1
rabbitmqctl cluster_status

image-20240325214047841

附录

如有需要踢出某个节点,则按下面操作执行:

1
2
3
4
5
6
7
# 被踢出的节点:
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app

# 节点1
rabbitmqctl forget_cluster_node rabbit@node02

负载均衡:Management UI

说明

  • 其实访问任何一个RabbitMQ实例的管理界面都是对集群操作,所以配置负载均衡通过统一入口访问在我们学习期间就是锦上添花
  • 先给管理界面做负载均衡,然后方便我们在管理界面上创建交换机、队列等操作

安装HAProxy

1
2
3
4
yum install -y haproxy
haproxy -v
systemctl start haproxy
systemctl enable haproxy

image-20231205145435058

修改配置文件

配置文件位置:

/etc/haproxy/haproxy.cfg

在配置文件末尾增加如下内容:

frontend rabbitmq_ui_frontend
bind 192.168.200.100:22222
mode http
default_backend rabbitmq_ui_backend

backend rabbitmq_ui_backend
mode http
balance roundrobin
option httpchk GET /
server rabbitmq_ui1 192.168.200.100:15672 check
server rabbitmq_ui2 192.168.200.150:15672 check
server rabbitmq_ui3 192.168.200.200:15672 check

设置SELinux策略,允许HAProxy拥有权限连接任意端口:

1
setsebool -P haproxy_connect_any=1

SELinux是Linux系统中的安全模块,它可以限制进程的权限以提高系统的安全性。在某些情况下,SELinux可能会阻止HAProxy绑定指定的端口,这就需要通过设置域(domain)的安全策略来解决此问题。

通过执行setsebool -P haproxy_connect_any=1命令,您已经为HAProxy设置了一个布尔值,允许HAProxy连接到任意端口。这样,HAProxy就可以成功绑定指定的socket,并正常工作。

重启HAProxy:

1
systemctl restart haproxy

测试效果

image-20231205203815753

负载均衡:核心功能

增加配置

frontend rabbitmq_frontend
bind 192.168.200.100:11111
mode tcp
default_backend rabbitmq_backend

backend rabbitmq_backend
mode tcp
balance roundrobin
server rabbitmq1 192.168.200.100:5672 check
server rabbitmq2 192.168.200.150:5672 check
server rabbitmq3 192.168.200.200:5672 check

重启HAProxy服务:

1
systemctl restart haproxy

测试

  1. 创建组件
  • 交换机:exchange.cluster.test
  • 队列:queue.cluster.test
  • 路由键:routing.key.cluster.test
  1. 创建生产者端程序

[1]配置POM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

[2]主启动类

1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQProducerApp {

public static void main(String[] args) {
SpringApplication.run(RabbitMQProducerApp.class, args);
}

}

[3]配置YAML

1
2
3
4
5
6
7
8
9
10
11
12
spring:
rabbitmq:
host: 192.168.200.100
port: 11111
username: yuanyuan
password: 123456
virtual-host: /
publisher-confirm-type: CORRELATED # 交换机的确认
publisher-returns: true # 队列的确认
logging:
level:
blog.yuanyuan.rabbitmq.config.MQProducerAckConfig: info

[4]配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;

@Configuration
@Slf4j
public class MQProducerAckConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{

@Autowired
private RabbitTemplate rabbitTemplate;

@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}

@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
log.info("消息发送到交换机成功!数据:" + correlationData);
} else {
log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause);
}
}

@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息主体: " + new String(returned.getMessage().getBody()));
log.info("应答码: " + returned.getReplyCode());
log.info("描述:" + returned.getReplyText());
log.info("消息使用的交换器 exchange : " + returned.getExchange());
log.info("消息使用的路由键 routing : " + returned.getRoutingKey());
}
}

[5] Junit测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;

@SpringBootTest
public class RabbitMQTest {

@Resource
private RabbitTemplate rabbitTemplate;

public static final String EXCHANGE_CLUSTER_TEST = "exchange.cluster.test";
public static final String ROUTING_KEY_CLUSTER_TEST = "routing.key.cluster.test";

@Test
public void testSendMessage() {
rabbitTemplate.convertAndSend(EXCHANGE_CLUSTER_TEST, ROUTING_KEY_CLUSTER_TEST, "message test cluster~~~");
}

}

image-20231205221122749

image-20231205212739539

  1. 创建消费端程序

[1]配置POM

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.1.5</version>
</parent>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
</dependencies>

[2]主启动类

1
2
3
4
5
6
7
8
9
10
11
import org.springframework.boot.SpringApplication;  
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMQProducerApp {

public static void main(String[] args) {
SpringApplication.run(RabbitMQProducerApp.class, args);
}

}

[3]配置YAML

1
2
3
4
5
6
7
8
9
10
11
12
13
spring:
rabbitmq:
host: 192.168.200.100
port: 11111
username: yuanyuan
password: 123456
virtual-host: /
listener:
simple:
acknowledge-mode: manual
logging:
level:
blog.yuanyuan.rabbitmq.listener.MyProcessor: info

[4]监听器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class MyProcessor {

@RabbitListener(queues = {"queue.cluster.test"})
public void processNormalQueueMessage(String data, Message message, Channel channel)
throws IOException {
log.info("消费端:" + data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

[5]运行效果

image-20231205223533138

镜像队列(被仲裁队列取代)

提出问题

现在我们创建过的队列,它是属于节点1的:

image-20231205225033524

现在我们停掉节点1的rabbit应用:

1
2
# 停止rabbit应用
rabbitmqctl stop_app

image-20231205225933246

image-20231205231152119

再次发送消息:

image-20231205230041784

为了后续操作,再重新启动rabbit应用

1
rabbitmqctl start_app

创建策略使队列镜像化

image-20231206002833874

image-20231206002859324

image-20231206003534880

image-20231206003556037

创建新的队列

要求:队列名称必须符合策略中指定的正则表达式

image-20231206004507667

image-20231206004549707

image-20231206004648018

绑定交换机:

image-20231206013509788

测试

节点1关闭rabbit应用

image-20231206012159111

然后就发现两个镜像队列自动分布到了节点2和节点3上:

image-20231206012307700

调整Java代码中的组件名称:

1
2
3
public static final String EXCHANGE_CLUSTER_TEST = "exchange.cluster.test";
public static final String ROUTING_KEY_MIRROR_TEST = "routing.key.mirror.test";
public static final String QUEUE_MIRROR_TEST = "mirror.queue.test";

image-20231206015533314

image-20231206015618657

仲裁队列

创建仲裁队列

说明:鉴于仲裁队列的功能,肯定是需要在前面集群的基础上操作!

创建交换机

和仲裁队列绑定的交换机没有特殊,我们还是创建一个direct交换机即可

交换机名称:exchange.quorum.test

image-20231206164511238

创建仲裁队列

队列名称:queue.quorum.test

image-20231206164838398

image-20231206165113573

绑定交换机

路由键:routing.key.quorum.test

image-20231206164951831

测试仲裁队列

常规测试

像使用经典队列一样发送消息、消费消息

  1. 生产者端
1
2
3
4
5
6
7
public static final String EXCHANGE_QUORUM_TEST = "exchange.quorum.test";
public static final String ROUTING_KEY_QUORUM_TEST = "routing.key.quorum.test";

@Test
public void testSendMessageToQuorum() {
rabbitTemplate.convertAndSend(EXCHANGE_QUORUM_TEST, ROUTING_KEY_QUORUM_TEST, "message test quorum ~~~");
}

image-20231206170401658

  1. 消费者端
1
2
3
4
5
6
7
public static final String QUEUE_QUORUM_TEST = "queue.quorum.test";

@RabbitListener(queues = {QUEUE_QUORUM_TEST})
public void quorumMessageProcess(String data, Message message, Channel channel) throws IOException {
log.info("消费端:" + data);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

image-20231206170424609

高可用测试

  1. 停止某个节点的rabbit应用
1
2
# 停止rabbit应用
rabbitmqctl stop_app
  1. 查看仲裁队列对应的节点情况

image-20231206170906222

  1. 再次发送消息

收发消息仍然正常

Stream Queue

Stream队列在目前企业实际应用非常少,真有特定场景需要使用肯定会倾向于使用Kafka,而不是RabbitMQ Stream

启用插件

说明:只有启用了Stream插件,才能使用流式队列的完整功能

在集群每个节点中依次执行如下操作:

1
2
3
4
5
6
7
8
9
# 启用Stream插件
rabbitmq-plugins enable rabbitmq_stream

# 重启rabbit应用
rabbitmqctl stop_app
rabbitmqctl start_app

# 查看插件状态
rabbitmq-plugins list

image-20240326140914228

负载均衡

在文件/etc/haproxy/haproxy.cfg末尾追加:

1
2
3
4
5
6
7
8
9
10
11
frontend rabbitmq_stream_frontend
bind 192.168.200.100:33333
mode tcp
default_backend rabbitmq_stream_backend

backend rabbitmq_stream_backend
mode tcp
balance roundrobin
server rabbitmq1 192.168.200.100:5552 check
server rabbitmq2 192.168.200.150:5552 check
server rabbitmq3 192.168.200.200:5552 check

Java代码

引入依赖

Stream 专属 Java 客户端官方网址:https://github.com/rabbitmq/rabbitmq-stream-java-client

Stream 专属 Java 客户端官方文档网址:https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>stream-client</artifactId>
<version>0.15.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
</dependency>
</dependencies>

创建Stream

说明:不需要创建交换机

  1. 代码方式创建
1
2
3
4
5
6
7
8
9
10
Environment environment = Environment.builder()
.host("192.168.200.100")
.port(33333)
.username("yuanyuan")
.password("123456")
.build();

environment.streamCreator().stream("stream.yuanyuan.test2").create();

environment.close();
  1. ManagementUI创建

image-20240830202833750

生产者端程序

  1. 内部机制说明

[1]官方文档

Internally, the Environment will query the broker to find out about the topology of the stream and will create or re-use a connection to publish to the leader node of the stream.

翻译:

在内部,Environment将查询broker以了解流的拓扑结构,并将创建或重用连接以发布到流的 leader 节点。

[2]解析

  • 在 Environment 中封装的连接信息仅负责连接到 broker
  • Producer 在构建对象时会访问 broker 拉取集群中 Leader 的连接信息
  • 将来实际访问的是集群中的 Leader 节点
  • Leader 的连接信息格式是:节点名称:端口号

[3]配置

为了让本机的应用程序知道 Leader 节点名称对应的 IP 地址,我们需要在本地配置 hosts 文件,建立从节点名称到 IP 地址的映射关系

image-20240326145508656

  1. 示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
Environment environment = Environment.builder()
.host("192.168.200.100")
.port(33333)
.username("yuanyuan")
.password("123456")
.build();

Producer producer = environment.producerBuilder()
.stream("stream.yuanyuan.test")
.build();

byte[] messagePayload = "hello rabbit stream".getBytes(StandardCharsets.UTF_8);

CountDownLatch countDownLatch = new CountDownLatch(1);

producer.send(
producer.messageBuilder().addData(messagePayload).build(),
confirmationStatus -> {
if (confirmationStatus.isConfirmed()) {
System.out.println("[生产者端]the message made it to the broker");
} else {
System.out.println("[生产者端]the message did not make it to the broker");
}

countDownLatch.countDown();
});

countDownLatch.await();

producer.close();

environment.close();

消费端程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Environment environment = Environment.builder()
.host("192.168.200.100")
.port(33333)
.username("yuanyuan")
.password("123456")
.build();

environment.consumerBuilder()
.stream("stream.yuanyuan.test")
.name("stream.yuanyuan.test.consumer")
.autoTrackingStrategy()
.builder()
.messageHandler((offset, message) -> {
byte[] bodyAsBinary = message.getBodyAsBinary();
String messageContent = new String(bodyAsBinary);
System.out.println("[消费者端]messageContent = " + messageContent + " Offset=" + offset.offset());
})
.build();

指定偏移量消费

偏移量

官方文档说明

The offset is the place in the stream where the consumer starts consuming from. The possible values for the offset parameter are the following:

  • OffsetSpecification.first(): starting from the first available offset. If the stream has not been truncated, this means the beginning of the stream (offset 0).
  • OffsetSpecification.last(): starting from the end of the stream and returning the last chunk of messages immediately (if the stream is not empty).
  • OffsetSpecification.next(): starting from the next offset to be written. Contrary to OffsetSpecification.last(), consuming with OffsetSpecification.next() will not return anything if no-one is publishing to the stream. The broker will start sending messages to the consumer when messages are published to the stream.
  • OffsetSpecification.offset(offset): starting from the specified offset. 0 means consuming from the beginning of the stream (first messages). The client can also specify any number, for example the offset where it left off in a previous incarnation of the application.
  • OffsetSpecification.timestamp(timestamp): starting from the messages stored after the specified timestamp. Note consumers can receive messages published a bit before the specified timestamp. Application code can filter out those messages if necessary.

指定Offset消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Environment environment = Environment.builder()
.host("192.168.200.100")
.port(33333)
.username("yuanyuan")
.password("123456")
.build();

CountDownLatch countDownLatch = new CountDownLatch(1);

Consumer consumer = environment.consumerBuilder()
.stream("stream.yuanyuan.test")
.offset(OffsetSpecification.first())
.messageHandler((offset, message) -> {
byte[] bodyAsBinary = message.getBodyAsBinary();
String messageContent = new String(bodyAsBinary);
System.out.println("[消费者端]messageContent = " + messageContent);
countDownLatch.countDown();
})
.build();

countDownLatch.await();

consumer.close();

对比

  • autoTrackingStrategy 方式:始终监听Stream中的新消息
  • 指定偏移量方式:针对指定偏移量的消息消费之后就停止