当前位置 : 首页 » 文章分类 :  开发  »  Spring-Kafka

Spring-Kafka

Spring-Kafka

Spring for Apache Kafka
https://spring.io/projects/spring-kafka

Spring Kafka 2.x 生产/消费消息的基本用法详解
https://juejin.im/entry/5a9e65296fb9a028dc408af5


Spring-Kafka 快速接入配置

添加依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

spring.kafka 配置

简单接入只需以下核心配置:

  • kafka服务器地址
  • 序列化、反序列化器
  • 消费组
  • 消息体所在包路径加入 trusted packages
spring:
  kafka:
    bootstrap-servers: localhost:9092  # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔
    consumer:
      group-id: group-my
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "com.masikkk.*"
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

发送消息

注入 KafkaTemplate 即可发送消息

@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaProducer {
    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void publish(Message message) {
        kafkaTemplate.send("masikkk-topic", message.getPartitionKey(), message);
    }
}

接收消息

@KafkaListener 注解方法即可接收消息

@Slf4j
@Component
public class KafkaConsumer {
    @KafkaListener(topics = {"topic1", "topic2"}, concurrency = "12")
    public void handle(MessageVO messageVO) {
        log.info("接收到消息: {}", JsonMappers.Normal.toJson(message));
    }
}

手动确认ack

@KafkaListener(topics = "#{kafkaTopicName}", groupId = "#{topicGroupId}")
public void processMessage(ConsumerRecords<Integer, String> consumerRecords, Acknowledgment ack) {
    try {
        consumerRecords.forEach(consumerRecord -> {
          // 处理
        });
        // 手动提交
        ack.acknowledge();
    } catch (Exception e) {
        log.error("Kafka消息消费异常,错误原因为:{}", e.getMessage(), e);
    } finally {
        // 最终手动提交,防止异常捕获漏提交,重复消费
        ack.acknowledge();
    }
}

Spring-Kafka 配置

Spring-Kafka 生产者默认配置

kafka-clients 包的 org.apache.kafka.clients.producer.ProducerConfig 中有配置项解释,static 代码块中有默认值。

retries: 3 # 大于0时启动失败重试,发送失败时重试发送的次数。默认值 2147483647
acks: 1 # 0-不应答。1-leader 应答,all-所有 leader 和 follower 应答。默认值 1

Spring Boot 启动时日志中打出的生产者默认配置:

