当前位置 : 首页 » 文章分类 :  开发  »  Spring-Cloud-Stream

Spring-Cloud-Stream

Spring Cloud Stream 笔记


spring-messaging

Spring Messaging 是 Spring Framework 中的一个模块,其作用就是统一消息的编程模型。

Message 消息体

Spring Messaging 中规定了 Message 的模型分为 payload 和 headers

package org.springframework.messaging;

public interface Message<T> {

    /**
     * Return the message payload.
     */
    T getPayload();

    /**
     * Return message headers for the message (never {@code null} but may be empty).
     */
    MessageHeaders getHeaders();

}

MessageChannel 发送消息

消息通道 MessageChannel 用于接收消息,调用 send 方法可以将消息发送至该消息通道中 :

package org.springframework.messaging;

@FunctionalInterface
public interface MessageChannel {
    long INDEFINITE_TIMEOUT = -1;

    default boolean send(Message<?> message) {
        return send(message, INDEFINITE_TIMEOUT);
    }

    // 发送消息直到消息被接收或到达超时时间
    boolean send(Message<?> message, long timeout);
}

SubscribableChannel 接收消息

通过消息通道的子接口可订阅的消息通道 SubscribableChannel 来进行消息消费,向其中注册消息处理器 MessageHandler

package org.springframework.messaging;

public interface SubscribableChannel extends MessageChannel {

    // 注册消息处理器
    boolean subscribe(MessageHandler handler);

    boolean unsubscribe(MessageHandler handler);
}

MessageHandler 消息处理器

MessageHandler 消息处理器用于处理消息

package org.springframework.messaging;

@FunctionalInterface
public interface MessageHandler {

    void handleMessage(Message<?> message) throws MessagingException;
}

Spring Cloud Stream 基本概念

Main Concepts
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_main_concepts

干货|Spring Cloud Stream 体系及原理介绍
https://fangjian0423.github.io/2019/04/03/spring-cloud-stream-intro/


Binder 绑定器

The Binder Abstraction
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-binder-abstraction

Spring Cloud Stream 屏蔽了底层消息中间件的实现细节,希望以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。

Binder 是提供与外部消息中间件集成的组件,会构造 Binding, 提供了 2 个方法分别是 bindConsumer 和 bindProducer 分别用于构造生产者和消费者。目前官方的实现有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部实现了 RocketMQ Binder。


消费组

Consumer Groups
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#consumer-groups

如果在同一个主题上的应用需要启动多个实例的时候,我们可以通过 spring.cloud.stream.bindings.input.group 属性为应用指定一个组名,这样这个应用的多个实例在接收到消息的时候, 只会有一个成员真正收到消息并进行处理。

默认情况下, 当没有为应用指定消费组的时候, Spring Cloud Stream 会为其分配一个独立的匿名消费组。 所以, 如果同一主题下的所有应用都没有被指定消费组的时候, 当有消息发布之后, 所有的应用都会对其进行消费, 因为它们各自都属于一个独立的组。 大部分情况下, 我们在创建 Spring Cloud Stream 应用的时候, 建议最好为其指定一个消费组,以防止对消息的重复处理, 除非该行为需要这样做(比如刷新所有实例的配置等)。


分区支持

Partitioning Support
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#partitioning

Demystifying Spring Cloud Stream producers with Apache Kafka partitions
https://spring.io/blog/2021/02/03/demystifying-spring-cloud-stream-producers-with-apache-kafka-partitions


实例

Stream Processing with Spring Cloud Stream and Apache Kafka Streams. Part 1 - Programming Model
https://spring.io/blog/2019/12/02/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-1-programming-model


StreamBridge动态选择destination(目的地本身还是静态配置)

Dynamic destinations with mutiple binders: #913
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/913

Spring Cloud Stream - route to multiple dynamic destinations at runtime
https://stackoverflow.com/questions/62247159/spring-cloud-stream-route-to-multiple-dynamic-destinations-at-runtime

利用 StreamBridge 可以实现动态发送到不同的目的地 binding, 但这些 binding 也必须提前在配置文件中定义好。

@Bean
public Consumer<String> process() {
  return c -> {
    if (c.equals("first")) {
      System.out.println("Sending to first output");
      streamBridge.send("first-out-0", c);
    }
    else {
      System.out.println("Sending to second output");
      streamBridge.send("second-out-0", c);
    }
  };
}

上面代码中的 first-out-0second-out-0 是提前定义在配置文件中的 output binding

spring-cloud-stream-samples/multi-binder-samples/multi-binder-dynamic-destinations/
https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples/multi-binder-dynamic-destinations


Binder 绑定器

Binders
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-binders

Classpath 中有多个绑定器时

当 classpath 中有多个绑定器(binder)时,配置时必须指明每个绑定(binding)的绑定器(binder)。

每个绑定器实现都包含一个 META-INF/spring.binders 属性文件,例如:
RabbitMQ 绑定器的是:

rabbit:\
org.springframework.cloud.stream.binder.rabbit.config.RabbitServiceAutoConfiguration

Kafka 绑定器的是

kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration

如果想自定义绑定器的话,也需要提供此文件。
此文件的 key 是绑定器的名字,value 是逗号分割的配置类列表,每个配置类里有且只有一个 org.springframework.cloud.stream.binder.Binder 接口的 Bean 定义,比如 KafkaBinderConfiguration 中是 KafkaMessageChannelBinder

绑定器选择

1、可以通过 spring.cloud.stream.defaultBinder 配置进行全局范围的绑定器选择,例如 spring.cloud.stream.defaultBinder=rabbit
2、或者可以在每个绑定(binding)配置中单独指定绑定器,例如包含一个 读绑定 input 和 写绑定 output 的应用配置,从 Kafka 读消息写入 RabbitMQ, 可通过下面的配置实现:

spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.output.binder=rabbit

配置选项

Configuration Options
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_configuration_options


绑定服务属性

绑定服务选项在类 org.springframework.cloud.stream.config.BindingServiceProperties 中定义

defaultBinder 默认绑定器

spring.cloud.stream.defaultBinder
默认值:空
某个绑定(binding) 没有配置单独的绑定器时,会使用默认绑定器。


通用绑定属性

Binding Properties
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#binding-properties

通过 spring.cloud.stream.bindings.<bindingName>.<property>=<value> 来配置绑定属性。bindingName 是要配置的绑定名。

为了避免重复,Spring Cloud Stream 支持给所有绑定设置默认属性,通过 spring.cloud.stream.default.<property>=<value>spring.cloud.stream.default.<producer|consumer>.<property>=<value> 来配置所有绑定的默认属性。例如 spring.cloud.stream.default.contentType=application/json

通用绑定属性在类 org.springframework.cloud.stream.config.BindingProperties 中定义。

destination 绑定的目的地(topic)

绑定的目的地,在不同的消息中间件中叫法各不同,比如 RabbitMQ 中是 exchange, Kafka 中是 topic.
如果绑定是一个 input 输入绑定,可以绑定到逗号分割的多个目的地。
如果绑定是一个 output 输出绑定,只能绑定到一个目的地。

group 消费组

绑定的消费组,只适用于 input 输入绑定。
默认值:null, 表示匿名消费组。

contentType 内容类型

绑定的内容类型
默认值:application/json

binder 绑定器

此绑定(binding)使用的绑定器。
默认值:null, 配置为 null 时会使用默认值绑定器(如果存在的话)


Consumer 属性

Consumer Properties
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_consumer_properties

消费者属性在类 org.springframework.cloud.stream.binder.ConsumerProperties 中定义。
消费者属性的前缀必须是 spring.cloud.stream.bindings.<bindingName>.consumer., 例如 spring.cloud.stream.bindings.input.consumer.concurrency=3
可通过 spring.cloud.stream.default.consumer 设置默认消费者属性,例如 spring.cloud.stream.default.consumer.headerMode=none


concurrency 并发数

消费者并发数
默认值:1
默认并发数 spring.cloud.stream.default.consumer.concurrency=3
指定 input 绑定的并发数 spring.cloud.stream.bindings.input.consumer.concurrency=3


headerMode

none 禁用消息 payload 中的 header
headers 使用中间件的原生 header 机制。
embeddedHeaders 在消息 payload 中嵌入 header
默认值:依赖于绑定的具体实现。
headerMode 在消费非 Spring Cloud Stream 应用发出的消息时且不支持原生 header 时有用。

Unrecognized token ‘ÿ’: was expecting

错误:
Spring cloud stream kafka 消费者消息反序列化报错 Unrecognized token ‘ÿ’: was expecting

2021-05-27 17:13:03.543 [KafkaConsumerDestination{consumerDestinationName='masikkk-test-topic', partitions=1, dlqName='null'}.container-0-C-1] ERROR o.s.integration.handler.LoggingHandler.handleMessageInternal:187 - org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"�contentType"application/json"{"msgCode":"msgcode-2","msgType":"ALERT","name":"name-2"}"; line: 1, column: 4]; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ÿ': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
 at [Source: (byte[])"�contentType"application/json"{"msgCode":"msgcode-2","msgType":"ALERT","name":"name-2"}"; line: 1, column: 4], failedMessage=GenericMessage [payload=byte[156], headers={kafka_offset=4, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@536fd40b, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=masikkk-test-topic, kafka_receivedTimestamp=1622106780457, contentType=application/json, kafka_groupId=group-local-mc-1}]
    at org.springframework.messaging.converter.MappingJackson2MessageConverter.convertFromInternal(MappingJackson2MessageConverter.java:235)

发现消息开头有个特殊字符。直接通过 kafka 命令行工具 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic masikkk-test-topic --from-beginning 接收的消息也是有这个特殊字符

�
 contentType"application/json"{"msgCode":"msgcode-2","msgType":"ALERT","taskId":"taskid-2","appCode":"appcode-2","deviceId":"deviceid-2","payload":{}}

原因:
发送方 Spring stream 的 headerMode 是默认值 embeddedHeaders, 所以消息中会有一个 header 但接收方没有解析这个 header

