集群搭建 安装RabbitMQ 前置要求 CentOS发行版的版本≥CentOS 8 Stream
镜像下载地址:https://mirrors.163.com/centos/8-stream/isos/x86_64/CentOS-Stream-8-20240318.0-x86_64-dvd1.iso
RabbitMQ安装方式官方指南:
安装Erlang环境 创建yum库配置文件 1 vim /etc/yum.repos.d/rabbitmq.repo
加入配置内容 以下内容来自官方文档:https://www.rabbitmq.com/docs/install-rpm
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 [modern-erlang] name =modern-erlang-el8 baseurl =https://yum1.novemberain.com/erlang/el/8/$basearch https ://yum2.novemberain.com/erlang/el/8/$basearch https ://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/$basearch repo_gpgcheck =1 enabled =1 gpgkey =https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key gpgcheck =1 sslverify =1 sslcacert =/etc/pki/tls/certs/ca-bundle.crt metadata_expire =300 pkg_gpgcheck =1 autorefresh =1 type =rpm-md [modern-erlang-noarch] name =modern-erlang-el8-noarch baseurl =https://yum1.novemberain.com/erlang/el/8/noarch https ://yum2.novemberain.com/erlang/el/8/noarch https ://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/noarch repo_gpgcheck =1 enabled =1 gpgkey =https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key https ://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc gpgcheck =1 sslverify =1 sslcacert =/etc/pki/tls/certs/ca-bundle.crt metadata_expire =300 pkg_gpgcheck =1 autorefresh =1 type =rpm-md [modern-erlang-source] name =modern-erlang-el8-source baseurl =https://yum1.novemberain.com/erlang/el/8/SRPMS https ://yum2.novemberain.com/erlang/el/8/SRPMS https ://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/rpm/el/8/SRPMS repo_gpgcheck =1 enabled =1 gpgkey =https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key https ://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc gpgcheck =1 sslverify =1 sslcacert =/etc/pki/tls/certs/ca-bundle.crt metadata_expire =300 pkg_gpgcheck =1 autorefresh =1 [rabbitmq-el8] name =rabbitmq-el8 baseurl =https://yum2.novemberain.com/rabbitmq/el/8/$basearch https ://yum1.novemberain.com/rabbitmq/el/8/$basearch https ://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/$basearch repo_gpgcheck =1 enabled =1 gpgkey =https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key https ://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc gpgcheck =1 sslverify =1 sslcacert =/etc/pki/tls/certs/ca-bundle.crt metadata_expire =300 pkg_gpgcheck =1 autorefresh =1 type =rpm-md [rabbitmq-el8-noarch] name =rabbitmq-el8-noarch baseurl =https://yum2.novemberain.com/rabbitmq/el/8/noarch https ://yum1.novemberain.com/rabbitmq/el/8/noarch https ://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/noarch repo_gpgcheck =1 enabled =1 gpgkey =https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key https ://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc gpgcheck =1 sslverify =1 sslcacert =/etc/pki/tls/certs/ca-bundle.crt metadata_expire =300 pkg_gpgcheck =1 autorefresh =1 type =rpm-md [rabbitmq-el8-source] name =rabbitmq-el8-source baseurl =https://yum2.novemberain.com/rabbitmq/el/8/SRPMS https ://yum1.novemberain.com/rabbitmq/el/8/SRPMS https ://dl.cloudsmith.io/public/rabbitmq/rabbitmq-server/rpm/el/8/SRPMS repo_gpgcheck =1 enabled =1 gpgkey =https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key gpgcheck =0 sslverify =1 sslcacert =/etc/pki/tls/certs/ca-bundle.crt metadata_expire =300 pkg_gpgcheck =1 autorefresh =1 type =rpm-md
更新yum库 –nobest表示所需安装包即使不是最佳选择也接受
正式安装Erlang 安装RabbitMQ 1 2 3 4 5 6 7 8 9 10 # 导入GPG密钥 rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/rabbitmq-release-signing-key.asc' rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-erlang.E495BB49CC4BBE5B.key' rpm --import 'https://github.com/rabbitmq/signing-keys/releases/download/3.0/cloudsmith.rabbitmq-server.9F4587F226208342.key' # 下载 RPM 包 wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.13.0/rabbitmq-server-3.13.0-1.el8.noarch.rpm # 安装 rpm -ivh rabbitmq-server-3.13.0-1.el8.noarch.rpm
RabbitMQ基础配置 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 # 启用管理界面插件 rabbitmq-plugins enable rabbitmq_management # 启动 RabbitMQ 服务: systemctl start rabbitmq-server # 将 RabbitMQ 服务设置为开机自动启动 systemctl enable rabbitmq-server # 新增登录账号密码 rabbitmqctl add_user yuanyuan 123456 # 设置登录账号权限 rabbitmqctl set_user_tags yuanyuan administrator rabbitmqctl set_permissions -p / yuanyuan ".*" ".*" ".*" # 配置所有稳定功能 flag 启用 rabbitmqctl enable_feature_flag all # 重启RabbitMQ服务生效 systemctl restart rabbitmq-server
收尾工作 1 rm -rf /etc/yum.repos.d/rabbitmq.repo
克隆VMWare虚拟机 目标 通过克隆操作,一共准备三台VMWare虚拟机
集群节点名称 虚拟机 IP 地址 node01 192.168.200.100 node02 192.168.200.150 node03 192.168.200.200
克隆虚拟机
给新机设置 IP 地址 在CentOS 7中,可以使用nmcli
命令行工具修改IP地址。以下是具体步骤:
查看网络连接信息: 停止指定的网络连接(将<connection_name>
替换为实际的网络连接名称): 1 nmcli con down <connection_name>
修改IP地址(将<connection_name>
替换为实际的网络连接名称,将<new_ip_address>
替换为新的IP地址,将<subnet_mask>
替换为子网掩码,将<gateway>
替换为网关): 1 2 3 4 # <new_ip_address>/<subnet_mask>这里是 CIDR 表示法 nmcli con mod <connection_name> ipv4.addresses <new_ip_address>/<subnet_mask> nmcli con mod <connection_name> ipv4.gateway <gateway> nmcli con mod <connection_name> ipv4.method manual
启动网络连接: 1 nmcli con up <connection_name>
验证新的IP地址是否生效: 修改主机名称 主机名称会被RabbitMQ作为集群中的节点名称,后面会用到,所以需要设置一下。
修改方式如下:
保险措施 为了在后续操作过程中,万一遇到操作失误,友情建议拍摄快照。
集群节点彼此发现 node01设置 设置 IP 地址到主机名称的映射 修改文件/etc/hosts,追加如下内容:
1 2 3 192.168.200.100 node01 192.168.200.150 node02 192.168.200.200 node03
查看当前RabbitMQ节点的Cookie值并记录 1 2 [root@node01 ~]# cat /var/lib/rabbitmq/.erlang.cookie NOTUPTIZIJONXDWWQPOJ # 你的可能不一样
重置节点应用 1 2 3 rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app
node02设置 设置 IP 地址到主机名称的映射 修改文件/etc/hosts,追加如下内容:
1 2 3 192.168.200.100 node01 192.168.200.150 node02 192.168.200.200 node03
修改当前RabbitMQ节点的Cookie值 node02和node03都改成和node01一样:
1 vim /var/lib/rabbitmq/.erlang.cookie
重置节点应用并加入集群 1 2 3 4 rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@node01 rabbitmqctl start_app
node03设置 设置 IP 地址到主机名称的映射 修改文件/etc/hosts,追加如下内容:
1 2 3 192.168.200.100 node01 192.168.200.150 node02 192.168.200.200 node03
修改当前RabbitMQ节点的Cookie值 node02和node03都改成和node01一样:
1 vim /var/lib/rabbitmq/.erlang.cookie
重置节点应用并加入集群 1 2 3 4 rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@node01 rabbitmqctl start_app
查看集群状态 1 rabbitmqctl cluster_status
附录 如有需要踢出某个节点,则按下面操作执行:
1 2 3 4 5 6 7 # 被踢出的节点: rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl start_app # 节点1 rabbitmqctl forget_cluster_node rabbit@node02
负载均衡:Management UI 说明 其实访问任何一个RabbitMQ实例的管理界面都是对集群操作,所以配置负载均衡通过统一入口访问在我们学习期间就是锦上添花 先给管理界面做负载均衡,然后方便我们在管理界面上创建交换机、队列等操作 安装HAProxy 1 2 3 4 yum install -y haproxy haproxy -v systemctl start haproxy systemctl enable haproxy
修改配置文件 配置文件位置:
/etc/haproxy/haproxy.cfg
在配置文件末尾增加如下内容:
frontend rabbitmq_ui_frontend bind 192.168.200.100:22222 mode http default_backend rabbitmq_ui_backend
backend rabbitmq_ui_backend mode http balance roundrobin option httpchk GET / server rabbitmq_ui1 192.168.200.100:15672 check server rabbitmq_ui2 192.168.200.150:15672 check server rabbitmq_ui3 192.168.200.200:15672 check
设置SELinux策略,允许HAProxy拥有权限连接任意端口:
1 setsebool -P haproxy_connect_any=1
SELinux是Linux系统中的安全模块,它可以限制进程的权限以提高系统的安全性。在某些情况下,SELinux可能会阻止HAProxy绑定指定的端口,这就需要通过设置域(domain)的安全策略来解决此问题。
通过执行setsebool -P haproxy_connect_any=1
命令,您已经为HAProxy设置了一个布尔值,允许HAProxy连接到任意端口。这样,HAProxy就可以成功绑定指定的socket,并正常工作。
重启HAProxy:
1 systemctl restart haproxy
测试效果
负载均衡:核心功能 增加配置 frontend rabbitmq_frontend bind 192.168.200.100:11111 mode tcp default_backend rabbitmq_backend
backend rabbitmq_backend mode tcp balance roundrobin server rabbitmq1 192.168.200.100:5672 check server rabbitmq2 192.168.200.150:5672 check server rabbitmq3 192.168.200.200:5672 check
重启HAProxy服务:
1 systemctl restart haproxy
测试 创建组件 交换机:exchange.cluster.test 队列:queue.cluster.test 路由键:routing.key.cluster.test 创建生产者端程序 [1]配置POM
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 3.1.5</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-test</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies >
[2]主启动类
1 2 3 4 5 6 7 8 9 10 11 import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RabbitMQProducerApp { public static void main (String[] args) { SpringApplication.run(RabbitMQProducerApp.class, args); } }
[3]配置YAML
1 2 3 4 5 6 7 8 9 10 11 12 spring: rabbitmq: host: 192.168 .200 .100 port: 11111 username: yuanyuan password: 123456 virtual-host: / publisher-confirm-type: CORRELATED publisher-returns: true logging: level: blog.yuanyuan.rabbitmq.config.MQProducerAckConfig: info
[4]配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 import jakarta.annotation.PostConstruct;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.ReturnedMessage;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Configuration;@Configuration @Slf4j public class MQProducerAckConfig implements RabbitTemplate .ConfirmCallback, RabbitTemplate.ReturnsCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(this ); rabbitTemplate.setReturnsCallback(this ); } @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { if (ack) { log.info("消息发送到交换机成功!数据:" + correlationData); } else { log.info("消息发送到交换机失败!数据:" + correlationData + " 原因:" + cause); } } @Override public void returnedMessage (ReturnedMessage returned) { log.info("消息主体: " + new String (returned.getMessage().getBody())); log.info("应答码: " + returned.getReplyCode()); log.info("描述:" + returned.getReplyText()); log.info("消息使用的交换器 exchange : " + returned.getExchange()); log.info("消息使用的路由键 routing : " + returned.getRoutingKey()); } }
[5] Junit测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 import jakarta.annotation.Resource;import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest public class RabbitMQTest { @Resource private RabbitTemplate rabbitTemplate; public static final String EXCHANGE_CLUSTER_TEST = "exchange.cluster.test" ; public static final String ROUTING_KEY_CLUSTER_TEST = "routing.key.cluster.test" ; @Test public void testSendMessage () { rabbitTemplate.convertAndSend(EXCHANGE_CLUSTER_TEST, ROUTING_KEY_CLUSTER_TEST, "message test cluster~~~" ); } }
创建消费端程序 [1]配置POM
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <parent > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-parent</artifactId > <version > 3.1.5</version > </parent > <dependencies > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency > <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-web</artifactId > </dependency > <dependency > <groupId > org.projectlombok</groupId > <artifactId > lombok</artifactId > </dependency > </dependencies >
[2]主启动类
1 2 3 4 5 6 7 8 9 10 11 import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class RabbitMQProducerApp { public static void main (String[] args) { SpringApplication.run(RabbitMQProducerApp.class, args); } }
[3]配置YAML
1 2 3 4 5 6 7 8 9 10 11 12 13 spring: rabbitmq: host: 192.168 .200 .100 port: 11111 username: yuanyuan password: 123456 virtual-host: / listener: simple: acknowledge-mode: manual logging: level: blog.yuanyuan.rabbitmq.listener.MyProcessor: info
[4]监听器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;@Component @Slf4j public class MyProcessor { @RabbitListener(queues = {"queue.cluster.test"}) public void processNormalQueueMessage (String data, Message message, Channel channel) throws IOException { log.info("消费端:" + data); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } }
[5]运行效果
镜像队列(被仲裁队列取代) 提出问题 现在我们创建过的队列,它是属于节点1的:
现在我们停掉节点1的rabbit应用:
1 2 # 停止rabbit应用 rabbitmqctl stop_app
再次发送消息:
为了后续操作,再重新启动rabbit应用
创建策略使队列镜像化
创建新的队列 要求 :队列名称必须符合策略中指定的正则表达式
绑定交换机:
测试 节点1关闭rabbit应用
然后就发现两个镜像队列自动分布到了节点2和节点3上:
调整Java代码中的组件名称:
1 2 3 public static final String EXCHANGE_CLUSTER_TEST = "exchange.cluster.test" ;public static final String ROUTING_KEY_MIRROR_TEST = "routing.key.mirror.test" ;public static final String QUEUE_MIRROR_TEST = "mirror.queue.test" ;
仲裁队列 创建仲裁队列 说明 :鉴于仲裁队列的功能,肯定是需要在前面集群的基础上操作!
创建交换机 和仲裁队列绑定的交换机没有特殊,我们还是创建一个direct交换机即可
交换机名称:exchange.quorum.test
创建仲裁队列 队列名称:queue.quorum.test
绑定交换机 路由键:routing.key.quorum.test
测试仲裁队列 常规测试 像使用经典队列一样发送消息、消费消息
生产者端 1 2 3 4 5 6 7 public static final String EXCHANGE_QUORUM_TEST = "exchange.quorum.test" ;public static final String ROUTING_KEY_QUORUM_TEST = "routing.key.quorum.test" ;@Test public void testSendMessageToQuorum () { rabbitTemplate.convertAndSend(EXCHANGE_QUORUM_TEST, ROUTING_KEY_QUORUM_TEST, "message test quorum ~~~" ); }
消费者端 1 2 3 4 5 6 7 public static final String QUEUE_QUORUM_TEST = "queue.quorum.test" ;@RabbitListener(queues = {QUEUE_QUORUM_TEST}) public void quorumMessageProcess (String data, Message message, Channel channel) throws IOException { log.info("消费端:" + data); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); }
高可用测试 停止某个节点的rabbit应用 1 2 # 停止rabbit应用 rabbitmqctl stop_app
查看仲裁队列对应的节点情况
再次发送消息 收发消息仍然正常
Stream Queue Stream队列在目前企业实际应用非常少,真有特定场景需要使用肯定会倾向于使用Kafka,而不是RabbitMQ Stream
启用插件 说明 :只有启用了Stream插件,才能使用流式队列的完整功能
在集群每个节点中依次执行如下操作:
1 2 3 4 5 6 7 8 9 # 启用Stream插件 rabbitmq-plugins enable rabbitmq_stream # 重启rabbit应用 rabbitmqctl stop_app rabbitmqctl start_app # 查看插件状态 rabbitmq-plugins list
负载均衡 在文件/etc/haproxy/haproxy.cfg末尾追加:
1 2 3 4 5 6 7 8 9 10 11 frontend rabbitmq_stream_frontend bind 192.168.200.100:33333 mode tcp default_backend rabbitmq_stream_backend backend rabbitmq_stream_backend mode tcp balance roundrobin server rabbitmq1 192.168.200.100:5552 check server rabbitmq2 192.168.200.150:5552 check server rabbitmq3 192.168.200.200:5552 check
Java代码 引入依赖 Stream 专属 Java 客户端官方网址:https://github.com/rabbitmq/rabbitmq-stream-java-client
Stream 专属 Java 客户端官方文档网址:https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 <dependencies > <dependency > <groupId > com.rabbitmq</groupId > <artifactId > stream-client</artifactId > <version > 0.15.0</version > </dependency > <dependency > <groupId > org.slf4j</groupId > <artifactId > slf4j-api</artifactId > <version > 1.7.30</version > </dependency > <dependency > <groupId > ch.qos.logback</groupId > <artifactId > logback-classic</artifactId > <version > 1.2.3</version > </dependency > </dependencies >
创建Stream 说明 :不需要创建交换机
代码方式创建 1 2 3 4 5 6 7 8 9 10 Environment environment = Environment.builder() .host("192.168.200.100" ) .port(33333 ) .username("yuanyuan" ) .password("123456" ) .build(); environment.streamCreator().stream("stream.yuanyuan.test2" ).create(); environment.close();
ManagementUI创建
生产者端程序 内部机制说明 [1]官方文档
Internally, the Environment
will query the broker to find out about the topology of the stream and will create or re-use a connection to publish to the leader node of the stream.
翻译:
在内部,Environment将查询broker以了解流的拓扑结构,并将创建或重用连接以发布到流的 leader 节点。
[2]解析
在 Environment 中封装的连接信息仅负责连接到 broker Producer 在构建对象时会访问 broker 拉取集群中 Leader 的连接信息 将来实际访问的是集群中的 Leader 节点 Leader 的连接信息格式是:节点名称:端口号 [3]配置
为了让本机的应用程序知道 Leader 节点名称对应的 IP 地址,我们需要在本地配置 hosts 文件,建立从节点名称到 IP 地址的映射关系
示例代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 Environment environment = Environment.builder() .host("192.168.200.100" ) .port(33333 ) .username("yuanyuan" ) .password("123456" ) .build(); Producer producer = environment.producerBuilder() .stream("stream.yuanyuan.test" ) .build(); byte [] messagePayload = "hello rabbit stream" .getBytes(StandardCharsets.UTF_8);CountDownLatch countDownLatch = new CountDownLatch (1 );producer.send( producer.messageBuilder().addData(messagePayload).build(), confirmationStatus -> { if (confirmationStatus.isConfirmed()) { System.out.println("[生产者端]the message made it to the broker" ); } else { System.out.println("[生产者端]the message did not make it to the broker" ); } countDownLatch.countDown(); }); countDownLatch.await(); producer.close(); environment.close();
消费端程序 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 Environment environment = Environment.builder() .host("192.168.200.100" ) .port(33333 ) .username("yuanyuan" ) .password("123456" ) .build(); environment.consumerBuilder() .stream("stream.yuanyuan.test" ) .name("stream.yuanyuan.test.consumer" ) .autoTrackingStrategy() .builder() .messageHandler((offset, message) -> { byte [] bodyAsBinary = message.getBodyAsBinary(); String messageContent = new String (bodyAsBinary); System.out.println("[消费者端]messageContent = " + messageContent + " Offset=" + offset.offset()); }) .build();
指定偏移量消费 偏移量 官方文档说明 The offset is the place in the stream where the consumer starts consuming from. The possible values for the offset parameter are the following:
OffsetSpecification.first() : starting from the first available offset. If the stream has not been truncated , this means the beginning of the stream (offset 0). OffsetSpecification.last() : starting from the end of the stream and returning the last chunk of messages immediately (if the stream is not empty). OffsetSpecification.next() : starting from the next offset to be written. Contrary to OffsetSpecification.last()
, consuming with OffsetSpecification.next()
will not return anything if no-one is publishing to the stream. The broker will start sending messages to the consumer when messages are published to the stream. OffsetSpecification.offset(offset) : starting from the specified offset. 0 means consuming from the beginning of the stream (first messages). The client can also specify any number, for example the offset where it left off in a previous incarnation of the application. OffsetSpecification.timestamp(timestamp) : starting from the messages stored after the specified timestamp. Note consumers can receive messages published a bit before the specified timestamp. Application code can filter out those messages if necessary. 指定Offset消费 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 Environment environment = Environment.builder() .host("192.168.200.100" ) .port(33333 ) .username("yuanyuan" ) .password("123456" ) .build(); CountDownLatch countDownLatch = new CountDownLatch (1 );Consumer consumer = environment.consumerBuilder() .stream("stream.yuanyuan.test" ) .offset(OffsetSpecification.first()) .messageHandler((offset, message) -> { byte [] bodyAsBinary = message.getBodyAsBinary(); String messageContent = new String (bodyAsBinary); System.out.println("[消费者端]messageContent = " + messageContent); countDownLatch.countDown(); }) .build(); countDownLatch.await(); consumer.close();
对比 autoTrackingStrategy 方式:始终监听Stream中的新消息 指定偏移量方式:针对指定偏移量的消息消费之后就停止