|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
配置
默认设置中,通过添加@RetryableTopic注释到@KafkaListener方法。
这是推荐且最简单的方法,因为它会自动配置所需的重试基础设施,并以默认设置创建重试和DLT主题。
要导入非阻塞重试基础设施并将其组件暴露为豆子,请注释 a@Configuration类为@EnableKafkaRetryTopic.
这使得对该功能组件的注入和运行时查找成为可能,并为高级和全局配置提供了基础。
其实也不必再加@EnableKafka,如果你加上这个注释,因为@EnableKafkaRetryTopic是元注释,记为@EnableKafka. |
为了高级和全球定制,请扩展重试主题配置支持在单一中@Configuration类并覆盖相关方法。
更多详情请参见“配置全局设置和功能”。
默认情况下,重试主题的容器与主容器具有相同的并发性。
从3.0版本开始,你可以设置不同的并发对于重试容器(无论是在注释上,还是在重试主题配置构建器).
|
只使用上述两种全局配置方法中的一种( |
使用@RetryableTopic注解
要配置重试主题和 DLT 以实现@KafkaListener注释方法,你只需要添加@RetryableTopic对它的注释和 Spring for Apache Kafka 会引导所有必要的主题和默认配置的消费者。
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
自3.2版本起,@RetryableTopic对@KafkaListener支持的课程包括:
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public class ClassLevelRetryListener {
@KafkaHandler
public void processMessage(MyPojo message) {
// ... message processing
}
}
你可以通过用@DltHandler注解。
如果没有提供DltHandler方法,会创建一个默认的消费者,只记录消耗情况。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果你不指定kafka模板,就用一个名字的豆子defaultRetryTopicKafkaTemplate会被查到。
如果找不到豆子,则会投出例外。 |
从3.0版本开始,@RetryableTopic注释可以作为自定义注释的元注释使用;例如:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
用RetryTopicConfiguration豆
你也可以通过创建非阻塞重试支持来配置RetryTopicConfiguration豆子@Configuration注释类。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将为所有标注为@KafkaListener使用默认配置。 这卡夫卡模板实例是消息转发的必需。
为了更细致地控制每个主题的非阻塞性重审,建议多个RetryTopicConfiguration豆子可以提供。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics(List.of("my-topic", "my-other-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics(List.of("my-topic", "my-other-topic"))
.retryOn(MyException.class)
.create(template);
}
重试主题和DLT的消费者会被分配到一个消费者组,组ID是你在组ID参数@KafkaListener带有主题后缀的注释。如果你没有提供任何注释,它们都会属于同一组,重试主题的重新平衡会导致主主题不必要的重新平衡。 |
如果消费者配置为ErrorHandlingDeserializer,为了处理反序列化异常,重要的是配置卡夫卡模板以及其制作者,配备一个既能处理普通对象也能处理原始内容的串行化器字节[]这些值是由反序列化异常产生的。模板的通用值类型应为对象. 一种方法是使用DelegatingByTypeSerializer; 以下是一个示例: |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
倍数@KafkaListener注释可以用于同一主题,无论是否手动分配分区,并配合非阻塞重试,但每个主题只会使用一种配置。最好只使用一个配置RetryTopicConfigurationBEAN 用于此类主题的配置;如果是多个@RetryableTopic注释是针对同一主题使用,所有注释的值都应相同,否则其中一个会被应用到该主题的所有监听者,而其他注释的值将被忽略。 |
配置全局设置和功能
自2.9版本起,之前用于配置组件的“豆”覆盖方法已被移除(由于API的实验性质,未被弃用)。这不会改变RetryTopicConfigurationBEANS方法——仅针对基础设施组件的配置。现在重试主题配置支持课程应扩展为(单一)@Configuration类,以及被覆盖的适当方法。示例如下:
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
使用这种配置方法时,@EnableKafkaRetryTopic注释不应用于防止因重复豆子而导致上下文无法启动。使用简单的@EnableKafka而是注释。 |
什么时候自动创建主题成立时,主主题和重试主题将以指定的分区数和复制因子创建。从3.0版本开始,默认复制因子为-1,即使用代理默认值。如果你的代理版本早于 2.4,你需要设置显式值。要覆盖特定主题(例如主主题或 DLT)的这些值,只需添加一个新主题 @Bean带有所需的属性;这将覆盖自动创建属性。
| 默认情况下,记录会使用接收记录的原始分区发布给重试主题。如果重试主题的分区数少于主主题,你应适当配置框架;以下示例。 |
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
函数的参数是消费者记录和下一个主题的名称。你可以返回特定的分区号,或者零用以表示卡夫卡制片人应该决定分区。
默认情况下,当记录通过重试主题时,所有重试头部的值(尝试次数、时间戳)都会被保留。从版本 2.9.6 开始,如果你只想保留这些头部的最后一个值,可以使用configureDeadLetterPublishingContainerFactory()上述设置工厂retainAllRetryHeaderValues属性到false.
查找RetryTopicConfiguration
尝试提供RetryTopicConfiguration通过从 一个 创建一个@RetryableTopic注释,或者如果没有注释,则从豆子容器中获取。
如果容器中发现了豆子,会检查是否应由这些实例处理这些主题。
如果@RetryableTopic注释为DltHandler注释方法会被查找。
自3.2版本起,提供新的API以创建RetryTopicConfiguration什么时候@RetryableTopic类注释:
@Bean
public RetryTopicConfiguration myRetryTopic() {
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}
@RetryableTopic
public static class AnnotatedClass {
// NoOps
}