解决:
headerMode 设置为 none (3.0之前是 raw ) 时会禁用 output 中的 header 迁入,将默认值 embeddedHeaders 改为 none


高级 Consumer 配置


Producer 属性

Producer Properties
https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producer_properties

Producer 属性通过 org.springframework.cloud.stream.binder.ProducerProperties 类暴露。

下面的 Producer 绑定属性必须通过 spring.cloud.stream.bindings.<bindingName>.producer. 前缀定义,例如 spring.cloud.stream.bindings.func-out-0.producer.partitionKeyExpression=payload.id

可通过 spring.cloud.stream.default.producer 前缀设置所有 binding 的默认属性,例如 spring.cloud.stream.default.producer.partitionKeyExpression=payload.id

partitionKeyExpression

生产者分区的 SpEL 表达式。设置后,此绑定上的出站数据会被分区。
必须将 partitionCount 设置为大于 1 分区才会生效。
默认值:null

partitionCount

partition 个数。当启用 partitioning 时,此值必须设置为大于 1
在 Kafka 上,此值和目标 topic 分区数的较大者被使用。
默认值:1

配置所有 topic 的 partition 个数默认值 spring.cloud.stream.default.producer.partitionCount=3
配置某个 tpikc 的 partition 个数 spring.cloud.stream.bindings.<bindingName>.producer.partitionCount=3

如果 topic 已经参加,之后配置这个值会报错,说无法修改 partition 个数,但可配置 spring.cloud.stream.kafka.binder.autoAddPartitions 启动 partition 个数修改

org.springframework.cloud.stream.provisioning.ProvisioningException: The number of expected partitions was: 3, but 1 has been found instead.Consider either increasing the partition count of the topic or enabling `autoAddPartitions`
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:394)

Kafka Binder 配置项

https://docs.spring.io/spring-cloud-stream/docs/Brooklyn.RELEASE/reference/html/_apache_kafka_binder.html


startOffset

https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/current/reference/html/spring-cloud-stream-binder-kafka.html

新 group 从哪里开始消费,可配置为 earliestlatest
默认值 null 等价于 earliest


autoAddPartitions

spring.cloud.stream.kafka.binder.autoAddPartitions
默认值 false


问题

MessageDispatchingException: Dispatcher has no subscribers

org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=9349ef3f-bf14-47ed-ba62-5a07d9f9462f, headers={kafka_offset=125, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@1fccd0d6, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedMessageKey=9349ef3f-bf14-47ed-ba62-5a07d9f9462f, kafka_receivedTopic=topic2, kafka_receivedTimestamp=1602175742577, contentType=application/json, kafka_groupId=topologyR}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)

原因:
spring-cloud-stream 的 bug

解决:
升级 spring-cloud-stream 到 3.0.9 及以上即可。

Dispatcher has no subscribers when value serializer is set for reactive producer in Spring boot 2.3.3 #973
https://github.com/spring-cloud/spring-cloud-stream-binder-kafka/issues/973


无kafka服务无法启动

使用 spring-cloud-stream 连接 kafka 默认如果 kafka 连不上则 spring boot 服务也起不来,在连接 kafka 是可选的服务中很难受。


Spring-Cloud-Stream 连接 Kafka 实例

添加依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

定义输入输出channel

public interface AlertSource {
    String ALERT_INPUT = "alertInput";

    String ALERT_OUTPUT = "alertOutput";

    @Input(ALERT_INPUT)
    MessageChannel input();

    @Output(ALERT_OUTPUT)
    MessageChannel output();
}

主类或配置类增加 binding

@EnableBinding({AlertSource.class})

生产者

@Component
@RequiredArgsConstructor
public class AlertProducer {
    private final AlertSource alertSource;

    void publish(MessageVO message) {
        log.debug("publish message: {}", JsonMappers.Normal.toJson(message));
        alertSource.output().send(MessageBuilder.withPayload(message).build());
    }
}

消费者

@Component
public class AlertConsumer {
    @Override
    @StreamListener(AlertSource.ALERT_INPUT)
    public void handle(@Payload MessageVO messageVO) {
        log.info("接收到消息: {}", JsonMappers.Normal.toJson(messageVO));
    }
}

配置项

1、配置项里将 channel 和 topic 绑定。
2、可自动创建 topic

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092        
          autoAddPartitions: true
      default:
        headerMode: none
        producer:
          partitionCount: 12
          partitionKeyExpression: payload.code
        consumer:
          concurrency: 12
      defaultBinder: kafka
      bindings:
        # 通道名
        alertOutput:
          # 消息发往的目的地,对应topic
          destination: alert-topic
        alertInput:
          destination: alert-topic
          group: group-local

上一篇 Spring-Data-MongoDB

下一篇 Apache-ab Web服务基准测试工具

阅读
评论
3.1k
阅读预计14分钟
创建日期 2021-05-17
修改日期 2022-08-22
类别

页面信息

location:
protocol:
host:
hostname:
origin:
pathname:
href:
document:
referrer:
navigator:
platform:
userAgent:

评论