HelloWorld

项目搭建

首先新建一个空项目,然后添加一个模块

image-20240615155308507

image-20240615155443363

Kafka相关依赖,不是starter依赖

1
2
3
4
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

配置文件

修改配置文件后缀为.yml,添加如下配置:

1
2
3
4
5
6
7
8
spring:
# kafka连接地址(ip+port)
kafka:
bootstrap-servers: localhost:9092
# 生产者配置
# producer:
# 消费者配置
# consumer:

程序编写

生产者(写入事件)

向hello Topic发送一个Events

1
2
3
4
5
6
7
8
9
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String,String> kafkaTemplate;

public void sendEvent(){
kafkaTemplate.send("hello","hello kafka");
}
}

编写测试方法

1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
class KafkaBaseApplicationTests {


@Resource
private EventProducer eventProducer;

@Test
void test01() {
eventProducer.sendEvent();
}

}

运行测试代码,测试通过即发送成功。

image-20240615163244264

消费者(读取事件)

消费者代码

1
2
3
4
5
6
7
8
9
@Component
public class EventConsumer {

//采用监听器方式接受事件
@KafkaListener(topics = {"hello"})
public void receive(String message) {
System.out.println("接收到消息:" + message);
}
}

运行程序,同时重新运行测试代码发送一条消息

image-20240615163542026

可以发现接收到了消息。那么怎么读取之前发的消息呢?

读取之前发的消息

首先,我们需要理解几个概念:

  1. 生产者Producer
  2. 消费者Consumer
  3. 主题Topic
  4. 分区Partition
  5. 偏移量Offset

image-20240615163849133

  1. Kafka中,每个topic可以有一个或多个partition
  2. 当创建topic时,如果不指定该topic的partition数量,那么默认就是1个partition
  3. offset是标识每个分区中消息的唯一位置,从0开始

默认情况下,当启动一个新的消费者组时,它会从每个分区的最新偏移量(即该分区中最后一条消息的下一个位置)开始消费。如果希望从第一条消息开始消费,需要将消费者的auto.offset.reset设置为earliest

在application.yml新增

1
2
3
4
5
6
7
8
9
10
spring:
kafka:
# 消费者配置
consumer:
auto-offset-reset: earliest
#auto-offset-reset所有取值:earliest、latest、none、exception
#earliest:自动将偏移量重置为最早的偏移量
#latest:自动将偏移量重置为最新偏移量
#none:如果没有为消费者组找到以前的偏移量,则向消费者抛出异常
#exception:向消费者抛出异常(spring-kafka不支持)

此时只需运行程序即可看到之前发送的两条消息

image-20240616145937601

如果之前已经用相同的消费者组ID消费过该主题,并且Kafka已经保存了该消费者组的偏移量,那么即使你设置了auto.offset.reset=earliest,该设置也不会生效,因为Kafka只会在找不到偏移量时使用这个配置。在这种情况下,你需要手动重置偏移量或使用一个新的消费者组ID

手动重置偏移量:

  1. 重置到第一条:./kafka-consumer-groups.sh --bootstrap-server <你的kafka-bootstrap-servers> --group <你的consumer-group> --topic <你的topic> --reset-offsets --to-earliest --execute

出现下方错误,说明你的程序正在运行,请关掉

image-20240616150923975

出现下面的界面就说明重置成功了!!!

image-20240616151216095

此时再次运行程序,也可收到之前的消息。

  1. 重置到最后一条:./kafka-consumer-groups.sh --bootstrap-server <你的kafka-bootstrap-servers> --group <你的consumer-group> --topic <你的topic> --reset-offsets --to-latest --execute

spring-kafka生产者发送消息

生产者客户端向kafka的主题topic中写入事件

image-20240618171641095

使用Message发送消息

在EventProducer类中添加方法

1
2
3
4
5
6
7
public void sendEvent2() {
//通过构建器模式创建Message对象
Message<String> message = MessageBuilder.withPayload("hello kafka")
.setHeader(KafkaHeaders.TOPIC, "hello") //设置消息主题
.build();
kafkaTemplate.send(message);
}

同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。使用Kafka插件查看消息数,发现新增了一个。

image-20240618172832034

使用ProducerRecord发送消息

在EventProducer类中添加方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void sendEvent3() {
//Headers里面是放一些信息(信息是key-value键值对),到时候消费者接收到消息后,可以通过headers来获取这些信息
Headers headers = new RecordHeaders();
headers.add("phone", "13723456789".getBytes(StandardCharsets.UTF_8));
headers.add("id", "id-1".getBytes(StandardCharsets.UTF_8));
//String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers
ProducerRecord<String, String> record = new ProducerRecord<>(
"hello",
0,
System.currentTimeMillis(),
"k1",
"hello kafka",
headers
);
kafkaTemplate.send(record);
}

同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。

发送指定分区消息

在EventProducer类中添加方法

1
2
3
4
public void sendEvent4() {
//String topic, Integer partition, Long timestamp, K key, @Nullable V data
kafkaTemplate.send("hello",0,System.currentTimeMillis(),"k2","hello kafka");
}

同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。

使用sendDefault发送默认topic消息

在EventProducer类中添加方法

