|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
特征
大多数功能都适用于@RetryableTopic注释和RetryTopicConfiguration豆。
BackOff 配置
BackOff 配置依赖于退避政策来自春季重试项目。
包括:
-
修复后退
-
指数式退缩
-
随机指数式后退
-
制服随机撤退
-
不退缩
-
自定义退让
@RetryableTopic(attempts = 5,
backOff = @BackOff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3_000)
.maxAttempts(4)
.create(template);
}
你也可以提供 Spring Retry 的自定义实现睡觉退房政策接口:
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackoff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.create(template);
}
默认的退避策略是固定退后政策最多尝试3次,间隔1000毫秒。 |
默认最大延迟为30秒指数退让政策.
如果你的退后政策需要比这个更大的延迟,请调整最大延迟相应地拥有财产。 |
第一次尝试被计入最大尝试次数,所以如果你提供一个最大尝试次数4次的价值是原始尝试加上3次重试。 |
全球暂停
你可以为重试过程设置全局超时。 如果到达该时间,下次消费者抛出异常时,消息会直接发送到DLT,或者如果没有DLT可用,则直接终止处理。
@RetryableTopic(backOff = @BackOff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(2_000)
.timeoutAfter(5_000)
.create(template);
}
| 默认情况下不设置超时,也可以通过提供 -1 作为超时值来实现。 |
异常分类器
你可以指定想重试哪些例外,哪些不重试。 你也可以设置它遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = "true")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
| 默认行为是对所有异常重试,而不是遍历原因。 |
自2.8.3版本起,出现了全局的致命异常列表,这些异常会导致记录被发送到DLT,且不需重试。
关于致命异常的默认列表,请参见 DefaultErrorHandler。
你可以通过覆盖配置非阻塞重试在 A 中的方法@Configuration扩展类重试主题配置支持.
更多信息请参见配置全局设置和功能。
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
| 要禁用致命例外的分类,只需清除提供的列表即可。 |
包含与排除主题
你可以决定哪些主题会由RetryTopicConfiguration通过 .includeTopic(String topic)、.includeTopics(Collection<String>topics)、.excludeTopic(字符串主题)和 .excludeTopics(Collection<String> topics)方法进行 bean 处理。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
| 默认行为是包含所有主题。 |
主题 AutoCreation
除非另有说明,框架将自动创建所需的主题,使用新主题被卡夫卡管理员豆。
你可以指定分区数量和创建主题的复制因子,也可以关闭此功能。
从3.0版本开始,默认复制因子为-1,即使用经纪商违约。
如果你的经纪商版本早于 2.4,你需要设置明确的数值。
| 请注意,如果你没有使用 Spring Boot,必须提供 KafkaAdmin 的 bean 才能使用此功能。 |
@RetryableTopic(numPartitions = "2", replicationFactor = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = "false")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
| 默认情况下,主题自动创建,只有一个分区,复制因子为-1(即使用代理默认)。 如果你的经纪商版本早于 2.4,你需要设置明确的数值。 |
故障头部管理
在考虑如何管理失败头部(原始头部和异常头部)时,框架委派给死信出版恢复者决定是添加还是替换头部。
默认情况下,它显式地设置了附加原始标题自false并且stripPreviousExceptionHeaders与默认使用的DeadLetter出版恢复.
这意味着默认配置中只有第一个“原始”和最后一个例外头会被保留。 这是为了避免在涉及多次重试步骤时产生过大的消息(例如栈跟踪头)。
更多信息请参见“管理死号记录头”。
要重新配置框架以使用这些属性的不同设置,可以配置死信出版恢复者通过覆盖configureCustomizers在 A 中的方法@Configuration扩展类重试主题配置支持.
详情请参见配置全局设置和功能。
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
dlpr.setAppendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
}
从2.8.4版本开始,如果你想添加自定义头部(除了工厂添加的重试信息头部外,还可以添加一个头部函数前往工厂——factory.setHeadersFunction((rec, ex) -> { ... }).
默认情况下,添加的任何头部都是累积的——Kafka 头部可以包含多个值。
从2.9.5版本开始,如果头函数返回的 包含一个类型的头部DeadLetterPublishingRecoverer.SingleRecordHeader那么该头部的现有值将被移除,只剩下新的单一值。
自定义死信出版恢复器
如故障头部管理所示,可以自定义默认设置死信出版恢复者由框架创建的实例。
然而,对于某些用例,需要对死信出版恢复者例如,可以覆盖createProducerRecord()修改发送到重试(或死信)主题的内容。
从3.0.9版本开始,你可以覆盖RetryTopicConfigurationSupport.configureDeadLetterPublishingContainerFactory()提供死信出版者创作者例如:
@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
configureDeadLetterPublishingContainerFactory() {
return (factory) -> factory.setDeadLetterPublisherCreator(
(templateResolver, destinationResolver) ->
new CustomDLPR(templateResolver, destinationResolver));
}
建议在构建自定义实例时使用提供的解析器。
基于抛出异常将消息路由到自定义DLT
从3.2.0版本开始,可以根据异常类型(在处理过程中抛出的异常类型)将消息路由到自定义DLT。
为此,需要指定路由。
路由定制包括对额外目的地的指定。
目的地又包括两个设置:后缀和异常.
当异常类型指定于异常被抛弃,DLT中包含后缀在考虑通用DLT之前,将被视为消息的目标主题。
使用注释或RetryTopicConfiguration豆:
@RetryableTopic(exceptionBasedDltRouting = {
@ExceptionBasedDltDestination(
suffix = "-deserialization", exceptions = {DeserializationException.class}
)}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
.create(template);
}
后缀发生在将军之前dltTopic后缀在自定义DLT名称中。
结合所展示的例子,导致反序列化例外将被引导至我的注释主题反序列化 DLT而不是我的注释主题DLT.
自定义DLT将按照主题自动创建中所述的相同规则创建。