该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0spring-doc.cadn.net.cn

特征

大多数功能都适用于@RetryableTopic注释和RetryTopicConfiguration豆。spring-doc.cadn.net.cn

BackOff 配置

BackOff 配置依赖于退避政策来自春季重试项目。spring-doc.cadn.net.cn

@RetryableTopic(attempts = 5,
    backOff = @BackOff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3_000)
            .maxAttempts(4)
            .create(template);
}

你也可以提供 Spring Retry 的自定义实现睡觉退房政策接口:spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .customBackoff(new MyCustomBackOffPolicy())
            .maxAttempts(5)
            .create(template);
}
默认的退避策略是固定退后政策最多尝试3次,间隔1000毫秒。
默认最大延迟为30秒指数退让政策. 如果你的退后政策需要比这个更大的延迟,请调整最大延迟相应地拥有财产。
第一次尝试被计入最大尝试次数,所以如果你提供一个最大尝试次数4次的价值是原始尝试加上3次重试。

全球暂停

你可以为重试过程设置全局超时。 如果到达该时间,下次消费者抛出异常时,消息会直接发送到DLT,或者如果没有DLT可用,则直接终止处理。spring-doc.cadn.net.cn

@RetryableTopic(backOff = @BackOff(2_000), timeout = 5_000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(2_000)
            .timeoutAfter(5_000)
            .create(template);
}
默认情况下不设置超时,也可以通过提供 -1 作为超时值来实现。

异常分类器

你可以指定想重试哪些例外,哪些不重试。 你也可以设置它遍历原因以查找嵌套异常。spring-doc.cadn.net.cn

@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = "true")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    throw new RuntimeException(new MyRetryException()); // will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .notRetryOn(MyDontRetryException.class)
            .create(template);
}
默认行为是对所有异常重试,而不是遍历原因。

自2.8.3版本起,出现了全局的致命异常列表,这些异常会导致记录被发送到DLT,且不需重试。 关于致命异常的默认列表,请参见 DefaultErrorHandler。 你可以通过覆盖配置非阻塞重试在 A 中的方法@Configuration扩展类重试主题配置支持. 更多信息请参见配置全局设置和功能spring-doc.cadn.net.cn

@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
    nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
要禁用致命例外的分类,只需清除提供的列表即可。

包含与排除主题

你可以决定哪些主题会由RetryTopicConfiguration通过 .includeTopic(String topic)、.includeTopics(Collection<String>topics)、.excludeTopic(字符串主题)和 .excludeTopics(Collection<String> topics)方法进行 bean 处理。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .includeTopics(List.of("my-included-topic", "my-other-included-topic"))
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .excludeTopic("my-excluded-topic")
            .create(template);
}
默认行为是包含所有主题。

主题 AutoCreation

除非另有说明,框架将自动创建所需的主题,使用新主题卡夫卡管理员豆。 你可以指定分区数量和创建主题的复制因子,也可以关闭此功能。 从3.0版本开始,默认复制因子为-1,即使用经纪商违约。 如果你的经纪商版本早于 2.4,你需要设置明确的数值。spring-doc.cadn.net.cn

请注意,如果你没有使用 Spring Boot,必须提供 KafkaAdmin 的 bean 才能使用此功能。
@RetryableTopic(numPartitions = "2", replicationFactor = "3")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@RetryableTopic(autoCreateTopics = "false")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .autoCreateTopicsWith(2, 3)
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotAutoCreateRetryTopics()
            .create(template);
}
默认情况下,主题自动创建,只有一个分区,复制因子为-1(即使用代理默认)。 如果你的经纪商版本早于 2.4,你需要设置明确的数值。

故障头部管理

在考虑如何管理失败头部(原始头部和异常头部)时,框架委派给死信出版恢复者决定是添加还是替换头部。spring-doc.cadn.net.cn

默认情况下,它显式地设置了附加原始标题false并且stripPreviousExceptionHeaders与默认使用的DeadLetter出版恢复.spring-doc.cadn.net.cn

这意味着默认配置中只有第一个“原始”和最后一个例外头会被保留。 这是为了避免在涉及多次重试步骤时产生过大的消息(例如栈跟踪头)。spring-doc.cadn.net.cn

要重新配置框架以使用这些属性的不同设置,可以配置死信出版恢复者通过覆盖configureCustomizers在 A 中的方法@Configuration扩展类重试主题配置支持. 详情请参见配置全局设置和功能spring-doc.cadn.net.cn

@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
    customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
        dlpr.setAppendOriginalHeaders(true);
        dlpr.setStripPreviousExceptionHeaders(false);
    });
}

从2.8.4版本开始,如果你想添加自定义头部(除了工厂添加的重试信息头部外,还可以添加一个头部函数前往工厂——factory.setHeadersFunction((rec, ex) -> { ... }).spring-doc.cadn.net.cn

默认情况下,添加的任何头部都是累积的——Kafka 头部可以包含多个值。 从2.9.5版本开始,如果函数返回的 包含一个类型的头部DeadLetterPublishingRecoverer.SingleRecordHeader那么该头部的现有值将被移除,只剩下新的单一值。spring-doc.cadn.net.cn

自定义死信出版恢复器

如故障头部管理所示,可以自定义默认设置死信出版恢复者由框架创建的实例。 然而,对于某些用例,需要对死信出版恢复者例如,可以覆盖createProducerRecord()修改发送到重试(或死信)主题的内容。 从3.0.9版本开始,你可以覆盖RetryTopicConfigurationSupport.configureDeadLetterPublishingContainerFactory()提供死信出版者创作者例如:spring-doc.cadn.net.cn

@Override
protected Consumer<DeadLetterPublishingRecovererFactory>
        configureDeadLetterPublishingContainerFactory() {

    return (factory) -> factory.setDeadLetterPublisherCreator(
            (templateResolver, destinationResolver) ->
                    new CustomDLPR(templateResolver, destinationResolver));
}

建议在构建自定义实例时使用提供的解析器。spring-doc.cadn.net.cn

基于抛出异常将消息路由到自定义DLT

从3.2.0版本开始,可以根据异常类型(在处理过程中抛出的异常类型)将消息路由到自定义DLT。 为此,需要指定路由。 路由定制包括对额外目的地的指定。 目的地又包括两个设置:后缀异常. 当异常类型指定于异常被抛弃,DLT中包含后缀在考虑通用DLT之前,将被视为消息的目标主题。 使用注释或RetryTopicConfiguration豆:spring-doc.cadn.net.cn

@RetryableTopic(exceptionBasedDltRouting = {
    @ExceptionBasedDltDestination(
        suffix = "-deserialization", exceptions = {DeserializationException.class}
    )}
)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltRoutingRules(Map.of("-deserialization", Set.of(DeserializationException.class)))
            .create(template);
}

后缀发生在将军之前dltTopic后缀在自定义DLT名称中。 结合所展示的例子,导致反序列化例外将被引导至我的注释主题反序列化 DLT而不是我的注释主题DLT. 自定义DLT将按照主题自动创建中所述的相同规则创建。spring-doc.cadn.net.cn