1
2
3
4
public void sendEvent5() {
//Integer partition, Long timestamp, K key, V data
kafkaTemplate.sendDefault(0,System.currentTimeMillis(),"k3","hello kafka");
}

同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。运行时报错:

image-20240619103307860

喔,原来是没有topic,我们需要在application.yml中配置默认topic。

1
2
3
4
spring:
kafka:
template:
default-topic: default-topic

这时运行就成功了。

kafkaTemplate.send(…) 和 kafkaTemplate.sendDefault(…) 的区别?
主要区别是发送消息到Kafka时是否每次都需要指定主题topic

  1. kafkaTemplate.send(…) 该方法需要明确地指定要发送消息的目标主题topic
  2. kafkaTemplate.sendDefault() 该方法不需要指定要发送消息的目标主题topic

kafkaTemplate.send(…) 方法适用于需要根据业务逻辑或外部输入动态确定消息目标topic的场景
kafkaTemplate.sendDefault() 方法适用于总是需要将消息发送到特定默认topic的场景;
kafkaTemplate.sendDefault() 是一个便捷方法,它使用配置中指定的默认主题topic来发送消息,如果应用中所有消息都发送到同一个主题时采用该方法非常方便,可以减少代码的重复或满足特定的业务需求

获取生产者消息发送结果

.send()方法和.sendDefault()方法都返回CompletableFuture<SendResult<K, V>>

CompletableFuture 是Java 8中引入的一个类,用于异步编程,它表示一个异步计算的结果,这个特性使得调用者不必等待操作完成就能继续执行其他任务,从而提高了应用程序的响应速度和吞吐量

因为调用 kafkaTemplate.send() 方法发送消息时,Kafka可能需要一些时间来处理该消息(例如:网络延迟、消息序列化、Kafka集群的负载等),如果 send() 方法是同步的,那么发送消息可能会阻塞调用线程,直到消息发送成功或发生错误,这会导致应用程序的性能下降,尤其是在高并发场景下

使用 CompletableFuture,.send() 方法可以立即返回一个表示异步操作结果的未来对象,而不是等待操作完成,这样,调用线程可以继续执行其他任务,而不必等待消息发送完成。当消息发送完成时(无论是成功还是失败),CompletableFuture会相应地更新其状态,并允许我们通过回调、阻塞等方式来获取操作结果

阻塞式获取生产者发送的消息

方法一:调用CompletableFuture的get()方法,同步阻塞等待发送结果

在EventProducer类中添加方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void sendEvent6() {
CompletableFuture<SendResult<String, String>> completableFuture
= kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello kafka");
//通过CompletableFuture来获取消息发送结果
//阻塞等待方式
try {
SendResult<String, String> sendResult = completableFuture.get();
if (sendResult.getRecordMetadata()!=null){
//kafka服务器确认已经接收到了消息
System.out.println("消息发送成功:"+sendResult.getRecordMetadata().toString());
}
System.out.println("producerRecord:"+sendResult.getProducerRecord());
} catch (Exception e) {
throw new RuntimeException(e);
}
}

同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。运行结果:

image-20240619105643055方法二:使用 thenAccept(), thenApply(), thenRun() 等方法来注册回调函数,回调函数将在 CompletableFuture 完成时被执行

