|
对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4! |
配置
从2.9版本开始,默认配置下,@EnableKafkaRetryTopic注解应该在带有@Configuration注解的类中使用。
这使得功能能够正确启动,并允许注入一些可以在运行时查找的功能组件。
如果不添加@EnableKafka,也不需要添加该注解,因为@EnableKafkaRetryTopic被@EnableKafka元注解所注解。 |
从那个版本开始,为了更高级的功能组件和全局功能的配置,`0` 类应该被扩展为 `1` 类,并且需要重写合适的的方法。 更多详细信息请参阅 配置全局设置和功能。
默认情况下,重试主题的容器将与主容器具有相同的并发性。
从3.0版本开始,您可以为重试容器设置不同的concurrency(可以在注解中设置,或在RetryConfigurationBuilder中设置)。
仅能使用上述技术之一,并且只能一个@Configuration类继承RetryTopicConfigurationSupport。 |
使用@RetryableTopic注解
为了配置一个标注为@KafkaListener的方法的重试主题和死信主题,你只需要在该方法上添加@RetryableTopic注解,Spring for Apache Kafka 将会使用默认配置自动创建所有必要的主题和消费者。
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
您可以使用同一类中的方法来处理dlt消息,通过使用@DltHandler注解进行标注。
如果未提供DltHandler方法,则会创建一个默认的消费者,仅记录消费。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果未指定kafkaTemplate的名称,将查找名为defaultRetryTopicKafkaTemplate的bean。
如果未找到该bean将抛出异常。 |
从 3.0 版本开始,@RetryableTopic 注解可以作为元注解用于自定义注解;例如:
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
static @interface MetaAnnotatedRetryableTopic {
@AliasFor(attribute = "concurrency", annotation = RetryableTopic.class)
String parallelism() default "3";
}
使用RetryTopicConfigurationBean
您也可以通过在一个@Configuration注解类中创建RetryTopicConfiguration个beans来配置非阻塞重试支持。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将为所有在方法上使用默认配置并带有@KafkaListener注解的主题创建重试主题和DLT,以及相应的消费者。KafkaTemplate实例用于消息转发。
为更精细地控制每个主题的非阻塞重试处理,可以提供多个 RetryTopicConfiguration bean。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackOff(3000)
.maxAttempts(5)
.concurrency(1)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
重试主题和DLT的消费者将被分配到一个具有组ID的消费者组,该组ID是由你在groupId参数中的@KafkaListener注解中提供的ID与主题后缀组合而成。如果你没有提供任何ID,它们将全部属于同一个组,重试主题上的重新平衡将导致主主题上的不必要的重新平衡。 |
如果消费者配置了ErrorHandlingDeserializer,以处理反序列化异常,重要的是要配置KafkaTemplate及其生产者使用能够处理普通对象以及来自反序列化异常的原始byte[]值的序列化器。
模板的通用值类型应为Object。
一种技术是使用DelegatingByTypeSerializer;以下是一个示例: |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
多个 @KafkaListener 注解可以用于同一主题,可与手动分区分配和非阻塞重试同时使用,但针对同一主题只会应用一个配置。
最好为此类主题使用一个单独的 RetryTopicConfiguration bean 进行配置;如果在同一主题上使用多个 @RetryableTopic 注解,所有注解的值必须相同,否则其中一个配置将应用于该主题所有监听器,其他注解的值会被忽略。 |
配置全局设置和功能
自 2.9 起,针对组件配置的先前组件覆盖方法已被移除(没有弃用,由于该 API 具有上述实验性质)。
这并不改变 RetryTopicConfiguration 个 beans 的方法——仅基础设施组件的配置受到影响。
现在应在一个(单一的)RetryTopicConfigurationSupport 类中扩展 @Configuration 类,并重写适当的方法。
以下是一个示例:
@EnableKafka
@Configuration
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
@Override
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
blockingRetries
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
.backOff(new FixedBackOff(3000, 3));
}
@Override
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
}
@Override
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
// Use the new 2.9 mechanism to avoid re-fetching the same records after a pause
customizersConfigurer.customizeErrorHandler(eh -> {
eh.setSeekAfterError(false);
});
}
}
当使用这种配置方法时,不应使用@EnableKafkaRetryTopic注解,以防止由于重复的bean导致上下文启动失败。
请改用简单的@EnableKafka注解。 |
当 autoCreateTopics 为 true 时,主主题和重试主题将使用指定的分区数和复制因子创建。
从 3.0 版本开始,默认复制因子为 -1,即使用 broker 的默认值。
如果您的 broker 版本早于 2.4,则需要显式设置值。
要为特定主题(例如主主题或 DLT)覆盖这些值,只需添加一个 NewTopic @Bean 以及所需的属性;这将覆盖自动创建属性。
| 默认情况下,记录会使用接收到记录的原始分区发布到重试主题(s)。 如果重试主题的分区数少于主主题,你应该相应地配置框架;以下是一个示例。 |
@EnableKafka
@Configuration
public class Config extends RetryTopicConfigurationSupport {
@Override
protected Consumer<DeadLetterPublishingRecovererFactory> configureDeadLetterPublishingContainerFactory() {
return dlprf -> dlprf.setPartitionResolver((cr, nextTopic) -> null);
}
...
}
The parameters to the function are the consumer record and the name of the next topic.
你可以返回一个特定的分区号,或 null 表示应由 KafkaProducer 来决定分区。
默认情况下,重试主题中记录在重试过程中传递时,重试头(尝试次数、时间戳)的所有值都会保留。
从 2.9.6 版本开始,如果您只想保留这些头的最后一个值,请使用上面所示的 configureDeadLetterPublishingContainerFactory() 方法,将工厂的 retainAllRetryHeaderValues 属性设置为 false。