2022-08-23 09:44:11.744 [http-nio-8778-exec-8] INFO  org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values:
        acks = 1
        batch.size = 16384
        bootstrap.servers = [kafka-service:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-1
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        internal.auto.downgrade.txn.commit = true
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 127000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

Spring-Kafka 消费者默认配置

kafka-clients 包的 org.apache.kafka.clients.consumer.ConsumerConfig 中有配置项解释,static 代码块中有默认值。

Spring Boot 启动时日志中打出的消费者默认配置:

2022-08-23 09:43:23.370 [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
        allow.auto.create.topics = true
        auto.commit.interval.ms = 5000
        auto.offset.reset = latest
        bootstrap.servers = [kafka-service:9092]
        check.crcs = true
        client.dns.lookup = use_all_dns_ips
        client.id = consumer-group-mc-1
        client.rack =
        connections.max.idle.ms = 540000
        default.api.timeout.ms = 60000
        enable.auto.commit = false
        exclude.internal.topics = true
        fetch.max.bytes = 52428800
        fetch.max.wait.ms = 500
        fetch.min.bytes = 1
        group.id = group-mc
        group.instance.id = null
        heartbeat.interval.ms = 3000
        interceptor.classes = []
        internal.leave.group.on.close = true
        internal.throw.on.fetch.stable.offset.unsupported = false
        isolation.level = read_uncommitted
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        max.partition.fetch.bytes = 1048576
        max.poll.interval.ms = 300000
        max.poll.records = 500
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
        receive.buffer.bytes = 65536
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        session.timeout.ms = 10000
        socket.connection.setup.timeout.max.ms = 127000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.2
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer

获取 KafkaProperties

在任意 Component 内直接注入 KafkaProperties 即可获取 Spring-Kafka 的配置参数,如果配置了 Spring-Kafka 则自动会有这个配置类 Bean
例如:

@Configuration
public class KafkaConfiguration {
    @Autowired
    private KafkaProperties kafkaProperties;
}

并发消费

@KafkaListener 中增加 concurrency = "12"
或者配置

spring:
  kafka:
    listener:
      # 指定消费者的并发数,也就是可以同时有多少个消费者线程在监听数据,默认为1,
      concurrency: 12

使用 ConsumerRecord 类消费

用 ConsumerRecord 类接收消息的好处:
ConsumerRecord 类里面包含分区信息、消息头、消息体等内容,如果业务需要获取这些参数时,使用 ConsumerRecord 会是个不错的选择。

@KafkaListener(topics = "xxxx")
public void consumerListener(ConsumerRecord<Integer, String> record) {
    log.info("receive : " + record.toString());
}

自动创建/更新topic

kafka默认自动创建topic

kafka 默认就是自动创建 topic 的,在 kafka 配置中
auto.create.topics.enable 默认为 true,表示如果主题不存在,则自动创建主题,分区数量由 kafka server.properties 配置文件中 num.partitions 指定,默认是 1
num.partitions 默认值1,如果创建 topic 时没有给出划分 partitions 个数,这个数字将是 topic 下 partitions 数目的默认数值。

Spring 启动时暴露 NewTopic Bean 创建/更新topic

通过暴露 NewTopic Bean 在 Spring 启动时自动创建 topic
如果topic已存在,但分区或副本数不同,可自动修改副本数,但只能改大不能改小

简单的,可以直接 new NewTopic(topicName, partitions, replicationFactor) 创建 topic

@Configuration
public class KafkaConfiguration {
    @Bean
    public NewTopic newTopic() {
        // 创建topic,分区数12,副本数1
        return new NewTopic("my-topic", 12, (short) 1);
    }
}

还能配置 topic 的参数

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

4.1.2. Configuring Topics
https://docs.spring.io/spring-kafka/reference/html/#configuring-topics

批量将 NewTopic Bean 注入 context 创建topic

下面的博客里提供一种配置多个topic,循环往 GenericWebApplicationContext 注册 NewTopic bean 来创建topic的方式,topic很多时可以用下面这种:

Springboot+kafka创建topic
https://juejin.cn/post/7071055307094360100


异常处理

处理消费异常

1、重写 ConsumerAwareListenerErrorHandler 其中处理消费异常的消息

@Slf4j
@Configuration
public class KafkaConfiguration {
    @Bean
    public ConsumerAwareListenerErrorHandler listenerErrorHandler() {
        return (message, exception, consumer) -> {
            log.error("Kafka消费异常 {}", message.getPayload(), exception);
            return null;
        };
    }
}

2、配置到 @KafkaListener 的 errorHandler 中,则 handle 方法排除异常时会进入 ConsumerAwareListenerErrorHandler 内

@KafkaListener(topics = {"topic1", "topic2"}, errorHandler = "listenerErrorHandler")
public void handle(MessageVO messageVO) {
  ...
}

ErrorHandlingDeserializer 处理毒丸消息

ConsumerAwareListenerErrorHandler 只能处理消息消费异常,如果消息反序列化失败,根本没进入 @KafkaListener 方法则无法处理。
如果 kafka consumer 反序列化失败,就会出现毒丸(Poison Pill)现象,Consumer 会卡在“反序列化失败-重试-反序列化失败”的死循环中,无法进入 @KafkaListener 方法,无法再处理后续消息,疯狂占用cpu、io,很短时间内会打印几十G的日志占用磁盘,危害很大。

问题:
kafka consumer 反序列化失败,疯狂打印日志:

2022-08-23 18:25:25.370 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.error:149 - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194)
...
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition alert-push-topic-5.4-0 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalStateException: No type information in headers and no default type provided

解决:
使用 ErrorHandlingDeserializer 处理反序列化失败,将 Consumer 的 key-deserializer 和 value-deserializer 都配置为 org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
并委任具体的 Key 和 Value 反序列化器:

spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer

在 Key 或 Value 反序列化失败时, ErrorHandlingDeserializer 确保毒丸(Poison Pill)消息被处理掉并记录日志,Consumer offeset 可以向前移动,使得 Consumer 可以继续处理后续的消息。

ErrorHandlingDeserializer 内部先用 delegate 反序列化,遇到异常会捕获处理:

public class ErrorHandlingDeserializer<T> implements Deserializer<T> {
    @Override
    public T deserialize(String topic, Headers headers, byte[] data) {
        try {
            return this.delegate.deserialize(topic, headers, data);
        }
        catch (Exception e) {
            deserializationException(headers, data, e);
            return recoverFromSupplier(topic, headers, data, e);
        }
    }
}

完整配置:

spring:
  kafka:
    consumer:
      group-id: group-my
      key-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        spring.json.trusted.packages: "com.masikkk.*"
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
        spring.deserializer.value.delegate.class: org.springframework.kafka.support.serializer.JsonDeserializer
    listener:
      concurrency: 12

AckMode 消息确认

RECORD:每处理一条commit一次
BATCH:(默认)每次poll的时候批量提交一次,频率取决于每次poll的调用频率
TIME:每次间隔ackTime的时间去commit(跟auto commit interval有什么区别呢?)
COUNT:累积达到ackCount次的ack去commit
COUNT_TIME:ackTime或ackCount哪个条件先满足,就commit
MANUAL:listener负责ack,但是背后也是批量上去
MANUAL_IMMEDIATE:listner负责ack,每调用一次,就立即commit

聊聊spring for kafka的AckMode
https://juejin.im/post/59e0528df265da43133c2ab5


spring-kafka 的自动commit offset机制

enable.auto.commit 设为false时,会使用spring-kafka的自动提交offset机制。
enable.auto.commit 设为true时采用kafka的默认提交模式。

spring-kafka 会检查 enable.auto.commit 变量是否为false,当为false时,
spring-kafka会启动一个invoker,这个invoker的目的就是启动一个线程去消费数据,他消费的数据不是直接从kafka里面直接取的,那么他消费的数据从哪里来呢?他是从一个spring-kafka自己创建的阻塞队列里面取的。
然后会进入一个循环,从源代码中可以看到如果auto.commit被关掉的话, 他会先把之前处理过的数据先进行提交offset,然后再去从kafka里面取数据。
然后把取到的数据丢给上面提到的阻塞列队,由上面创建的线程去消费,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka会调用kafka的pause方法,则consumer会停止从kafka里面继续再拿数据。

建议:
使用spring-kafka后,把kafka-client的enable.auto.commit设置成false,表示禁止kafka-client自动提交offset,从而转向使用spring-kafka的offset提交机制。
之前遇到过消费超时自动提交失败,导致offset永远没更新,spring-kafka提供了多种提交策略,保证了在一批消息没有完成消费的情况下,也能提交offset,从而避免了完全提交不上而导致永远重复消费的问题。

针对spring-kafka的consumer端上的使用分析总结
https://juejin.im/entry/5a6e8dea518825732472710c

总结kafka的consumer消费能力很低的情况下的处理方案
https://www.jianshu.com/p/4e00dff97f39


手动配置 KafkaListenerContainerFactory

在创建 监听容器 ListenerContainer 前需要创建一个 监听容器工厂 ListenerContainerFactory,Spring 默认会根据 application.properties/yml 中的 spring.kafka 配置给配置好一个 ListenerContainerFactory

配置项控制是否开启kafka连接

问题:
配置 Spring-Kafka 后,服务启动就会自动连接kafka,连不上会一直报 warning,虽然不影响服务

2022-08-24 15:38:50.437 [kafka-admin-client-thread | adminclient-1] WARN  org.apache.kafka.clients.NetworkClient.processDisconnection:782 - [AdminClient clientId=adminclient-1] Connection to node -1 (localhost/127.0.0.1:9093) could not be established. Broker may not be available.

自定义配置 KafkaListenerContainerFactory 实现 app.kafkaEnabled=true 时才开启 kafka 监听。
如果环境中没有启动 kafka 服务器,设置 app.kafkaEnabled=false 不连接 kafka broker,不会由于连不上 kafka 而一直报 warning

@Slf4j
@Configuration
@RequiredArgsConstructor
@ConditionalOnProperty(prefix = "app", name = "kafkaEnabled", havingValue = "true")
public class KafkaConfiguration {
    private final KafkaProperties kafkaProperties;

    public static final String CONTAINER_NAME = "my-container";

    @Bean(CONTAINER_NAME)
    public ConcurrentKafkaListenerContainerFactory<Object, Object> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConcurrency(kafkaProperties.getListener().getConcurrency());
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()));
        return factory;
    }

    @Bean
    public KafkaConsumer kafkaConsumer() {
        return new KafkaConsumer();
    }
}