在EventProducer类中添加方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void sendEvent7() {
CompletableFuture<SendResult<String, String>> completableFuture
= kafkaTemplate.sendDefault(0, System.currentTimeMillis(), "k3", "hello kafka");
//通过CompletableFuture来获取消息发送结果
//非阻塞等待方式
try {
completableFuture.thenAccept((sendResult) -> {
if (sendResult.getRecordMetadata() != null) {
//kafka服务器确认已经接收到了消息
System.out.println("消息发送成功:" + sendResult.getRecordMetadata().toString());
}
System.out.println("producerRecord:" + sendResult.getProducerRecord());
}).exceptionally((t)->{
t.printStackTrace(System.err);
return null;
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}

同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。运行结果:

image-20240619110615702

生产者发送对象消息

先创建一个User类

1
2
3
4
5
6
7
8
9
10
@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {

private int id;
private String phone;
private Date birthday;
}

在EventProducer类中添加方法

1
2
3
4
5
6
7
8
@Resource
private KafkaTemplate<String, Object> kafkaTemplate2;

public void sendEvent8() {
User user = User.builder().id(1001).phone("13723456789").birthday(new Date()).build();
//分区为空,让kafka自己决定把消息发送到哪个分区
kafkaTemplate2.sendDefault(null, System.currentTimeMillis(), "k3", user);
}

同时编写测试方法测试,代码与之前测试代码几乎相同,只需调用新写的方法即可。运行发现又报错了o(╥﹏╥)o

image-20240619112120359

发现是序列化异常,此时我们只需在application.yml添加序列化配置即可:

1
2
3
4
5
6
7
8
9
10
spring:
kafka:
# 生产者配置
producer:
# key默认为StringSerializer.class序列化
#key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value默认为StringSerializer.class序列化
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
#或者
#value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer

此时再次发送消息就成功了。

Kafka的核心概念:Replica副本

Replica:副本,为实现备份功能,保证集群中的某个节点发生故障时,该节点上的partition数据不丢失,且 Kafka仍然能够继续工作,Kafka提供了副本机制,一个topic的每个分区都有1个或多个副本

Replica副本分为Leader Replica和Follower Replica:
Leader:每个分区多个副本中的“主”副本,生产者发送数据以及消费者消费数据,都是来自leader副本
Follower:每个分区多个副本中的“从”副本,实时从leader副本中同步数据,保持和leader副本数据的同步,leader副本发生故障时,某个follower副本会成为新的leader副本

设置副本个数不能为0,也不能大于节点个数,否则将不能创建Topic

指定topic的分区和副本

方式一:通过Kafka提供的命令行工具在创建topic时指定分区和副本
./kafka-topics.sh --create --topic <主题名> --partitions 3 --replication-factor 1 --bootstrap-server 127.0.0.1:9092

image-20240619114434403

image-20240619114521199

image-20240619114616839

方式二:执行代码时指定分区和副本
kafkaTemplate.send(“topic”, message);
直接使用send()方法发送消息时,kafka会帮我们自动完成topic的创建工作,但这种情况下创建的topic默认只有一个分区,分区有1个副本,也就是有它自己本身的副本,没有额外的副本备份。我们可以在项目中新建一个配置类专门用来初始化topic:

1
2
3
4
5
6
7
8
9
@Configuration
public class KafkaConfig {

@Bean
public NewTopic newTopic() {
//topic不存在才创建
return new NewTopic("hello-topic", 5, (short) 1);
}
}

运行程序,可以发现多了一个topic,分区数为5:

image-20240619152312276

可以在配置类中更新topic:

1
2
3
4
5
//对topic进行更新
@Bean
public NewTopic updateNewTopic() {
return new NewTopic("hello-topic", 9, (short) 1);
}

运行后发现分区数变为了9

image-20240620144938892

分区数只能增大不能减小

生产者发送消息的分区策略

消息发到哪个分区中?是什么策略?
生产者写入消息到topic,Kafka将依据不同的策略将数据分配到不同的分区中

  1. 默认分配策略:BuiltInPartitioner
    有key:Utils.toPositive(Utils.murmur2(serializedKey)) % numPartitions;
    没有key:使用随机数 % numPartitions

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    public void sendEvent9() {
    User user = User.builder().id(1001).phone("13723456789").birthday(new Date()).build();
    //分区为空,让kafka自己决定把消息发送到哪个分区
    kafkaTemplate2.send("hello-topic", null, System.currentTimeMillis(), "k9", user);
    }
    public void sendEvent10() {
    User user = User.builder().id(1001).phone("13723456789").birthday(new Date()).build();
    //分区和key为空
    kafkaTemplate2.send("hello-topic", user);
    }
  2. 轮询分配策略:RoundRobinPartitioner (接口:Partitioner)

    在KafkaConfig配置类中添加:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;
    /**
    * 生产者工厂
    */
    public ProducerFactory<String, ?> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    /**
    * 生产者配置
    */
    private Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class); //轮询分配策略:RoundRobinPartitioner
    return props;
    }
    /**
    * kafkaTemplate 覆盖默认的kafkaTemplate
    */
    @Bean
    public KafkaTemplate<String, ?> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
    }
  3. 自定义分配策略:我们自己定义

    新建自定义分区策略类:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    public class CustomPartitioner implements Partitioner {

    private AtomicInteger nextPartition = new AtomicInteger(0);

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
    int numPartitions = partitions.size();
    if (key == null) {
    //轮询策略选择分区
    int next = nextPartition.getAndIncrement();
    if (next >= numPartitions) {
    nextPartition.compareAndSet(next, 0);
    }
    System.out.println("分区值:" + next);
    return next;
    } else {
    //key不为空,使用默认的分区策略
    return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
    }

    修改props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class);

    删除hello-topic,编写测试方法:

    1
    2
    3
    4
    5
    6
    @Test
    void test10() {
    for (int i = 0; i < 5; i++) {
    eventProducer.sendEvent10();
    }
    }

生产者发送消息的流程

image-20240620161105081

自定义消息发送拦截器

我们需要实现ProducerInterceptor<K, V>接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class CustomProducerInterceptor implements ProducerInterceptor<String, Object> {

/**
* 发送消息时,会先调用该方法,对消息进行拦截,可以在该方法中做消息的拦截处理,记录日志等操作
*
* @param producerRecord
* @return ProducerRecord<String, Object>
*/
@Override
public ProducerRecord<String, Object> onSend(ProducerRecord producerRecord) {
System.out.println("拦截器拦截到消息:" + producerRecord.toString());
return producerRecord;
}

/**
* 服务器收到消息后的一个确认
* @param recordMetadata
* @param e
*/
@Override
public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
if (recordMetadata != null) {
System.out.println("服务器收到消息:" + recordMetadata.offset());
} else {
System.out.println("消息发送失败了,exception=" + e.getMessage());
}
}

@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}
}

同时在KafkaConfig配置类的producerConfigs方法中添加:

1
2
//添加拦截器
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName());

运行测试类,结果如下:

image-20240620163311678

获取生产者发送的消息

Payload与Header

新建一个module,配置跟之前一样,将上一个module中的consumer和producer文件夹复制一份过来,修改EventProducer代码:

1
2
3
4
5
6
7
8
9
10
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;

