|
对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4! |
Apache Kafka Streams 支持
从版本 1.1.4 开始,Spring for Apache Kafka 提供了对 Kafka Streams 的全面支持。
要在 Spring 应用程序中使用它,kafka-streams jar 必须在 classpath 上。
它是 Spring for Apache Kafka 项目的可选依赖,并且不会通过传递依赖进行下载。
基础
The reference Apache Kafka Streams documentation suggests the following way of using the API:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
我们有两个主要组件:
-
StreamsBuilder: 通过API构建KStream(或KTable)实例。 -
KafkaStreams: 用于管理这些实例的生命周期。
所有由单个 KStream 通过 KafkaStreams 曝光给 StreamsBuilder 的 StreamsBuilder 实例都会在相同的时间启动和停止,即使它们有不同的逻辑。
换句话说,由一个 StreamsBuilder 定义的所有流都与单个生命周期控制绑定。
一旦 KafkaStreams 实例被 streams.close() 关闭,就无法重新启动。
相反,必须创建一个新的 KafkaStreams 实例以重新开始流处理。 |
Spring 管理
为简化从 Spring 应用上下文视角使用 Kafka Streams,并通过容器使用生命周期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean。
这是 AbstractFactoryBean 的实现,用于将 StreamsBuilder 的单例实例作为 bean 暴露出来。
以下示例创建了这样的 bean:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从 2.2 版本开始,stream 配置现在以一个 KafkaStreamsConfiguration 对象提供,而不是一个 StreamsConfig 对象。 |
The StreamsBuilderFactoryBean 也实现了 SmartLifecycle 以管理内部 KafkaStreams 实例的生命周期。
类似 Kafka Streams API,你必须在启动 KafkaStreams 之前定义 KStream 实例。
同样适用于 Spring API for Kafka Streams。
因此,当你在 StreamsBuilderFactoryBean 上使用默认的 autoStartup = true 时,你必须在应用程序上下文刷新之前在 StreamsBuilder 上声明 KStream 实例。
例如,KStream 可以是一个常规的 bean 定义,而 Kafka Streams API 的使用不受影响。
以下示例展示了如何实现:
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果你希望手动控制生命周期(例如根据某些条件停止和启动),可以通过使用工厂bean(&)加上StreamsBuilderFactoryBean bean的前缀(prefix)来直接引用StreamsBuilderFactoryBean bean。
由于StreamsBuilderFactoryBean使用其内部的KafkaStreams实例,因此可以安全地停止并重新启动它。
每次start()调用都会创建一个新的KafkaStreams。
如果你希望为不同的KStream实例分别控制生命周期,也可以考虑使用不同的StreamsBuilderFactoryBean实例。
您还可以在 StreamsBuilderFactoryBean 上指定 KafkaStreams.StateListener、Thread.UncaughtExceptionHandler 和 StateRestoreListener 选项,这些选项会被委托给内部的 KafkaStreams 实例。
此外,从 版本 2.1.5 开始,您还可以通过使用 KafkaStreamsCustomizer 回调接口来配置内部的 KafkaStreams 实例,而无需间接设置这些选项。注意,KafkaStreamsCustomizer 将覆盖由 StreamsBuilderFactoryBean 提供的选项。
如果您需要直接执行某些 KafkaStreams 操作,可以通过使用 StreamsBuilderFactoryBean.getKafkaStreams() 访问内部的 KafkaStreams 实例。
您可以按类型注入 StreamsBuilderFactoryBean bean,但请确保在 bean 定义中使用完整的类型,如下所示:
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,如果你使用接口类型的 bean 定义,可以添加 @Qualifier 以通过名称进行注入。
以下示例展示了如何操作:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从版本 2.4.1 开始,工厂 bean 新增了一个类型为 KafkaStreamsInfrastructureCustomizer 的 infrastructureCustomizer 属性;这允许自定义 StreamsBuilder(例如添加一个状态存储)和/或在创建流之前自定义 Topology。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
默认提供无操作实现,以避免在不需要某方法时同时实现两个方法。
一个 CompositeKafkaStreamsInfrastructureCustomizer 提供,用于在需要应用多个自定义器时使用。
Kafka Streams 微计量支持
从2.5.3版本开始,你可以配置一个KafkaStreamsMicrometerListener,以自动为工厂bean管理的KafkaStreams对象注册micrometer meters:
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
流式处理的 JSON 序列化和反序列化
对于在向topics或state stores读取或写入数据时以JSON格式进行序列化和反序列化,Spring for Apache Kafka提供了一个使用JSON的JsonSerde实现,将其委托给序列化、反序列化和消息转换中描述的JsonSerializer和JsonDeserializer。
JsonSerde实现通过其构造函数提供相同的配置选项(目标类型或ObjectMapper)。
在以下示例中,我们使用JsonSerde来序列化和反序列化Kafka流中Cat负载(JsonSerde可以以类似方式在需要实例的地方使用):
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
在使用2.3版本之后,当程序化地构建用于生产者/消费者的序列化器/反序列化器时,您可以使用流畅API(fluent API),这简化了配置。
stream.through(
new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
使用KafkaStreamBrancher
The KafkaStreamBrancher类引入了一种在 KStream 之上构建更方便的条件分支的方式。
考虑以下不使用 KafkaStreamBrancher 的示例:
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用 KafkaStreamBrancher:
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
配置
要配置Kafka Streams环境,需要一个StreamsBuilderFactoryBean实例。
查看Apache Kafka的文档以了解所有可能的选项。
从 2.2 版本开始,stream 配置现在以一个 KafkaStreamsConfiguration 对象提供,而不是以一个 StreamsConfig 提供。 |
为了避免在大多数情况下编写样板代码,特别是在开发微服务时,Spring for Apache Kafka 提供了 @EnableKafkaStreams 注解,你应该将其放在 @Configuration 类上。 你所需要做的就是声明一个名为 defaultKafkaStreamsConfig 的 KafkaStreamsConfiguration bean。 一个名为 defaultKafkaStreamsBuilder 的 StreamsBuilderFactoryBean bean 会自动在应用上下文中声明。 你也可以声明并使用任何额外的 StreamsBuilderFactoryBean bean。 你可以通过提供一个实现 StreamsBuilderFactoryBeanConfigurer 的 bean 来对该 bean 进行额外的自定义。 如果有多个这样的 bean,它们将根据其 Ordered.order 属性被应用。
默认情况下,当工厂bean被停止时,KafkaStreams.cleanUp()方法会被调用。
从2.1.2版本开始,工厂bean新增了带有一个CleanupConfig对象的构造函数,该对象具有属性,可让您在start()或stop()时调用cleanUp()方法,或两者皆不调用。
从2.7版本开始,默认情况下从不清理本地状态。
头部信息增强器
Version 3.0 added the HeaderEnricherProcessor extension of ContextualProcessor; providing the same functionality as the deprecated HeaderEnricher which implemented the deprecated Transformer interface.
This can be used to add headers within the stream processing; the header values are SpEL expressions; the root object of the expression evaluation has 3 properties:
-
record-org.apache.kafka.streams.processor.api.Record(key,value,timestamp,headers) -
key- 当前记录的键 -
value- 当前记录的值 -
context- theProcessorContext, allowing access to the current record metadata
这些表达式必须返回一个 byte[] 或一个 String(这将使用 UTF-8 转换为 byte[])。
使用 enricher 于流中:
.process(() -> new HeaderEnricherProcessor(expressions))
处理器不会更改key或value;它只是添加头信息。
| 您需要为每条记录创建一个新的实例。 |
.process(() -> new HeaderEnricherProcessor<..., ...>(expressionMap))
这里是简单的示例,添加一个字面量标题和一个变量:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("record.timestamp() + ' @' + record.offset()"));
ProcessorSupplier supplier = () -> new HeaderEnricher<String, String>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.process(() -> supplier)
.to(OUTPUT);
MessagingProcessor
版本 3.0 增加了 MessagingProcessor 扩展的 ContextualProcessor,提供了与已弃用的 MessagingTransformer 相同的功能,该 MessagingTransformer 实现了已弃用的 Transformer 接口。
这允许 Kafka Streams 构型与 Spring 消息组件(如 Spring 集成流)进行交互。
转换器需要一个 MessagingFunction 的实现。
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 自动提供了一个实现,使用其 GatewayProxyFactoryBean。它还需要一个 MessagingMessageConverter 来将键、值和元数据(包括标头)转换为 Spring Messaging 的 Message<?>。更多详细信息,请参见 [从 KStream 调用 Spring Integration 流程].
反序列化异常的恢复
版本 2.3 引入了 RecoveringDeserializationExceptionHandler,当发生反序列化异常时可以采取一些操作。
参阅关于 DeserializationExceptionHandler 的 Kafka 文档,其中 RecoveringDeserializationExceptionHandler 是其实现。
RecoveringDeserializationExceptionHandler 配置使用 4 实现。
框架提供了 DeadLetterPublishingRecoverer,它可以将失败的记录发送到死信主题。
参阅 发布死信记录 了解更多关于此恢复器的信息。
配置恢复器,请在流配置中添加以下属性:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,recoverer() 这个 bean 可以是 ConsumerRecordRecoverer 的自己实现。
Kafka Streams 示例
The following example combines all the topics we have covered in this chapter:
@Configuration
@EnableKafka
@EnableKafkaStreams
public class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1_000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}