Kafka3.7.0:3.SpringBoot集成
HelloWorld
项目搭建
首先新建一个空项目,然后添加一个模块
Kafka相关依赖,不是starter依赖
1 | <dependency> |
配置文件
修改配置文件后缀为.yml,添加如下配置:
1 | spring: |
程序编写
生产者(写入事件)
向hello Topic发送一个Events
1 |
|
编写测试方法
1 |
|
运行测试代码,测试通过即发送成功。
消费者(读取事件)
消费者代码
1 |
|
运行程序,同时重新运行测试代码发送一条消息
可以发现接收到了消息。那么怎么读取之前发的消息呢?
读取之前发的消息
首先,我们需要理解几个概念:
- 生产者Producer
- 消费者Consumer
- 主题Topic
- 分区Partition
- 偏移量Offset
- Kafka中,每个topic可以有一个或多个partition
- 当创建topic时,如果不指定该topic的partition数量,那么默认就是1个partition
- offset是标识每个分区中消息的唯一位置,从0开始
默认情况下,当启动一个新的消费者组时,它会从每个分区的最新偏移量(即该分区中最后一条消息的下一个位置)开始消费。如果希望从第一条消息开始消费,需要将消费者的auto.offset.reset设置为earliest。
在application.yml新增
1 | spring: |
此时只需运行程序即可看到之前发送的两条消息
如果之前已经用相同的消费者组ID消费过该主题,并且Kafka已经保存了该消费者组的偏移量,那么即使你设置了auto.offset.reset=earliest,该设置也不会生效,因为Kafka只会在找不到偏移量时使用这个配置。在这种情况下,你需要手动重置偏移量或使用一个新的消费者组ID
手动重置偏移量:
- 重置到第一条:./kafka-consumer-groups.sh --bootstrap-server <你的kafka-bootstrap-servers> --group <你的consumer-group> --topic <你的topic> --reset-offsets --to-earliest --execute
出现下方错误,说明你的程序正在运行,请关掉
出现下面的界面就说明重置成功了!!!
此时再次运行程序,也可收到之前的消息。
- 重置到最后一条:./kafka-consumer-groups.sh --bootstrap-server <你的kafka-bootstrap-servers> --group <你的consumer-group> --topic <你的topic> --reset-offsets --to-latest --execute
spring-kafka生产者发送消息
生产者客户端向kafka的主题topic中写入事件
使用Message发送消息
在EventProducer类中添加方法
1 | public void sendEvent2() { |
同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。使用Kafka插件查看消息数,发现新增了一个。
使用ProducerRecord发送消息
在EventProducer类中添加方法
1 | public void sendEvent3() { |
同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。
发送指定分区消息
在EventProducer类中添加方法
1 | public void sendEvent4() { |
同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。
使用sendDefault发送默认topic消息
在EventProducer类中添加方法
1 | public void sendEvent5() { |
同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。运行时报错:
喔,原来是没有topic,我们需要在application.yml中配置默认topic。
1 | spring: |
这时运行就成功了。
kafkaTemplate.send(…) 和 kafkaTemplate.sendDefault(…) 的区别?
主要区别是发送消息到Kafka时是否每次都需要指定主题topic
- kafkaTemplate.send(…) 该方法需要明确地指定要发送消息的目标主题topic
- kafkaTemplate.sendDefault() 该方法不需要指定要发送消息的目标主题topic
kafkaTemplate.send(…) 方法适用于需要根据业务逻辑或外部输入动态确定消息目标topic的场景
kafkaTemplate.sendDefault() 方法适用于总是需要将消息发送到特定默认topic的场景;
kafkaTemplate.sendDefault() 是一个便捷方法,它使用配置中指定的默认主题topic来发送消息,如果应用中所有消息都发送到同一个主题时采用该方法非常方便,可以减少代码的重复或满足特定的业务需求
获取生产者消息发送结果
.send()方法和.sendDefault()方法都返回CompletableFuture<SendResult<K, V>>
CompletableFuture 是Java 8中引入的一个类,用于异步编程,它表示一个异步计算的结果,这个特性使得调用者不必等待操作完成就能继续执行其他任务,从而提高了应用程序的响应速度和吞吐量
因为调用 kafkaTemplate.send() 方法发送消息时,Kafka可能需要一些时间来处理该消息(例如:网络延迟、消息序列化、Kafka集群的负载等),如果 send() 方法是同步的,那么发送消息可能会阻塞调用线程,直到消息发送成功或发生错误,这会导致应用程序的性能下降,尤其是在高并发场景下
使用 CompletableFuture,.send() 方法可以立即返回一个表示异步操作结果的未来对象,而不是等待操作完成,这样,调用线程可以继续执行其他任务,而不必等待消息发送完成。当消息发送完成时(无论是成功还是失败),CompletableFuture会相应地更新其状态,并允许我们通过回调、阻塞等方式来获取操作结果
阻塞式获取生产者发送的消息
方法一:调用CompletableFuture的get()方法,同步阻塞等待发送结果
在EventProducer类中添加方法
1 | public void sendEvent6() { |
同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。运行结果:
方法二:使用 thenAccept(), thenApply(), thenRun() 等方法来注册回调函数,回调函数将在 CompletableFuture 完成时被执行
在EventProducer类中添加方法
1 | public void sendEvent7() { |
同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。运行结果:
生产者发送对象消息
先创建一个User类
1 |
|
在EventProducer类中添加方法
1 |
|
同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。运行发现又报错了o(╥﹏╥)o
发现是序列化异常,此时我们只需在application.yml添加序列化配置即可:
1 | spring: |
此时再次发送消息就成功了。
Kafka的核心概念:Replica副本
Replica:副本,为实现备份功能,保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且 Kafka仍然能够继续工作,Kafka提供了副本机制,一个topic的每个分区都有1个或多个副本
Replica副本分为Leader Replica和Follower Replica:
Leader:每个分区多个副本中的“主”副本,生产者发送数据以及消费者消费数据,都是来自leader副本
Follower:每个分区多个副本中的“从”副本,实时从leader副本中同步数据,保持和leader副本数据的同步,leader副本发生故障时,某个follower副本会成为新的leader副本
设置副本个数不能为0,也不能大于节点个数,否则将不能创建Topic
指定topic的分区和副本
方式一:通过Kafka提供的命令行工具在创建topic时指定分区和副本
./kafka-topics.sh --create --topic <主题名> --partitions 3 --replication-factor 1 --bootstrap-server 127.0.0.1:9092
方式二:执行代码时指定分区和副本
kafkaTemplate.send(“topic”, message);
直接使用send()方法发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区有1个副本,也就是有它自己本身的副本,没有额外的副本备份。我们可以在项目中新建一个配置类专门用来初始化topic:
1 |
|
运行程序,可以发现多了一个topic,分区数为5:
可以在配置类中更新topic:
1 | //对topic进行更新 |
运行后发现分区数变为了9
分区数只能增大不能减小
生产者发送消息的分区策略
消息发到哪个分区中?是什么策略?
生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中
默认分配策略:BuiltInPartitioner
有key:Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
没有key:使用随机数 % numPartitions1
2
3
4
5
6
7
8
9
10public void sendEvent9() {
User user = User.builder().id(1001).phone("13723456789").birthday(new Date()).build();
//分区为空,让kafka自己决定把消息发送到哪个分区
kafkaTemplate2.send("hello-topic", null, System.currentTimeMillis(), "k9", user);
}
public void sendEvent10() {
User user = User.builder().id(1001).phone("13723456789").birthday(new Date()).build();
//分区和key为空
kafkaTemplate2.send("hello-topic", user);
}轮询分配策略:RoundRobinPartitioner (接口:Partitioner)
在KafkaConfig配置类中添加:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
private String bootstrapServers;
private String valueSerializer;
/**
* 生产者工厂
*/
public ProducerFactory<String, ?> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
/**
* 生产者配置
*/
private Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class); //轮询分配策略:RoundRobinPartitioner
return props;
}
/**
* kafkaTemplate 覆盖默认的kafkaTemplate
*/
public KafkaTemplate<String, ?> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}自定义分配策略:我们自己定义
新建自定义分区策略类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33public class CustomPartitioner implements Partitioner {
private AtomicInteger nextPartition = new AtomicInteger(0);
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (key == null) {
//轮询策略选择分区
int next = nextPartition.getAndIncrement();
if (next >= numPartitions) {
nextPartition.compareAndSet(next, 0);
}
System.out.println("分区值:" + next);
return next;
} else {
//key不为空,使用默认的分区策略
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
public void close() {
}
public void configure(Map<String, ?> map) {
}
}修改props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);
删除hello-topic,编写测试方法:
1
2
3
4
5
6
void test10() {
for (int i = 0; i < 5; i++) {
eventProducer.sendEvent10();
}
}
生产者发送消息的流程
自定义消息发送拦截器
我们需要实现ProducerInterceptor<K, V>接口:
1 | public class CustomProducerInterceptor implements ProducerInterceptor<String, Object> { |
同时在KafkaConfig配置类的producerConfigs方法中添加:
1 | //添加拦截器 |
运行测试类,结果如下:
获取生产者发送的消息
Payload与Header
新建一个module,配置跟之前一样,将上一个module中的consumer和producer文件夹复制一份过来,修改EventProducer代码:
1 |
|
修改EventConsumer代码:
1 |
|
application.yml
1 | spring: |
测试代码:
1 |
|
运行程序,然后运行测试代码发送消息:
ConsumerRecord获取详细内容
修改EventConsumer:
1 |
|
对象消息
复制之前的model文件夹
添加pom依赖:
1 | <dependency> |
对象JSON工具类:
1 | public class JSONUtils { |
EventProducer新增发送user对象消息:
1 |
|
EventConsumer添加消费方法:
1 |
|
新增测试方法,调用eventProducer.sendEvent2()。
自定义配置
1 | # 自定义配置 |
EventConsumer:
1 |
|
手动消息确认
EventConsumer:
1 |
|
添加手动确认模式:
1 | spring: |
如果注释掉 ack.acknowledge();
,会出现消息多次消费问题:
每次启动都会收到该消息。
默认情况下,Kafka消费者消费消息后会自动发送确认信息给Kafka服务器,表示消息已经被成功消费。但在某些场景下,我们希望在消息处理成功后再发送确认,或者在消息处理失败时选择不发送确认,以便Kafka能够重新发送该消息。
指定消费消息
EventConsumer:
1 |
|
新建KafkaConfig:
初始化5个分区用于测试。
1 |
|
EventProducer:
1 |
|
清空kafka中topic,新建测试方法,发送消息:
运行测试代码前,请将KafkaListener注解所有内容注释掉
打开KafkaListener注释,启动SpringBoot应用程序:
发现只接收到3条消息,为什么呢?
这是因为我们设置的 initialOffset = "3"
,所以会从3开始读消息。
你可以添加如下配置并手动重置偏移量或者使用一个新的消费组ID读取历史消息。
1 | spring: |
一共19条消息:
1 | 接收到事件:User(id=15, phone=1372345678915, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:3 |
批量消费消息
新建一个module,配置和之前的一样。
- 设置application.yml开启批量消费
1 | spring: |
- 编写EventConsumer
1 |
|
- 将上一个module的model,utils,producer文件夹拷贝过来,添加json依赖,修改EventProducer
1 | <dependency> |
1 |
|
- 编写测试方法
1 |
|
- 运行测试类
- 取消EventConsumer的
//@KafkaListener(topics = {"batchTopic"}, groupId = "batchGroup")
的注释,运行主程序类
消费消息时的消息拦截
在消息消费之前,我们可以通过配置拦截器对消息进行拦截,在消息被实际处理之前对其进行一些操作,例如记录日志、修改消息内容或执行一些安全检查等。
新建一个module,配置不变
- 实现kafka的ConsumerInterceptor拦截器接口
1 | /** |
- 修改配置文件
1 | spring: |
- 在Kafka消费者的ConsumerFactory配置中注册这个拦截器
1 |
|
- 监听消息时使用我们的监听器容器工厂Bean
1 |
|
复制上一个module的model,producer,util文件夹
添加json依赖
修改EventProducer
1 |
|
- 启动SpringBoot主程序
- 编写测试类(跟之前一样,以后都不赘述),运行测试方法
消息转发
消息转发就是应用A从TopicA接收到消息,经过处理后转发到TopicB,再由应用B监听接收该消息,即一个应用处理完成后将该消息转发至其他应用处理。
消费者:
1 |
|
生产者:
1 |
|
启动主程序,然后启动测试方法发送消息:
消息消费的分区策略
Kafka消费消息时的分区策略:是指Kafka主题topic中哪些分区应该由哪些消费者来消费
Kafka有多种分区分配策略,默认的分区分配策略是 RangeAssignor
,除了RangeAssignor策略外,Kafka还有其他分区分配策略:
- RoundRobinAssignor
- StickyAssignor
- CooperativeStickyAssignor
这些策略各有特点,可以根据实际的应用场景和需求来选择适合的分区分配策略。
RangeAssignor
Kafka默认的消费分区分配策略:RangeAssignor
RangeAssignor策略是根据消费者组内的消费者数量和主题的分区数量,来均匀地为每个消费者分配分区。
假设:
- 一个主题myTopic有10个分区(p0 - p9)
- 一个消费者组内有3个消费者:consumer1、consumer2、consumer3
RangeAssignor消费分区策略:
- 计算每个消费者应得的分区数:分区总数(10)/ 消费者数量(3)= 3 … 余1
- 每个消费者理论上应该得到3个分区,但由于有余数1,所以前1个消费者会多得到一个分区
- consumer1(作为第一个消费者)将得到 3 + 1 = 4 个分区
- consumer2 和 consumer3 将各得到 3 个分区
- 具体分配:分区编号从0到9,按照编号顺序为消费者分配分区:
- consumer1 将分配得到分区 0、1、2、3
- consumer2 将分配得到分区 4、5、6
- consumer3 将分配得到分区 7、8、9
配置文件:
1 | spring: |
创建10个分区:
1 |
|
创建3个消费者:
1 |
|
生产者:
1 |
|
编写测试程序发送消息:
取消消费者上@KafkaListener的注释,运行主程序进行消费:
RoundRobinAssignor
继续以前面的例子数据,采用RoundRobinAssignor策略进行测试,得到的结果如下:
C1: 0, 3, 6, 9
C2: 1, 4, 7
C3: 2, 5, 8
配置消费策略:
1 |
|
修改消费者:
1 |
|
请先删除myTopic
使用RangeAssignor的测试方法进行测试:
StickyAssignor
StickyAssignor消费分区策略:
尽可能保持消费者与分区之间的分配关系不变,即使消费组的消费者成员发生变化,减少不必要的分区重分配;
尽量保持现有的分区分配不变,仅对新加入的消费者或离开的消费者进行分区调整。这样,大多数消费者可以继续消费它们之前消费的分区,只有少数消费者需要处理额外的分区;所以叫“粘性”分配。
CooperativeStickyAssignor
CooperativeStickyAssignor消费分区策略:
与 StickyAssignor 类似,但增加了对协作式重新平衡的支持,即消费者可以在它离开消费者组之前通知协调器,以便协调器可以预先计划分区迁移,而不是在消费者突然离开时立即进行分区重分配。
实际工作中,推荐使用StickyAssignor或者CooperativeStickyAssignor
Kafka事件(消息、数据)的存储
kafka的所有事件(消息、数据)都存储在/tmp/kafka-logs目录中,可通过log.dirs=/tmp/kafka-logs配置
Kafka的所有事件(消息、数据)都是以日志文件的方式来保存
topic-partition
Kafka一般都是海量的消息数据,为了避免日志文件过大,日志文件被存放在多个日志目录下,日志目录的命名规则为:<topic_name>-<partition_id>
比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0
、firstTopic-1
、firstTopic-2
:
- 00000000000000000000.index 消息索引文件
- 00000000000000000000.log 消息数据文件
- 00000000000000000000.timeindex 消息的时间戳索引文件
- 00000000000000000006.snapshot 快照文件,生产者发生故障或重启时能够恢复并继续之前的操作
- leader-epoch-checkpoint 记录每个分区当前领导者的epoch以及领导者开始写入消息时的起始偏移量
- partition.metadata 存储关于特定分区的元数据(metadata)信息
topic-offset
每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset
在kafka中,有一个__consumer_offsets的topic, 消费者消费提交的offset信息会写入到 该topic中,__consumer_offsets保存了每个consumer group某一时刻提交的offset信息,__consumer_offsets默认有50个分区
consumer_group 保存在哪个分区中的计算公式:
1 | Math.abs("groupid".hashCode())%groupMetadataTopicPartitionCount; |
Offset详解
生产者Offset
生产者发送一条消息到Kafka的broker的某个topic下某个partition中
Kafka内部会为每条消息分配一个唯一的offset,该offset就是该消息在partition中的位置
消费者offset
消费者offset是消费者需要知道自己已经读取到哪个位置了,接下来需要从哪个位置开始继续读取消息。
每个消费者组(Consumer Group)中的消费者都会独立地维护自己的offset,当消费者从某个partition读取消息时,它会记录当前读取到的offset,这样,即使消费者崩溃或重启,它也可以从上次读取的位置继续读取,而不会重复读取或遗漏消息。
消费者offset需要消费消息并提交后才记录offset
每个消费者组启动开始监听消息,默认从消息的最新的位置开始监听消息,即把最新的位置作为消费者offset
- 分区中还没有发送过消息,则最新的位置就是0
- 分区中已经发送过消息,则最新的位置就是生产者offset的下一个位置
消费者消费消息后,如果不提交确认(ack),则offset不更新,提交了才更新
命令行命令:./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group yourGroup --describe
。yourGroup:改为你的group
消费者从什么位置开始消费,就看消费者的offset是多少