public void sendEvent() {
kafkaTemplate.send("helloTopic", "hello kafka");
}

}

修改EventConsumer代码:

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class EventConsumer {

@KafkaListener(topics = {"helloTopic"}, groupId = "helloGroup")
// @Payload : 标记该参数是消息体内容
// @Header:标记该参数是消息头内容
public void onEvent(@Payload String event,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition) {
System.out.println("接收到事件:" + event + ", topic:" + topic + ", partition:" + partition);
}
}

application.yml

1
2
3
4
5
6
7
8
9
10
spring:
application:
name: kafka-02-base
# kafka连接地址(ip+port)
kafka:
bootstrap-servers: localhost:9092
# 生产者配置
#producer:
# 消费者配置
#consumer:

测试代码:

1
2
3
4
5
6
7
8
9
10
11
12
@SpringBootTest
class KafkaBaseApplicationTests {

@Resource
private EventProducer eventProducer;

@Test
void text01() {
eventProducer.sendEvent();
}

}

运行程序,然后运行测试代码发送消息:

image-20240812164858498

ConsumerRecord获取详细内容

修改EventConsumer:

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class EventConsumer {

@KafkaListener(topics = {"helloTopic"}, groupId = "helloGroup")
public void onEvent(@Payload String event,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
ConsumerRecord<String, String> record) {
System.out.println("接收到事件:" + event + ", topic:" + topic + ", partition:" + partition);
System.out.println("record:" + record.toString());
}
}

image-20240812165437038

对象消息

复制之前的model文件夹

添加pom依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>

对象JSON工具类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class JSONUtils {

private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();

public static String toJson(Object object) {
try {
return OBJECTMAPPER.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

public static <T> T toBean(String json, Class<T> clazz) {
try {
return OBJECTMAPPER.readValue(json, clazz);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

EventProducer新增发送user对象消息:

1
2
3
4
5
6
7
8
9
10
11
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;

public void sendEvent2() {
User user = User.builder().id(1001).phone("13723456789").birthday(new Date()).build();
String userJson = JSONUtils.toJson(user);
kafkaTemplate.send("helloTopic", userJson);
}
}

EventConsumer添加消费方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class EventConsumer {

@KafkaListener(topics = {"helloTopic"}, groupId = "helloGroup")
public void onEvent2(String userJson,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
ConsumerRecord<String, String> record) {
User user = JSONUtils.toBean(userJson, User.class);
System.out.println("接收到事件:" + user + ", topic:" + topic + ", partition:" + partition);
System.out.println("record:" + record.toString());
}
}

新增测试方法,调用eventProducer.sendEvent2()。

image-20240812172516034

自定义配置

1
2
3
4
5
6
# 自定义配置
kafka:
topic:
name: helloTopic
consumer:
group: helloGroup

EventConsumer:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class EventConsumer {

@KafkaListener(topics = {"${kafka.topic.name}"}, groupId = "${kafka.consumer.group}")
public void onEvent3(String userJson,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
ConsumerRecord<String, String> record) {
User user = JSONUtils.toBean(userJson, User.class);
System.out.println("接收到事件:" + user + ", topic:" + topic + ", partition:" + partition);
System.out.println("record:" + record.toString());
}
}

手动消息确认

EventConsumer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
public class EventConsumer {

@KafkaListener(topics = {"${kafka.topic.name}"}, groupId = "${kafka.consumer.group}")
public void onEvent4(String userJson,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
ConsumerRecord<String, String> record,
Acknowledgment ack) {
try {
User user = JSONUtils.toBean(userJson, User.class);
System.out.println("接收到事件:" + user + ", topic:" + topic + ", partition:" + partition);
System.out.println("record:" + record.toString());

ack.acknowledge(); //手动确认消息,告诉kafka服务器,该消息我已经收到,默认情况下,kafka是自动确认消息
} catch (Exception e) {
e.printStackTrace();
}
}
}

image-20240812174305255

添加手动确认模式:

1
2
3
4
spring:
kafka:
listener:
ack-mode: manual # 手动确认

image-20240812174506749

如果注释掉 ack.acknowledge();,会出现消息多次消费问题:

image-20240812174818486

每次启动都会收到该消息。

默认情况下,Kafka消费者消费消息后会自动发送确认信息给Kafka服务器,表示消息已经被成功消费。但在某些场景下,我们希望在消息处理成功后再发送确认,或者在消息处理失败时选择不发送确认,以便Kafka能够重新发送该消息。

指定消费消息

EventConsumer:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class EventConsumer {

@KafkaListener(groupId = "${kafka.consumer.group}",
topicPartitions = {
@TopicPartition(
topic = "${kafka.topic.name}",
partitions = {"0", "1", "2"},
partitionOffsets = {
@PartitionOffset(partition = "3", initialOffset = "3"),
@PartitionOffset(partition = "4", initialOffset = "3")
})
})
public void onEvent5(String userJson,
@Header(value = KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(value = KafkaHeaders.RECEIVED_PARTITION) String partition,
ConsumerRecord<String, String> record,
Acknowledgment ack) {
User user = JSONUtils.toBean(userJson, User.class);
System.out.println("接收到事件:" + user + ", topic:" + topic + ", partition:" + partition);
System.out.println("record:" + record.toString());
ack.acknowledge(); //手动确认消息,告诉kafka服务器,该消息我已经收到,默认情况下,kafka是自动确认消息
}
}

新建KafkaConfig:

初始化5个分区用于测试。

1
2
3
4
5
6
7
8
@Configuration
public class KafkaConfig {

@Bean
public NewTopic newTopic() {
return new NewTopic("helloTopic", 5, (short) 1);
}
}

EventProducer:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;

public void sendEvent3() {
for (int i = 0; i < 25; i++) {
User user = User.builder().id(i).phone("13723456789" + i).birthday(new Date()).build();
String userJson = JSONUtils.toJson(user);
kafkaTemplate.send("helloTopic", "k" + i, userJson);
}
}
}

清空kafka中topic,新建测试方法,发送消息:

运行测试代码前,请将KafkaListener注解所有内容注释掉

image-20240812180928711

打开KafkaListener注释,启动SpringBoot应用程序:

image-20240812181623664

发现只接收到3条消息,为什么呢?

这是因为我们设置的 initialOffset = "3" ,所以会从3开始读消息。

image-20240812181952018

你可以添加如下配置并手动重置偏移量或者使用一个新的消费组ID读取历史消息。

1
2
3
4
5
6
7
8
9
spring:
kafka:
consumer:
auto-offset-reset: earliest

# 使用一个新的消费组ID
kafka:
consumer:
group: helloGroup2

一共19条消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
接收到事件:User(id=15, phone=1372345678915, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:3
record:ConsumerRecord(topic = helloTopic, partition = 3, leaderEpoch = 0, offset = 3, CreateTime = 1723457734050, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k15, value = {"id":15,"phone":"1372345678915","birthday":1723457734049})
接收到事件:User(id=18, phone=1372345678918, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:3
record:ConsumerRecord(topic = helloTopic, partition = 3, leaderEpoch = 0, offset = 4, CreateTime = 1723457734052, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k18, value = {"id":18,"phone":"1372345678918","birthday":1723457734051})
接收到事件:User(id=23, phone=1372345678923, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:3
record:ConsumerRecord(topic = helloTopic, partition = 3, leaderEpoch = 0, offset = 5, CreateTime = 1723457734053, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k23, value = {"id":23,"phone":"1372345678923","birthday":1723457734053})
接收到事件:User(id=0, phone=137234567890, birthday=Mon Aug 12 18:15:33 CST 2024), topic:helloTopic, partition:0
record:ConsumerRecord(topic = helloTopic, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1723457734037, serialized key size = 2, serialized value size = 56, headers = RecordHeaders(headers = [], isReadOnly = false), key = k0, value = {"id":0,"phone":"137234567890","birthday":1723457733972})
接收到事件:User(id=4, phone=137234567894, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:0
record:ConsumerRecord(topic = helloTopic, partition = 0, leaderEpoch = 0, offset = 1, CreateTime = 1723457734048, serialized key size = 2, serialized value size = 56, headers = RecordHeaders(headers = [], isReadOnly = false), key = k4, value = {"id":4,"phone":"137234567894","birthday":1723457734048})
接收到事件:User(id=6, phone=137234567896, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:0
record:ConsumerRecord(topic = helloTopic, partition = 0, leaderEpoch = 0, offset = 2, CreateTime = 1723457734048, serialized key size = 2, serialized value size = 56, headers = RecordHeaders(headers = [], isReadOnly = false), key = k6, value = {"id":6,"phone":"137234567896","birthday":1723457734048})
接收到事件:User(id=8, phone=137234567898, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:0
record:ConsumerRecord(topic = helloTopic, partition = 0, leaderEpoch = 0, offset = 3, CreateTime = 1723457734048, serialized key size = 2, serialized value size = 56, headers = RecordHeaders(headers = [], isReadOnly = false), key = k8, value = {"id":8,"phone":"137234567898","birthday":1723457734048})
接收到事件:User(id=11, phone=1372345678911, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:0
record:ConsumerRecord(topic = helloTopic, partition = 0, leaderEpoch = 0, offset = 4, CreateTime = 1723457734049, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k11, value = {"id":11,"phone":"1372345678911","birthday":1723457734048})
接收到事件:User(id=24, phone=1372345678924, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:0
record:ConsumerRecord(topic = helloTopic, partition = 0, leaderEpoch = 0, offset = 5, CreateTime = 1723457734053, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k24, value = {"id":24,"phone":"1372345678924","birthday":1723457734053})
接收到事件:User(id=3, phone=137234567893, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:1
record:ConsumerRecord(topic = helloTopic, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1723457734048, serialized key size = 2, serialized value size = 56, headers = RecordHeaders(headers = [], isReadOnly = false), key = k3, value = {"id":3,"phone":"137234567893","birthday":1723457734048})
接收到事件:User(id=16, phone=1372345678916, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:1
record:ConsumerRecord(topic = helloTopic, partition = 1, leaderEpoch = 0, offset = 1, CreateTime = 1723457734050, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k16, value = {"id":16,"phone":"1372345678916","birthday":1723457734050})
接收到事件:User(id=17, phone=1372345678917, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:1
record:ConsumerRecord(topic = helloTopic, partition = 1, leaderEpoch = 0, offset = 2, CreateTime = 1723457734050, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k17, value = {"id":17,"phone":"1372345678917","birthday":1723457734050})
接收到事件:User(id=20, phone=1372345678920, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:1
record:ConsumerRecord(topic = helloTopic, partition = 1, leaderEpoch = 0, offset = 3, CreateTime = 1723457734052, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k20, value = {"id":20,"phone":"1372345678920","birthday":1723457734052})
接收到事件:User(id=1, phone=137234567891, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:2
record:ConsumerRecord(topic = helloTopic, partition = 2, leaderEpoch = 0, offset = 0, CreateTime = 1723457734047, serialized key size = 2, serialized value size = 56, headers = RecordHeaders(headers = [], isReadOnly = false), key = k1, value = {"id":1,"phone":"137234567891","birthday":1723457734047})
接收到事件:User(id=13, phone=1372345678913, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:2
record:ConsumerRecord(topic = helloTopic, partition = 2, leaderEpoch = 0, offset = 1, CreateTime = 1723457734049, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k13, value = {"id":13,"phone":"1372345678913","birthday":1723457734049})
接收到事件:User(id=14, phone=1372345678914, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:2
record:ConsumerRecord(topic = helloTopic, partition = 2, leaderEpoch = 0, offset = 2, CreateTime = 1723457734049, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k14, value = {"id":14,"phone":"1372345678914","birthday":1723457734049})
接收到事件:User(id=19, phone=1372345678919, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:2
record:ConsumerRecord(topic = helloTopic, partition = 2, leaderEpoch = 0, offset = 3, CreateTime = 1723457734052, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k19, value = {"id":19,"phone":"1372345678919","birthday":1723457734052})
接收到事件:User(id=21, phone=1372345678921, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:2
record:ConsumerRecord(topic = helloTopic, partition = 2, leaderEpoch = 0, offset = 4, CreateTime = 1723457734053, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k21, value = {"id":21,"phone":"1372345678921","birthday":1723457734052})
接收到事件:User(id=22, phone=1372345678922, birthday=Mon Aug 12 18:15:34 CST 2024), topic:helloTopic, partition:2
record:ConsumerRecord(topic = helloTopic, partition = 2, leaderEpoch = 0, offset = 5, CreateTime = 1723457734053, serialized key size = 3, serialized value size = 58, headers = RecordHeaders(headers = [], isReadOnly = false), key = k22, value = {"id":22,"phone":"1372345678922","birthday":1723457734053})

批量消费消息

新建一个module,配置和之前的一样。

  1. 设置application.yml开启批量消费
1
2
3
4
5
6
7
8
9
10
11
12
spring:
application:
name: kafka-03-base
# kafka连接地址(ip+port)
kafka:
bootstrap-servers: localhost:9092
# 配置消息监听器
listener:
type: batch # batch:批量模式; normal:单条消息
consumer:
max-poll-records: 20 # 每次最多消费多少条消息
auto-offset-reset: earliest # 从最早的消息开始消费
  1. 编写EventConsumer
1
2
3
4
5
6
7
8
@Component
public class EventConsumer {

//@KafkaListener(topics = {"batchTopic"}, groupId = "batchGroup")
public void onEvent(List<ConsumerRecord<String, String>> records) {
System.out.println("批量消费,records.size: " + records.size() + ", records: " + records);
}
}
  1. 将上一个module的model,utils,producer文件夹拷贝过来,添加json依赖,修改EventProducer
1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-json</artifactId>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;

public void sendEvent() {
for (int i = 0; i < 125; i++) {
User user = User.builder().id(i).phone("13723456789" + i).birthday(new Date()).build();
String userJson = JSONUtils.toJson(user);
kafkaTemplate.send("batchTopic", "k" + i, userJson);
}
}
}
  1. 编写测试方法
1
2
3
4
5
6
7
8
9
10
11
12
13
@SpringBootTest
class KafkaBaseApplicationTests {

@Resource
private EventProducer eventProducer;


@Test
void test01() {
eventProducer.sendEvent();
}

}
  1. 运行测试类

image-20240815162447745

  1. 取消EventConsumer的 //@KafkaListener(topics = {"batchTopic"}, groupId = "batchGroup") 的注释,运行主程序类

image-20240815162846227

消费消息时的消息拦截

在消息消费之前,我们可以通过配置拦截器对消息进行拦截,在消息被实际处理之前对其进行一些操作,例如记录日志、修改消息内容或执行一些安全检查等。

新建一个module,配置不变

  1. 实现kafka的ConsumerInterceptor拦截器接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/**
* 自定义消费者拦截器
*/
public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {

/**
* 在消费消息之前执行
*
* @param records
* @return
*/
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
System.out.println("onConsumer方法执行:" + records);
return records;
}

/**
* 消息拿到之后,提交offset之前执行该方法
*
* @param offsets
*/
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
System.out.println("onCommit方法执行:" + offsets);
}

/**
* 消费者关闭之前执行该方法
*/
@Override
public void close() {

}

@Override
public void configure(Map<String, ?> map) {

}
}
  1. 修改配置文件
1
2
3
4
5
6
7
8
9
10
spring:
application:
name: kafka-04-base
# kafka连接地址(ip+port)
kafka:
bootstrap-servers: localhost:9092
# 消费者配置
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  1. 在Kafka消费者的ConsumerFactory配置中注册这个拦截器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
@Configuration
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;

@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;

/**
* 消费者配置
*/
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
// 添加拦截器
props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());
return props;
}

/**
* 消费者工厂
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
var listenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<String, String>();
listenerContainerFactory.setConsumerFactory(consumerFactory);
return listenerContainerFactory;
}
}
  1. 监听消息时使用我们的监听器容器工厂Bean
1
2
3
4
5
6
7
8
@Component
public class EventConsumer {

@KafkaListener(topics = {"intTopic"}, groupId = "intGroup", containerFactory = "kafkaListenerContainerFactory")
public void onEvent(ConsumerRecord<String, String> record) {
System.out.println("接收到事件:" + record);
}
}
  1. 复制上一个module的model,producer,util文件夹

  2. 添加json依赖

  3. 修改EventProducer

1
2
3
4
5
6
7
8
9
10
11
12
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;

public void sendEvent() {
User user = User.builder().id(100).phone("13723456789").birthday(new Date()).build();
String userJson = JSONUtils.toJson(user);
kafkaTemplate.send("intTopic", userJson);

}
}
  1. 启动SpringBoot主程序

image-20240815171533005

  1. 编写测试类(跟之前一样,以后都不赘述),运行测试方法

image-20240815171739650

消息转发

消息转发就是应用A从TopicA接收到消息,经过处理后转发到TopicB,再由应用B监听接收该消息,即一个应用处理完成后将该消息转发至其他应用处理。

消费者:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Component
public class EventConsumer {

@KafkaListener(topics = {"topicA"}, groupId = "aGroup")
@SendTo(value = "topicB")
public String onEventA(ConsumerRecord<String, String> record) {
System.out.println("消费A消息:" + record);
return record.value() + "--forward message";
}


@KafkaListener(topics = {"topicB"}, groupId = "bGroup")
public void onEventB(ConsumerRecord<String, String> record) {
System.out.println("消费B消息:" + record);
}
}

生产者:

1
2
3
4
5
6
7
8
9
10
11
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;

public void sendEvent() {
User user = User.builder().id(100).phone("13723456789").birthday(new Date()).build();
String userJson = JSONUtils.toJson(user);
kafkaTemplate.send("topicA", userJson);
}
}

启动主程序,然后启动测试方法发送消息:

image-20240815173139265

消息消费的分区策略

Kafka消费消息时的分区策略:是指Kafka主题topic中哪些分区应该由哪些消费者来消费

image-20240815173316213

Kafka有多种分区分配策略,默认的分区分配策略是 RangeAssignor ,除了RangeAssignor策略外,Kafka还有其他分区分配策略:

  • RoundRobinAssignor
  • StickyAssignor
  • CooperativeStickyAssignor

这些策略各有特点,可以根据实际的应用场景和需求来选择适合的分区分配策略。

RangeAssignor

Kafka默认的消费分区分配策略:RangeAssignor

RangeAssignor策略是根据消费者组内的消费者数量和主题的分区数量,来均匀地为每个消费者分配分区。

假设:

  1. 一个主题myTopic有10个分区(p0 - p9)
  2. 一个消费者组内有3个消费者:consumer1、consumer2、consumer3

RangeAssignor消费分区策略:

  1. 计算每个消费者应得的分区数:分区总数(10)/ 消费者数量(3)= 3 … 余1
    • 每个消费者理论上应该得到3个分区,但由于有余数1,所以前1个消费者会多得到一个分区
    • consumer1(作为第一个消费者)将得到 3 + 1 = 4 个分区
    • consumer2 和 consumer3 将各得到 3 个分区
  2. 具体分配:分区编号从0到9,按照编号顺序为消费者分配分区:
    • consumer1 将分配得到分区 0、1、2、3
    • consumer2 将分配得到分区 4、5、6
    • consumer3 将分配得到分区 7、8、9

配置文件:

1
2
3
4
5
6
7
8
9
10
11
spring:
application:
name: kafka-06-base
# kafka连接地址(ip+port)
kafka:
bootstrap-servers: localhost:9092
# 消费者配置
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest

创建10个分区:

1
2
3
4
5
6
7
8
@Configuration
public class KafkaConfig {

@Bean
public NewTopic newTopic() {
return new NewTopic("myTopic", 10, (short) 1); // 10个分区,每个分区1个副本
}
}

创建3个消费者:

1
2
3
4
5
6
7
8
@Component
public class EventConsumer {

//@KafkaListener(topics = {"myTopic"}, groupId = "myGroup", concurrency = "3") // 3个线程即3个消费者
public void onEventA(ConsumerRecord<String, String> record) {
System.out.println(Thread.currentThread().getName() + " --> 消费消息:" + record);
}
}

生产者:

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class EventProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;

public void sendEvent() {
for (int i = 0; i < 100; i++) {
User user = User.builder().id(i).phone("13723456789" + i).birthday(new Date()).build();
String userJson = JSONUtils.toJson(user);
kafkaTemplate.send("myTopic", "k" + i, userJson);
}
}
}

编写测试程序发送消息:

image-20240815175703814

取消消费者上@KafkaListener的注释,运行主程序进行消费:

image-20240815180131793

RoundRobinAssignor

继续以前面的例子数据,采用RoundRobinAssignor策略进行测试,得到的结果如下:
C1: 0, 3, 6, 9
C2: 1, 4, 7
C3: 2, 5, 8

配置消费策略:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Configuration
public class KafkaConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.consumer.key-deserializer}")
private String keyDeserializer;

@Value("${spring.kafka.consumer.value-deserializer}")
private String valueDeserializer;

@Value("${spring.kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;

/**
* 消费者配置
*/
private Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDeserializer);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
//指定使用轮训的消息消费分区器
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RoundRobinAssignor.class.getName());
return props;
}

/**
* 消费者工厂
*/
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory(ConsumerFactory<String, String> consumerFactory) {
var listenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<String, String>();
listenerContainerFactory.setConsumerFactory(consumerFactory);
return listenerContainerFactory;
}

@Bean
public NewTopic newTopic() {
return new NewTopic("myTopic", 10, (short) 1); // 10个分区,每个分区1个副本
}
}

修改消费者:

1
2
3
4
5
6
7
8
9
@Component
public class EventConsumer {

@KafkaListener(topics = {"myTopic"}, groupId = "myGroup",
concurrency = "3", containerFactory = "kafkaListenerContainerFactory") // 3个线程即3个消费者
public void onEventA(ConsumerRecord<String, String> record) {
System.out.println(Thread.currentThread().getId() + " --> 消费消息:" + record);
}
}

请先删除myTopic

使用RangeAssignor的测试方法进行测试:

image-20240815182421365

StickyAssignor

StickyAssignor消费分区策略:
尽可能保持消费者与分区之间的分配关系不变,即使消费组的消费者成员发生变化,减少不必要的分区重分配;
尽量保持现有的分区分配不变,仅对新加入的消费者或离开的消费者进行分区调整。这样,大多数消费者可以继续消费它们之前消费的分区,只有少数消费者需要处理额外的分区;所以叫“粘性”分配。

CooperativeStickyAssignor

CooperativeStickyAssignor消费分区策略:
与 StickyAssignor 类似,但增加了对协作式重新平衡的支持,即消费者可以在它离开消费者组之前通知协调器,以便协调器可以预先计划分区迁移,而不是在消费者突然离开时立即进行分区重分配。

实际工作中,推荐使用StickyAssignor或者CooperativeStickyAssignor

Kafka事件(消息、数据)的存储

kafka的所有事件(消息、数据)都存储在/tmp/kafka-logs目录中,可通过log.dirs=/tmp/kafka-logs配置

Kafka的所有事件(消息、数据)都是以日志文件的方式来保存

topic-partition

Kafka一般都是海量的消息数据,为了避免日志文件过大,日志文件被存放在多个日志目录下,日志目录的命名规则为:<topic_name>-<partition_id>

比如创建一个名为 firstTopic 的 topic,其中有 3 个 partition,那么在 kafka 的数据目录(/tmp/kafka-log)中就有 3 个目录,firstTopic-0firstTopic-1firstTopic-2:

  • 00000000000000000000.index 消息索引文件
  • 00000000000000000000.log 消息数据文件
  • 00000000000000000000.timeindex 消息的时间戳索引文件
  • 00000000000000000006.snapshot 快照文件,生产者发生故障或重启时能够恢复并继续之前的操作
  • leader-epoch-checkpoint 记录每个分区当前领导者的epoch以及领导者开始写入消息时的起始偏移量
  • partition.metadata 存储关于特定分区的元数据(metadata)信息

topic-offset

每次消费一个消息并且提交以后,会保存当前消费到的最近的一个offset
在kafka中,有一个__consumer_offsets的topic, 消费者消费提交的offset信息会写入到 该topic中,__consumer_offsets保存了每个consumer group某一时刻提交的offset信息,__consumer_offsets默认有50个分区
consumer_group 保存在哪个分区中的计算公式:

1
Math.abs("groupid".hashCode())%groupMetadataTopicPartitionCount; 

Offset详解

生产者Offset

生产者发送一条消息到Kafka的broker的某个topic下某个partition中

Kafka内部会为每条消息分配一个唯一的offset,该offset就是该消息在partition中的位置

image-20240815185524702

消费者offset

image-20240815185229239

消费者offset是消费者需要知道自己已经读取到哪个位置了,接下来需要从哪个位置开始继续读取消息。

每个消费者组(Consumer Group)中的消费者都会独立地维护自己的offset,当消费者从某个partition读取消息时,它会记录当前读取到的offset,这样,即使消费者崩溃或重启,它也可以从上次读取的位置继续读取,而不会重复读取或遗漏消息。

消费者offset需要消费消息并提交后才记录offset

  1. 每个消费者组启动开始监听消息,默认从消息的最新的位置开始监听消息,即把最新的位置作为消费者offset

    • 分区中还没有发送过消息,则最新的位置就是0
    • 分区中已经发送过消息,则最新的位置就是生产者offset的下一个位置
  2. 消费者消费消息后,如果不提交确认(ack),则offset不更新,提交了才更新
    命令行命令:./kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group yourGroup --describe 。yourGroup:改为你的group

消费者从什么位置开始消费,就看消费者的offset是多少