如果在 @KafkaListener 属性中没有指定 containerFactory 那么 Spring Boot 会默认注入 name 为 kafkaListenerContainerFactory 的 containerFactory

public class KafkaConsumer {
    @KafkaListener(topics = {"topic1"}, containerFactory = KafkaConfiguration.CONTAINER_NAME)
    public void handle(Message message) {
      ...
    }
}

Spring-kafka/Apache-kafka/Kafka-clients版本兼容性

spring-kafka 与对应的 Apache-kafka kafka-clients 版本兼容性对照表

Spring-kafka Apache-Kafka kafka-clients
2.3.x 3.2.x 2.3.1
2.2.x 3.1.x 2.0.1, 2.1.x, 2.2.x
2.1.x 3.0.x 1.0.x, 1.1.x, 2.0.0
1.3.x 2.3.x 0.11.0.x, 1.0.x

我们用的版本

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>1.3.1.RELEASE</version>
</dependency>

Spring KafkaTemplate 配置类示例

@EnableKafka
@Configuration
public class KafkaConfiguration {
    @Value("${kafka.bootstrap.servers}")
    private String brokers;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        return props;
    }

    public ProducerFactory<String, Object> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

问题

The class ‘com.xxx.Message’ is not in the trusted packages

问题:kafka 消费报错:

2022-08-22 18:41:12.194 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer.error:149 - Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer
    at org.springframework.kafka.listener.SeekUtils.seekOrRecover(SeekUtils.java:194)
...
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition structured-data-push-topic-5 at offset 0. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'com.xxx.Message' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126)

解决:
把消息体 VO 所在的包加入信任包配置:

spring:
  kafka:
    consumer:
      properties:
        spring.json.trusted.packages: "com.masikkk.*"

https://stackoverflow.com/questions/51688924/spring-kafka-the-class-is-not-in-the-trusted-packages


动态创建消费者

kafka动态创建消费者(实时更新topic和servers)
https://blog.csdn.net/weixin_41422086/article/details/104849127


上一篇 Linux-CGroup

下一篇 2022年7月装机

阅读
评论
3.7k
阅读预计18分钟
创建日期 2022-08-22
修改日期 2023-05-27
类别

页面信息

location:
protocol:
host:
hostname:
origin:
pathname:
href:
document:
referrer:
navigator:
platform:
userAgent:

评论