Spring-Kafka
Spring-Kafka
Spring for Apache Kafka
https://spring.io/projects/spring-kafka
Spring Kafka 2.x 生产/消费消息的基本用法详解
https://juejin.im/entry/5a9e65296fb9a028dc408af5
Spring-Kafka 快速接入配置
添加依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
spring.kafka 配置
简单接入只需以下核心配置:
- kafka服务器地址
- 序列化、反序列化器
- 消费组
- 消息体所在包路径加入 trusted packages
spring:
kafka:
bootstrap-servers: localhost:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
consumer:
group-id: group-my
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: "com.masikkk.*"
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
发送消息
注入 KafkaTemplate
即可发送消息
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducer {
private final KafkaTemplate<String, Object> kafkaTemplate;
public void publish(Message message) {
kafkaTemplate.send("masikkk-topic", message.getPartitionKey(), message);
}
}
接收消息
@KafkaListener
注解方法即可接收消息
@Slf4j
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"topic1", "topic2"}, concurrency = "12")
public void handle(MessageVO messageVO) {
log.info("接收到消息: {}", JsonMappers.Normal.toJson(message));
}
}
手动确认ack
@KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}")
public void processMessage(ConsumerRecords<Integer, String> consumerRecords, Acknowledgment ack) {
try {
consumerRecords.forEach(consumerRecord -> {
// 处理
});
// 手动提交
ack.acknowledge();
} catch (Exception e) {
log.error("Kafka消息消费异常,错误原因为:{}", e.getMessage(), e);
} finally {
// 最终手动提交,防止异常捕获漏提交,重复消费
ack.acknowledge();
}
}
Spring-Kafka 配置
Spring-Kafka 生产者默认配置
kafka-clients
包的 org.apache.kafka.clients.producer.ProducerConfig
中有配置项解释,static 代码块中有默认值。
retries: 3 # 大于0时启动失败重试,发送失败时重试发送的次数。默认值 2147483647
acks: 1 # 0-不应答。1-leader 应答,all-所有 leader 和 follower 应答。默认值 1
Spring Boot 启动时日志中打出的生产者默认配置:
2022-08-23 09:44:11.744 [http-nio-8778-exec-8] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [kafka-service:9092]
buffer.memory = 33554432
client.dns.lookup = use_all_dns_ips
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
internal.auto.downgrade.txn.commit = true
key.serializer = class org.apache.kafka.common.serialization.StringSerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
Spring-Kafka 消费者默认配置
kafka-clients
包的 org.apache.kafka.clients.consumer.ConsumerConfig
中有配置项解释,static 代码块中有默认值。
Spring Boot 启动时日志中打出的消费者默认配置:
2022-08-23 09:43:23.370 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [kafka-service:9092]
check.crcs = true
client.dns.lookup = use_all_dns_ips
client.id = consumer-group-mc-1
client.rack =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = group-mc
group.instance.id = null
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
internal.throw.on.fetch.stable.offset.unsupported = false
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
session.timeout.ms = 10000
socket.connection.setup.timeout.max.ms = 127000
socket.connection.setup.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.engine.factory.class = null
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.certificate.chain = null
ssl.keystore.key = null
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.certificates = null
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer
获取 KafkaProperties
在任意 Component 内直接注入 KafkaProperties 即可获取 Spring-Kafka 的配置参数,如果配置了 Spring-Kafka 则自动会有这个配置类 Bean
例如:
@Configuration
public class KafkaConfiguration {
@Autowired
private KafkaProperties kafkaProperties;
}
并发消费
@KafkaListener
中增加 concurrency = "12"
或者配置
spring:
kafka:
listener:
# 指定消费者的并发数,也就是可以同时有多少个消费者线程在监听数据,默认为1,
concurrency: 12
使用 ConsumerRecord 类消费
用 ConsumerRecord 类接收消息的好处:
ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。
@KafkaListener(topics = "xxxx")
public void consumerListener(ConsumerRecord<Integer, String> record) {
log.info("receive : " + record.toString());
}
自动创建/更新topic
kafka默认自动创建topic
kafka 默认就是自动创建 topic 的,在 kafka 配置中auto.create.topics.enable
默认为 true,表示如果主题不存在,则自动创建主题,分区数量由 kafka server.properties 配置文件中 num.partitions
指定,默认是 1num.partitions
默认值1,如果创建 topic 时没有给出划分 partitions 个数,这个数字将是 topic 下 partitions 数目的默认数值。
Spring 启动时暴露 NewTopic Bean 创建/更新topic
通过暴露 NewTopic Bean 在 Spring 启动时自动创建 topic
如果topic已存在,但分区或副本数不同,可自动修改副本数,但只能改大不能改小
简单的,可以直接 new NewTopic(topicName, partitions, replicationFactor)
创建 topic
@Configuration
public class KafkaConfiguration {
@Bean
public NewTopic newTopic() {
// 创建topic,分区数12,副本数1
return new NewTopic("my-topic", 12, (short) 1);
}
}
还能配置 topic 的参数
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
4.1.2. Configuring Topics
https://docs.spring.io/spring-kafka/reference/html/#configuring-topics
批量将 NewTopic Bean 注入 context 创建topic
下面的博客里提供一种配置多个topic,循环往 GenericWebApplicationContext 注册 NewTopic bean 来创建topic的方式,topic很多时可以用下面这种:
Springboot+kafka创建topic
https://juejin.cn/post/7071055307094360100
异常处理
处理消费异常
1、重写 ConsumerAwareListenerErrorHandler 其中处理消费异常的消息
@Slf4j
@Configuration
public class KafkaConfiguration {
@Bean
public ConsumerAwareListenerErrorHandler listenerErrorHandler() {
return (message, exception, consumer) -> {
log.error("Kafka消费异常 {}", message.getPayload(), exception);
return null;
};
}
}
2、配置到 @KafkaListener 的 errorHandler 中,则 handle 方法排除异常时会进入 ConsumerAwareListenerErrorHandler 内
@KafkaListener(topics = {"topic1", "topic2"}, errorHandler = "listenerErrorHandler")
public void handle(MessageVO messageVO) {
...
}
ErrorHandlingDeserializer 处理毒丸消息
ConsumerAwareListenerErrorHandler 只能处理消息消费异常,如果消息反序列化失败,根本没进入 @KafkaListener 方法则无法处理。
如果 kafka consumer 反序列化失败,就会出现毒丸(Poison Pill)现象,Consumer 会卡在“反序列化失败-重试-反序列化失败”的死循环中,无法进入 @KafkaListener 方法,无法再处理后续消息,疯狂占用cpu、io,很短时间内会打印几十G的日志占用磁盘,危害很大。
问题:
kafka consumer 反序列化失败,疯狂打印日志:
2022-08-23 18:25:25.370 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.error:149 - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194)
...
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition alert-push-topic-5.4-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided
解决:
使用 ErrorHandlingDeserializer 处理反序列化失败,将 Consumer 的 key-deserializer 和 value-deserializer 都配置为 org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
并委任具体的 Key 和 Value 反序列化器:
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
在 Key 或 Value 反序列化失败时, ErrorHandlingDeserializer 确保毒丸(Poison Pill)消息被处理掉并记录日志,Consumer offeset 可以向前移动,使得 Consumer 可以继续处理后续的消息。
ErrorHandlingDeserializer 内部先用 delegate 反序列化,遇到异常会捕获处理:
public class ErrorHandlingDeserializer<T> implements Deserializer<T> {
@Override
public T deserialize(String topic, Headers headers, byte[] data) {
try {
return this.delegate.deserialize(topic, headers, data);
}
catch (Exception e) {
deserializationException(headers, data, e);
return recoverFromSupplier(topic, headers, data, e);
}
}
}
完整配置:
spring:
kafka:
consumer:
group-id: group-my
key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
spring.json.trusted.packages: "com.masikkk.*"
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
listener:
concurrency: 12
AckMode 消息确认
RECORD
:每处理一条commit一次BATCH
:(默认)每次poll的时候批量提交一次,频率取决于每次poll的调用频率TIME
:每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)COUNT
:累积达到ackCount次的ack去commitCOUNT_TIME
:ackTime或ackCount哪个条件先满足,就commitMANUAL
:listener负责ack,但是背后也是批量上去MANUAL_IMMEDIATE
:listner负责ack,每调用一次,就立即commit
聊聊spring for kafka的AckMode
https://juejin.im/post/59e0528df265da43133c2ab5
spring-kafka 的自动commit offset机制
enable.auto.commit
设为false时,会使用spring-kafka的自动提交offset机制。enable.auto.commit
设为true时采用kafka的默认提交模式。
spring-kafka 会检查 enable.auto.commit
变量是否为false,当为false时,
spring-kafka会启动一个invoker,这个invoker的目的就是启动一个线程去消费数据,他消费的数据不是直接从kafka里面直接取的,那么他消费的数据从哪里来呢?他是从一个spring-kafka自己创建的阻塞队列里面取的。
然后会进入一个循环,从源代码中可以看到如果auto.commit被关掉的话, 他会先把之前处理过的数据先进行提交offset,然后再去从kafka里面取数据。
然后把取到的数据丢给上面提到的阻塞列队,由上面创建的线程去消费,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka会调用kafka的pause方法,则consumer会停止从kafka里面继续再拿数据。
建议:
使用spring-kafka后,把kafka-client的enable.auto.commit设置成false,表示禁止kafka-client自动提交offset,从而转向使用spring-kafka的offset提交机制。
之前遇到过消费超时自动提交失败,导致offset永远没更新,spring-kafka提供了多种提交策略,保证了在一批消息没有完成消费的情况下,也能提交offset,从而避免了完全提交不上而导致永远重复消费的问题。
针对spring-kafka的consumer端上的使用分析总结
https://juejin.im/entry/5a6e8dea518825732472710c
总结kafka的consumer消费能力很低的情况下的处理方案
https://www.jianshu.com/p/4e00dff97f39
手动配置 KafkaListenerContainerFactory
在创建 监听容器 ListenerContainer 前需要创建一个 监听容器工厂 ListenerContainerFactory,Spring 默认会根据 application.properties/yml 中的 spring.kafka 配置给配置好一个 ListenerContainerFactory
配置项控制是否开启kafka连接
问题:
配置 Spring-Kafka 后,服务启动就会自动连接kafka,连不上会一直报 warning,虽然不影响服务
2022-08-24 15:38:50.437 [kafka-admin-client-thread | adminclient-1] WARN org.apache.kafka.clients.NetworkClient.processDisconnection:782 - [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9093) could not be established. Broker may not be available.
自定义配置 KafkaListenerContainerFactory 实现 app.kafkaEnabled=true
时才开启 kafka 监听。
如果环境中没有启动 kafka 服务器,设置 app.kafkaEnabled=false
不连接 kafka broker,不会由于连不上 kafka 而一直报 warning
@Slf4j
@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "app", name = "kafkaEnabled", havingValue = "true")
public class KafkaConfiguration {
private final KafkaProperties kafkaProperties;
public static final String CONTAINER_NAME = "my-container";
@Bean(CONTAINER_NAME)
public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
return factory;
}
@Bean
public KafkaConsumer kafkaConsumer() {
return new KafkaConsumer();
}
}
如果在 @KafkaListener 属性中没有指定 containerFactory 那么 Spring Boot 会默认注入 name 为 kafkaListenerContainerFactory
的 containerFactory
public class KafkaConsumer {
@KafkaListener(topics = {"topic1"}, containerFactory = KafkaConfiguration.CONTAINER_NAME)
public void handle(Message message) {
...
}
}
Spring-kafka/Apache-kafka/Kafka-clients版本兼容性
spring-kafka 与对应的 Apache-kafka kafka-clients 版本兼容性对照表
Spring-kafka | Apache-Kafka | kafka-clients |
---|---|---|
2.3.x | 3.2.x | 2.3.1 |
2.2.x | 3.1.x | 2.0.1, 2.1.x, 2.2.x |
2.1.x | 3.0.x | 1.0.x, 1.1.x, 2.0.0 |
1.3.x | 2.3.x | 0.11.0.x, 1.0.x |
我们用的版本
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>1.3.1.RELEASE</version>
</dependency>
Spring KafkaTemplate 配置类示例
@EnableKafka
@Configuration
public class KafkaConfiguration {
@Value("${kafka.bootstrap.servers}")
private String brokers;
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return props;
}
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
问题
The class ‘com.xxx.Message’ is not in the trusted packages
问题:kafka 消费报错:
2022-08-22 18:41:12.194 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.error:149 - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194)
...
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition structured-data-push-topic-5 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.xxx.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126)
解决:
把消息体 VO 所在的包加入信任包配置:
spring:
kafka:
consumer:
properties:
spring.json.trusted.packages: "com.masikkk.*"
https://stackoverflow.com/questions/51688924/spring-kafka-the-class-is-not-in-the-trusted-packages
动态创建消费者
kafka动态创建消费者(实时更新topic和servers)
https://blog.csdn.net/weixin_41422086/article/details/104849127
上一篇 Linux-CGroup
下一篇 2022年7月装机
页面信息
location:
protocol
: host
: hostname
: origin
: pathname
: href
: document:
referrer
: navigator:
platform
: userAgent
: