|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
变革历史
3.2版本自3.1以来的新内容
本节介绍了从3.1版本到3.2版本所做的更改。 关于早期版本的变更,请参见更改历史。
Kafka 客户端版本
此版本需要 3.7.0 版本卡夫卡客户端.
Kafka 客户端 3.7.0 版本引入了新的消费者组协议。
更多细节及其局限性请参见KIP-848。
新的消费者组协议是抢先体验版本,不打算用于生产环境。
此版本仅建议用于测试。
因此,Spring for Apache Kafka 仅在该测试级别支持范围内支持这一新的消费者组协议Kafka-客户端本身。
默认情况下,Spring for Apache Kafka 使用经典的消费者组协议,在测试新的消费者组协议时,需要通过group.protocol消费者的财产。
测试支持变更
这牛皮纸模式在嵌入卡夫卡默认情况下,用户想要使用牛皮纸模式必须启用它。
这是由于在使用过程中观察到的某些不稳定性所致嵌入卡夫卡在牛皮纸模式,尤其是在测试新的消费者组协议时。
新的消费者组协议仅支持牛皮纸因此,在测试新协议时,这需要针对真实的Kafka集群进行,而不是基于KafkaClusterTestKit哪嵌入卡夫卡基于。
此外,在多场比赛中还观察到其他一些比赛条件卡夫卡听众方法:嵌入卡夫卡在牛皮纸模式。
在这些问题解决之前,牛皮纸默认嵌入卡夫卡将保持为false.
Kafka Streams 交互式查询支持
一个新的APIKafkaStreamsInteractiveQuerySupport用于访问Kafka Streams交互式查询中使用的可查询存储。
详情请参见 Kafka Streams 互动支持。
交易Id后缀策略
一个新的交易Id后缀策略引入接口以管理transactional.id后缀。
默认实现为DefaultTransactionIdSuffixStrategy设置时maxCache大于 0 可以重复使用transactional.id在特定范围内,否则通过递增计数器即时生成后缀。
更多信息请参见固定交易Id后缀。
异步@KafkaListener返回
@KafkaListener(和@KafkaHandler)方法现在可以返回异步返回类型包括CompletableFuture<?>,单核细胞增多症<?>以及Kotlin暂停功能。
更多信息请参见异步返回。
基于抛出异常将消息路由到自定义DLT
现在可以根据异常类型将消息重定向到自定义DLT,而异常在消息处理过程中抛出了异常。
重定向的规则可以通过以下方式设定RetryableTopic.exceptionBasedDltRouting或者RetryTopicConfigurationBuilder.dltRoutingRules.
自定义DLT以及其他重试和死字母主题都会自动生成。
更多信息请参见基于抛出异常将消息路由到自定义DLT。
弃用 ContainerProperties transactionManager 属性
退役transactionManager属性容器属性支持的KafkaAwareTransactionManager,这是一种比一般更窄的类型。PlatformTransactionManager.参见 ContainerProperties 和事务同步。
回滚处理后
一个新的AfterRollback处理器应用程序接口processBatch提供。
更多信息请参见后回滚处理器。
更改@RetryableTopic SameIntervalTopicReuseStrategy默认值
改变@RetryableTopic属性相同时间间隔主题再利用策略默认值为SINGLE_TOPIC.
关于最大区间指数延迟,请参见单一主题。
非阻塞重试支持类级@KafkaListener
非阻塞重试支持类@KafkaListener。 参见非阻挡重试。
支持流程@RetryableTopic在 RetryTopicConfigurationProvider 中的一个类上。
提供一个新的公共 API 供你查找RetryTopicConfiguration.
参见Find RetryTopicConfiguration
RetryTopicConfigurer support process MultiMethodKafkaListenerEndpoint.
这RetryTopicConfigurer支持流程与寄存器MultiMethodKafkaListenerEndpoint.
这MultiMethodKafkaListenerEndpoint提供接球手/二传手对于性质defaultMethod和方法.
修改EndpointCustomizer严格来说MethodKafkaListenerEndpoint类型。
这EndpointHandlerMethod添加新构造器,为所提供的豆构建实例。
提供新的职业EndpointHandlerMultiMethodTo Handler 多方法用于重试端点。
新的API方法,基于用户提供的函数寻找偏移量
消费者回拨提供了一个新的 API,用于基于用户自定义函数寻找偏移量,该函数以当前的偏移量为参数。
详情请参见 Seek API 文档。
@PartitionOffset SeekPosition 的支持
添加寻觅位置属性到@PartitionOffset支持TopicPartitionOffset.SeekPosition.
详情请参见手动分配。
TopicPartitionOffset 中新增的构造器,接受一个函数来计算 寻找 的偏移量
主题分区偏移有一个新的构造函数,该函数需要用户自给的函数来计算要寻找的偏移量。
当使用该构造函数时,框架会调用当前消费者偏移位置的输入参数。
详情请参见 Seek API 文档。
Spring Boot 应用名作为默认客户端 ID 前缀
对于定义应用程序名称的 Spring Boot 应用程序,现在使用该名称 作为某些客户端类型自动生成客户端ID的默认前缀。 详情请参见默认客户端ID前缀。
增强MessageListenerContainer的检索
ListenerContainerRegistry提供了两个新的 API,动态查找和过滤MessageListenerContainer实例。getListenerContainersMatching(Predicate<String> idMatcher)通过ID筛选,另一个是getListenerContainersMatching(BiPredicate<String, MessageListenerContainer> matcher)通过ID和容器属性进行筛选。
看@KafkaListener生命周期管理的API文档更多信息请见。
通过提供更多追踪标签增强观察
卡夫卡模板观察提供更多追踪标签(基数较低)。卡夫卡听众观察提供了新的API,用于查找高基数的键名以及更多的追踪标签(高基数或低基数)。
参见测距观察
3.1版自3.0版以来的新内容
本节介绍了从3.0版本到3.1版本所做的更改。 关于早期版本的变更,请参见更改历史。
EmbeddedKafkaBroker
现在提供了一种额外的实现可供使用牛皮纸而不是Zookeeper。
更多信息请参见嵌入式Kafka经纪人。
JsonDeserializer
当发生反序列化异常时,序列化例外消息不再包含以下格式的数据无法反序列化数据 [[123, 34, 98, 97, 122, ...;为每个数据字节设置数值数组并不实用,对于大数据可能会显得冗长。
当与ErrorHandlingDeserializer这反序列化例外发送到错误处理程序的 包含数据该属性包含无法反序列化的原始数据。
当不与ErrorHandlingDeserializer这卡夫卡消费者会持续针对同一记录发出异常,显示主题/分区/偏移量及Jackson抛出的原因。
容器后处理器
后处理可以通过指定 a 的豆子名称来应用于监听器容器容器后处理器在@KafkaListener注解。
这发生在容器创建后,且配置完成后容器定制器在集装箱工厂上配置。
更多信息请参见Container Factory。
ErrorHandlingDeserializer
你现在可以添加一个验证器该解序器;如果代表反串化器成功反序列化对象,但该对象验证失败,会抛出类似反序列化异常的异常。
这使得原始原始数据可以传递给错误处理程序。
看用ErrorHandlingDeserializer更多信息请见。
可重试主题
变更后缀-重试-5000自-重试什么时候@RetryableTopic(backoff = @Backoff(延迟 = 5000),尝试次数 = “2”,fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC).
如果你想保留后缀-重试-5000用@RetryableTopic(后退 = @Backoff(延迟 = 5000),尝试次数 = “2”).
更多信息请参见主题命名。
监听器容器变更
在手动分配分区时,使用零消费者group.id这AckMode现在自动被强制为手动.
更多信息请参见“手动分配所有分区”。
3.0版本自2.9版本以来的新内容
观察
现在支持使用测时器和使用Micrometer进行追踪的观测。 更多信息请参见观察。
本地图像
提供创建原生图像的支持。 更多信息请参见“本地图片”。
全球单嵌卡夫卡
嵌入的卡夫卡(EmbeddedKafkaBroker)现在可以作为整个测试计划的单一全局实例开始。
更多信息请参见“使用同一经纪人处理多个测试类别”。
可重试主题变更
该功能已不再被视为实验性(就其 API 而言),该功能自 2.7 版本起支持,但存在比平时更大的风险导致 API 变更失效。
本版本中,非阻塞重试基础设施的启动机制进行了调整,以避免某些应用中出现的应用初始化时序问题。
你现在可以设置不同的并发对于重试容器;默认情况下,并发与主容器相同。
@RetryableTopic现在可以作为自定义注释的元注释使用,包括支持以下@AliasFor性能。
更多信息请参见配置。
重试主题的默认复制因子现在为-1(使用经纪违约)。
如果你的经纪人是2.4版本之前的,你现在需要明确设置该属性。
你现在可以配置多个@RetryableTopic同一主题、同一应用上下文的听众。
此前,这是不可能的。
更多信息请参见“多个听众,同一主题”。
API正在发生破坏性的变化重试主题配置支持;具体来说,如果你覆盖了 的豆定义方法destinationTopicResolver,kafkaConsumerBackoffManager和/或retryTopicConfigurer;
这些方法现在需要ObjectProvider<RetryTopicComponentFactory>参数。
监听器容器变更
与消费者认证和授权失败相关的事件现在由容器发布。 更多信息请参见申请活动。
你现在可以自定义消费者线程使用的线程名称。 更多信息请参见容器线程命名。
容器性质restartAfterAuthException已被添加。
更多信息请参见监听器容器属性。
卡夫卡模板变化
本类返回的未来为完成未来s 代替可听未来s.
看用卡夫卡模板.
回复Kafka模板变化
本类返回的未来为完成未来s 代替可听未来s.
看用回复Kafka模板和请求/回复留言<?>s.
@KafkaListener变化
你现在可以使用自定义的关联头,它会在任何回复消息中被回应。
详见用回复Kafka模板更多信息请见。
你现在可以在整个批次处理完毕之前手动提交批次的部分内容。 更多信息请参见提交偏移量。
卡夫卡标题变化
四个常数卡夫卡标题2.9.x 中被弃用的内容现已被移除。
-
而不是
MESSAGE_KEY用钥匙. -
而不是
PARTITION_ID用分区
同样地RECEIVED_MESSAGE_KEY被替换为RECEIVED_KEY和RECEIVED_PARTITION_ID被替换为RECEIVED_PARTITION.
测试变更
3.0.7 版本引入了模拟消费者工厂和模拟制作工厂.
更多信息请参见“模拟消费者与生产者”条目。
从版本 3.0.10 开始,嵌入的 Kafka 代理默认设置 Spring Boot 属性spring.kafka.bootstrap-servers到嵌入经纪人的地址。
2.9版自2.8版以来的新内容
错误处理程序变更
这默认错误处理现在可以配置为暂停容器进行一次轮询,并使用上一次轮询的剩余结果,而不是寻找剩余记录的偏移量。
更多信息请参见 DefaultErrorHandler。
这默认错误处理现在有后退处理员财产。
更多信息请参见“退后操作员”条目。
监听器容器变更
拦截在Tx之前现在可以与所有事务管理器兼容(之前只有在KafkaAwareTransactionManager被使用)。
参见[interceptBeforeTx]。
一个新的集装箱物业暂停立刻提供了 ,使容器可以在当前记录处理完毕后暂停消费者,而不是等到处理完之前轮询的所有记录后才进行。
参见[pauseImmediate]。
与消费者认证和授权相关的事件
头部映射器变更
你现在可以配置哪些入站头应该被映射。 也提供2.8.8及更高版本。 更多信息请参见消息头。
卡夫卡模板变化
在3.0版本中,该类返回的未来将为完成未来s 代替可听未来s.
看用卡夫卡模板在使用本发布时,协助过渡。
回复Kafka模板变化
模板现在提供了一种方法,可以在回复容器上等待分配,以避免在回复容器初始化前发送请求时发生竞赛。
也提供2.8.8及更高版本。
看用回复Kafka模板.
在3.0版本中,该类返回的未来将为完成未来s 代替可听未来s.
看用回复Kafka模板和请求/回复留言<?>s在使用本发布时,协助过渡。
2.8版本自2.7版本以来的新内容
本节介绍了从2.7版本到2.8版本所做的更改。 关于早期版本的变更,请参见更改历史。
包装变更
与类型映射相关的类和接口已被从…support.converter自…support.mapping.
-
抽象JavaTypeMapper -
ClassMapper -
DefaultJackson2JavaTypeMapper -
Jackson2JavaTypeMapper
错误顺序的手动提交
监听器容器现在可以配置为接受手动偏移提交,顺序不乱(通常是异步的)。 容器会推迟提交,直到缺失的偏移量被确认。 更多信息请参见手动提交偏移量。
@KafkaListener变化
现在可以在方法本身上指定监听器方法是否是批处理监听器。 这使得同一容器工厂可以同时用于记录和批量监听器。
更多信息请参见[批次监听器]。
批处理监听器现在可以处理转换异常。
更多信息请参见“带有批处理错误处理器的转换错误”。
RecordFilterStrategy当与批处理监听器一起使用时,现在可以在一次调用中过滤整个批次。
更多信息请参见[批次听众]末尾的注释。
这@KafkaListener注释现在具有Filter属性,以覆盖容器工厂的RecordFilterStrategy只为这位听众准备。
这@KafkaListener注释现在具有信息属性;这用于填充新的监听器容器属性听众信息.
然后用来填充KafkaHeaders.LISTENER_INFO每个记录中的头部,可用于记录拦截者,RecordFilterStrategy或者说是听众本身。
更多信息请参见监听器信息头和 AbstractMessageListenerContainer 属性。
卡夫卡模板变化
现在你可以根据主题、分区和偏移量接收单条记录。
看用卡夫卡模板接收更多信息请见。
通用错误处理器添加
遗产通用错误处理器其记录批处理监听器的子接口层级已被新的单一接口取代通用错误处理器其实现对应于大多数遗留实现通用错误处理器.
参见容器错误处理程序和迁移自定义遗留错误处理器实现通用错误处理器更多信息请见。
监听器容器变更
这拦截在Tx之前容器属性现在为true默认。
这authorizationExceptionRetryInterval该物业已更名为authExceptionRetryInterval现应用于认证异常除了授权例外之前。
这两个例外都被视为致命,除非设置了该属性,否则容器会默认停止。
看用KafkaMessageListenerContainer以及监听器容器属性以获取更多信息。
串行器/解串器变更
这DelegatingByTopicSerializer和DelegatingByTopicDeserializer现已提供。
更多信息请参见“委派串行器和解串器”。
DeadLetter出版恢复变化
该物业stripPreviousExceptionHeaders现在是true默认。
现在有多种技术可以自定义输出记录中添加的头部。
更多信息请参见“管理死号记录头”。
可重试主题变更
现在你可以用同一个工厂来处理可重试和不可重试的主题。 更多信息请参见“指定听者容器工厂”。
现在全球范围内有一份可控的致命例外清单,这些例外会让失败的记录直接进入DLT。 请参阅异常分类器来了解如何管理它。
你现在可以结合使用阻断和非阻断重试。 更多信息请参见“组合阻塞和非阻塞重试”。
使用可重试主题功能时抛出的 KafkaBackOffException 现已在 DEBUG 级别被记录。 如果你需要将日志级别改回 WARN 或设置到其他级别,请参见更改 KafkaBackOffException 日志级别。
2.6与2.7之间的变化
Kafka 客户端版本
此版本需要 2.7.0卡夫卡客户端.
自2.7.1版本起,它也兼容2.8.0客户端;参见覆盖Spring Boot依赖。
使用主题进行非阻塞延迟重试
这一重要新功能在本版本中被加入。 当严格排序不重要时,失败的配送可以被发送到其他主题,稍后处理。 可以配置一系列此类重试主题,延迟会逐渐增加。 更多信息请参见非阻塞重试。
监听器容器变更
这仅有LogRecord元数据容器属性现在为true默认。
一个新的集装箱物业立即停止现已发布。
更多信息请参见监听器容器属性。
使用以下退避在投递尝试之间(例如:SeekToCurrentErrorHandler和DefaultAfterRollback处理器)现在会在容器停止后不久退出倒车间隔,而不是延迟停止。
错误处理程序和回滚后处理器,扩展失败记录处理器现在可以配置为一个或多个RetryListenerS用于接收关于重试和恢复进展的信息。
这记录拦截者现在,监听者返回后(通常或通过抛出异常)调用了额外的方法。
它还有一个子接口消费者意识记录拦截者.
此外,现在还有批次拦截者给批量听众。
更多信息请参见消息监听器容器。
@KafkaListener变化
你现在可以验证 的有效载荷参数@KafkaHandler方法(类级监听者)。
看@KafkaListener @Payload验证更多信息请见。
你现在可以设置原始记录标题属性消息信息转换器和批处理消息传递转换器这导致了消费者记录将加入改装后的留言<?>.
例如,如果你想使用死信出版恢复者在监听器错误处理程序中。
更多信息请参见监听器错误处理程序。
你现在可以修改了@KafkaListener应用初始化时的注释。
看@KafkaListener属性修改更多信息请见。
DeadLetter出版恢复变化
现在,如果键和值都无法反序列化,原始值会被发布到DLT。
之前,这个值是填充的,但密钥反序列化例外继续担任头部。
如果你对恢复器进行了子类化并覆盖了创建制作人记录方法。
此外,恢复器会在发布给目标解析器前验证目标解析器所选分区是否真实存在。
更多信息请参见“发布无效记录”。
ChainedKafkaTransactionManager已弃用
更多信息请参见交易。
卡夫卡流变更
默认情况下,StreamsBuilderFactoryBean现在配置为不清理本地州。
更多信息请参见配置。
卡夫卡管理员变化
新方法创建或修改主题和描述主题已被添加。卡夫卡管理员。新话题已添加以方便在单个 Bean 中配置多个主题。
更多信息请参见[配置主题]。
消息转换器变化
现在可以添加一个春季消息 智能消息转换器前往消息信息转换器,允许基于内容类型页眉。
更多信息请参见春季消息消息转换。
测 序@KafkaListeners
看开始@KafkaListeners 按顺序排列更多信息请见。
指数倒退与最大次数
一个新的退避提供了实现功能,使配置最大重试次数更加方便。
看指数倒退与最大次数实现更多信息请见。
条件委派错误处理程序
这些新的错误处理程序可以根据异常类型配置为委派给不同的错误处理程序。 更多信息请参见委派错误处理程序。
2.5与2.6之间的变化
监听器容器变更
默认EOSMode现在是试用版.
更多信息请参见“恰好一次语义”。
各种错误处理程序(扩展失败记录处理器) 以及DefaultAfterRollback处理器现在重置退避如果恢复失败。
此外,你现在还可以选择退避根据失败记录和/或例外情况使用。
你现在可以配置一个建议链在容器属性中。
更多信息请参见监听器容器属性。
当容器配置为发布时ListenerContainerIdleEventS,现在发布一个ListenerContainerNoLongerIdleEvent当发布闲置事件后收到记录。
更多信息请参见应用事件及检测空闲和无响应消费者。
@KafkaListener变更
使用手动分区分配时,你可以指定通配符来确定哪些分区应重置为初始偏移量。
此外,如果监听者实现消费者寻求,onPartitionsAssigned()在手动分配后被调用。
(同样也在2.5.5版本中加入)
更多信息请参见显式分区分配。
方便方法已被添加到摘要消费者寻求感知让寻找变得更容易。
更多信息请参见[寻求]。
ErrorHandler 变更
的子类失败记录处理器(例如:SeekToCurrentErrorHandler,DefaultAfterRollback处理器,恢复批处理错误处理如果异常类型与之前该记录发生的异常不同,则可以配置重置重试状态。
生产工厂变动
你现在可以为生产者设定最高年龄,超过后他们将被关闭并重新创造。 更多信息请参见交易。
你现在可以在默认KafkaProducerFactory已经被创造出来。
比如,如果你在凭证更改后需要更新 SSL 密钥/信任存储位置,这可能很有用。
看用默认KafkaProducerFactory更多信息请见。
2.4与2.5之间的变化
本节介绍了从2.4版本到2.5版本所做的更改。 关于早期版本的变更,请参见更改历史。
消费者/生产者工厂变更
默认的消费者和生产者工厂现在可以在消费者或生产者创建或关闭时调用回调。 提供了原生的微米度制实现。 更多信息请参见工厂听众。
你现在可以在运行时更改引导服务器属性,从而实现切换到另一个 Kafka 集群。 更多信息请参见“连接卡夫卡”。
StreamsBuilderFactoryBean变化
工厂豆现在可以在卡夫卡流创造或毁灭。
提供了原生微米度制的实现。
更多信息请参见KafkaStreams的微米支持。
投递尝试头
现在有选项可以添加一个头部,用于在使用特定错误处理程序和回滚处理器后跟踪投递尝试。 更多信息请参见“投递尝试”头部。
@KafkaListener变更
当需要时,默认回复头将自动填充@KafkaListener返回类型为留言<?>.
更多信息请参见回复类型信息<?>。
这KafkaHeaders.RECEIVED_MESSAGE_KEY不再包含零当输入记录具有零钥匙;头部完全省略。
@KafkaListener方法现在可以指定消费者记录元数据参数,而不是使用离散的首部来处理元数据,如主题、分区等。
更多信息请参见消费者记录元数据。
监听器容器变更
这assignmentCommitOption容器属性现在为LATEST_ONLY_NO_TX默认。
更多信息请参见监听器容器属性。
这subBatchPerPartition容器属性现在为true在使用交易时默认如此。
更多信息请参见交易。
一个新的恢复批处理错误处理现已提供。
现已支持静态组成员资格。 更多信息请参见消息监听器容器。
当配置增量/合作式再平衡时,如果偏移未能与非致命性RebalanceInProgressException,容器将尝试重新提交该实例剩余分区的偏移量,尤其是在重新平衡完成后。
默认的错误处理程序现在是SeekToCurrentErrorHandler对于唱片听众和恢复批处理错误处理给批量听众。
更多信息请参见容器错误处理程序。
你现在可以控制标准错误处理程序有意抛出的异常记录在哪一级。 更多信息请参见容器错误处理程序。
这getAssignmentsByClientId()新增了方法,使得确定并发容器中哪些消费者被分配到哪些分区变得更容易。
更多信息请参见监听器容器属性。
你现在可以抑制整个日志消费者记录错误的 s,调试日志等。
看仅有LogRecord元数据收录于监听器容器属性。
卡夫卡模板变更
这卡夫卡模板现在可以维持微米计时器。
更多信息请参见监测。
这卡夫卡模板现在可以配置为ProducerConfig属性可以覆盖生产工厂的属性。
看用卡夫卡模板更多信息请见。
一个RoutingKafkaTemplate现已提供。
看用RoutingKafkaTemplate更多信息请见。
你现在可以使用KafkaSendCallback而不是听众未来回应以获得更窄的例外,以便更容易提取失败者制作人唱片.
看用卡夫卡模板更多信息请见。
Kafka 字符串串行器/解串器
新增功能ToStringSerializer/字符串解串器以及相关的SerDe现已提供。
更多信息请参见字符串序列化。
JsonDeserializer
这JsonDeserializer现在可以更灵活地确定反序列化类型。
更多信息请参见“使用方法确定类型”。
分派串行器/解串器
这授权序列化器现在可以处理“标准”类型,当出站记录没有头部时。
更多信息请参见“委派串行器和解串器”。
测试变更
这KafkaTestUtils.consumerProps()助手纪录现已刷新ConsumerConfig.AUTO_OFFSET_RESET_CONFIG自最早默认。
更多信息请参见JUnit。
2.3 与 2.4 之间的变化
消费者意识RebalanceListener
喜欢消费者RebalanceListener,该接口现在增加了一种方法onPartitionsLost(分区丢失).
更多信息请参阅Apache Kafka文档。
与消费者RebalanceListener, 默认实现不调用在被取消的分区上.
监听器容器会在调用后调用该方法。onPartitionsLost(分区丢失);因此,实施时不应采取同样的做法消费者意识RebalanceListener.
更多信息请参见《重新平衡听众》结尾的重要说明。
卡夫卡模板
这卡夫卡模板现在支持非事务式发布,同时支持事务式发布。
看卡夫卡模板交易式与非事务式出版更多信息请见。
AggregatingReplyingKafkaTemplate
这发布策略现在是双消费者.
现在暂停后(以及记录到达时)都被宣布为止;第二个参数为true在暂停后判罚的情况下。
更多信息请参见“汇总多条回复”。
监听器容器
这容器属性提供authorizationExceptionRetryInterval允许监听器容器在任意后重试的选项授权例外由卡夫卡消费者.
请参见其JavaDocs和用KafkaMessageListenerContainer更多信息请见。
@KafkaListener
这@KafkaListener注释新增了一个属性分割可计算;默认为真。
当回复听者返回可迭代该属性控制返回结果是作为单条记录发送,还是为每个元素发送记录。
看使用@SendTo更多信息
批量监听器现在可以配置为批处理记录适配器;例如,这允许批量处理交易,而监听者一次只接收一条记录。
在默认实现中,a消费者记录恢复器可用于处理批处理中的错误,而不停止整个批处理——这在使用事务时可能有用。
更多信息请参见与批处理监听器的事务。
卡夫卡流
这StreamsBuilderFactoryBean接受新财产KafkaStreamsInfrastructureCustomizer.
这允许在创建流之前配置构建器和/或拓扑。
更多信息请参见春季管理。
2.2与2.3之间的变化
本节涵盖了从2.2版本到2.3版本所做的更改。
技巧、窍门和示例
新增一章“技巧、窍门与示例”。 请提交GitHub议题和/或拉取请求,以便在该章节中获得更多条目。
配置变更
从2.3.4版本开始,缺失主题致命container 属性默认为 false。
当这种情况成立时,如果代理当机,应用程序将无法启动;许多用户受到这一变化的影响;鉴于Kafka是一个高可用性平台,我们没预料到启动一个没有活跃代理的应用会成为常见的用例。
生产商与消费者工厂变革
这默认KafkaProducerFactory现在可以配置为每个线程创建一个生产者。
你也可以提供提供商<序列号生成器>构造函数中的实例,作为配置类(需要无 arg 构造器)或构造 的替代方案串行器实例,然后在所有生产者之间共享。
看用默认KafkaProducerFactory更多信息请见。
同样的选项也适用于提供商<去串行器>实例DefaultKafkaConsumerFactory.
看用KafkaMessageListenerContainer更多信息请见。
监听器容器变更
此前,错误处理器接收ListenerExecutionFailedException(实际听者例外为原因)当监听器被调用时(例如)@KafkaListeners).
本地人抛出的例外情况GenericMessageListeners 未变更地传递给错误处理程序。
现在ListenerExecutionFailedException总是参数(实际监听者例外为原因),提供对容器group.id财产。
由于监听器容器有自己的提交偏移机制,它更倾向于KafkaConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG存在false.
现在除非在消费者工厂中特别设置或容器的消费者属性覆盖,否则它会自动将其设置为虚假。
这ackOnError(失误)财产现在是false默认。
现在可以获得消费者的group.id在监听者方法中。
看获取消费者group.id更多信息请见。
容器有了新的属性记录拦截者允许在调用监听者之前检查或修改记录。
一个合成记录拦截者也提供以防你需要调用多个拦截器。
更多信息请参见消息监听器容器。
这消费者寻求它拥有新的方法,允许你针对起始、结束或当前位置进行寻址,并寻找大于或等于时间戳的第一个偏移量。
更多信息请参见[寻求]。
便利类摘要消费者寻求感知现已提供以简化寻球。
更多信息请参见[寻求]。
这容器属性提供idleBetweenPolls让监听器容器中的主循环在中间休眠的选项KafkaConsumer.poll()调用。
请参见其JavaDocs和用KafkaMessageListenerContainer更多信息请见。
使用AckMode.MANUAL(或MANUAL_IMMEDIATE)你现在可以通过呼叫来实现重新投递纳克在确认.
更多信息请参见提交偏移量。
现在可以用Micrometer监测听众表现定时器s.
更多信息请参见监测。
这些容器现在会发布与创业相关的更多消费者生命周期事件。 更多信息请参见申请活动。
事务批处理监听器现在可以支持僵尸围栏。 更多信息请参见交易。
监听器容器工厂现在可以配置为容器定制器在每个容器创建和配置后,进一步配置。
更多信息请参见集装箱工厂。
ErrorHandler 变更
这SeekToCurrentErrorHandler现在将某些例外视为致命,并对这些例外禁用重试,第一次失败时调用恢复者。
这SeekToCurrentErrorHandler和SeekToCurrentBatchErrorHandler现在可以配置为应用退避(线索睡眠)在送货尝试之间。
从版本 2.3.2 开始,恢复记录的偏移量将在错误处理程序恢复失败记录后返回时提交。
这死信出版恢复者,与ErrorHandlingDeserializer现在,将发送到死符主题的消息的有效载荷设置为无法反序列化的原始值。
以前,它是零以及提取反序列化例外从消息头部获取。
更多信息请参见“发布无效记录”。
主题构建器
新一班主题构建器为更方便地创建新主题 @Beans 代表自动主题配置。
更多信息请参见[配置主题]。
卡夫卡流变更
你现在可以对StreamsBuilderFactoryBean由@EnableKafkaStreams.
更多信息请参见流配置。
一个恢复反序列化异常处理程序现在提供了允许恢复有反序列化错误记录的记录。
它可以与死信出版恢复者把这些记录发给一个死信的话题。
更多信息请参见“从反序列化异常恢复”。
这HeaderEnricher提供了变换器,使用 SpEL 生成头部值。
更多信息请参见“标题增强器”。
这MessagingTransformer已提供。
这使得 Kafka 流拓扑能够与 Spring 消息组件(如 Spring 集成流程)交互。
看MessagingProcessor以及[从一个KStream] 了解更多信息。
JSON 组件变更
现在所有支持 JSON 的组件默认配置为 Jackson对象映射器由JacksonUtils.enhancedObjectMapper().
这JsonDeserializer现在提供类型参考基于 的构造器,用于更好地处理目标通用容器类型。
还有一个JacksonMime类型模块被引入用于序列化org.springframework.util.MimeType就是普通的弦。
更多信息请参见其JavaDocs及序列化、反序列化和消息转换。
一个ByteArrayJsonMessageConverter以及为所有 Json 转换器提供新的超类,JsonMessageConverter.
另外,还有StringOrBytes 串行器现已上市;它可以串行化字节[],字节和字符串在制作人唱片s.
更多信息请参见春季消息消息转换。
这JsonSerializer,JsonDeserializer和JsonSerde现在有流利的API,让程序化配置更简单。
更多信息请参见 javadocs、序列化、反序列化和消息转换,以及 Streams JSON 序列化与反序列化。
AggregatingReplyingKafkaTemplate
扩展了回复Kafka模板通过汇总多个接收者的回复。
更多信息请参见“汇总多条回复”。
交易变更
你现在可以覆盖生产工厂的transactionId前缀在卡夫卡模板和KafkaTransactionManager.
看transactionId前缀更多信息请见。
新的委托串行器/解串器
该框架现在提供了委托串行器和反串化器,利用头部实现具有多种键/值类型的记录的生成和消费。
更多信息请参见“委派串行器和解串器”。
新重试反串器
该框架现在提供了委托RetryingDeserializer当出现暂时错误(如网络问题)时,以重新尝试序列化。
更多信息请参见“Retry De串行器”。
2.1与2.2之间的变化
级别与套件变更
这容器属性课程已从......org.springframework.kafka.listener.config自org.springframework.kafka.listener.
这AckModeENUM 已被从摘要MessageListenerContainer自容器属性.
这setBatchErrorHandler()和setErrorHandler()方法已被从容器属性两人都去摘要MessageListenerContainer和摘要KafkaListenerContainerFactory.
回滚处理后
一个新的AfterRollback处理器提供策略。
更多信息请参见后回滚处理器。
ConcurrentKafkaListenerContainerFactory变化
你现在可以使用ConcurrentKafkaListenerContainerFactory创建和配置任意ConcurrentMessageListenerContainer,不仅仅是那些@KafkaListener附注。
更多信息请参见集装箱工厂。
监听器容器变更
一个新的容器属性(缺失主题致命)已被添加。
看用KafkaMessageListenerContainer更多信息请见。
一个ConsumerStoppedEvent当消费者停止时,会发出电。
更多信息请参见讨论串安全。
批量监听器也可以选择性地接收完整音频消费者记录<?, ?>宾语而非List<ConsumerRecord<?, ?>.
更多信息请参见[批次监听器]。
这DefaultAfterRollback处理器和SeekToCurrentErrorHandler现在可以恢复(跳过)那些不断失败的记录,并且默认情况下,在10次失败后才恢复。
它们可以配置为将失败记录发布到死信主题。
从版本 2.2.4 开始,用户的组 ID 可以在选择死字母主题名称时使用。
这ConsumerStoppingEvent已被添加。
更多信息请参见申请活动。
这SeekToCurrentErrorHandler现在可以配置为在容器配置为AckMode.MANUAL_IMMEDIATE(自2.2.4起)。
@KafkaListener变更
你现在可以覆盖并发和自动启动通过在注释上设置属性来实现监听器容器工厂的属性。
你现在可以添加配置,来确定哪些头部(如果有的话)会被复制到回复消息中。
看@KafkaListener注解更多信息请见。
你现在可以使用@KafkaListener作为你自己注释的元注释。
看@KafkaListener作为元注释更多信息请见。
现在配置验证器为@Payload验证。
看@KafkaListener @Payload验证更多信息请见。
你现在可以直接在注释上指定 Kafka 消费者属性;这些会覆盖消费者工厂(自2.2.4版本起)中定义的同名属性。 更多信息请参见注释属性。
头部映射变更
类型为 的头部模仿类型和媒体类型现在被映射为简单的字符串记录头价值。
之前,它们被映射为 JSON 格式,仅模仿类型被解码了。媒体类型无法解码。
它们现在只是为了互作性而简单字符串。
另外,还有DefaultKafkaHeaderMapper有新的addToStringClasses方法,允许通过以下方式指定应映射的类型toString()而不是 JSON。
更多信息请参见消息头。
嵌入的卡夫卡变更
这卡夫卡嵌入式类及其卡夫卡规则界面已被弃用,取而代之的是EmbeddedKafkaBroker以及JUnit 4嵌入卡夫卡规则包装纸。
这@EmbeddedKafka注释现在填充EmbeddedKafkaBroker用豆子代替被弃用的卡夫卡嵌入式.
这一变化允许使用@EmbeddedKafka在JUnit 5测试中。
这@EmbeddedKafka注释现在具有 属性端口指定填充EmbeddedKafkaBroker.
更多信息请参见测试应用。
JsonSerializer/Deserializer 增强
你现在可以通过使用生产者属性和消费者属性来提供类型映射信息。
解串器上新增了构造函数,允许用提供的目标类型覆盖类型头信息。
这JsonDeserializer现在默认移除所有类型信息首部。
你现在可以配置JsonDeserializer通过使用 Kafka 属性忽略类型信息头部(自 2.2.3 版本起)。
更多信息请参见串行化、反串列化和消息转换。
卡夫卡流变更
流的配置豆现在必须是KafkaStreamsConfiguration宾语而非StreamsConfig对象。
这StreamsBuilderFactoryBean已从包裹中移动…核心自…配置.
这KafkaStreamBrancher当条件分支建立在KStream实例。
更多信息请参见 Apache Kafka 流的支持与配置。
事务ID
当监听器容器启动交易时,transactional.id现在是transactionId前缀附加于<group.id>.<topic>.<partition>.
这一改动允许了对僵尸进行正确的围栏,如本文所述。
2.0 与 2.1 之间的变更
JSON 改进
这StringJsonMessageConverter和JsonSerializer现在在 中添加类型信息头,使转换器和JsonDeserializer在接收时创建特定类型,基于消息本身而非固定配置类型。
更多信息请参见串行化、反串列化和消息转换。
容器停止错误处理程序
现在,记录和批处理监听器都提供了容器错误处理程序,将监听器抛出的任何异常视为致命 / 他们停下了集装箱。 更多信息请参见处理例外。
暂停与继续容器
听众容器现在有暂停(pause)和简历()方法(自2.1.3版本起)。
更多信息请参见暂停和继续听众容器。
有状态重审
从2.1.3版本开始,你可以配置有状态重试。 更多信息请参见“状态重试”。
客户端ID
从2.1.1版本开始,你现在可以设置client.id前缀于@KafkaListener.
以前,要自定义客户端ID,每个监听器都需要一个独立的消费工厂(和容器工厂)。
前缀为-n在使用并发时提供唯一的客户端ID。
记录偏移提交
默认情况下,主题偏移提交的日志通过调试Logging层。
从版本 2.1.2 开始,新增了容器属性叫commitLogLevel可以指定这些消息的日志级别。
看用KafkaMessageListenerContainer更多信息请见。
默认@KafkaHandler
从2.1.3版本开始,你可以指定其中一个@KafkaHandler类级注释@KafkaListener作为默认。
看@KafkaListener在一门课上更多信息请见。
回复Kafka模板
从2.1.3版本开始,是卡夫卡模板提供给支持请求/回复语义。
看用回复Kafka模板更多信息请见。
2.0版本的迁移指南
请参阅2.0到2.1的迁移指南。
1.3 与 2.0 之间的变化
1.2与1.3之间的变化
交易支持
0.11.0.0客户端库增加了对事务的支持。
这KafkaTransactionManager并且新增了其他交易支持。
更多信息请参见交易。
对头部的支持
0.11.0.0客户端库增加了对消息头的支持。
这些现在可以映射到和映射到春季消息 消息头.
更多信息请参见消息头。
对卡夫卡时间戳的支持
卡夫卡模板现在支持一个 API 来添加带有时间戳的记录。
新增功能卡夫卡标题已经引入了关于时间戳支持。
还有,新的KafkaConditions.timestamp()和KafkaMatchers.hasTimestamp()新增了测试设施。
看用卡夫卡模板,@KafkaListener注解以及更多细节请参阅测试应用。
@KafkaListener变化
你现在可以配置KafkaListenerErrorHandler处理例外情况。
更多信息请参见处理例外。
默认情况下,@KafkaListener 身份证属性现在被用作group.id属性,覆盖消费者工厂中配置的属性(如果存在)。
此外,你可以显式配置组ID在注释上。
以前,你需要一个独立的容器工厂(和消费品工厂)来使用不同的设备group.id为听众设计的价值观。
恢复之前使用出厂配置的行为group.id,设idIsGroup(同名集团)注释上的属性为false.
@EmbeddedKafka注解
为方便起见,设有测试级别@EmbeddedKafka提供注释以注册卡夫卡嵌入式像豆子一样。
更多信息请参见测试应用。
Kerberos 配置
现在支持配置 Kerberos。 更多信息请参见JAAS和Kerberos。
1.0 与 1.1 之间的变化
寻求
你现在可以查找每个主题或划分的位置。 你可以用它在组管理时设置初始化时的初始位置,Kafka负责分配分区。 你还可以在应用执行的任意时刻检测到空闲容器。 更多信息请参见[寻求]。