对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0spring-doc.cadn.net.cn

配置

从2.9版本开始,默认配置中,@EnableKafkaRetryTopic注释应用于@Configuration注释类。这使得该功能能够正确引导,并允许在运行时注入部分功能组件以查找。spring-doc.cadn.net.cn

其实也不必再加@EnableKafka,如果你加上这个注释,因为@EnableKafkaRetryTopic是元注释,记为@EnableKafka.

此外,从该版本开始,为了更高级地配置功能组件和全局特性,重试主题配置支持类应扩展为@Configuration类别,以及相应的方法被覆盖。更多细节请参见配置全局设置和功能spring-doc.cadn.net.cn

默认情况下,重试主题的容器与主容器具有相同的并发。从 3.0 版本开始,你可以设置不同的并发对于重试容器(无论是在注释上,还是在RetryConfigurationBuilder).spring-doc.cadn.net.cn

上述技术中只能使用一种,且仅限其中一种@Configuration类可以扩展重试主题配置支持.

使用@RetryableTopic注解

要配置重试主题和 DLT 以实现@KafkaListener注释方法,你只需要添加@RetryableTopic对它的注释和 Spring for Apache Kafka 会引导所有必要的主题和默认配置的消费者。spring-doc.cadn.net.cn

@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
    // ... message processing
}

自3.2版本起,@RetryableTopic对@KafkaListener支持的课程包括:spring-doc.cadn.net.cn

@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {

    @KafkaHandler
    public void processMessage(MyPojo message) {
        // ... message processing
    }

}

你可以通过用@DltHandler注解。 如果没有提供DltHandler方法,会创建一个默认的消费者,只记录消耗情况。spring-doc.cadn.net.cn

@DltHandler
public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
}
如果你不指定kafka模板,就用一个名字的豆子defaultRetryTopicKafkaTemplate会被查询。如果找不到豆子,则会抛出例外。

从3.0版本开始,@RetryableTopic注释可以作为自定义注释的元注释使用; 例如:spring-doc.cadn.net.cn

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {

    @AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
    String parallelism() default "3";

}

RetryTopicConfiguration

你也可以通过创建非阻塞重试支持来配置RetryTopicConfiguration豆子@Configuration注释类。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .create(template);
}

这将为所有标注为@KafkaListener使用默认配置。这卡夫卡模板实例是消息转发的必需。spring-doc.cadn.net.cn

为了更细致地控制每个主题的非阻塞性重审,建议多个RetryTopicConfiguration豆子可以提供。spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .fixedBackOff(3000)
            .maxAttempts(5)
            .concurrency(1)
            .includeTopics("my-topic", "my-other-topic")
            .create(template);
}

@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
    return RetryTopicConfigurationBuilder
            .newInstance()
            .exponentialBackoff(1000, 2, 5000)
            .maxAttempts(4)
            .excludeTopics("my-topic", "my-other-topic")
            .retryOn(MyException.class)
            .create(template);
}
重试主题和DLT的消费者会被分配到一个消费者组,组ID是你在组ID参数@KafkaListener附上主题后缀的注释。 如果你不提供任何选项,它们都会属于同一个群体,而在重试主题上重新平衡会导致主主题不必要的重新平衡。
如果消费者配置为ErrorHandlingDeserializer,为了处理反序列化异常,重要的是配置卡夫卡模板以及其制作者,配备一个既能处理普通对象也能处理原始内容的串行化器字节[]这些值是由反序列化例外产生的。 模板的通用值类型应为对象. 一种方法是使用DelegatingByTypeSerializer;以下是一个示例:
@Bean
public ProducerFactory<String, Object> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
        new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
               MyNormalObject.class, new JsonSerializer<Object>())));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
    return new KafkaTemplate<>(producerFactory());
}
倍数@KafkaListener注释可以用于同一主题,无论是否手动分配分区,并配合非阻塞重试,但每个主题只会使用一种配置。 最好只用一个RetryTopicConfiguration用于此类主题配置的 BEAN;如果是多重的话@RetryableTopic注释是针对同一主题使用,所有注释的值都应相同,否则其中一个会被应用到该主题的所有监听者,而其他注释的值将被忽略。

配置全局设置和功能

自2.9版本起,之前用于配置组件的“bean”覆盖方法已被移除(由于API的实验性质,未被弃用)。 这并不改变RetryTopicConfigurationBEANS方法——仅针对基础设施组件的配置。 现在重试主题配置支持课程应扩展为(单一)@Configuration类,以及被覆盖的正确方法。 举个例子:spring-doc.cadn.net.cn

@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {

    @Override
    protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
        blockingRetries
                .retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
                .backOff(new FixedBackOff(3000, 3));
    }

    @Override
    protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
        nonBlockingFatalExceptions.add(MyNonBlockingException.class);
    }

    @Override
    protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
        // Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
        customizersConfigurer.customizeErrorHandler(eh -> {
            eh.setSeekAfterError(false);
        });
    }

}
使用这种配置方法时,@EnableKafkaRetryTopic注释不应用于防止因重复豆子而导致上下文启动失败。 用简单的@EnableKafka而是注释。

什么时候自动创建主题成立时,主主题和重试主题将按照指定的分区数和复制因子创建。 从3.0版本开始,默认复制因子为-1,即使用经纪商违约。 如果你的经纪商版本早于 2.4,你需要设置明确的数值。 要覆盖特定主题(例如主主题或DLT)的这些值,只需添加一个新主题 @Bean具备所需的性质;这会覆盖自动创建属性。spring-doc.cadn.net.cn

默认情况下,记录会通过接收记录的原始分区发布给重试主题。 如果重试主题的分区数少于主主题,你应适当配置框架;以下是一个示例。
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {

    @Override
    protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
        return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
    }

    ...

}

函数的参数是消费者记录和下一个主题的名称。 你可以返回特定的分区号,或者用以表示卡夫卡制片人应该决定分区。spring-doc.cadn.net.cn

默认情况下,当记录通过重试主题时,所有重试头部的值(尝试次数、时间戳)都会被保留。 从2.9.6版本开始,如果你只想保留这些头部的最后一个值,可以使用configureDeadLetterPublishingContainerFactory()上述设置工厂retainAllRetryHeaderValues属性到false.spring-doc.cadn.net.cn

查找RetryTopicConfiguration

尝试提供RetryTopicConfiguration通过从 一个 创建一个@RetryableTopic注释,或者如果没有注释,则从豆子容器中获取。spring-doc.cadn.net.cn

如果容器中发现了豆子,会检查是否应由这些实例处理这些主题。spring-doc.cadn.net.cn

如果@RetryableTopic注释为DltHandler注释方法会被查找。spring-doc.cadn.net.cn

自3.2版本起,提供新的API以创建RetryTopicConfiguration什么时候@RetryableTopic类注释:spring-doc.cadn.net.cn

@Bean
public RetryTopicConfiguration myRetryTopic() {
    RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
    return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}

@RetryableTopic
public static class AnnotatedClass {
    // NoOps
}