DLT策略

该框架提供了几种与DLT合作的策略。 你可以提供DLT处理方法,使用默认的日志方法,或者干脆不使用DLT。 你也可以选择如果DLT处理失败会发生什么。spring-doc.cadn.net.cn

DLT处理方法

你可以指定处理该主题DLT的方法,以及处理失败时的行为。spring-doc.cadn.net.cn

为此你可以使用@DltHandler在该类方法中的注释,具有@RetryableTopic注释。 注意,所有@RetryableTopic该类内的注释方法。spring-doc.cadn.net.cn

@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}

@DltHandler
public void processDltMessage(MyPojo message) {
    // ... message processing, persistence, etc
}

DLT处理方法也可以通过以下方式提供RetryTopicConfigurationBuilder.dltHandlerMethod(String, String)方法,作为参数传递了 BEAN 名称和方法名,用于处理 DLT 的消息。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .create(template);
}

@Component
public class MyCustomDltProcessor {

    private final MyDependency myDependency;

    public MyCustomDltProcessor(MyDependency myDependency) {
        this.myDependency = myDependency;
    }

    public void processDltMessage(MyPojo message) {
        // ... message processing, persistence, etc
    }
}
如果没有提供DLT处理程序,默认RetryTopicConfigurer.LoggingDltListenerHandlerMethod被使用。

从版本 2.8 开始,如果你不想在本应用中完全使用DLT,包括默认处理程序(或者你想推迟消耗),你可以独立于容器工厂控制 DLT 容器是否启动自动启动财产。spring-doc.cadn.net.cn

当使用@RetryableTopic注释,设autoStartDltHandler属性到false;使用配置构建器时,使用autoStartDltHandler(false).spring-doc.cadn.net.cn

你之后可以通过KafkaListenerEndpointRegistry.spring-doc.cadn.net.cn

DLT失效行为

如果DLT处理失败,有两种可能的行为:ALWAYS_RETRY_ON_ERRORFAIL_ON_ERROR.spring-doc.cadn.net.cn

在前者中,记录会被转发回 DLT 主题,这样就不会阻碍其他 DLT 记录的处理。 在后者中,消费者在不转发消息的情况下结束执行。spring-doc.cadn.net.cn

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .dltHandlerMethod("myCustomDltProcessor", "processDltMessage")
            .doNotRetryOnDltFailure()
            .create(template);
}
默认行为是ALWAYS_RETRY_ON_ERROR.
从2.8.3版本开始,ALWAYS_RETRY_ON_ERROR如果记录导致致命异常抛出,则不会将记录路由回DLT, 例如:反序列化例外,因为通常此类例外总是会被抛出。

被视为致命的例外情况有:spring-doc.cadn.net.cn

您可以使用以下方法为该列表添加和移除异常目的地主题解析器豆。spring-doc.cadn.net.cn

更多信息请参见异常分类器spring-doc.cadn.net.cn

配置无DLT

该框架还提供了无需为主题配置DLT的可能性。 在这种情况下,重审用尽后,处理程序就此结束。spring-doc.cadn.net.cn

@RetryableTopic(dltProcessingFailureStrategy =
            DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
    // ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .doNotConfigureDlt()
            .create(template);
}