4。参考信息
4.1. 使用Spring处理Apache Kafka
此部分提供了关于使用 Spring 与 Apache Kafka 遇到的各种问题的详细解释。 对于快速但不详细的介绍,请参见 快速入门。
4.1.1. 连接到 Kafka
从 2.5 版本开始,每个类都扩展了 KafkaResourceFactory。
这允许在运行时通过向其配置添加 Supplier<String> 来更改引导服务器: setBootstrapServersSupplier(() → …)。
这将对所有新连接调用以获取服务器列表。
生产者和消费者通常具有长生命周期。
要关闭现有生产者,请在 DefaultKafkaProducerFactory 上调用 reset()。
要关闭现有消费者,请在 KafkaListenerEndpointRegistry 上调用 stop()(然后 start()),并在/或在任何其他监听器容器 bean 上的 stop() 和 start() 调用。
为了方便,框架还提供了一个ABSwitchCluster,它支持两组引导服务器;任何时候其中一组处于活动状态。
配置ABSwitchCluster并将它添加到生产者和消费者工厂中,以及调用setBootstrapServersSupplier()的KafkaAdmin。
当您想切换时,请调用primary()或secondary()并调用reset()以建立新的连接(对于生产者);对于消费者,请调用stop()和start()所有监听器容器。
在使用@KafkaListener时,请将stop()和start()作为KafkaListenerEndpointRegistry bean。
查看更多Java文档信息。
工厂监听器
从版本 2.5 开始,DefaultKafkaProducerFactory 和 DefaultKafkaConsumerFactory 可以通过配置 Listener 来接收生产者或消费者创建或关闭的通知。
interface Listener<K, V> {
default void producerAdded(String id, Producer<K, V> producer) {
}
default void producerRemoved(String id, Producer<K, V> producer) {
}
}
interface Listener<K, V> {
default void consumerAdded(String id, Consumer<K, V> consumer) {
}
default void consumerRemoved(String id, Consumer<K, V> consumer) {
}
}
在每种情况下,id都是通过将创建后从metrics()获得的client-id属性附加到工厂beanName属性来创建的,并用.分隔。
这些侦听器可以用于创建和绑定一个 Micrometer KafkaClientMetrics 实例,当新客户端被创建时(并在关闭客户端时将其关闭)。
框架提供了实现这一功能的监听器;详见Micrometer 原生度量指标。
4.1.2. 配置主题
如果你在应用程序上下文中定义了一个KafkaAdmin类型的bean,它会自动将主题添加到经纪人。
要做到这一点,你可以在应用程序上下文中为每个主题添加一个NewTopic @Bean。
2.3版引入了一个新的类TopicBuilder,以更方便地创建此类bean。
以下示例展示了如何实现:
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))
@Bean
fun topic1() =
TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build()
@Bean
fun topic2() =
TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
@Bean
fun topic3() =
TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
从 2.6 版本开始,可以省略 .partitions() 和/或 replicas(),那些属性将应用 broker 的默认值。
支持此功能的 broker 版本必须至少为 2.4.0 - 请参见 KIP-464。
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()
@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()
@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()
从 2.7 版本开始,你可以在一个 KafkaAdmin.NewTopics 组件定义中声明多个 NewTopic:
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build()
)
当使用 Spring Boot 时,会自动注册一个 KafkaAdmin bean,因此只需要 NewTopic(和/或 NewTopics)@Beans。 |
默认情况下,如果经纪人不可用,会记录一条消息,但上下文会继续加载。
你可以通过编程方式调用管理员的 initialize() 方法稍后重试。
如果你希望将此条件视为致命,请将管理员的 fatalIfBrokerNotAvailable 属性设置为 true。
上下文将无法初始化。
如果 broker 支持(1.0.0 或更高),当发现某个现有主题的分区数少于 NewTopic.numPartitions 时,管理员可以增加分区数量。 |
从 2.7 版本开始,KafkaAdmin 提供了在运行时创建和检查主题的方法。
-
createOrModifyTopics -
describeTopics
对于更高级的功能,您可以直接使用AdminClient。
以下示例展示了如何操作:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
4.1.3. 发送消息
本部分介绍了如何发送消息。
使用KafkaTemplate
本部分介绍了如何使用 KafkaTemplate 来发送消息。
概述
The KafkaTemplate wraps a producer and provides convenience methods to send data to Kafka topics.
The following listing shows the relevant methods from KafkaTemplate:
ListenableFuture<SendResult<K, V>> sendDefault(V data);
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, V data);
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);
ListenableFuture<SendResult<K, V>> send(Message<?> message);
Map<MetricName, ? extends Metric> metrics();
List<PartitionInfo> partitionsFor(String topic);
<T> T execute(ProducerCallback<K, V, T> callback);
// Flush the producer.
void flush();
interface ProducerCallback<K, V, T> {
T doInKafka(Producer<K, V> producer);
}
有关更多详细信息,请参阅javadoc。
The sendDefault API 需要将默认主题提供给模板。
The API 接收一个 timestamp 参数并将其时间戳存储在记录中。
用户提供的时间戳如何存储取决于 Kafka 主题上配置的时间戳类型。
如果主题配置为使用 CREATE_TIME,则会记录(或生成)用户指定的时间戳。
如果主题配置为使用 LOG_APPEND_TIME,则忽略用户指定的时间戳,并由代理添加本地时间戳。
要使用模板,您可以配置一个生产者工厂并将其提供给模板的构造函数。 以下示例展示了如何操作:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// See https://kafka.apache.org/documentation/#producerconfigs for more properties
return props;
}
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory());
}
从版本2.5开始,您可以现在重写工厂的ProducerConfig属性,以从同一个工厂创建具有不同生产者配置的模板。
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
return new KafkaTemplate<>(pf,
Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}
注意,类型为 ProducerFactory<?, ?>(例如由 Spring Boot 自动配置的)的 bean 可以通过不同的具体泛型类型进行引用。
您也可以通过使用标准 <bean/> 定义来配置模板。
然后,要使用模板,你可以调用其方法。
当您使用带有Message<?>参数的方法时,主题、分区和键信息将通过包含以下项目的消息头提供:
-
KafkaHeaders.TOPIC -
KafkaHeaders.PARTITION_ID -
KafkaHeaders.MESSAGE_KEY -
KafkaHeaders.TIMESTAMP
消息有效载荷就是数据。
可选地,你可以用 ProducerListener 配置 KafkaTemplate,以在发送结果(成功或失败)时异步回调,而不是等待 Future 完成。
以下代码示例显示了 ProducerListener 接口的定义:
public interface ProducerListener<K, V> {
void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata);
void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata,
Exception exception);
}
默认情况下,模板配置为 LoggingProducerListener,这会记录错误,当发送成功时则不执行任何操作。
为了方便,如果您只想实现其中一个方法,已提供默认的方法实现。
请注意,send方法返回一个 ListenableFuture<SendResult>。
您可以使用监听器注册一个回调,以异步接收send的结果。
以下示例展示了如何实现:
ListenableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(Throwable ex) {
...
}
});
SendResult有兩個屬性,即ProducerRecord和RecordMetadata。
請參閱Kafka API文檔以了解有關這些物件的資訊。
The Throwable in onFailure 可以转换为一个 KafkaProducerException;其 failedProducerRecord 属性包含失败的记录。
使用版本 2.5 后,您可以使用 KafkaSendCallback 替代 ListenableFutureCallback,从而更容易提取失败的 ProducerRecord,避免需要将 Throwable 进行类型转换:
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(new KafkaSendCallback<Integer, String>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
...
}
@Override
public void onFailure(KafkaProducerException ex) {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
}
});
您也可以使用一对 lambda 表达式:
ListenableFuture<SendResult<Integer, String>> future = template.send("topic", 1, "thing");
future.addCallback(result -> {
...
}, (KafkaFailureCallback<Integer, String>) ex -> {
ProducerRecord<Integer, String> failed = ex.getFailedProducerRecord();
...
});
如果您希望阻塞发送线程以等待结果,可以调用future的get()方法;推荐使用带有超时的方法。
您可能希望在等待之前调用flush(),或者出于方便考虑,模板有一个带autoFlush参数的构造函数,该构造函数会使模板在每次发送时flush()。
仅当设置了linger.ms生产者属性并希望立即发送一个不完整的批次时才需要刷新。
示例
本部分展示了向Kafka发送消息的示例:
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
ListenableFuture<SendResult<Integer, String>> future = template.send(record);
future.addCallback(new KafkaSendCallback<Integer, String>() {
@Override
public void onSuccess(SendResult<Integer, String> result) {
handleSuccess(data);
}
@Override
public void onFailure(KafkaProducerException ex) {
handleFailure(data, record, ex);
}
});
}
public void sendToKafka(final MyOutputData data) {
final ProducerRecord<String, String> record = createRecord(data);
try {
template.send(record).get(10, TimeUnit.SECONDS);
handleSuccess(data);
}
catch (ExecutionException e) {
handleFailure(data, record, e.getCause());
}
catch (TimeoutException | InterruptedException e) {
handleFailure(data, record, e);
}
}
注意 ExecutionException 的原因是 KafkaProducerException 通过 failedProducerRecord 属性。
使用RoutingKafkaTemplate
从 2.5 版本开始,您可以使用一个 RoutingKafkaTemplate 来在运行时选择 producer,基于目的地 topic 的名称。
The routing template does not support transactions, execute, flush, or metrics operations because the topic is not known for those operations. |
该模板需要一个将 java.util.regex.Pattern 映射到 ProducerFactory<Object, Object> 的实例的映射。
该映射应有序(例如一个 LinkedHashMap),因为它将按顺序遍历;你应该在前面添加更具体的模式。
以下简单的 Spring Boot 应用程序通过使用相同的模板向不同的主题发送消息,每个主题使用不同的值序列化器。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
ProducerFactory<Object, Object> pf) {
// Clone the PF with a different Serializer, register with Spring for shutdown
Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
context.registerBean(DefaultKafkaProducerFactory.class, "bytesPF", bytesPF);
Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
map.put(Pattern.compile("two"), bytesPF);
map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
return new RoutingKafkaTemplate(map);
}
@Bean
public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
return args -> {
routingTemplate.send("one", "thing1");
routingTemplate.send("two", "thing2".getBytes());
};
}
}
The corresponding @KafkaListener s for this example are shown in Annotation Properties.
对于另一种实现类似结果的技术,但具有将不同类型发送到同一主题的能力,请参阅委派序列化器和反序列化器。
使用DefaultKafkaProducerFactory
正如在使用KafkaTemplate中所述,使用ProducerFactory来创建生产者。
当不使用事务时,默认情况下,DefaultKafkaProducerFactory会创建一个用于所有客户端的单例生产者,这在KafkaProducer的JavaDoc中被推荐。
但是,如果你在模板上调用flush(),这可能会导致其他使用相同生产者的线程延迟。
从版本2.3开始,DefaultKafkaProducerFactory有了一个新的属性producerPerThread。
当设置为true时,工厂将为每个线程创建(并缓存)一个独立的生产者,以避免这个问题。
When producerPerThread is true, user code must call closeThreadBoundProducer() on the factory when the producer is no longer needed.
This will physically close the producer and remove it from the ThreadLocal.
Calling reset() or destroy() will not clean up these producers. |
当创建一个DefaultKafkaProducerFactory时,可以通过调用仅接受一个属性映射(属性)Map的构造函数从配置中获取key和/或value的Serializer类(参见使用KafkaTemplate示例),或者将Serializer实例传递给DefaultKafkaProducerFactory构造函数(在这种情况下,所有Producer共享相同的实例)。
或者,可以提供Supplier<Serializer>(从2.3版本开始)来获取每个Producer的单独Serializer实例:
@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}
@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}
自从2.5.10版本起,现在可以在工厂创建之后更新生产者属性。
这在例如凭证变更后需要更新SSL密钥/信任存储位置的情况下可能会很有用。
这些更改不会影响现有的生产者实例;调用reset()关闭所有现有生产者,以便使用新的属性创建新的生产者。
注意:您不能将事务性生产者工厂更改为非事务性,反之亦然。
两个新的方法现在已提供:
void updateConfigs(Map<String, Object> updates);
void removeConfig(String configKey);
从 2.8 版本开始,如果你以对象形式提供序列化器(通过构造函数或 setter 提供),工厂将调用 configure() 方法来使用配置属性配置它们。
使用ReplyingKafkaTemplate
版本 2.1.3 引入了一个 KafkaTemplate 的子类,以提供请求/回复语义。
该类命名为 ReplyingKafkaTemplate,并具有两个额外的方法;以下显示了方法签名:
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);
RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
Duration replyTimeout);
(也参见 使用 Message<?>s 的请求/回复)。
结果是一个result,它会异步地填充结果(或在超时时抛出异常)。
结果还有一个property属性,它是调用method的结果。
你可以使用这个future来确定发送操作的结果。
如果使用第一个方法,或者replyTimeout参数为null时,将使用模板的defaultReplyTimeout属性(默认为5秒)。
从 2.8.8 版本开始,模板新增了一个方法 waitForAssignment。
这在回复容器配置为 auto.offset.reset=latest 以避免在容器初始化之前先发送一个请求和回复时特别有用。
在使用手动分区分配(无组管理)时,等待的持续时间必须大于容器的 pollTimeout 属性,因为通知将在第一次轮询完成后才发送。 |
以下这个 Spring Boot 应用程序展示了如何使用该功能的示例:
@SpringBootApplication
public class KRequestingApplication {
public static void main(String[] args) {
SpringApplication.run(KRequestingApplication.class, args).close();
}
@Bean
public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
return args -> {
if (!template.waitForAssignment(Duration.ofSeconds(10))) {
throw new IllegalStateException("Reply container did not initialize");
}
ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
System.out.println("Sent ok: " + sendResult.getRecordMetadata());
ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
System.out.println("Return value: " + consumerRecord.value());
};
}
@Bean
public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
ProducerFactory<String, String> pf,
ConcurrentMessageListenerContainer<String, String> repliesContainer) {
return new ReplyingKafkaTemplate<>(pf, repliesContainer);
}
@Bean
public ConcurrentMessageListenerContainer<String, String> repliesContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> repliesContainer =
containerFactory.createContainer("kReplies");
repliesContainer.getContainerProperties().setGroupId("repliesGroup");
repliesContainer.setAutoStartup(false);
return repliesContainer;
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean
public NewTopic kReplies() {
return TopicBuilder.name("kReplies")
.partitions(10)
.replicas(2)
.build();
}
}
请注意,我们可以使用 Boot 自动配置的容器工厂来创建回复容器。
如果回复使用了非平凡的反序列化器,请考虑使用一个ErrorHandlingDeserializer,该代码委托给您已配置的反序列化器。当如此配置时,RequestReplyFuture将异常完成,并且您可以捕获ExecutionException,其中包含在cause属性中的DeserializationException。
从2.6.7版本开始,除了检测DeserializationException之外,模板还会调用replyErrorChecker函数(如果提供了的话)。如果该函数返回异常,则未来将完成为异常状态。
这是一个示例:
template.setReplyErrorChecker(record -> {
Header error = record.headers().lastHeader("serverSentAnError");
if (error != null) {
return new MyException(new String(error.value()));
}
else {
return null;
}
});
...
RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
...
}
catch (InterruptedException e) {
...
}
catch (ExecutionException e) {
if (e.getCause instanceof MyException) {
...
}
}
catch (TimeoutException e) {
...
}
该模板设置了一个标题(默认命名为KafkaHeaders.CORRELATION_ID),服务器端必须将其回显。
在这种情况下,以下@KafkaListener应用程序会响应:
@SpringBootApplication
public class KReplyingApplication {
public static void main(String[] args) {
SpringApplication.run(KReplyingApplication.class, args);
}
@KafkaListener(id="server", topics = "kRequests")
@SendTo // use default replyTo expression
public String listen(String in) {
System.out.println("Server received: " + in);
return in.toUpperCase();
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kRequests")
.partitions(10)
.replicas(2)
.build();
}
@Bean // not required if Jackson is on the classpath
public MessagingMessageConverter simpleMapperConverter() {
MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
return messagingMessageConverter;
}
}
基础设施会回显关联ID并确定回复主题。
见 使用 @SendTo 传递转发侦听器结果 以获取更多关于发送回复的信息。 模板使用默认的标题 KafKaHeaders.REPLY_TOPIC 来指示回复的目标主题。
从版本 2.2 开始,模板会尝试检测回复主题或分区。如果容器被配置为监听单个主题或者单个 TopicPartitionOffset,则使用它来设置回复头信息。如果容器以其他方式配置,则用户必须手动设置回复头信息。在这种情况下,在初始化期间会写入一条 INFO 级别的日志消息。以下示例使用了 KafkaHeaders.REPLY_TOPIC:
record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));
当您使用单个回复 TopicPartitionOffset 进行配置时,只要每个实例监听不同的分区,就可以为多个模板使用相同的回复主题。
在使用单个回复主题进行配置时,每个实例必须使用不同的 group.id。
在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例才能找到关联ID。
这可能对自动扩展有用,但会增加额外的网络流量开销,并且需要花费少量成本来丢弃每个不需要的回复。
当您使用此设置时,我们建议将模板的 sharedReplyTopic 设置为 true,这样可以将意外回复的日志级别从默认的 ERROR 降低到 DEBUG。
以下是一个配置回复容器以使用相同的共享回复主题的示例:
@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {
ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
Properties props = new Properties();
props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
container.getContainerProperties().setKafkaConsumerProperties(props);
return container;
}
| 如果您有多个客户端实例,并且没有按照上一节所述进行配置,那么每个实例都需要一个专用的回复主题。 另一种方法是设置
服务器必须使用此标头将回复路由到正确的分区( 不过在这种情况下,回复容器不应使用Kafka的组管理功能,而应被配置为监听固定分区(通过在它的 |
如果 DefaultKafkaHeaderMapper 需要Jackson在类路径中(对于@KafkaListener)。 如果没有可用,消息转换器没有头映射器,因此您必须配置一个MessagingMessageConverter与SimpleKafkaHeaderMapper,如前所述。 |
默认情况下,使用3个header:
-
KafkaHeaders.CORRELATION_ID- 用于将回复与请求关联起来 -
KafkaHeaders.REPLY_TOPIC- 用于告诉服务器回复的位置 -
KafkaHeaders.REPLY_PARTITION- (可选)用于告诉服务器回复哪个分区
这些标题名称由@KafkaListener基础设施使用来路由回复。
从 2.3 版本开始,您可以自定义标题名称——模板有三个属性 correlationHeaderName、replyTopicHeaderName 和 replyPartitionHeaderName。如果您的服务器不是 Spring 应用程序(或不使用 @KafkaListener),这会很有用。
请求/回复与Message<?> s
版本 2.7 在 ReplyingKafkaTemplate 上添加了方法,用于发送和接收 spring-messaging 的 Message<?> 抽象:
RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);
<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
ParameterizedTypeReference<P> returnType);
这些将使用模板的默认replyTimeout,还有可以接受超时时间作为方法调用参数的重载版本。
如果消费者Deserializer或模板的MessageConverter可以在没有附加信息的情况下通过配置或回复消息中的类型元数据转换负载,则使用第一种方法。
如果需要为返回类型提供类型信息以协助消息转换器,请使用第二种方法。这种方法还允许相同的模板接收不同类型,即使回复中没有类型元数据(例如服务器端不是Spring应用程序时)。以下是后一种情况的示例:
@Bean
ReplyingKafkaTemplate<String, String, String> template(
ProducerFactory<String, String> pf,
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> replyContainer =
factory.createContainer("replies");
replyContainer.getContainerProperties().setGroupId("request.replies");
ReplyingKafkaTemplate<String, String, String> template =
new ReplyingKafkaTemplate<>(pf, replyContainer);
template.setMessageConverter(new ByteArrayJsonMessageConverter());
template.setDefaultTopic("requests");
return template;
}
@Bean
fun template(
pf: ProducerFactory<String?, String>?,
factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
val replyContainer = factory.createContainer("replies")
replyContainer.containerProperties.groupId = "request.replies"
val template = ReplyingKafkaTemplate(pf, replyContainer)
template.messageConverter = ByteArrayJsonMessageConverter()
template.defaultTopic = "requests"
return template
}
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());
RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())
val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })
回复类型消息<?>
当 @KafkaListener 返回 Message<?> 时,在2.5版本之前的版本中,必须填充回复主题和相关id标头。在此示例中,我们使用请求中的回复主题标头:
@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.TOPIC, replyTo)
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.setHeader(KafkaHeaders.CORRELATION_ID, correlation)
.build();
}
这还展示了如何在回复记录上设置一个键。
从版本 2.5 开始,如果检测到这些标题缺失,框架将使用主题填充它们——无论是从 @SendTo 值确定的主题还是传入的 KafkaHeaders.REPLY_TOPIC 标题(如果存在的话)。它还会回显传入的 KafkaHeaders.CORRELATION_ID 和 KafkaHeaders.REPLY_PARTITION,如果它们存在。
@KafkaListener(id = "requestor", topics = "request")
@SendTo // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
return MessageBuilder.withPayload(in.toUpperCase())
.setHeader(KafkaHeaders.MESSAGE_KEY, 42)
.build();
}
聚合多个回复
模板在使用ReplyingKafkaTemplate中仅适用于单个请求/回复场景。对于单一消息的多个接收者都返回一个回复的情况,可以使用AggregatingReplyingKafkaTemplate。这是散射-聚集企业集成模式的客户端实现。
与 ReplyingKafkaTemplate 类似,AggregatingReplyingKafkaTemplate 构造函数需要一个生产者工厂和一个监听容器来接收回复;它还有一个第三个参数 BiPredicate<List<ConsumerRecord<K, R>>, Boolean> releaseStrategy,每次接收到回复时都会被调用;当该谓词返回 true 时,使用 ConsumerRecord 的集合来完成由 sendAndReceive 方法返回的 Future。
有一个额外的属性 returnPartialOnTimeout(默认为 false)。 当将其设置为 true 时,与使用 KafkaReplyTimeoutException 完成未来不同,部分结果在至少收到一条回复记录的情况下会正常完成未来。
从版本 2.3.5 开始,如果超时(returnPartialOnTimeout 是 true),也会调用谓词。 第一个参数是当前记录列表;第二个参数为 true 表示此调用是由超时引起的。 谓词可以修改记录列表。
AggregatingReplyingKafkaTemplate<Integer, String, String> template =
new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
future.get(30, TimeUnit.SECONDS);
请注意,返回类型是 ConsumerRecord,其值是一个包含多个 ConsumerRecord 的集合。 ConsumerRecord 是模板合成的“外部”记录,并非真正的记录,用于持有实际回复记录。
当正常发布发生时(发布策略返回 true),主题设置为 aggregatedResults;如果 returnPartialOnTimeout 为 true 并且超时发生(并且至少收到一条回复记录),则主题设置为 partialResultsAfterTimeout。
该模板提供了这些“主题”名称的常量静态变量:
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a normal release by the release strategy.
*/
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";
/**
* Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
* results in its value after a timeout.
*/
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";
实际的 ConsumerRecord 包含从其中接收回复的实际主题。
监听器容器对于回复必须配置为AckMode.MANUAL或AckMode.MANUAL_IMMEDIATE; 消费者属性enable.auto.commit必须设置为false(自版本2.3起默认值)。
为了避免任何可能的消息丢失,模板仅在没有待处理请求时提交偏移量,即当最后一个待处理请求被释放策略释放时。 重新分配后,可能会有重复回复的交付;这些对于正在飞行中的请求将被忽略;当你收到重复回复时,可能会看到错误日志消息。 |
如果您使用了ErrorHandlingDeserializer与这个聚合模板一起使用,框架将不会自动检测DeserializationException。
相反,记录(带有null值)将以完整形式返回,并包含反序列化异常信息在头部。
建议应用程序调用实用方法ReplyingKafkaTemplate.checkDeserialization()来确定是否发生了反序列化异常。
请参阅其javadoc以获取更多信息。
此聚合模板也不调用replyErrorChecker;您应该对回复中的每个元素进行检查。 |
4.1.4. 接收消息
您可以通过配置一个MessageListenerContainer并提供一个消息监听器,或使用@KafkaListener注解来接收消息。
消息监听器
当您使用消息监听器容器时,必须提供一个监听器来接收数据。 目前支持八种消息监听器接口。 以下列表显示了这些接口:
public interface MessageListener<K, V> { (1)
void onMessage(ConsumerRecord<K, V> data);
}
public interface AcknowledgingMessageListener<K, V> { (2)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment);
}
public interface ConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (3)
void onMessage(ConsumerRecord<K, V> data, Consumer<?, ?> consumer);
}
public interface AcknowledgingConsumerAwareMessageListener<K, V> extends MessageListener<K, V> { (4)
void onMessage(ConsumerRecord<K, V> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
public interface BatchMessageListener<K, V> { (5)
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K, V> { (6)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
public interface BatchConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (7)
void onMessage(List<ConsumerRecord<K, V>> data, Consumer<?, ?> consumer);
}
public interface BatchAcknowledgingConsumerAwareMessageListener<K, V> extends BatchMessageListener<K, V> { (8)
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment, Consumer<?, ?> consumer);
}
| 1 | 使用此接口处理从 Kafka 消费者poll()操作接收到的单个ConsumerRecord实例,当使用自动提交或容器管理的提交方法之一时。 |
| 2 | 使用此接口处理从 Kafka 消费者poll()操作中接收到的单个ConsumerRecord实例,在使用手动提交方法之一时。
提交方法 |
| 3 | 使用此接口处理从 Kafka 消费者 poll() 操作接收到的单个 ConsumerRecord 实例,当使用自动提交或一个容器管理的 提交方法 时。
访问 Consumer 对象。 |
| 4 | 使用此接口来处理从 Kafka 消费者 poll() 操作中接收到的单个 ConsumerRecord 实例。当使用手动提交方法之一时。
访问 Consumer 对象是可以提供的。 |
| 5 | 使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。当使用自动提交或容器管理的 提交方法 时,请使用此接口。
不支持 AckMode.RECORD,因为监听器会获得完整的批次数据。 |
| 6 | 使用此接口处理从Kafka消费者poll()操作接收的所有ConsumerRecord实例,当使用其中一种手动提交方法时。 |
| 7 | 使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。当使用自动提交或容器管理的 提交方法 时,请使用此接口。
不支持 AckMode.RECORD,因为监听器会接收到完整的批次。
提供了对 Consumer 对象的访问权限。 |
| 8 | 使用此接口处理从 Kafka 消费者 poll() 操作接收到的所有 ConsumerRecord 实例。
可以访问 Consumer 对象。 |
The Consumer object is not thread-safe.
You must only invoke its methods on the thread that calls the listener. |
您不应该在监听器中执行任何Consumer<?, ?>方法来影响消费者的持仓或已提交的偏移量;容器需要管理此类信息。 |
消息监听器容器
两个 MessageListenerContainer 实现被提供:
-
KafkaMessageListenerContainer -
ConcurrentMessageListenerContainer
The KafkaMessageListenerContainer 接收所有主题或分区的所有消息,使用单个线程。
The ConcurrentMessageListenerContainer 代理给一个或多个 KafkaMessageListenerContainer 实例以提供多线程消费。
从版本2.2.7开始,您可以向监听器容器添加一个RecordInterceptor;它将在调用监听器之前被调用,允许检查或修改记录。
如果拦截器返回null,则不会调用监听器。
从版本2.7开始,它还具有额外的方法,在监听器退出(正常结束或通过抛出异常)之后会被调用。
此外,从版本2.7开始,现在有一个BatchInterceptor,提供了类似的功能用于批量监听器。Batch Listeners
此外,ConsumerAwareRecordInterceptor(和BatchInterceptor)提供对Consumer<?, ?>的访问。
这可能被用来,在拦截器中访问消费者指标等。
| 你不应在这些拦截器中执行任何会影响消费者持仓或已提交偏移量的方法;容器需要管理此类信息。 |
如果拦截器修改了记录(通过创建新记录),则必须保持topic、partition和offset不变,以免产生意外的副作用,例如记录丢失。 |
可以使用 CompositeRecordInterceptor 和 CompositeBatchInterceptor 来调用多个拦截器。
默认情况下,从版本2.8开始,使用事务时,拦截器会在事务开始前被调用。
您可以将侦听器容器的interceptBeforeTx属性设置为false,以在事务开始后而不是在事务开始前调用拦截器。
从版本 2.3.8、2.4.6 开始,当并发度大于一时,ConcurrentMessageListenerContainer 现在支持静态成员资格。group.instance.id 在 1 处附加了 -n 和 n。这与增加的 session.timeout.ms 结合使用,可以减少重新平衡事件,例如,在应用程序实例重启时。
使用KafkaMessageListenerContainer
以下构造函数可用:
public KafkaMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它接收一个 ConsumerFactory 和有关主题和分区以及其他配置的信息,在一个 ContainerProperties 对象中。 ContainerProperties 具有以下构造函数:
public ContainerProperties(TopicPartitionOffset... topicPartitions)
public ContainerProperties(String... topics)
public ContainerProperties(Pattern topicPattern)
第一个构造函数采用TopicPartitionOffset个参数的数组,明确指示容器使用哪些分区(使用消费者assign()方法),并可选择一个初始偏移量。
正数值默认为绝对偏移量。
负数值默认相对于当前分区中的最后一个偏移量。
提供了第二个构造函数TopicPartitionOffset,它额外接受一个boolean参数。
如果这个值是true,则初始偏移量(正或负)相对于此消费者的当前位置。
当容器启动时应用这些偏移量。
第二个构造函数采用话题数组,Kafka根据group.id属性分配分区——在组中分布分区。
第三个构造函数使用正则表达式Pattern来选择话题。
要向容器分配一个 MessageListener,可以在创建容器时使用 ContainerProps.setMessageListener 方法。 以下示例显示了如何操作:
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
containerProps.setMessageListener(new MessageListener<Integer, String>() {
...
});
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<>(consumerProps());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
请注意,当创建DefaultKafkaConsumerFactory时,使用仅包含上述属性的构造函数意味着从配置中选择键和值Deserializer类。
或者,可以将Deserializer实例传递给DefaultKafkaConsumerFactory构造函数来获取密钥和/或值,在这种情况下所有消费者共享相同的实例。
另一种选择是提供Supplier<Deserializer>(从版本2.3开始),这些将在每个Consumer中用于获得单独的Deserializer实例:
DefaultKafkaConsumerFactory<Integer, CustomValue> cf =
new DefaultKafkaConsumerFactory<>(consumerProps(), null, () -> new CustomValueDeserializer());
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProps);
return container;
refer to the javadoc for ContainerProperties for more information about the various properties that you can set.
从版本 2.1.1 开始,新增了一个名为 logContainerConfig 的属性。
当启用 true 和 INFO 日志记录时,每个监听器容器都会写入一条日志消息,总结其配置属性。
默认情况下,主题偏移量提交的记录级别为DEBUG。 从版本2.1.2开始,在ContainerProperties中有一个属性commitLogLevel可让您指定这些消息的日志级别。 例如,要将日志级别更改为INFO,可以使用containerProperties.setCommitLogLevel(LogIfLevelEnabled.Level.INFO);。
从版本 2.2 开始,新增了一个名为 missingTopicsFatal 的容器属性(自 2.3.4 版本起默认为 false)。如果任何配置的主题在代理上不存在,则会阻止容器启动。如果容器配置为监听主题模式(正则表达式),则此设置不适用。
以前,容器线程会在 consumer.poll() 方法中循环等待主题出现,并记录大量日志消息。除了日志外,没有其他迹象表明存在问题。
从版本 2.8 开始,引入了一个新的容器属性 authExceptionRetryInterval。当容器从 KafkaConsumer 获取到任何 AuthenticationException 或 AuthorizationException 后,此属性会导致容器重试获取消息。例如,配置的用户被拒绝访问读取某个主题或凭据不正确时就可能发生这种情况。定义 authExceptionRetryInterval 允许在授予适当权限后容器恢复。
| 默认情况下,未配置任何间隔时间 - 认证和授权错误被视为致命错误,这将导致容器停止。 |
从版本 2.8 开始,在创建消费者工厂时,如果您以对象形式提供反序列化器(在构造函数中或通过 setter 方法),则工厂将调用 configure() 方法使用配置属性来配置它们。
使用ConcurrentMessageListenerContainer
单个构造函数类似于KafkaListenerContainer构造函数。下列表示了构造函数的签名:
public ConcurrentMessageListenerContainer(ConsumerFactory<K, V> consumerFactory,
ContainerProperties containerProperties)
它还有一个 concurrency 属性。 例如,container.setConcurrency(3) 创建了三个 KafkaMessageListenerContainer 实例。
对于第一个构造函数,Kafka 利用其组管理能力将分区分配给消费者。
|
当监听多个主题时,默认的分区分配可能不符合您的预期。 例如,如果您有三个主题,每个主题都有五个分区,并且您想使用 使用Spring Boot时,可以按如下方式设置策略:
|
当容器属性配置为TopicPartitionOffset时,ConcurrentMessageListenerContainer会将TopicPartitionOffset实例分布到代理KafkaMessageListenerContainer实例上。
例如,如果提供了六个TopicPartitionOffset实例且concurrency是3;每个容器获得两个分区。
对于五个TopicPartitionOffset实例,两个容器各获得两个分区,第三个容器获得一个分区。
如果concurrency大于TopicPartitions的数量,则调整concurrency使得每个容器获得一个分区。
如果设置了client.id属性,则会在其后面附加-n,其中n是与并发性相对应的消费者实例。当启用JMX时,需要此操作以提供唯一的MBean名称。 |
从版本 1.3 开始,MessageListenerContainer 提供了对底层 KafkaConsumer 的指标的访问。在 ConcurrentMessageListenerContainer 的情况下,metrics() 方法返回所有目标 KafkaMessageListenerContainer 实例的指标。Map<MetricName, ? extends Metric> 根据为底层 KafkaConsumer 提供的 client-id 对指标进行分组。
从版本2.3开始,ContainerProperties提供了一个idleBetweenPolls选项,允许监听器容器中的主循环在两次KafkaConsumer.poll()调用之间休眠。实际的睡眠间隔是从提供的选项和max.poll.interval.ms消费者配置与当前记录批处理时间之间的差值中选择最小值。
提交偏移量
提交偏移量有多种选择。
如果enable.auto.commit消费者属性为true,则Kafka会根据其配置自动提交偏移量。
如果是false,容器支持几种AckMode设置(将在下一列表中描述)。
默认的AckMode是BATCH。
从版本2.3开始,除非在配置中明确设置了框架,否则将enable.auto.commit设为false。
以前,如果未设置该属性,则使用Kafka默认值(true)。
消费者 poll() 方法返回一个或多个 ConsumerRecords。
对每个记录调用MessageListener。
以下列表描述了容器针对每个 AckMode 执行的操作(当未使用事务时):
-
RECORD: 记录监听器在处理完记录后返回时提交偏移量。 -
BATCH: 在处理完poll()返回的所有记录后提交偏移量。 -
TIME: 在处理完由poll()返回的所有记录时提交偏移量,只要自上次提交以来已超过ackTime。 -
COUNT: 确保自上次提交以来已收到ackCount条记录,并在由poll()返回的所有记录处理完成后提交偏移量。 -
COUNT_TIME: 类似于TIME和COUNT,但如果任一条件为true,则执行提交操作。 -
MANUAL: 消息监听器负责acknowledge()的Acknowledgment。在那之后,应用与BATCH相同的语义。 -
MANUAL_IMMEDIATE: 当监听器调用Acknowledgment.acknowledge()方法时,立即提交偏移量。
当使用事务时,偏移量将发送到事务中,并且语义等同于RECORD或BATCH,具体取决于监听器类型(记录或批次)。
0 和 1 需要监听者为 AcknowledgingMessageListener 或 BatchAcknowledgingMessageListener。
参见 消息监听器。 |
根据 syncCommits 容器属性,使用消费者上的 commitSync() 或 commitAsync() 方法。syncCommits 默认为 true;另请参阅 setSyncCommitTimeout。查看 setCommitCallback 获取异步提交的结果;默认回调是 LoggingCommitCallback,该回调会记录错误(在调试级别上记录成功)。
因为监听容器有自己的提交偏移量的机制,它更偏好Kafka ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG被设置为false。
从2.3版本开始,除非在消费者工厂或容器的消费者属性中特别设置,否则它会无条件地将其设为false。
代码Acknowledgment具有以下方法:
public interface Acknowledgment {
void acknowledge();
}
此方法使监听器能够控制何时提交偏移量。
从版本 2.3 开始,Acknowledgment 接口有两个额外的方法 nack(long sleep) 和 nack(int index, long sleep)。第一个方法用于记录监听器,第二个方法用于批处理监听器。为您的监听器类型调用错误的方法将抛出一个 IllegalStateException。
如果您想提交一个部分批次,请使用 nack()。在使用事务时,设置AckMode到MANUAL;调用nack()将成功处理的记录偏移量发送给事务。 |
nack()只能在调用监听器的消费者线程上调用。 |
nack() 是不允许在使用 乱序提交 时使用的。 |
使用记录监听器时,调用 nack() 会提交任何待处理的偏移量,丢弃上一次轮询中的剩余记录,并在其分区上执行查找操作,以便在下一次 poll() 中重新传递失败的记录和未处理的记录。可以通过设置 sleep 参数来暂停消费者在重新交付之前的运行。这与将容器配置为具有 DefaultErrorHandler 并抛出异常的功能类似。
使用批处理侦听器时,可以指定发生故障的批次中的索引。
当调用 nack() 时,将提交该索引之前记录的偏移量,并对失败和丢弃的记录所在的分区执行寻址操作,以便它们将在下一个poll()中重新交付。
See 容器错误处理程序 以获取更多详细信息。
| 消费者在睡眠期间被暂停,以便继续轮询代理以保持消费者的活跃状态。 实际的睡眠时间及其分辨率取决于容器的 最小睡眠时间等于 对于较小的睡眠时间或为了提高准确性,请考虑减少容器的 |
手动提交偏移量
当使用 AckMode.MANUAL 或 AckMode.MANUAL_IMMEDIATE 时,确认必须按顺序进行,因为 Kafka 不为每个记录维护状态,只维护每个 group/partition 的已提交偏移量。
从 2.8 版本开始,现在可以将容器属性设置为 asyncAcks,这允许由 poll 返回的记录的确认可以按任意顺序进行。
监听容器会推迟不按顺序的提交,直到缺少的确认收到。
消费者在所有上一次 poll 的偏移量提交之前会被暂停(不再有新记录被传递)。
| 虽然该功能允许应用程序异步处理记录,但应理解的是,这会增加在发生故障后出现重复投递的可能性。 |
@KafkaListener注解
The @KafkaListener 注解用于将 bean 方法指定为监听器容器的监听器。
该 bean 会被包装在 MessagingMessageListenerAdapter 中,可根据需要配置各种特性,例如转换器以将数据转换为匹配方法参数。
您可以通过SpEL在注解上配置大多数属性,使用#{…}或属性占位符(${…})。
请参见Javadoc以获取更多信息。
记录监听器
@KafkaListener 注解为简单的 POJO 监听器提供了一种机制。
以下示例展示了如何使用它:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
这种机制要求在您要使用的 @EnableKafka 类之一上使用 @Configuration 注解,并使用一个监听器容器工厂来配置底层的 ConcurrentMessageListenerContainer。
默认情况下,期望有一个名为 kafkaListenerContainerFactory 的 bean。
以下示例展示了如何使用 ConcurrentMessageListenerContainer:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
...
return props;
}
}
请注意,要设置容器属性,必须使用在工厂上的getContainerProperties()方法。
它用作实际注入到容器中的属性的模板。
从版本 2.1.1 开始,您可以通过注解创建的消费者现在可以设置 client.id 属性。
clientIdPrefix 会附加 -n,其中 n 是在使用并发时表示容器编号的整数。
从 2.2 版本开始,您可以使用注解本身上的属性来覆盖容器工厂的 concurrency 和 autoStartup 属性。
这些属性可以是简单值、属性占位符或 SpEL 表达式。
以下示例展示了如何实现:
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
显式分区分配
您也可以通过显式主题和分区(可选地,以及它们的初始偏移)配置POJO侦听器。 以下示例展示了如何进行配置:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
您可以为每个分区在partitions或partitionOffsets属性中指定一个,但不能同时指定两者。
如同大多数注解属性一样,你可以使用 SpEL 表达式;有关如何生成大量分区的示例,请参见 手动为所有分区赋值。
从版本2.5.5开始,您可以对所有分配的分区应用初始偏移:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
The * 通配符表示 partitions 属性中的所有分区。
在每个 @TopicPartition 中,@PartitionOffset 与通配符的组合只能出现一次。
此外,当监听器实现ConsumerSeekAware时,即使使用手动赋值,onPartitionsAssigned现在也会被调用,这允许在该时间点进行任意的跳转操作。
从2.6.4版本开始,您可以指定一个由逗号分隔的分区列表,或分区范围:
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
范围是包含在内的;上面的例子将分配分区 0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15。
相同的技巧也可以用于指定初始偏移量:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
初始偏移将应用于所有6个分区。
手动确认
当使用手动AckMode时,你还可以为监听器提供Acknowledgment。
以下示例还展示了如何使用不同的容器工厂。
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
消费者记录元数据
最后,可以从消息头获取记录的元数据。 您可以使用以下标题名称来检索消息的标题:
-
KafkaHeaders.OFFSET -
KafkaHeaders.RECEIVED_MESSAGE_KEY -
KafkaHeaders.RECEIVED_TOPIC -
KafkaHeaders.RECEIVED_PARTITION_ID -
KafkaHeaders.RECEIVED_TIMESTAMP -
KafkaHeaders.TIMESTAMP_TYPE
从2.5版本开始,如果传入的记录具有null键,则不再包含RECEIVED_MESSAGE_KEY;在此之前,标题会被填充为null值。
此更改是为了使框架与spring-messaging约定保持一致,其中null值的标题不存在。
以下示例显示了如何使用 headers:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
从 2.5 版本开始,您可以使用一个 ConsumerRecordMetadata 参数来接收记录元数据,而不再使用单独的头信息。
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
这包含了来自ConsumerRecord的所有数据,但不包含键和值。
批次监听器
自版本 1.1 起,您可以配置方法接收从消费者拉取的整个批次的消息记录。
要配置监听器工厂以创建批量监听器,可以设置 @KafkaListener 属性。以下示例展示了如何进行配置:
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true); // <<<<<<<<<<<<<<<<<<<<<<<<<
return factory;
}
| 从版本 2.8 开始,您可以在 @0 注解的 1 属性上覆盖工厂的 0 属性。这与对 容器错误处理程序 的更改一起使用,使得同一个工厂可以用于记录和批处理监听器。 |
以下示例显示如何接收一个payload列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
主题、分区、偏移量等信息在与负载相对应的头信息中可用。以下示例展示了如何使用这些头信息:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
或者,你也可以通过每个偏移接收一个List或一个Message<?>个对象,以及消息中的其他详细信息,但该方法只能定义一个该参数(在使用手动提交时的额外Acknowledgment参数,以及/或Consumer<?, ?>参数除外)。
以下示例展示了如何实现:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen14(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen15(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen16(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
在这种情况下,不会对有效负载执行转换。
如果配置了BatchMessagingMessageConverter为RecordMessageConverter,你也可以向Message参数添加泛型类型,并且payload会被转换。更多信息请参见批量监听器中的Payload转换。
您还可以接收一个ConsumerRecord<?, ?>对象列表,但必须是方法上唯一的参数(在使用手动提交和Consumer<?, ?>个参数时可选定义Acknowledgment)。下面的例子展示了如何实现:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
从版本2.2开始,监听器可以接收ConsumerRecords<?, ?>方法返回的完整的poll()对象,使监听器能够访问其他额外的方法,例如partitions()(它返回列表中的TopicPartition实例)和records(TopicPartition)(获取选择性记录)。
同样,此参数必须是该方法上唯一的参数(除了使用手动提交或Acknowledgment参数时可选的Consumer<?, ?>)
下面的例子展示了如何实现:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
如果容器工厂配置了 RecordFilterStrategy,则对于 ConsumerRecords<?, ?> 监听器会被忽略,并发出 WARN 日志消息。只有在使用 <List<?>> 形式的监听器时才能对记录进行过滤。默认情况下,每次只过滤一条记录;从版本 2.8 开始,您可以覆盖 filterBatch 来一次性调用过滤整个批次。 |
注解属性
从版本 2.0 开始,如果存在 id 属性,则将其用作 Kafka 消费者 group.id 属性,并覆盖消费者工厂中配置的属性(如果存在)。
您也可以显式设置 groupId 或将 idIsGroup 设置为 false 来恢复以前使用消费者工厂 group.id 的行为。
您可以在大多数注解属性中使用属性占位符或SpEL表达式,如下例所示:
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
从版本2.1.2开始,SpEL表达式支持一个特殊的标记:__listener。它是一个伪bean名称,表示当前注解所在的bean实例。
考虑以下示例:
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
在前面的示例中提供了这些bean后,我们可以使用以下内容:
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
如果在极少数情况下您有一个实际名为__listener的bean,可以通过使用beanRef属性来更改表达式标记。以下示例显示了如何操作:
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}",
groupId = "#{__x.topic}.group")
从版本 2.2.4 开始,您可以在注解中直接指定 Kafka 消费者属性,这些属性将覆盖消费者工厂中配置的同名属性。不能以这种方式指定 group.id 和 client.id 属性;它们将被忽略;请使用 groupId 和 clientIdPrefix 注解属性来指定这些。
The properties are specified as individual strings with the normal Java Properties file format: foo:bar, foo=bar, or foo bar.
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
以下是对使用RoutingKafkaTemplate示例中的相应监听器的说明。
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}
获取消费者group.id
当在多个容器中运行相同的监听器代码时,可能需要能够确定记录来自哪个容器(通过其group.id消费者属性来标识)。
您可以在监听器线程上调用 KafkaUtils.getConsumerGroupId() 来执行此操作。
或者,您可以在方法参数中访问组ID。
@KafkaListener(id = "bar", topicPattern = "${topicTwo:annotated2}", exposeGroupId = "${always:true}")
public void listener(@Payload String foo,
@Header(KafkaHeaders.GROUP_ID) String groupId) {
...
}
此功能在记录监听器和批次监听器中可用,这些监听器接收一个 List<?> 的记录。
它 不 在接收一个 ConsumerRecords<?, ?> 参数的批次监听器中可用。
在这种情况下,请使用 KafkaUtils 机制。 |
容器线程命名
Listener containers currently use two task executors, one to invoke the consumer and another that is used to invoke the listener when the kafka consumer property enable.auto.commit is false. You can provide custom executors by setting the consumerExecutor and listenerExecutor properties of the container’s ContainerProperties. When using pooled executors, be sure that enough threads are available to handle the concurrency across all the containers in which they are used. When using the ConcurrentMessageListenerContainer, a thread from each is used for each consumer (concurrency).
如果您不提供消费者执行器,将使用 SimpleAsyncTaskExecutor。这个执行器创建的线程名称类似于 <beanName>-C-1(消费者线程)。对于 ConcurrentMessageListenerContainer,线程名称的 <beanName> 部分变成了 <beanName>-m,其中 m 代表消费者实例。每次容器启动时,n 就会递增。因此,如果bean名称为 container,则在容器首次启动后,该容器中的线程将被命名为 container-0-C-1、container-1-C-1 等;在停止并随后启动之后,将被命名为 container-0-C-2、container-1-C-2 等。
@KafkaListener作为元注解
从 2.2 版本开始,您可以现在使用 @KafkaListener 作为元注解。
以下示例展示了如何操作:
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@KafkaListener
public @interface MyThreeConsumersListener {
@AliasFor(annotation = KafkaListener.class, attribute = "id")
String id();
@AliasFor(annotation = KafkaListener.class, attribute = "topics")
String[] topics();
@AliasFor(annotation = KafkaListener.class, attribute = "concurrency")
String concurrency() default "3";
}
You must 为至少一个 topics、topicPattern 或 topicPartitions 别名(通常还需要为 id 或 groupId 别名,除非在消费者工厂配置中指定了 group.id)进行别名设置。
以下示例展示了如何实现:
@MyThreeConsumersListener(id = "my.group", topics = "my.topic")
public void listen1(String in) {
...
}
@KafkaListener 在一个类上
当在类级别使用 @KafkaListener 时,必须在方法级别指定 @KafkaHandler。
当消息被传递时,转换后的消息负载类型将决定调用哪个方法。
以下示例展示了如何实现:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
Starting with version 2.1.3, you can designate a @KafkaHandler method as the default method that is invoked if there is no match on other methods.
At most, one method can be so designated.
When using @KafkaHandler methods, the payload must have already been converted to the domain object (so the match can be performed).
Use a custom deserializer, the JsonDeserializer, or the JsonMessageConverter with its TypePrecedence set to TYPE_ID.
See Serialization, Deserialization, and Message Conversion for more information.
由于Spring解析方法参数的一些限制,一个默认的@KafkaHandler不能接收离散头;它必须使用讨论在消费者记录元数据中的ConsumerRecordMetadata。 |
例如:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果对象是 String,则不会工作;topic 参数也会获得对 object 的引用。
如果你需要在默认方法中使用记录的元数据,请使用以下内容:
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
@KafkaListener属性修改
从 2.7.2 版本开始,您可以在容器创建之前程序化地修改注解属性。
为此,请向应用上下文添加一个或多个 KafkaListenerAnnotationBeanPostProcessor.AnnotationEnhancer。
AnnotationEnhancer 是一个 BiFunction<Map<String, Object>, AnnotatedElement, Map<String, Object>,必须返回属性的映射。
属性值可以包含 SpEL 和/或属性占位符;增强器在任何解析之前被调用。
如果存在多个增强器并且实现了 Ordered,它们将按顺序被调用。
AnnotationEnhancer 需要声明 static 个 bean 定义,因为它们在应用程序上下文生命周期的非常早期阶段是必需的。 |
一个示例如下:
@Bean
public static AnnotationEnhancer groupIdEnhancer() {
return (attrs, element) -> {
attrs.put("groupId", attrs.get("id") + "." + (element instanceof Class
? ((Class<?>) element).getSimpleName()
: ((Method) element).getDeclaringClass().getSimpleName()
+ "." + ((Method) element).getName()));
return attrs;
};
}
@KafkaListener生命周期管理
带有@KafkaListener个注解创建的监听器容器并非应用程序上下文中的 bean。相反,它们是通过类型为 KafkaListenerEndpointRegistry 的基础设施 bean 注册的。该bean由框架自动声明,负责管理容器的生命周期;它会自动启动所有将autoStartup设置为true的容器。所有由所有容器工厂创建的容器必须在同一 phase。See Listener Container Auto Startup for more information.您可以通过使用注册表程序化地管理生命周期。启动或停止注册表将启动或停止所有已注册的容器。Alternatively, you can get a reference to an individual container by using its id attribute.您可以在注解中设置autoStartup,这会覆盖容器工厂中配置的默认设置。您可以从应用程序上下文获取到 bean 的引用,例如通过自动装配,来管理其已注册的容器。以下示例展示了如何操作:
@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;
...
this.registry.getListenerContainer("myContainer").start();
...
注册表仅维护其管理的容器的生命周期;声明为 beans 的容器不由注册表管理,可以从应用上下文中获取。
可以通过调用注册表的 getListenerContainers() 方法获取一组受管理的容器。
版本 2.2.5 增加了方便的方法 getAllListenerContainers(),返回所有容器的集合,包括注册表管理的和声明为 beans 的。
返回的集合将包含任何已初始化的 prototype beans,但不会初始化任何 lazy bean 声明。
在应用程序上下文刷新后注册的端点将立即启动,无论其 autoStartup 属性,以遵守 SmartLifecycle 合同,其中 autoStartup 仅在应用程序上下文初始化期间考虑。
一个晚注册的示例是一个在原型作用域中的 bean,其实例在上下文初始化后创建。
从 2.8.7 版本开始,您可以将注册表的 alwaysStartAfterRefresh 属性设置为 false,然后容器的 autoStartup 属性将定义容器是否启动。 |
@KafkaListener @Payload验证
从 2.2 版本开始,现在可以添加一个 Validator 来验证 @KafkaListener @Payload 个参数。
此前,您需要配置一个自定义的 DefaultMessageHandlerMethodFactory 并将其添加到注册器中。
现在,可以直接将验证器添加到注册器本身。
以下代码展示了如何操作:
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(new MyValidator());
}
}
当您使用 Spring Boot 与验证Starters时,LocalValidatorFactoryBean 会自动配置,如下示例所示: |
@Configuration
@EnableKafka
public class Config implements KafkaListenerConfigurer {
@Autowired
private LocalValidatorFactoryBean validator;
...
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setValidator(this.validator);
}
}
以下示例展示了如何验证:
public static class ValidatedClass {
@Max(10)
private int bar;
public int getBar() {
return this.bar;
}
public void setBar(int bar) {
this.bar = bar;
}
}
@KafkaListener(id="validated", topics = "annotated35", errorHandler = "validationErrorHandler",
containerFactory = "kafkaJsonListenerContainerFactory")
public void validatedListener(@Payload @Valid ValidatedClass val) {
...
}
@Bean
public KafkaListenerErrorHandler validationErrorHandler() {
return (m, e) -> {
...
};
}
Starting with version 2.5.11, validation now works on payloads for @KafkaHandler methods in a class-level listener.
See @KafkaListener on a Class.
重新平衡监听器
ContainerProperties有一个名为consumerRebalanceListener的属性,该属性采用Kafka客户端的ConsumerRebalanceListener接口的实现。
如果未提供此属性,容器会配置一个日志监听器,将重平衡事件记录在INFO级别日志中。
框架还添加了一个子接口ConsumerAwareRebalanceListener。
以下代码片段显示了ConsumerAwareRebalanceListener接口的定义:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
注意,当分区被回收时,有两个回调:第一个会立即调用。 第二个会在任何待处理的偏移量提交之后调用。 这在你希望将偏移量维护在某个外部存储库时很有用,如下例所示:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
从版本2.4开始,增加了一个新的方法onPartitionsLost()(类似于ConsumerRebalanceLister中的同名方法)。
在向监听器容器提供自定义监听器(无论是哪种类型)时,重要的是确保你的实现不从 这是因为监听器容器会在调用你实现的 |
使用 Forwarding Listener 结果@SendTo
从 2.0 版本开始,如果你同时用一个带有 @SendTo 注解的注解标注一个 @KafkaListener,并且方法调用返回结果,该结果将转发到由 @SendTo 指定的主题。
The @SendTo 值可以有多种形式:
-
@SendTo("someTopic")路由到字面主题 -
@SendTo("#{someExpression}")在应用上下文初始化期间评估表达式一次,然后路由到由该评估确定的主题。 -
@SendTo("!{someExpression}")路由到通过运行时评估表达式确定的主题。#root对象用于评估,具有三个属性:-
request: The inboundConsumerRecord(orConsumerRecordsobject for a batch listener)) -
source: Theorg.springframework.messaging.Message<?>转换自request. -
result: 该方法的返回结果。
-
-
@SendTo(无属性): 此处视为!{source.headers['kafka_replyTopic']}(自 2.1.3 版本起)。
从 2.1.11 版本和 2.2.1 版本开始,property placeholders 会在 @SendTo 值中解析。
表达式求值的结果必须是一个 String,该值代表主题名称。
以下示例展示了使用 @SendTo 的各种方法:
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
...
}
}
为了支持@SendTo,监听器容器工厂必须为其replyTemplate属性提供一个KafkaTemplate,用于发送回复。
这应该是一个KafkaTemplate而不是一个ReplyingKafkaTemplate,后者在客户端用于请求/回复处理。
当使用Spring Boot时,Starters会自动将模板配置到工厂中;当自定义工厂时,必须如下面的示例所示设置。 |
从 2.2 版本开始,您可以向监听器容器工厂添加一个 ReplyHeadersConfigurer。
这将用于确定要在回复消息中设置哪些头信息。
以下示例显示了如何添加一个 ReplyHeadersConfigurer:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
您也可以根据需要添加更多头信息。 以下示例展示了如何操作:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
当您使用@SendTo时,必须在ConcurrentKafkaListenerContainerFactory的KafkaTemplate属性中配置一个replyTemplate以执行发送操作。
除非您使用请求/回复语义,否则仅使用简单的send(topic, value)方法,因此您可能希望创建一个子类以生成分区或键。
以下示例展示了如何实现这一点: |
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory()) {
@Override
public ListenableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
|
如果监听器方法返回
|
使用请求/回复语义时,发送方可以请求目标分区。
|
您可以使用
See 处理异常 获取更多详细信息。 |
如果监听器方法返回Iterable,则默认情况下会为每个元素发送一个记录作为值。从版本2.3.5开始,在 这需要在回复模板的生产者配置中有一个合适的序列化程序。 但是,如果回复是 |
过滤消息
在某些场景中,例如重新平衡时,已经处理过的消息可能会被重新分发。
框架无法知道这样的消息是否已经被处理过。
这是应用程序级别的功能。
这被称为
The Spring for Apache Kafka 项目也通过 FilteringMessageListenerAdapter 类提供了一些帮助,该类可以包装你的 MessageListener。
此类接受一个 RecordFilterStrategy 的实现,其中你实现 filter 方法以指示消息是重复的并应被丢弃。
此类还有一个名为 ackDiscarded 的额外属性,用于指示适配器是否应确认丢弃的记录。
它默认值为 false。
当您使用 @KafkaListener 时,请在容器工厂上设置 RecordFilterStrategy(可选地设置 ackDiscarded),以便将监听器包装在适当的过滤适配器中。
在另外一种情况下,当您使用批处理消息监听器时,会提供一个 FilteringBatchMessageListenerAdapter。
当 @KafkaListener 接收到一个 List<ConsumerRecord<?, ?>> 而不是 ConsumerRecords 时,FilteringBatchMessageListenerAdapter 会被忽略,因为 ConsumerRecords 是不可变的。 |
从 2.8.4 版本开始,您可以使用在监听器注解上的 filter 属性来覆盖监听器容器工厂的默认 RecordFilterStrategy。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}
重试投递
在处理异常中看到了DefaultErrorHandler。
开始@KafkaListener<p>在Sequence中进行选择</p>
一个常见的用例是在另一个监听器消费完主题中的所有记录后再启动一个监听器。
例如,您可能希望在处理来自其他主题的记录之前将一个或多个压缩主题的内容加载到内存中。
从版本2.7.3开始,引入了一个新的组件ContainerGroupSequencer。
它使用@KafkaListenercontainerGroup属性来分组容器,并在当前组中的所有容器都闲置时启动下一组的容器。
最好用一个例子来说明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
在此,我们有两个组的四个监听器,g1 和 g2。
在应用上下文初始化时, sequencer 会将提供的组中的所有容器的 autoStartup 属性设置为 false。
它还会为任何不已经设置了该属性的容器(在这种情况下为其提供一个值,即 5000ms)设置 idleEventInterval。
然后,在应用上下文启动 sequencer 后,首先开始第一组中的容器。
当接收到 ListenerContainerIdleEvent 时,每个容器内的各个子容器会停止。
当一个容器的所有子容器都停止后,该容器也会被停止。
当一组中的所有容器都被停止后,将启动下一组的容器。
对组和组中容器的数量没有限制。
默认情况下,最终组(上例中的g2)中的容器在空闲时不会停止。要修改该行为,请将定时器上的stopLastGroupWhenIdle设置为true。
作为补充说明;之前,每个组中的容器被添加到一个类型为Collection<MessageListenerContainer>的bean中,该bean的名字是containerGroup。
这些集合现在已弃用,取而代之的是类型为ContainerGroup的bean,其名称为组名后缀加上.group;在上面的例子中,会有两个bean g1.group和g2.group。
类型为Collection的beans将在未来的版本中被移除。
使用KafkaTemplateto Receive
本部分介绍了如何使用 KafkaTemplate 来接收消息。
从 2.8 版本开始,模板有四个 receive() 方法:
ConsumerRecord<K, V> receive(String topic, int partition, long offset);
ConsumerRecord<K, V> receive(String topic, int partition, long offset, Duration pollTimeout);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested);
ConsumerRecords<K, V> receive(Collection<TopicPartitionOffset> requested, Duration pollTimeout);
正如您所见,您需要知道要检索的记录的分区和偏移量;每次操作都会创建(并在操作完成后关闭)一个新的 Consumer。
使用最后两种方法时,每个记录都会单独检索,然后将结果组装成一个 ConsumerRecords 对象。
在为请求创建 TopicPartitionOffsets 时,仅支持正的绝对偏移量。
4.1.5. Listener Container Properties
| 属性 | 默认 | 描述 |
|---|---|---|
1 |
在提交待处理偏移量之前记录的条目数量,当 |
|
|
一个由 |
|
批处理 |
controls how often offsets are committed - see 提交偏移量. |
|
5000 |
在 |
|
LATEST_ONLY _NO_TX |
是否在分配时提交初始位置;默认情况下,只有当 |
|
|
当不为 null 时,在 Kafka 客户端 抛出 1 或 2 时,两次轮询之间休眠 0 秒。 当为 null 时,此类异常被视为致命,容器将停止。 |
|
(empty string) |
一个用于 |
|
false |
将 |
|
false |
将 |
|
|
当存在且参数为 |
|
|
一个提供程序用于 |
|
DEBUG |
提交偏移量日志的记录级别。 |
|
|
一个重新均衡监听器;参见重新均衡监听器。 |
|
30s |
在消费者开始之前等待的超时时间(以毫秒为单位);例如,当你使用线程数不足的任务执行器时,可能会发生这种情况。 |
|
|
A task executor to run the consumer threads.
The default executor creates threads named |
|
|
见 投递尝试标题。 |
|
|
Exactly Once Semantics模式;请参见Exactly Once Semantics。 |
|
|
当消费由事务性生产者产生的记录时,如果消费者位于分区的末尾,由于用于表示事务提交/回滚的伪记录,以及可能存在的已回滚记录,可能会错误地报告滞后值大于0。这在功能上不影响消费者,但一些用户表达了对“滞后”非零的担忧。
将此属性设置为 |
|
|
覆盖消费者 |
|
5.0 |
在未接收到任何记录之前应用于 |
|
0 |
用于在轮询之间让线程休眠以减慢传递速度。
处理一批记录所需时间加上此值必须小于 |
|
|
当设置时,启用发布 |
|
|
当启用时,将发布 |
|
无 |
用于覆盖在消费者工厂上配置的任意消费者属性。 |
|
|
设置为true以在INFO级别日志所有容器属性。 |
|
|
消息监听器。 |
|
|
是否维护用于消费者线程的 Micrometer 定时器。 |
|
|
当为真时,如果配置的主题未在代理中存在,则防止容器启动。 |
|
30s |
多久检查一次消费者线程中的0状态。 查看代码1和2。 |
|
3.0 |
乘以 |
|
|
设置为 false 以在错误、调试日志等中记录完整的消费者记录(而不是仅仅记录 |
|
5000 |
超时时间传递给 |
|
|
一个用于运行消费者监控任务的调度器。 |
|
10000 |
在发布容器已停止事件之前,阻塞 |
|
|
停止监听容器如果抛出 |
|
|
容器停止时,在当前记录处理完毕后停止处理,而不是在处理完之前轮询的所有记录之后再停止。 |
|
参见描述。 |
当使用批处理监听器时,如果这是 |
|
|
当 |
|
|
偏移量提交是使用同步还是异步方式;请参阅 |
|
n/a |
配置的主题、主题模式或显式分配的主题/分区。 |
|
|
见 交易。 |
| 属性 | 默认 | 描述 |
|---|---|---|
|
事务回滚后要调用的 |
|
应用程序上下文 |
事件发布者。 |
|
参见描述。 |
已弃用 - 请参阅 |
|
|
在调用批处理监听器之前设置一个 |
|
Bean 名称 |
容器的bean名称;对于子容器,后缀为 |
|
参见描述。 |
|
|
|
容器属性实例。 |
|
参见描述。 |
已弃用 - 请参阅 |
|
参见描述。 |
已弃用 - 请参阅 |
|
参见描述。 |
如果存在,则为 |
|
|
确定在事务开始前还是后调用 |
|
参见描述。 |
用户配置容器或 |
|
null |
要填充到 |
|
(只读) |
如果请求了消费者暂停,则为 True。 |
|
|
设置 |
|
30s |
当 |
| 属性 | 默认 | 描述 |
|---|---|---|
(只读) |
分配给此容器(显式或隐式)的分区。 |
|
(只读) |
分配给此容器(显式或隐式)的分区。 |
|
|
由并发容器用于为每个子容器的消费者提供唯一的 |
|
n/a |
如果已请求暂停且消费者实际上已经暂停,则为 True。 |
| 属性 | 默认 | 描述 |
|---|---|---|
|
当 |
|
(只读) |
此容器的子节点 |
|
(只读) |
此容器的子容器 |
|
1 |
要管理的子节点 |
|
n/a |
如果已请求暂停并且所有子容器的消费者实际上已经暂停,则为 True。 |
|
n/a |
对所有子级 |
4.1.6. 动态创建容器
有几种技术可以用来在运行时创建监听器容器。 本部分将探讨其中的一些技术。
消息监听器实现
如果你直接实现自己的监听器,你可以简单地使用容器工厂来为该监听器创建一个原始容器:
public class MyListener implements MessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data) {
// ...
}
}
private ConcurrentMessageListenerContainer<String, String> createContainer(
ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {
ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
container.getContainerProperties().setMessageListener(new MyListener());
container.getContainerProperties().setGroupId(group);
container.setBeanName(group);
container.start();
return container;
}
class MyListener : MessageListener<String?, String?> {
override fun onMessage(data: ConsumerRecord<String?, String?>) {
// ...
}
}
private fun createContainer(
factory: ConcurrentKafkaListenerContainerFactory<String, String>, topic: String, group: String
): ConcurrentMessageListenerContainer<String, String> {
val container = factory.createContainer(topic)
container.containerProperties.messageListener = MyListener()
container.containerProperties.groupId = group
container.beanName = group
container.start()
return container
}
原型Bean
用于被@KafkaListener注解标记的方法的容器可以在声明该bean为prototype时动态创建:
public class MyPojo {
private final String id;
private final String topic;
public MyPojo(String id, String topic) {
this.id = id;
this.topic = topic;
}
public String getId() {
return this.id;
}
public String getTopic() {
return this.topic;
}
@KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
public void listen(String in) {
System.out.println(in);
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
return new MyPojo(id, topic);
}
applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
class MyPojo(id: String?, topic: String?) {
@KafkaListener(id = "#{__listener.id}", topics = ["#{__listener.topics}"])
fun listen(`in`: String?) {
println(`in`)
}
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun pojo(id: String?, topic: String?): MyPojo {
return MyPojo(id, topic)
}
applicationContext.getBean(MyPojo::class.java, "one", arrayOf("topic2"))
applicationContext.getBean(MyPojo::class.java, "two", arrayOf("topic3"))
监听器必须具有唯一的ID。
从2.8.9版本开始,KafkaListenerEndpointRegistry新增了方法unregisterListenerContainer(String id),允许您重用ID。
取消注册容器不会stop()容器本身,您必须自行执行此操作。 |
4.1.7. 应用事件
以下由监听器容器发布并由其消费者监听的 Spring 应用程序事件:
-
ConsumerStartingEvent- 发布于消费者线程首次启动时,在其开始轮询之前。 -
ConsumerStartedEvent- 发布当消费者即将开始拉取时。 -
ConsumerFailedToStartEvent- 在consumerStartTimeout容器属性内若没有发布ConsumerStartingEvent,则显示此内容。这可能表明配置的任务执行器线程不足,无法支持其使用的容器及其并发性。当出现此条件时还会记录错误信息。 -
ListenerContainerIdleEvent: 在未收到idleInterval条消息时触发发布(如果已配置)。 -
ListenerContainerNoLongerIdleEvent: 在之前发布过一个ListenerContainerIdleEvent之后消费记录时发布。 -
ListenerContainerPartitionIdleEvent: 在从该分区未收到消息的持续时间为idlePartitionEventInterval(如果已配置)时发布。 -
ListenerContainerPartitionNoLongerIdleEvent: 在从之前发布过ListenerContainerPartitionIdleEvent的分区消费记录时发布。 -
NonResponsiveConsumerEvent: 在消费者似乎在poll方法中被阻塞时发布。 "} -
ConsumerPartitionPausedEvent: 在分区暂停时由每个消费者发布。 -
ConsumerPartitionResumedEvent: 每个消费者在分区恢复时发布。 -
ConsumerPausedEvent: 每个消费者在容器暂停时发布。 -
ConsumerResumedEvent: 每个消费者在容器恢复时发布。 -
ConsumerStoppingEvent: 在消费者停止之前由每个消费者发布。 -
ConsumerStoppedEvent: 在消费者关闭后发布。 见 线程安全性. -
ContainerStoppedEvent: 在所有消费者都停止时发布。
默认情况下,应用程序上下文的事件多播器会在调用线程上触发事件监听器。
如果你将多播器更改为使用异步执行器,那么在事件包含对消费者的引用时,不要调用任何Consumer方法。 |
代码 ListenerContainerIdleEvent 具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。 -
id: 监听器ID(或容器bean名称)。 -
idleTime: 容器在发布该事件时已空闲的时间。 -
topicPartitions: 生成事件时容器被分配的主题和分区。 -
consumer: 对 KafkaConsumer对象的引用。 例如,如果消费者之前调用了pause()方法,它可以在事件收到时进行resume()处理。 -
paused: 当前容器是否已暂停。 查看暂停和恢复侦听器容器以获取更多信息。
The ListenerContainerNoLongerIdleEvent 具有相同的属性,除了 idleTime 和 paused。
代码 ListenerContainerPartitionIdleEvent 具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。 -
id: 监听器ID(或容器bean名称)。 -
idleTime: 事件发布时,时间分区的消费处于空闲状态。 -
topicPartition: 触发事件的主题和分区。 -
consumer: 对 KafkaConsumer对象的引用。 例如,如果消费者之前调用了pause()方法,它可以在事件收到时进行resume()处理。 -
paused: 是否当前暂停了该消费者的分区消费。 请参阅 暂停和恢复监听器容器 以获取更多信息。
The ListenerContainerPartitionNoLongerIdleEvent 具有相同的属性,除了 idleTime 和 paused。
代码 NonResponsiveConsumerEvent 具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。 -
id: 监听器ID(或容器bean名称)。 -
timeSinceLastPoll: 容器上一次调用poll()之前的时刻。 -
topicPartitions: 生成事件时容器被分配的主题和分区。 -
consumer: 对 KafkaConsumer对象的引用。 例如,如果消费者之前调用了pause()方法,它可以在事件收到时进行resume()处理。 -
paused: 当前容器是否已暂停。 查看暂停和恢复侦听器容器以获取更多信息。
事件 ConsumerPausedEvent、ConsumerResumedEvent 和 ConsumerStopping 具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。 -
partitions: 涉及的TopicPartition实例。
事件ConsumerPartitionPausedEvent和ConsumerPartitionResumedEvent具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。 -
partition: Spring框架涉及的TopicPartition实例。
The ConsumerStartingEvent, ConsumerStartingEvent, ConsumerFailedToStartEvent, ConsumerStoppedEvent and ContainerStoppedEvent 事件具有以下属性:
-
source: 事件的发布者监听器容器实例。 -
container: 侦听器容器或其父侦听器容器,如果源容器是子容器。
所有容器(无论是子容器还是父容器)都发布 ContainerStoppedEvent。 对于一个父容器,源属性和容器属性是相同的。
此外,ConsumerStoppedEvent还具有以下附加属性:
-
reason-
NORMAL- 消费者正常停止(容器已停止)。 -
ERROR- 当抛出java.lang.Error时。 -
FENCED- 事务性生产者被封存,stopContainerWhenFenced容器属性为true。 -
AUTH- 抛出了AuthenticationException或AuthorizationException,且未配置authExceptionRetryInterval。 -
NO_OFFSET- 分区没有偏移量,且auto.offset.reset策略是none。
-
在发生这种情况后,您可以使用此事件重启容器:
if (event.getReason.equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测空闲和无响应的消费者
虽然异步消费者效率很高,但一个问题在于检测它们何时处于空闲状态。如果您希望在一段时间内没有收到消息时采取某些操作,那么这个问题就显得尤为重要。
您可以配置侦听器容器,当一段时间内没有消息传递时发布一个ListenerContainerIdleEvent。在容器空闲期间,每隔idleEventInterval毫秒就会发布一个事件。
要配置此功能,请在容器上设置idleEventInterval。以下示例显示了如何操作:
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(...);
return container;
}
以下示例显示了如何为@KafkaListener设置idleEventInterval:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在这些情况下,每当容器空闲时,每分钟都会发布一次事件。
如果由于某种原因,消费者poll()方法没有退出,则不会收到任何消息,并且无法生成空闲事件(这是早期版本的kafka-clients中的一个问题,当时代理不可达)。在这种情况下,如果轮询在3x内未返回NonResponsiveConsumerEvent,则容器会发布pollTimeout属性。默认情况下,每个容器每30秒执行一次此检查。可以通过设置monitorInterval(默认为30秒)和noPollThreshold(默认为3.0)属性来修改此行为。ContainerProperties配置监听器容器时使用。为了防止因竞争条件而产生虚假事件,应确保noPollThreshold大于1.0。接收到此类事件后可以停止容器,从而唤醒消费者以便其停止。
从版本 2.6.2 开始,如果容器已发布ListenerContainerIdleEvent,则在接收到后续记录时会发布ListenerContainerNoLongerIdleEvent。
事件消费
通过实现ApplicationListener,您可以捕获这些事件——无论是通用监听器还是仅接收此特定事件的监听器。
您还可以使用@EventListener,它是在Spring Framework 4.2中引入的。
The next example combines @KafkaListener and @EventListener into a single class.
You should understand that the application listener gets events for all containers, so you may need to check the listener ID if you want to take specific action based on which container is idle.
You can also use the @EventListener condition for this purpose.
有关事件属性的信息,请参阅应用程序事件。
事件通常在消费者线程上发布,因此可以安全地与Consumer对象进行交互。
以下示例同时使用@KafkaListener和@EventListener:
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
| 事件监听器会看到所有容器的事件。 因此,在前面的示例中,我们根据监听器 ID 缩小了接收到的事件范围。 由于为 这就是为什么我们在条件中使用 |
如果您希望在空闲事件中停止监听器容器,请不要在调用监听器的线程上调用container.stop()。这样做会导致延迟并产生不必要的日志消息。相反,您应该将事件传递给一个不同的线程,该线程可以随后停止容器。
另外,如果您使用的容器是一个子容器,则不应直接stop()停止容器实例。你应该停止并发容器而不是直接操作容器实例。 |
空闲时的位置
请注意,当检测到空闲时,可以通过在侦听器中实现ConsumerSeekAware来获取当前位置。
请参阅Seeking to a Specific Offset中的onIdleContainer()。
4.1.8. Topic/Partition 初始偏移量
有几种方式可以设置分区的初始偏移。
手动分配分区时,您可以在配置的TopicPartitionOffset参数中设置初始偏移量(如需设定)(参见消息监听容器)。
您还可以在任何时间点.seek()到特定的偏移量。
当您使用其中经纪人分配分区的组管理时:
-
对于一个新的
group.id,初始偏移由auto.offset.reset消费者属性(earliest或latest)确定。 -
对于现有的组ID,初始偏移量是该组ID的当前偏移量。 但是,您可以在初始化时(或之后任何时候)跳转到特定的偏移量。
4.1.9. 寻求到特定偏移量
为了搜索,您的监听器必须实现 ConsumerSeekAware,其包含以下方法:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
The registerSeekCallback 会在容器启动时以及分区分配时被调用。
你应该在初始化后任意时间点使用此回调时调用它。
你应该保存对该回调的引用。
如果你在同一 ConcurrentMessageListenerContainer 中使用相同的监听器,你应该将回调保存在一个 ThreadLocal 或其他以监听器 Thread 为键的结构中。
当使用分组管理时,在分配分区时会调用onPartitionsAssigned。
你可以通过这种方式,例如在调用回调方法时设置分区的初始偏移量。
你也可以使用此方法将当前线程的回调与分配的分区关联起来(参见下面的例子)。
必须使用传入的回调参数而不是registerSeekCallback传递的参数。
从2.5.5版本开始,即使在使用手动分区分配时也会调用此方法。
onPartitionsRevoked 在容器停止或Kafka撤销分配时被调用。
你应该丢弃此线程的回调并移除与撤销分区的任何关联。
回调具有以下方法:
void seek(String topic, int partition, long offset);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection=<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection=<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
seekRelative 在 2.3 版本中添加,用于执行相对定位。
-
offset开始于分区末尾的负数和toCurrentfalse- 相对于分区末尾的查找。 -
offset正向且toCurrentfalse- 相对于分区的开始进行查找。 -
offset为负数且toCurrenttrue- 相对于当前位置进行查找(回退)。 -
offset正向且toCurrenttrue- 相对于当前位置(快进)。
The seekToTimestamp 方法在 2.3 版本中也已添加。
当在 onIdleContainer 或 onPartitionsAssigned 方法中为多个分区查找相同时间戳时,优先选择第二种方法,因为在一个调用到消费者 offsetsForTimes 方法中查找时间戳对应偏移量更为高效。
当从其他位置调用时,容器会收集所有时间戳查找请求,并在一个调用到 offsetsForTimes 上进行处理。 |
您也可以在检测到空闲容器时从onIdleContainer()执行 seek 操作。
请参阅检测空闲和无响应消费者以了解如何启用空闲容器检测。
The seekToBeginning 方法接受一个集合作为参数,在处理压缩主题时特别有用,例如每次应用程序启动时都希望定位到主题的开始位置: |
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
在运行时任意查找,请使用来自registerSeekCallback的适当线程的回调引用。
这里是使用回调的简单Spring Boot应用程序示例;它向主题发送10条记录;在控制台输入<Enter>会使所有分区跳转到开头。
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
为使事情更简单,版本 2.3 增加了 AbstractConsumerSeekAware 类,用于跟踪某个主题/分区应使用的回调。
以下示例展示了如何在容器空闲时,将每个分区的游标seek到上次处理的最后一条记录。
它还提供了方法,允许任意外部调用来将分区回退一位记录。
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<org.apache.kafka.common.TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new org.apache.kafka.common.TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
}
版本 2.6 为抽象类新增了便利方法:
-
seekToBeginning()- 将所有分配的分区移到开头 -
seekToEnd()- 将所有分配的分区移至末尾 -
seekToTimestamp(long time)- 将所有已分配的分区定位到由该时间戳表示的偏移量。
示例:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis - 60_000);
}
}
4.1.10. 容器工厂
作为在@KafkaListener 注解中讨论的,ConcurrentKafkaListenerContainerFactory 用于创建带有注解的方法容器。
从 2.2 版本开始,您可以使用相同的工厂来创建任何 ConcurrentMessageListenerContainer。
这可能在您需要创建具有相似属性的多个容器,或者希望使用某些外部配置的工厂(如由 Spring Boot 自动配置提供的工厂)时很有用。
一旦创建了容器,您可以进一步修改其属性,其中许多属性是通过使用 container.getContainerProperties() 设置的。
以下示例配置了一个 ConcurrentMessageListenerContainer:
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
通过这种方式创建的容器不会被添加到端点注册表中。
它们应该作为@Bean定义创建,以便与应用程序上下文注册。 |
从 2.3.4 版本开始,你可以在工厂后添加一个 0 来进一步配置每个在创建和配置后生成的容器。
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
4.1.11. 线程安全性
当使用并发消息监听器容器时,一个监听器实例会被所有消费者线程调用。 监听器因此需要是线程安全的,最好使用无状态监听器。 如果无法使您的监听器线程安全,或者添加同步会显著降低并发带来的好处,您可以使用以下几种技术之一:
-
使用
n容器与concurrency=1配合,并使用原型作用域的MessageListener组件bean,使得每个容器都获得自己的实例(这在使用@KafkaListener时是不行的)。 -
在
ThreadLocal<?>实例中保持状态。 -
具有单例作用域的监听器委托到在
SimpleThreadScope中声明的bean(或类似的作用域)。
To facilitate cleaning up thread state (for the second and third items in the preceding list), 自版本 2.2 起,监听器容器在每个线程退出时发布一个 ConsumerStoppedEvent。
您可以使用 ApplicationListener 或 @EventListener 方法移除 ThreadLocal<?> 实例或 remove() 个线程作用域的 bean。
请注意,SimpleThreadScope 不会销毁具有销毁接口(如 DisposableBean)的 bean,因此您应 destroy() 该实例 yourself。
| 默认情况下,应用程序上下文的事件多播器会在调用线程上触发事件监听器。 如果你将多播器更改为使用异步执行器,线程清理将不再生效。 |
4.1.12. 监控
监控监听器性能
从版本 2.3 开始,监听器容器将在类路径上检测到 `1` 并且应用程序上下文中存在一个 `2` 的情况下自动为监听器创建和更新 Micrometer `0`。这些计时器可以通过将 `3` 配置属性设置为 `5` 来禁用。
维护两个计时器——一个用于监听器调用成功,另一个用于失败。
计时器命名为 spring.kafka.listener,并具有以下标签:
-
name: (容器bean名称) -
result:successorfailure -
exception:noneorListenerExecutionFailedException
您可以通过设置ContainerProperties属性来添加额外的标签。
使用并发容器时,为每个线程创建计时器,并且在name标签后附加-n,其中n是0到concurrency-1。 |
监控 KafkaTemplate 性能
从版本 2.5 开始,如果类路径中检测到 Micrometer,并且应用程序上下文中存在一个 MeterRegistry,则模板将自动创建和更新发送操作的 Micrometer Timer。
可以通过将模板的 micrometerEnabled 属性设置为 false 来禁用计时器。
维护两个计时器——一个用于监听器调用成功,另一个用于失败。
计时器命名为 spring.kafka.template,并具有以下标签:
-
name: (模板bean名称) -
result:successorfailure -
exception:none或失败时的异常类名
您可以使用模板的micrometerTags属性添加其他标签。
Micrometer 原生指标
自2.5版本起,框架提供了Factory Listeners来管理Micrometer KafkaClientMetrics实例,在生产者和消费者创建和关闭时对其进行管理。
要启用此功能,请简单地向您的生产者和消费者工厂添加监听器:
@Bean
public ConsumerFactory<String, String> myConsumerFactory() {
Map<String, Object> configs = consumerConfigs();
...
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(configs);
...
cf.addListener(new MicrometerConsumerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return cf;
}
@Bean
public ProducerFactory<String, String> myProducerFactory() {
Map<String, Object> configs = producerConfigs();
configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
...
DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
...
pf.addListener(new MicrometerProducerListener<String, String>(meterRegistry(),
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
...
return pf;
}
传递给监听器的消费者/生产者 id 将使用标签名称 spring.id 添加到计数器的标签中。
double count = this.meterRegistry.get("kafka.producer.node.incoming.byte.total")
.tag("customTag", "customTagValue")
.tag("spring.id", "myProducerFactory.myClientId-1")
.functionCounter()
.count()
类似的功能还提供给了StreamsBuilderFactoryBean- 请参见KafkaStreams Micrometer 支持。
4.1.13. 事务
本部分描述了Spring for Apache Kafka如何支持事务。
概述
0.11.0.0 客户端库增加了对事务的支持。 Spring for Apache Kafka 通过以下方式提供支持:
-
KafkaTransactionManager: 用于与常规Spring事务支持一起使用(@Transactional,TransactionTemplate等)。 -
事务性
KafkaMessageListenerContainer -
使用 0 的本地事务
-
与其他事务管理器的事务同步
transactions are enabled by providing the DefaultKafkaProducerFactory with a transactionIdPrefix. In that case, instead of managing a single shared Producer, the factory maintains a cache of transactional producers. When the user calls close() on a producer, it is returned to the cache for reuse instead of actually being closed. The transactional.id property of each producer is transactionIdPrefix + n, where n starts with 0 and is incremented for each new producer, unless the transaction is started by a listener container with a record-based listener. In that case, the transactional.id is <transactionIdPrefix>.<group.id>.<topic>.<partition>. This is to properly support fencing zombies, as described here. This new behavior was added in versions 1.3.7, 2.0.6, 2.1.10, and 2.2.0. If you wish to revert to the previous behavior, you can set the producerPerConsumerPartition property on the DefaultKafkaProducerFactory to false.
| 在使用批处理监听器时,事务是受支持的,默认情况下,由于一个批次可能包含多个主题或分区的数据,因此不支持僵尸隔离。
但是从版本2.3.2开始,在设置了容器属性 自版本2.5起,当启用了事务( 另外参见一次即最终语义。 |
也请参见 transactionIdPrefix。
使用Spring Boot,只需要设置spring.kafka.producer.transaction-id-prefix属性——Boot将会自动配置一个KafkaTransactionManager bean并将其注入到监听容器中。
从 2.5.8 版本开始,你现在可以在生产者工厂上配置 maxAge 属性。这在使用事务性生产者时很有用,这些生产者可能会闲置直到代理的 transactional.id.expiration.ms。
当前的 kafka-clients 可能会导致在没有重新分配的情况下出现 ProducerFencedException。通过将 maxAge 设置为小于 transactional.id.expiration.ms 的值,工厂将在生产者超出最大年龄后刷新它。 |
使用KafkaTransactionManager
The KafkaTransactionManager 是 Spring Framework 的 PlatformTransactionManager 的实现。
它在构造函数中提供对生产者工厂的引用。
如果你提供自定义的生产者工厂,它必须支持事务。
参见 ProducerFactory.transactionCapable()。
You can use the KafkaTransactionManager with normal Spring transaction support (@Transactional, TransactionTemplate, and others).
If a transaction is active, any KafkaTemplate operations performed within the scope of the transaction use the transaction’s Producer.
The manager commits or rolls back the transaction, depending on success or failure.
You must configure the KafkaTemplate to use the same ProducerFactory as the transaction manager
事务同步
本部分指的是生产者发起的事务(由监听器容器启动的事务之外的事务);有关容器启动事务时的链式事务信息,请参见 使用消费者发起的事务。
如果您想要将记录发送到kafka并执行一些数据库更新,您可以使用正常的Spring事务管理,例如使用一个 DataSourceTransactionManager。
@Transactional
public void process(List<Thing> things) {
things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
updateDb(things);
}
拦截器对于注解@Transactional开始事务,而KafkaTemplate将与该事务管理器同步一个事务;每个发送都将参与该事务。
当方法退出时,数据库事务将提交,随后是Kafka事务。
如果您希望按相反的顺序执行提交(先Kafka后数据库),请使用嵌套的@Transactional方法,外层方法配置为使用DataSourceTransactionManager,内层方法配置为使用KafkaTransactionManager。
见与其他事务管理器一起在Kafka-first或DB-first配置中同步JDBC和Kafka事务的应用程序示例,了解有关Kafka事务的其他示例。
| 从2.5.17、2.6.12、2.7.9和2.8.0版本开始,如果同步事务中的提交失败(在主事务提交之后),异常将被抛给调用者。 此前,这种情况会被静默忽略(仅在调试日志中记录)。 应用程序应在必要时采取补救措施,以补偿已提交的主事务。 |
使用消费者发起的事务
The ChainedKafkaTransactionManager 已在 2.7 版本中弃用;有关其超类 ChainedTransactionManager 的更多信息,请参见 JavaDocs。
替代方案是使用 KafkaTransactionManager 在容器中启动 Kafka 事务,并使用 @Transactional 注解监听器方法以启动另一个事务。
见与其他事务管理器链式JDBC和Kafka事务的Kafka事务示例,了解一个示例应用程序。
KafkaTemplate本地事务
你可以使用 KafkaTemplate 在本地事务中执行一系列操作。
以下示例展示了如何操作:
boolean result = template.executeInTransaction(t -> {
t.sendDefault("thing1", "thing2");
t.sendDefault("cat", "hat");
return true;
});
The argument in the callback is the template itself (this).
If the callback exits normally, the transaction is committed.
If an exception is thrown, the transaction is rolled back。
如果正在处理一个KafkaTransactionManager(或同步化的)事务,则不使用它。
相反,将使用一个新的"嵌套"事务。 |
transactionIdPrefix
如概览部分所述,生产者工厂通过此属性配置以构建生产者transactional.id属性。
在使用EOSMode.ALPHA运行多个应用程序实例时,必须使该属性在同一组所有实例上相同,以便满足监听器容器线程上产生记录时的围栏僵尸问题(概览中也提到了这一点)。
然而,在使用由监听器容器未启动的事务产生的记录时,每个实例上的前缀必须不同。
版本2.3简化了此配置过程,特别是对于Spring Boot应用程序。
在之前的版本中,您需要创建两个生产者工厂和KafkaTemplate-一个用于监听器容器线程上产生记录,另一个用于由kafkaTemplate.executeInTransaction()或事务拦截器上的@Transactional方法启动的独立事务。
现在,您可以在KafkaTemplate上覆盖工厂的transactionalIdPrefix和KafkaTransactionManager。
当使用事务管理器和模板为监听容器时,通常会将此设置为生产者工厂属性的默认值。
在使用 EOSMode.ALPHA 时,所有应用实例中的该值应该相同。
当使用 EOSMode.BETA 时,不再需要对消费者发起的事务使用相同的 transactional.id;实际上,在每个实例上都必须是唯一的,就像生产者发起的事务一样。
对于模板(或 @Transaction 的事务管理器)启动的事务,您应该分别在模板和事务管理器上设置该属性。
此属性在每个应用实例上必须有不同的值。
| 这个(对于 `0` 的不同规则)问题在使用 `1` 时已经得到解决(需要代理版本 >= 2.5);详见 精确一次语义。 |
KafkaTemplate事务性与非事务性发布
通常,当一个`KafkaTemplate`是事务性的(通过具有事务能力的生产者工厂进行配置),需要使用事务。
事务可以通过调用`TransactionTemplate`、`@Transactional`方法中的任意一种、或者在配置有`KafkaTransactionManager`的监听器容器的情况下启动。
任何尝试在事务范围之外使用模板的行为都会导致模板抛出一个`IllegalStateException`异常。
从版本2.4.3开始,你可以将模板的`allowNonTransactional`属性设置为`true`。
在这种情况下,模板将在没有事务的情况下允许操作运行,并通过调用`ProducerFactory`的`createNonTransactionalProducer()`方法实现;生产者将像平常一样被缓存或线程绑定以供重用。
请参阅使用`DefaultKafkaProducerFactory`。
带有批处理监听器的事务
当使用事务时,如果监听器发生故障,将会调用AfterRollbackProcessor来在回滚操作完成后采取一些行动。
使用默认的AfterRollbackProcessor与记录监听器一起时,会进行重新定位以确保失败的记录将被重新传输。
然而,对于批处理监听器而言,整个批次都将被重新传输,因为框架不知道批次中的哪个记录发生了故障。
有关更多信息,请参见回滚后处理器。
当使用批量监听器时,2.4.2 版本引入了一种处理在处理批次过程中出现错误的替代机制;这里的BatchToRecordAdapter是一个关键点。
当配置了具有 batchListener 为 true 的容器工厂和 BatchToRecordAdapter 后,监听器每次被调用时会处理一个记录。
这使得在批次内进行错误处理成为可能,并且根据异常类型可以停止整个批次的处理。
默认提供了BatchToRecordAdapter,它可以与标准的ConsumerRecordRecoverer如DeadLetterPublishingRecoverer一起配置。
以下测试用例配置片段说明了如何使用此功能:
public static class TestListener {
final List<String> values = new ArrayList<>();
@KafkaListener(id = "batchRecordAdapter", topics = "test")
public void listen(String data) {
values.add(data);
if ("bar".equals(data)) {
throw new RuntimeException("reject partial");
}
}
}
@Configuration
@EnableKafka
public static class Config {
ConsumerRecord<?, ?> failed;
@Bean
public TestListener test() {
return new TestListener();
}
@Bean
public ConsumerFactory<?, ?> consumerFactory() {
return mock(ConsumerFactory.class);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) -> {
this.failed = record;
}));
return factory;
}
}
4.1.14. 至少一次语义
您可以用KafkaAwareTransactionManager实例提供一个监听器容器。
当这样配置时,容器在调用监听器之前启动事务。
监听器执行的任何KafkaTemplate操作都将参与该事务。
如果监听器成功处理记录(或使用BatchMessageListener处理多个记录),则容器通过调用producer.sendOffsetsToTransaction()将偏移量发送给事务管理器,以便在提交事务之前完成。
如果监听器抛出异常,则事务被回滚,并且消费者会被重定位以在下一次拉取时重新获取回滚的记录。
有关更多信息以及处理反复失败的记录,请参见事务后处理器。
使用事务启用精确一次语义(EOS)。
这表示,对于一个read→process-write序列,可以保证该序列恰好完成一次。
(读取和处理具有至少一次的语义)。
Spring for Apache Kafka 版本 2.5 及其以后支持两种 EOS 模式:
-
ALPHA- alias forV1(已弃用) -
BETA- alias forV2(已弃用) -
V1- akatransactional.id抢占 (自版本 0.11.0.0 起) -
V2- 即 fetch-offset-request 防护(自 2.5 版本起)
使用模式 V1,如果启动了另一个具有相同 transactional.id 的实例,则生产者会被“围栏”。Spring 通过为每个 group.id/topic/partition 使用一个 Producer 来管理这一点;当发生重新平衡时,新实例将使用相同的 transactional.id,而旧生产者则会被围栏。
当模式为 V2 时,不需要为每个 group.id/topic/partition 都有一个生产者,因为消费者元数据会随事务的偏移量一起发送,经纪人可以使用该信息来判断生产者是否被围栏(fenced)。
Starting with version 2.6, the default EOSMode is V2.
要配置容器使用模式 ALPHA,请将容器属性 EOSMode 设置为 ALPHA,以恢复到以前的行为。
如果您使用的是 如果您使用的broker早于2.5版,则必须将 |
当您的代理升级到2.5或更高版本时,您应该将模式切换为V2,但是生产者的数量将保持不变。 然后,您可以将应用程序进行滚动升级,将producerPerConsumerPartition设置为false以减少生产者的数量;您也不再需要设置subBatchPerPartition容器属性。
如果您所在的经纪人已经是2.5或更新版本,您应该将DefaultKafkaProducerFactory producerPerConsumerPartition属性设置为false,以减少所需生产者的数量。
当使用 |
当使用V2模式时,不再需要将subBatchPerPartition设置为true;当EOSMode是V2时,默认会设置为false。
参考 KIP-447 获取更多信息。
V1 和 V2 之前是 ALPHA 和 BETA;为了与 KIP-732 对齐,它们已被更改。
4.1.15. 将Spring Bean连接到生产者/消费者拦截器
Apache Kafka 提供了一种机制,用于向生产者和消费者添加拦截器。这些对象由 Kafka 管理,而不是 Spring,因此正常的 Spring 依赖注入无法用于连接依赖的 Spring Bean。但是,您可以使用拦截器的 config() 方法手动连接这些依赖项。下面的 Spring Boot 应用程序展示了如何通过覆盖 boot 的默认工厂来实现这一点,将一些依赖的 Bean 添加到配置属性中,从而将它们融入应用程序。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(SomeBean someBean) {
Map<String, Object> consumerProperties = new HashMap<>();
// consumerProperties.put(..., ...)
// ...
consumerProperties.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, MyConsumerInterceptor.class.getName());
consumerProperties.put("some.bean", someBean);
return new DefaultKafkaConsumerFactory<>(consumerProperties);
}
@Bean
public ProducerFactory<?, ?> kafkaProducerFactory(SomeBean someBean) {
Map<String, Object> producerProperties = new HashMap<>();
// producerProperties.put(..., ...)
// ...
Map<String, Object> producerProperties = properties.buildProducerProperties();
producerProperties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, MyProducerInterceptor.class.getName());
producerProperties.put("some.bean", someBean);
DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory<>(producerProperties);
return factory;
}
@Bean
public SomeBean someBean() {
return new SomeBean();
}
@KafkaListener(id = "kgk897", topics = "kgh897")
public void listen(String in) {
System.out.println("Received " + in);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("kgh897", "test");
}
@Bean
public NewTopic kRequests() {
return TopicBuilder.name("kgh897")
.partitions(1)
.replicas(1)
.build();
}
}
public class SomeBean {
public void someMethod(String what) {
System.out.println(what + " in my foo bean");
}
}
public class MyProducerInterceptor implements ProducerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
this.bean.someMethod("producer interceptor");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
}
@Override
public void close() {
}
}
public class MyConsumerInterceptor implements ConsumerInterceptor<String, String> {
private SomeBean bean;
@Override
public void configure(Map<String, ?> configs) {
this.bean = (SomeBean) configs.get("some.bean");
}
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
this.bean.someMethod("consumer interceptor");
return records;
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
}
@Override
public void close() {
}
}
结果:
producer interceptor in my foo bean
consumer interceptor in my foo bean
Received test
4.1.16. 暂停和恢复侦听器容器
版本 2.1.3 为监听器容器新增了 pause() 和 resume() 方法。
此前,您可以在 ConsumerAwareMessageListener 中暂停消费者,并通过监听 ListenerContainerIdleEvent 事件来恢复,这提供了对 Consumer 对象的访问。
虽然可以通过事件监听器在空闲容器中暂停消费者,但在某些情况下这并非线程安全,因为无法保证事件监听器在消费者线程中被调用。
为了安全地暂停和恢复消费者,应使用监听器容器上的 pause 和 resume 方法。
一个 pause() 在下一次 poll() 之前生效;一个 resume() 在当前 poll() 返回后生效。
当容器被暂停时,它会继续 poll() 消费者,避免在使用组管理时发生重新平衡,但不会检索任何记录。
有关更多信息,请参阅 Kafka 文档。
从版本 2.1.5 开始,您可以调用 isPauseRequested() 来检查 pause() 是否已经调用。
然而,消费者可能尚未实际暂停。
isConsumerPaused() 返回 true 当所有 Consumer 实例实际上已暂停。
此外(自 2.1.5 起也包括),ConsumerPausedEvent 和 ConsumerResumedEvent 实例会以 source 属性作为容器发布,而涉及在 partitions 属性中的 TopicPartition 实例也会被发布。
以下这个简单的Spring Boot应用程序演示了如何使用容器注册表获取对某个方法的容器的引用,并暂停或恢复其消费者,以及接收相应的事件:
@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {
public static void main(String[] args) {
SpringApplication.run(Application.class, args).close();
}
@Override
public void onApplicationEvent(KafkaEvent event) {
System.out.println(event);
}
@Bean
public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
KafkaTemplate<String, String> template) {
return args -> {
template.send("pause.resume.topic", "thing1");
Thread.sleep(10_000);
System.out.println("pausing");
registry.getListenerContainer("pause.resume").pause();
Thread.sleep(10_000);
template.send("pause.resume.topic", "thing2");
Thread.sleep(10_000);
System.out.println("resuming");
registry.getListenerContainer("pause.resume").resume();
Thread.sleep(10_000);
};
}
@KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
public void listen(String in) {
System.out.println(in);
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("pause.resume.topic")
.partitions(2)
.replicas(1)
.build();
}
}
以下代码列表显示了前述示例的结果:
partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2
4.1.17. 在侦听器容器上暂停和恢复分区
自从版本2.7以来,你可以通过在监听容器中使用pausePartition(TopicPartition topicPartition)和resumePartition(TopicPartition topicPartition)方法暂停和恢复分配给该消费者的特定分区的消费。 暂停和恢复分别发生在poll()之前和之后,类似于pause()和resume()方法。 isPartitionPauseRequested()方法返回true如果已经请求了对该分区的暂停。 isPartitionPaused()方法返回true如果该分区实际上已经被暂停。
自从2.7版本开始,ConsumerPartitionPausedEvent和ConsumerPartitionResumedEvent实例将作为容器的source属性和TopicPartition实例发布。
4.1.18. 序列化,反序列化,和消息转换
概述
Spring Boot 提供了用于配置和管理应用程序的高级 API。
它在 org.apache.kafka.common.serialization.Serializer<T> 和
org.apache.kafka.common.serialization.Deserializer<T> 的抽象中提供了一些内置实现。
同时,我们可以使用 Producer 或 Consumer 配置属性来指定配置类和配置属性。
以下示例展示了如何进行配置:
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
...
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
对于更复杂或特定的情况,KafkaConsumer(因此也是KafkaProducer)提供了重载构造函数,可以接受Serializer和Deserializer的实例,分别用于keys和values。
当您使用此API时,DefaultKafkaProducerFactory 和 DefaultKafkaConsumerFactory 也通过构造函数或setter方法提供属性,用于将自定义的 Serializer 和 Deserializer 实例注入到目标 Producer 或 Consumer 中。
此外,您还可以通过构造函数传入 Supplier<Serializer> 或 Supplier<Deserializer> 实例 - 这些 Supplier 在创建每个 Producer 或 Consumer 时被调用。
字符串序列化
自 2.5 版本起,Spring for Apache Kafka 提供了用于实体的字符串表示的 ToStringSerializer 和 ParseStringDeserializer 类。它们依赖于方法 toString 和某些 Function<String> 或 BiFunction<String, Headers> 来解析字符串并填充实例的属性。通常,这会调用类上的静态方法,例如 parse:
ToStringSerializer<Thing> thingSerializer = new ToStringSerializer<>();
//...
ParseStringDeserializer<Thing> deserializer = new ParseStringDeserializer<>(Thing::parse);
默认情况下,ToStringSerializer 配置为在记录 Headers 中传达关于序列化实体的类型信息。
可以通过将 addTypeInfo 属性设置为 false 来禁用此功能。
接收方的 ParseStringDeserializer 可以使用这些信息。
-
ToStringSerializer.ADD_TYPE_INFO_HEADERS(默认true): 您可以将其设置为false以在ToStringSerializer(设置addTypeInfo属性) 时禁用此功能。
ParseStringDeserializer<Object> deserializer = new ParseStringDeserializer<>((str, headers) -> {
byte[] header = headers.lastHeader(ToStringSerializer.VALUE_TYPE).value();
String entityType = new String(header);
if (entityType.contains("Thing")) {
return Thing.parse(str);
}
else {
// ...parsing logic
}
});
你可以通过默认值为UTF-8的Charset来配置将String转换为/从byte[]转换的Charset。
你可以使用 ConsumerConfig 属性通过解析器方法的名称来配置反序列化器:
-
ParseStringDeserializer.KEY_PARSER -
ParseStringDeserializer.VALUE_PARSER
The properties must contain the fully qualified name of the class followed by the method name, separated by a period ..
The method must be static and have a signature of either (String, Headers) or (String).
一个 ToFromStringSerde 也已提供,用于与 Kafka Streams 一起使用。
JSON
Spring for Apache Kafka 也提供了基于 Jackson JSON 对象映射器的 JsonSerializer 和 JsonDeserializer 实现。
JsonSerializer 允许将任何 Java 对象写入为 JSON byte[]。
JsonDeserializer 需要额外的 Class<?> targetType 参数以允许将消费的 byte[] 反序列化到正确的目标对象。
以下示例展示了如何创建一个 JsonDeserializer:
JsonDeserializer<Thing> thingDeserializer = new JsonDeserializer<>(Thing.class);
您可以自定义JsonSerializer和JsonDeserializer,使用一个ObjectMapper。
您还可以扩展它们,在configure(Map<String, ?> configs, boolean isKey)方法中实现特定的配置逻辑。
从版本 2.3 开始,所有 JSON-aware 组件默认配置为带有 JacksonUtils.enhancedObjectMapper() 的实例,该实例会禁用 MapperFeature.DEFAULT_VIEW_INCLUSION 和 DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES 特性。
此外,该实例还包含用于自定义数据类型的知名模块,例如 Java 时间和 Kotlin 支持。
请参阅 JacksonUtils.enhancedObjectMapper() JavaDocs 以获取更多信息。
此方法还注册了将 org.springframework.util.MimeType 对象序列化为 plain string 的功能,以实现跨平台网络兼容性。
可以在应用程序上下文中注册一个 JacksonMimeTypeModule 作为 bean,并会自动配置到 Spring Boot ObjectMapper 实例。
也从2.3版本开始,JsonDeserializer提供了基于TypeReference的构造函数,以便更好地处理目标泛型容器类型。
从 2.1 版本开始,你可以在记录 Headers 中传达类型信息,允许处理多种类型。
此外,你可以通过以下 Kafka 属性配置序列化器和反序列化器。
如果你为 KafkaConsumer 和 KafkaProducer 分别提供了 Serializer 和 Deserializer 实例,则这些属性将不起作用。
配置属性
-
JsonSerializer.ADD_TYPE_INFO_HEADERS(默认true): 您可以将其设置为false以在JsonSerializer(设置addTypeInfo属性) 时禁用此功能。 -
JsonSerializer.TYPE_MAPPINGS(默认empty): 请参见 Mapping Types。 -
JsonDeserializer.USE_TYPE_INFO_HEADERS(默认true):您可以将其设置为false以忽略序列化器设置的头信息。 -
JsonDeserializer.REMOVE_TYPE_INFO_HEADERS(默认true):您可以将其设置为false以保留序列化器设置的标头。 -
JsonDeserializer.KEY_DEFAULT_TYPE: 无标题信息时键的反序列化回退类型。 -
JsonDeserializer.VALUE_DEFAULT_TYPE: 无头信息时的反序列化回退类型。 -
JsonDeserializer.TRUSTED_PACKAGES(defaultjava.util,java.lang):逗号分隔的包模式列表,允许反序列化。*表示反序列化所有。 -
JsonDeserializer.TYPE_MAPPINGS(默认empty): 请参见 Mapping Types。 -
JsonDeserializer.KEY_TYPE_METHOD(默认empty): 请参见 使用方法确定类型。 -
JsonDeserializer.VALUE_TYPE_METHOD(默认empty): 请参见 使用方法确定类型。
从 2.2 版本开始,反序列化器会移除由序列化器添加的类型信息头(如果存在)。可以通过将 removeTypeHeaders 属性设置为 false 来恢复之前的处理方式,既可以直接设置在反序列化器上,也可以使用前面描述的配置属性。
从 2.8 版本开始,如果你像在 程序化构造 中所示那样程序化地构造序列化器或反序列化器,只要没有使用任何属性进行显式设置(使用 set*() 方法或使用流畅 API),上述属性将由工厂应用。此前,在程序化创建时,配置属性从未被应用;如果直接在对象上显式设置属性,则情况仍如此。 |
映射类型
从 2.2 版本开始,使用 JSON 时,可以通过前面列出的属性提供类型映射。
此前,您需要在序列化器和反序列化器中自定义类型映射器。
映射由前面列表中属性的以逗号分隔的 token:className 对组成。
在出站方向,负载的类名被映射到相应的 token。
在入站方向,类型头中的 token 被映射到相应的类名。
以下示例创建了一组映射:
senderProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
senderProps.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.mycat.Cat, hat:com.myhat.hat");
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerProps.put(JsonDeSerializer.TYPE_MAPPINGS, "cat:com.yourcat.Cat, hat:com.yourhat.hat");
| 对应的对象必须兼容。 |
如果使用 Spring Boot,你可以在 application.properties(或 yaml)文件中提供这些属性。
以下示例展示了如何操作:
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.properties.spring.json.type.mapping=cat:com.mycat.Cat,hat:com.myhat.Hat
|
您只能使用属性执行简单的配置。 对于更高级的配置(例如,在序列化程序和反序列化程序中使用自定义的
构造器也提供了 setter,作为使用这些构造器的替代方案。 |
从2.2版开始,您可以显式配置反序列化程序以使用提供的目标类型并忽略标头中的类型信息,方法是使用其中一个重载构造函数,该构造函数具有布尔值useHeadersIfPresent(默认为true)。 以下示例演示了如何执行此操作: \ "
}
DefaultKafkaConsumerFactory<Integer, Cat1> cf = new DefaultKafkaConsumerFactory<>(props,
new IntegerDeserializer(), new JsonDeserializer<>(Cat1.class, false));
使用方法确定类型
从2.5版本开始,您可以使用属性配置反序列化器,通过调用方法来确定目标类型。
如果存在该配置,则会覆盖上述讨论的其他技术。
这在数据由不使用Spring序列化的应用程序发布,且需要根据数据或其他头信息反序列化到不同类型时很有用。
将这些属性设置为方法名:完全限定类名后跟方法名,用点 . 分隔。
该方法必须声明为 public static,并具有以下三种签名之一:(String topic, byte[] data, Headers headers)、(byte[] data, Headers headers) 或 (byte[] data),并且返回一个 Jackson JavaType。
-
JsonDeserializer.KEY_TYPE_METHOD:spring.json.key.type.method -
JsonDeserializer.VALUE_TYPE_METHOD:spring.json.value.type.method
你可以使用任意的头信息或检查数据以确定类型。
JavaType thing1Type = TypeFactory.defaultInstance().constructType(Thing1.class);
JavaType thing2Type = TypeFactory.defaultInstance().constructType(Thing2.class);
public static JavaType thingOneOrThingTwo(byte[] data, Headers headers) {
// {"thisIsAFieldInThing1":"value", ...
if (data[21] == '1') {
return thing1Type;
}
else {
return thing2Type;
}
}
对于更复杂的数据检查,可以考虑使用JsonPath或类似方法,但确定类型的测试越简单,处理过程就越高效。
以下是一个通过程序方式创建反序列化器的示例(当在提供消费者工厂的构造函数中传入反序列化器时):
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeResolver(SomeClass::thing1Thing2JavaTypeForTopic);
...
public static JavaType thing1Thing2JavaTypeForTopic(String topic, byte[] data, Headers headers) {
...
}
程序化构建
在使用2.3版本之后,当程序化地构建用于生产者/消费者的序列化器/反序列化器时,您可以使用流畅API(fluent API),这简化了配置。
@Bean
public ProducerFactory<MyKeyType, MyValueType> pf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaProducerFactory<MyKeyType, MyValueType> pf = new DefaultKafkaProducerFactory<>(props,
new JsonSerializer<MyKeyType>()
.forKeys()
.noTypeInfo(),
new JsonSerializer<MyValueType>()
.noTypeInfo());
return pf;
}
@Bean
public ConsumerFactory<MyKeyType, MyValueType> cf() {
Map<String, Object> props = new HashMap<>();
// props.put(..., ...)
// ...
DefaultKafkaConsumerFactory<MyKeyType, MyValueType> cf = new DefaultKafkaConsumerFactory<>(props,
new JsonDeserializer<>(MyKeyType.class)
.forKeys()
.ignoreTypeHeaders(),
new JsonDeserializer<>(MyValueType.class)
.ignoreTypeHeaders());
return cf;
}
为了程序化地提供类型映射,类似于 使用方法确定类型,使用 typeFunction 属性。
JsonDeserializer<Object> deser = new JsonDeserializer<>()
.trustedPackages("*")
.typeFunction(MyUtils::thingOneOrThingTwo);
或者,只要不使用流畅API来配置属性,或者使用set*()方法进行设置,工厂将使用配置属性来配置序列化器/反序列化器;参见配置属性。
委托式序列化器和反序列化器
使用标题
版本 2.3 引入了 DelegatingSerializer 和 DelegatingDeserializer,这允许生产者产生并消费者用不同类型的键和/或值记录。
生产者必须设置一个头 DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR 到一个选择器值,该值用于选择用于值的序列化器和 DelegatingSerializer.KEY_SERIALIZATION_SELECTOR 用于键;如果未找到匹配项,将抛出 IllegalStateException。
对于输入记录,反序列化器使用相同的头来选择要使用的反序列化器;如果未找到匹配项或该头不存在,则返回原始byte[]。
You can configure the map of selector to Serializer / Deserializer via a constructor, or you can configure it via Kafka producer/consumer properties with the keys DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG and DelegatingSerializer.KEY_SERIALIZATION_SELECTOR_CONFIG.
For the serializer, the producer property can be a Map<String, Object> where the key is the selector and the value is a Serializer instance, a serializer Class or the class name.
The property can also be a String of comma-delimited map entries, as shown below.
反序列化器的消费者属性可以是一个 Map<String, Object>,其中键是选择器,值是一个 Deserializer 实例,一个 deserializer Class 或类名。该属性也可以是一个用逗号分隔的映射条目的字符串,如下所示。
使用属性进行配置时,请使用以下语法:
producerProps.put(DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Serializer, thing2:com.example.MyThing2Serializer")
consumerProps.put(DelegatingDeserializer.VALUE_SERIALIZATION_SELECTOR_CONFIG,
"thing1:com.example.MyThing1Deserializer, thing2:com.example.MyThing2Deserializer")
生产者则会将DelegatingSerializer.VALUE_SERIALIZATION_SELECTOR头设置为thing1或thing2。
这种技术支持将不同类型发送到同一主题(或不同主题)。
从 2.5.1 版本开始,如果类型(键或值)是 Serdes 所支持的标准类型之一(如 Long, Integer, 等),则无需设置 selector 头部;序列化器会将该头部设置为该类型的类名。对于这些类型无需配置序列化器或反序列化器,它们会在首次使用时动态创建。 |
对于另一种将不同类型发送到不同主题的技术,请参阅使用RoutingKafkaTemplate。
By 类型
版本 2.8 引入了 DelegatingByTypeSerializer。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null, new DelegatingByTypeSerializer(Map.of(
byte[].class, new ByteArraySerializer(),
Bytes.class, new BytesSerializer(),
String.class, new StringSerializer())));
}
从2.8.3版本开始,您可以配置序列化器检查映射键是否可以从目标对象继承,这对于委托序列化器可以序列化子类的情况很有用。在这种情况下,如果存在模棱两可的匹配,则需要提供一个有序的Map,例如一个LinkedHashMap。
By Topic
从 2.8 版本开始,DelegatingByTopicSerializer 和 DelegatingByTopicDeserializer 允许根据主题名称选择序列化器/反序列化器。
使用正则表达式 Pattern 来查找要使用的实例。
可以通过构造函数或属性配置映射(以逗号分隔的 pattern:serializer 列表)。
producerConfigs.put(DelegatingByTopicSerializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArraySerializer.class.getName()
+ ", topic[5-9]:" + StringSerializer.class.getName());
...
ConsumerConfigs.put(DelegatingByTopicDeserializer.VALUE_SERIALIZATION_TOPIC_CONFIG,
"topic[0-4]:" + ByteArrayDeserializer.class.getName()
+ ", topic[5-9]:" + StringDeserializer.class.getName());
使用 KEY_SERIALIZATION_TOPIC_CONFIG 时用于键的用途。
@Bean
public ProducerFactory<Integer, Object> producerFactory(Map<String, Object> config) {
return new DefaultKafkaProducerFactory<>(config,
null,
new DelegatingByTopicSerializer(Map.of(
Pattern.compile("topic[0-4]"), new ByteArraySerializer(),
Pattern.compile("topic[5-9]"), new StringSerializer())),
new JsonSerializer<Object>()); // default
}
您可以在没有模式匹配时,使用 DelegatingByTopicSerialization.KEY_SERIALIZATION_TOPIC_DEFAULT 和 DelegatingByTopicSerialization.VALUE_SERIALIZATION_TOPIC_DEFAULT 指定默认的序列化器/反序列化器。
一个额外的属性 DelegatingByTopicSerialization.CASE_SENSITIVE(默认值 true),当设置为 false 时会使主题查找不区分大小写。
重试反序列化器
The RetryingDeserializer uses a delegate Deserializer and RetryTemplate to retry deserialization when the delegate might have transient errors, such a network issues, during deserialization.
ConsumerFactory cf = new DefaultKafkaConsumerFactory(myConsumerConfigs,
new RetryingDeserializer(myUnreliableKeyDeserializer, retryTemplate),
new RetryingDeserializer(myUnreliableValueDeserializer, retryTemplate));
参考 spring-retry 项目了解如何配置 RetryTemplate 的重试策略、退避策略等。
Spring 消息 Messaging 消息转换
虽然从低级Kafka的Consumer和Producer视角来看,Serializer和Deserializer API很简单且灵活,但当你在使用@KafkaListener或Spring Integration的Apache Kafka支持时,在Spring消息层可能需要更多灵活性。为了让你容易地转换到和从org.springframework.messaging.Message,Spring for Apache Kafka提供了一个MessageConverter抽象,其MessagingMessageConverter实现和JsonMessageConverter(及其子类)的定制化。
你可以直接将MessageConverter注入到KafkaTemplate实例中,并通过使用AbstractKafkaListenerContainerFactory定义的@KafkaListener.containerFactory()属性进行注入。以下示例展示了如何操作:
@Bean
public KafkaListenerContainerFactory<?> kafkaJsonListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setMessageConverter(new JsonMessageConverter());
return factory;
}
...
@KafkaListener(topics = "jsonData",
containerFactory = "kafkaJsonListenerContainerFactory")
public void jsonListener(Cat cat) {
...
}
当使用 Spring Boot 时,只需将转换器定义为一个 @Bean,Spring Boot 自动配置就会将其连接到自动配置的模板和容器工厂。
当你使用一个 @KafkaListener,消息转换器会提供参数类型以协助进行转换。
|
这种类型推断只能在 |
|
在消费者侧,您可以配置一个 在生产者端,当你使用Spring Integration或方法
再次使用 为了方便,在2.3版本开始,框架还提供了一个 |
从 2.7.1 版本开始,消息有效载荷转换可以委托给一个 spring-messaging SmartMessageConverter; 这使得转换,例如,可以基于 MessageHeaders.CONTENT_TYPE 头部进行。
类似地,在 在任何情况下,如果 |
当使用默认的 converter 在 KafkaTemplate 和 listener 容器工厂中时,您通过在模板上调用 setMessagingConverter() 并通过在 @KafkaListener 的方法上的 contentMessageConverter 属性来配置 SmartMessageConverter。
示例:
template.setMessagingConverter(mySmartConverter);
@KafkaListener(id = "withSmartConverter", topics = "someTopic",
contentTypeConverter = "mySmartConverter")
public void smart(Thing thing) {
...
}
使用 Spring Data 投影接口
从 2.1.1 版本开始,您可以将 JSON 转换为 Spring Data Projection 接口,而不是具体的类型。 这允许非常选择性的、低耦合的数据绑定,包括在 JSON 文档中的多个位置查找值。 例如,可以定义以下接口作为消息负载类型:
interface SomeSample {
@JsonPath({ "$.username", "$.user.name" })
String getUsername();
}
@KafkaListener(id="projection.listener", topics = "projection")
public void projection(SomeSample in) {
String username = in.getUsername();
...
}
访问器方法将默认用于通过接收到的 JSON 文档中的字段来查找属性名称。@JsonPath 表达式允许自定义值查找方式,甚至可以定义多个 JSON 路径表达式,以从多个位置查找值,直到某个表达式返回实际值。
要启用此功能,请使用一个配置了适当委托转换器的ProjectingMessageConverter(用于出站转换和将非投影接口进行转换)。
您还必须将spring-data:spring-data-commons和com.jayway.jsonpath:json-path添加到类路径中。
当用作 @KafkaListener 方法的参数时,接口类型会像平常一样自动传递给转换器。
使用ErrorHandlingDeserializer
When 一个反序列化器无法反序列化一条消息时,Spring 没有办法处理这个问题,因为该问题发生在poll()返回之前。
为了解决这个问题,引入了ErrorHandlingDeserializer。
该反序列化器委托给实际的反序列化器(键或值)。
如果委托的反序列化器无法反序列化记录内容,ErrorHandlingDeserializer会返回一个null值,并在包含原因和原始字节的DeserializationException头中返回。
当你使用记录级的MessageListener时,如果ConsumerRecord包含针对键或值的DeserializationException头,容器的ErrorHandler将被调用,传入失败的ConsumerRecord。
该记录不会传递给监听器。
或者,你可以通过提供一个failedDeserializationFunction来配置ErrorHandlingDeserializer,从而创建一个自定义值。
该函数用于创建T的实例,该实例以 usual 的方式传递给监听器。
一个包含所有上下文信息的FailedDeserializationInfo类型的对象会提供给该函数。
你可以在头信息中找到DeserializationException(作为序列化的Java对象)。
有关ErrorHandlingDeserializer的更多信息,请参阅Javadoc。
您可以使用接受 key 和 value Deserializer 对象的 DefaultKafkaConsumerFactory 构造函数,并将通过使用适当的委托配置的 ErrorHandlingDeserializer 实例进行连接。
或者,您可以使用消费者配置属性(这些属性用于由 ErrorHandlingDeserializer 使用的实例化)来实例化委托。
属性名称为 ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS 和 ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS。
属性值可以是类或类名。
以下示例展示了如何设置这些属性:
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
以下示例使用了一个 failedDeserializationFunction。
public class BadFoo extends Foo {
private final FailedDeserializationInfo failedDeserializationInfo;
public BadFoo(FailedDeserializationInfo failedDeserializationInfo) {
this.failedDeserializationInfo = failedDeserializationInfo;
}
public FailedDeserializationInfo getFailedDeserializationInfo() {
return this.failedDeserializationInfo;
}
}
public class FailedFooProvider implements Function<FailedDeserializationInfo, Foo> {
@Override
public Foo apply(FailedDeserializationInfo info) {
return new BadFoo(info);
}
}
上述示例使用了以下配置:
...
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class);
consumerProps.put(ErrorHandlingDeserializer.VALUE_FUNCTION, FailedFooProvider.class);
...
如果消费者配置有 模板的泛型值类型应该是 一种技术是使用 |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
当使用批处理监听器的ErrorHandlingDeserializer时,必须在消息头中检查反序列化异常。
当与DefaultBatchErrorHandler一起使用时,可以使用该头确定异常失败的记录,并通过BatchListenerFailedException将错误传递给错误处理程序。
@KafkaListener(id = "test", topics = "test")
void listen(List<Thing> in, @Header(KafkaHeaders.BATCH_CONVERTED_HEADERS) List<Map<String, Object>> headers) {
for (int i = 0; i < in.size(); i++) {
Thing thing = in.get(i);
if (thing == null
&& headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER) != null) {
DeserializationException deserEx = ListenerUtils.byteArrayToDeserializationException(this.logger,
(byte[]) headers.get(i).get(SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER));
if (deserEx != null) {
logger.error(deserEx, "Record at index " + i + " could not be deserialized");
}
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
process(thing);
}
}
ListenerUtils.byteArrayToDeserializationException() 可用于将头信息转换为 DeserializationException。
当消费 List<ConsumerRecord<?, ?> 时,使用 ListenerUtils.getExceptionFromHeader() 代替:
@KafkaListener(id = "kgh2036", topics = "kgh2036")
void listen(List<ConsumerRecord<String, Thing>> in) {
for (int i = 0; i < in.size(); i++) {
ConsumerRecord<String, Thing> rec = in.get(i);
if (rec.value() == null) {
DeserializationException deserEx = ListenerUtils.getExceptionFromHeader(rec,
SerializationUtils.VALUE_DESERIALIZER_EXCEPTION_HEADER, this.logger);
if (deserEx != null) {
logger.error(deserEx, "Record at offset " + rec.offset() + " could not be deserialized");
throw new BatchListenerFailedException("Deserialization", deserEx, i);
}
}
process(rec.value());
}
}
如果同时使用一个DeadLetterPublishingRecoverer,则为1发布的记录将具有一个 |
负载转换与批处理监听器
You can also use a JsonMessageConverter within a BatchMessagingMessageConverter to convert batch messages when you use a batch listener container factory.
See Serialization, Deserialization, and Message Conversion and Spring Messaging Message Conversion for more information.
默认情况下,转换的类型是从监听器参数推断出来的。
如果你将 JsonMessageConverter 配置为具有其 TypePrecedence 设置为 TYPE_ID(而不是默认的 INFERRED)的 DefaultJackson2TypeMapper,转换器会使用标头中的类型信息(如果存在)。
这允许,例如,监听方法可以被声明为接口而不是具体的类。
此外,类型转换器支持映射,因此反序列化可以是与源不同的类型(只要数据兼容)。
当你使用 类级别的 @KafkaListener 实例时,这也非常有用,在这种情况下,必须已经将负载转换为确定要调用的方法。
下面的例子创建了使用此方法的bean:
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setMessageConverter(new BatchMessagingMessageConverter(converter()));
return factory;
}
@Bean
public JsonMessageConverter converter() {
return new JsonMessageConverter();
}
请注意,为此要生效,转换目标的方法签名必须是一个具有单个泛型参数类型的容器对象,例如以下所示:
@KafkaListener(topics = "blc1")
public void listen(List<Foo> foos, @Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
注意,您仍然可以访问批处理标题。
如果批处理转换器具有支持记录转换器,则也可以接收一个其payload根据泛型类型进行转换的消息列表。 以下示例展示了如何实现:
@KafkaListener(topics = "blc3", groupId = "blc3")
public void listen1(List<Message<Foo>> fooMessages) {
...
}
ConversionService自定义
从版本 2.1.1 开始,用于默认 o.s.messaging.handler.annotation.support.MessageHandlerMethodFactory 解析监听器方法参数调用的 org.springframework.core.convert.ConversionService,会在实现以下任一接口的所有 bean 上提供:
-
org.springframework.core.convert.converter.Converter -
org.springframework.core.convert.converter.GenericConverter -
org.springframework.format.Formatter
这允许你在不更改默认配置用于 0 和 1 的情况下,进一步自定义监听器反序列化。
通过KafkaListenerConfigurer beans为KafkaListenerEndpointRegistrar设置的自定义MessageHandlerMethodFactory将禁用此功能。 |
添加自定义HandlerMethodArgumentResolver to @KafkaListener
从 2.4.2 版本开始,您可以添加自己的 HandlerMethodArgumentResolver 并解析自定义方法参数。
您只需要实现 KafkaListenerConfigurer,并使用来自类 KafkaListenerEndpointRegistrar 的方法 setCustomMethodArgumentResolvers()。
@Configuration
class CustomKafkaConfig implements KafkaListenerConfigurer {
@Override
public void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {
registrar.setCustomMethodArgumentResolvers(
new HandlerMethodArgumentResolver() {
@Override
public boolean supportsParameter(MethodParameter parameter) {
return CustomMethodArgument.class.isAssignableFrom(parameter.getParameterType());
}
@Override
public Object resolveArgument(MethodParameter parameter, Message<?> message) {
return new CustomMethodArgument(
message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC, String.class)
);
}
}
);
}
}
You can also completely replace the framework’s argument resolution by adding a custom MessageHandlerMethodFactory to the KafkaListenerEndpointRegistrar bean.
If you do this, and your application needs to handle tombstone records, with a null value() (e.g. from a compacted topic), you should add a KafkaNullAwarePayloadArgumentResolver to the factory; it must be the last resolver because it supports all types and can match arguments without a @Payload annotation.
If you are using a DefaultMessageHandlerMethodFactory, set this resolver as the last custom resolver; the factory will ensure that this resolver will be used before the standard PayloadMethodArgumentResolver, which has no knowledge of KafkaNull payloads.
4.1.19. 消息头
The 0.11.0.0 客户端引入了对消息中头信息的支持。
从 2.0 版本开始,Spring for Apache Kafka 现在支持将这些头信息映射到和从 spring-messaging MessageHeaders 映射。
之前的版本将 ConsumerRecord 和 ProducerRecord 映射到 spring-messaging 的 Message<?>,其中 value 属性映射到 payload,以及其他属性(topic、partition 等)映射到头(headers)。
这仍然适用,但现在还可以映射额外的(任意的)头(headers)。 |
Apache Kafka 头部的 API 非常简单,如下面的接口定义所示:
public interface Header {
String key();
byte[] value();
}
The KafkaHeaderMapper strategy is provided to map header entries between Kafka Headers and MessageHeaders.
Its interface definition is as follows:
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
The SimpleKafkaHeaderMapper 映射原始头为 byte[],并提供转换为 String 值的配置选项。
The DefaultKafkaHeaderMapper maps the key to the MessageHeaders header name and, in order to support rich header types for outbound messages, JSON conversion is performed.
A “special” header (with a key of spring_json_header_types) contains a JSON map of <key>:<type>.
This header is used on the inbound side to provide appropriate conversion of each header value to the original type.
在入站侧,所有 Kafka Header 实例都映射到 MessageHeaders。
在出站侧,默认情况下,所有 MessageHeaders 都已映射,除了 id、timestamp 以及映射到 ConsumerRecord 属性的头信息。
您可以使用模式将要映射的出站消息头进行指定,通过将模式提供给映射器。 以下列表展示了多个示例映射:
public DefaultKafkaHeaderMapper() { (1)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public DefaultKafkaHeaderMapper(String... patterns) { (3)
...
}
public DefaultKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
| 1 | 使用默认的 Jackson ObjectMapper,并映射大多数标头,如前所述在示例中讨论的那样。 |
| 2 | 使用提供的 Jackson ObjectMapper,并映射大多数头信息,正如在示例之前讨论的那样。 |
| 3 | 使用默认的 Jackson ObjectMapper,并根据提供的模式映射头。 |
| 4 | 使用提供的 Jackson ObjectMapper 并根据提供的模式映射标头。 |
模式相当简单,可以包含一个前导通配符(), a trailing wildcard, or both (for example,.cat.*). You can negate patterns with a leading!<p>第一个匹配头部名称的模式(无论正负)都会被采用。</p>
当您提供自己的模式时,我们建议包含!id和!timestamp,因为这些标头在入站侧是只读的。
默认情况下,mapper 仅反序列化编号为 java.lang 和 java.util 的类。
可以通过使用 addTrustedPackages 方法添加可信的包来信任其他(或所有)包。
如果收到来自不可信来源的消息,您可能只想添加您信任的那些包。
要信任所有包,可以使用 mapper.addTrustedPackages("*")。 |
将 String 个标头值以原始形式进行映射,在与不熟悉映射器 JSON 格式的系统通信时很有用。 |
从 2.2.5 版本开始,您可以指定某些字符串值的头不应使用 JSON 映射,但可/从原始 byte[] 映射。
AbstractKafkaHeaderMapper 有新属性;当设置为 true 时,所有字符串值的头将使用 charset 属性(默认 UTF-8)转换为 byte[]。
此外,还有一个属性 rawMappedHeaders,其值为 header name : boolean 的映射;如果该映射包含一个头名称,且该头包含 String 值,则将使用字符集直接映射为原始 byte[]。
该映射还用于在以下情况下将原始传入的 byte[] 头映射为 String:当且仅当映射值中的布尔值为 true。
如果布尔值为 false,或者头名称不在映射中且值为 true,则传入的头将直接作为原始未映射头处理。
以下测试用例说明了这种机制。
@Test
public void testSpecificStringConvert() {
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
两者头映射器默认都会映射所有入站头。 自 2.8.8 版本起,模式也可以应用于入站映射。 要创建入站映射的映射器,请使用相应映射器上的静态方法:
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static DefaultKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如:
DefaultKafkaHeaderMapper inboundMapper = DefaultKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除所有以 abc 开头的标题,并包含其他所有标题。
默认情况下,在MessagingMessageConverter和BatchMessagingMessageConverter中使用DefaultKafkaHeaderMapper,只要Jackson在类路径上。
使用批处理转换器时,转换后的标题在KafkaHeaders.BATCH_CONVERTED_HEADERS中作为List<Map<String, Object>>出现,其中列表中位置的映射对应于payload中数据的位置。
如果不存在转换器(要么是因为Jackson不存在,要么是明确设置为null),则从消费者记录中提供的标头在KafkaHeaders.NATIVE_HEADERS标头中未转换提供。此标头是一个Headers对象(或在批处理转换器的情况下是List<Headers>对象),其中列表中的位置对应于有效负载中的数据位置)。
某些类型不适合JSON序列化,对于这些类型,简单的toString()序列化可能更受青睐。
DefaultKafkaHeaderMapper具有一种名为addToStringClasses()的方法,该方法让您可以为出站映射提供应以这种方式处理的类的名称。
在入站映射期间,它们被映射为String。
默认情况下,仅org.springframework.util.MimeType和org.springframework.http.MediaType被这样映射。 |
从 2.3 版本开始,String 类型的头部处理被简化。
此类头部不再默认进行 JSON 编码(即不再添加包裹的 "…")。
类型仍然添加到 JSON_TYPES 头部,接收系统可以将其转换回字符串(从 byte[])。
映射器可以处理由旧版本产生的头部(它会检查是否有前置的 ");因此使用 2.3 的应用程序可以消费来自旧版本的记录。 |
为与早期版本兼容,将 encodeStrings 设置为 true,如果由使用 2.3 的版本产生的记录需要被使用早期版本的应用消费。
当所有应用都使用 2.3 或更高版本时,可以将该属性保留默认值 false。 |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
DefaultKafkaHeaderMapper mapper = new DefaultKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,將會自動配置該使用者Converter這個轉換器 bean 到自動配置的 KafkaTemplate;否則您需要將該使用者Converter 添加到模板中。
4.1.20. Null Payloads and Log Compaction of \'Tombstone\' Records
使用日志压缩时,您可以发送和接收带有
你还可以因为其他原因接收到null值,例如当一个Deserializer无法反序列化某个值时可能会返回null。
要通过KafkaTemplate发送一个null载荷,您可以在send()方法的值参数中传递null。send(Message<?> message)变体是此规则的一个例外。由于spring-messagingMessage<?>不能有null载荷,因此可以使用一种称为KafkaNull的特殊载荷类型,框架会发送null。为方便起见,提供了静态KafkaNull.INSTANCE。
使用消息监听器容器时,接收到的 ConsumerRecord 具有 null value()。
To configure the @KafkaListener to handle null payloads, you must use the @Payload annotation with required = false.
If it is a tombstone message for a compacted log, you usually also need the key so that your application can determine which key was “deleted”.
The following example shows such a configuration:
@KafkaListener(id = "deletableListener", topics = "myTopic")
public void listen(@Payload(required = false) String value, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key) {
// value == null represents key deletion
}
当您在类级别使用@KafkaListener并具有多个@KafkaHandler方法时,需要一些额外的配置。具体来说,您需要一个带有KafkaNull负载的@KafkaHandler方法。下面的示例展示了如何进行配置:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String cat) {
...
}
@KafkaHandler
public void listen(Integer hat) {
...
}
@KafkaHandler
public void delete(@Payload(required = false) KafkaNull nul, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int key) {
...
}
}
注意,参数是 null,而不是 KafkaNull。
见手动分配所有分区。 |
此功能需要使用 当使用自定义的 |
4.1.21. 处理异常
本部分描述了在使用 Spring for Apache Kafka 时如何处理各种可能出现的异常。
监听器错误处理程序
从 2.0 版本开始,@KafkaListener 注解新增了属性:errorHandler。
你可以使用 errorHandler 来提供一个 KafkaListenerErrorHandler 实现的 bean 名称。
此函数式接口有一个方法,如下列示的代码所示:
@FunctionalInterface
public interface KafkaListenerErrorHandler {
Object handleError(Message<?> message, ListenerExecutionFailedException exception) throws Exception;
}
你有权限访问由消息转换器产生的 spring-messaging Message<?> 对象,以及由监听器抛出的异常,该异常被包装在 ListenerExecutionFailedException 中。
错误处理器可以抛出原始或一个新的异常,该异常将被抛给容器。
错误处理器返回的任何内容都会被忽略。
从 2.7 版本开始,您可以将 rawRecordHeader 属性设置在 MessagingMessageConverter 和 BatchMessagingMessageConverter 上,这会使原始的 ConsumerRecord 添加到在 KafkaHeaders.RAW_DATA 头部转换的 Message<?> 中。
这在例如您希望在监听器错误处理程序中使用 DeadLetterPublishingRecoverer 时很有用。
它可能在请求/回复场景中使用,您希望在重试一定次数后捕获失败记录并将其发送到发送者,其中失败记录已放入死信主题中。
@Bean
KafkaListenerErrorHandler eh(DeadLetterPublishingRecoverer recoverer) {
return (msg, ex) -> {
if (msg.getHeaders().get(KafkaHeaders.DELIVERY_ATTEMPT, Integer.class) > 9) {
recoverer.accept(msg.getHeaders().get(KafkaHeaders.RAW_DATA, ConsumerRecord.class), ex);
return "FAILED";
}
throw ex;
};
}
它有一个子接口 (ConsumerAwareListenerErrorHandler),可以通过以下方法访问consumer对象:
Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer);
如果您错误处理程序实现这个接口,您可以,例如,相应地调整偏移量。 例如,要重置偏移量来重播失败的消息,你可以做一些像下面这样的事情:
@Bean
public ConsumerAwareListenerErrorHandler listen3ErrorHandler() {
return (m, e, c) -> {
this.listen3Exception = e;
MessageHeaders headers = m.getHeaders();
c.seek(new org.apache.kafka.common.TopicPartition(
headers.get(KafkaHeaders.RECEIVED_TOPIC, String.class),
headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, Integer.class)),
headers.get(KafkaHeaders.OFFSET, Long.class));
return null;
};
}
Similarly, you could do something like the following for a batch listener:
@Bean
public ConsumerAwareListenerErrorHandler listen10ErrorHandler() {
return (m, e, c) -> {
this.listen10Exception = e;
MessageHeaders headers = m.getHeaders();
List<String> topics = headers.get(KafkaHeaders.RECEIVED_TOPIC, List.class);
List<Integer> partitions = headers.get(KafkaHeaders.RECEIVED_PARTITION_ID, List.class);
List<Long> offsets = headers.get(KafkaHeaders.OFFSET, List.class);
Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
for (int i = 0; i < topics.size(); i++) {
int index = i;
offsetsToReset.compute(new TopicPartition(topics.get(i), partitions.get(i)),
(k, v) -> v == null ? offsets.get(index) : Math.min(v, offsets.get(index)));
}
offsetsToReset.forEach((k, v) -> c.seek(k, v));
return null;
};
}
这将批次中的每个主题/分区重置为批次中的最低偏移量。
| 前面两个示例是简单的实现,你可能希望在错误处理器中进行更多的检查。 |
容器错误处理器
Starting with version 2.8, the legacy ErrorHandler and BatchErrorHandler interfaces have been superceded by a new CommonErrorHandler.
These error handlers can handle errors for both record and batch listeners, allowing a single listener container factory to create containers for both types of listener.
CommonErrorHandler implementations to replace most legacy framework error handler implementations are provided and the legacy error handlers deprecated.
The legacy interfaces are still supported by listener containers and listener container factories; they will be deprecated in a future release.
见 将自定义遗留错误处理程序迁移到 CommonErrorHandler 以获取将自定义错误处理程序迁移到 CommonErrorHandler 的信息。
当使用事务时,如果没有配置错误处理程序,则异常将回滚事务。
事务容器的错误处理由 AfterRollbackProcessor 处理。
如果你在使用事务时提供自定义错误处理程序,并且希望回滚事务,则该处理程序必须抛出异常。
此接口具有一个默认方法 isAckAfterHandle(),由容器在错误处理程序在不抛出异常的情况下返回时,用来确定是否应提交偏移量;默认返回 true。
通常,框架提供的错误处理程序在错误未被“处理”时(例如在执行seek操作后)将抛出异常。
默认情况下,容器会在ERROR级别记录此类异常。
所有框架错误处理程序都扩展KafkaExceptionLogLevelAware,这允许您控制这些异常的日志记录级别。
/**
* Set the level at which the exception thrown by this handler is logged.
* @param logLevel the level (default ERROR).
*/
public void setLogLevel(KafkaException.Level logLevel) {
...
}
您可以指定一个全局错误处理器,用于容器工厂中所有监听器的错误处理。 以下示例展示了如何实现:
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setCommonErrorHandler(myErrorHandler);
...
return factory;
}
默认情况下,如果带有注解的监听器方法抛出异常,该异常将传递到容器,并根据容器的配置来处理消息。
容器会在调用错误处理程序之前先提交任何待处理的偏移提交。
如果您使用 Spring Boot,只需将错误处理程序作为 a @Bean 添加,Boot 将会将其添加到自动配置的工厂中。
默认错误处理器
这个新的错误处理器取代了SeekToCurrentErrorHandler和RecoveringBatchErrorHandler,它们在过去多个发布版本中都是默认的错误处理器。
一个不同之处是,当抛出除BatchListenerFailedException以外的异常时,批处理监听器的回退行为等同于重试完整批次。
出错处理程序可以恢复(跳过)一直失败的记录。
默认情况下,在发生十次失败后,失败的记录会被记录(记录级别为ERROR)。
你可以通过配置自定义的恢复器(BiConsumer)以及一个BackOff来控制重试次数和每次重试之间的延迟。
使用FixedBackOff配合FixedBackOff.UNLIMITED_ATTEMPTS将导致(实际上)无限重试。
以下示例配置了在尝试三次后进行恢复:
DefaultErrorHandler errorHandler =
new DefaultErrorHandler((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
为了使用自定义的此处理器实例配置监听器容器,请将其添加到容器工厂中。
例如,使用 @KafkaListener 容器工厂,你可以添加 DefaultErrorHandler 如下:
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckMode(AckMode.RECORD);
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 2L)));
return factory;
}
对于记录器监听器,这将使用1秒的退避策略重试最多2次(总共3次发送尝试),而不是默认配置(FixedBackOff(0L, 9))。
在重试耗尽后,失败将仅记录日志。
例如;如果 poll 返回六条记录(每分区两条,共三个分区),且侦听器在第四条记录上抛出异常,则容器会确认前三条消息并提交它们的偏移量。
DefaultErrorHandler 将分区 1 的偏移量设置为 1,并将分区 2 的偏移量设置为 0。
下一个 poll() 返回三条未处理的记录。
如果 AckMode 是 BATCH,则容器会在调用错误处理程序之前,为前两个分区提交偏移量。
批处理监听器必须抛出一个BatchListenerFailedException,表示批处理中哪些记录失败。
事件序列是:
-
提交索引前的记录的偏移量。
-
如果重试次数未耗尽,则执行seek操作,使所有剩余记录(包括失败的记录)都会被重新投递。
-
如果重试次数用尽,尝试恢复失败的记录(默认仅日志)并执行seek操作,以便剩余的记录(不包括失败的记录)将被重新传递。 恢复后的记录的偏移量将被提交
-
如果重试次数耗尽且恢复失败,那么将像重试次数未耗尽一样执行寻道操作。
The 默认 recoverer 在尝试次数耗尽后会记录失败的记录。
您可以使用自定义 recoverer,或使用框架提供的 recoverer,例如 DeadLetterPublishingRecoverer。
当使用POJO批处理监听器(例如:List<Thing>),且你没有完整的消费记录可添加到异常中时,你只需要添加失败记录的索引:
@KafkaListener(id = "recovering", topics = "someTopic")
public void listen(List<Thing> things) {
for (int i = 0; i < records.size(); i++) {
try {
process(things.get(i));
}
catch (Exception e) {
throw new BatchListenerFailedException("Failed to process", i);
}
}
}
当容器配置为使用 AckMode.MANUAL_IMMEDIATE 时,错误处理器可以配置在恢复记录后提交偏移量;将 commitRecovered 属性设置为 true。
参见 发布死信记录。
在使用事务时,提供了类似的 DefaultAfterRollbackProcessor 功能。
查看 回滚后处理器。
The DefaultErrorHandler 将某些异常视为致命,对于此类异常会跳过重试;在第一次失败时会调用 recoverer。
默认视为致命的异常包括:
-
DeserializationException -
MessageConversionException -
ConversionException -
MethodArgumentResolutionException -
NoSuchMethodException -
ClassCastException
由于这些异常在重试投递时不太可能得到解决。
您可以将更多异常类型添加到不可重试类别,或完全替换分类异常的映射。
请参阅DefaultErrorHandler.addNotRetryableException()和DefaultErrorHandler.setClassifications()的Javadocs以获取更多信息,以及spring-retry和BinaryExceptionClassifier的Javadocs。
这里是将 IllegalArgumentException 添加到不可重试异常的示例:
@Bean
public DefaultErrorHandler errorHandler(ConsumerRecordRecoverer recoverer) {
DefaultErrorHandler handler = new DefaultErrorHandler(recoverer);
handler.addNotRetryableExceptions(IllegalArgumentException.class);
return handler;
}
错误处理程序可以配置一个或多个RetryListener,接收重试和恢复进度的通知。
从2.8.10版本开始,增加了批量监听器的方法。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
default void failedDelivery(ConsumerRecords<?, ?> records, Exception ex, int deliveryAttempt) {
}
default void recovered(ConsumerRecords<?, ?> records, Exception ex) {
}
default void recoveryFailed(ConsumerRecords<?, ?> records, Exception original, Exception failure) {
}
}
查看更多Java文档信息。
如果恢复器失败(抛出异常),则失败的记录将包含在seeks中。
如果恢复器失败,默认会将BackOff重置,并会在再次尝试恢复前按照回退策略重新进行重试。
要跳过恢复失败后的重试,将错误处理程序的resetStateOnRecoveryFailure设置为false。 |
你可以通过提供一个BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>的错误处理程序来确定BackOff的使用,该BackOff将基于失败的记录和/或异常来决定:
handler.setBackOffFunction((record, ex) -> { ... });
如果函数返回 null,将使用处理器的默认 BackOff。
设置 resetStateOnExceptionChange 到 true,如果失败之间的异常类型发生变化,则重置重试序列(如果已配置,将选择一个新的BackOff)。默认情况下,不考虑异常类型。
也请参见 Delivery Attempts 头部。
批处理错误处理的转换错误
从版本 2.8 开始,当使用一个 MessageConverter 与一个 ByteArrayDeserializer、一个 BytesDeserializer 或一个 StringDeserializer,以及一个 DefaultErrorHandler 时,批处理监听器现在可以正确处理转换错误。
当发生转换错误时,payload 会被设置为 null,并会在记录头中添加一个反序列化异常,类似于 ErrorHandlingDeserializer。
监听器中可用 ConversionException 的列表,以便监听器可以抛出 BatchListenerFailedException,表示转换异常首次发生的位置索引。
示例:
@KafkaListener(id = "test", topics = "topic")
void listen(List<Thing> in, @Header(KafkaHeaders.CONVERSION_FAILURES) List<ConversionException> exceptions) {
for (int i = 0; i < in.size(); i++) {
Foo foo = in.get(i);
if (foo == null && exceptions.get(i) != null) {
throw new BatchListenerFailedException("Conversion error", exceptions.get(i), i);
}
process(foo);
}
}
重试完整批次
这是现在 批处理监听器 的回退行为,当监听器抛出除 BatchListenerFailedException 以外的异常时,DefaultErrorHandler 的行为将作为回退。
无法保证在重新发送批次时,批次具有相同数量的记录和/或重新发送的记录在同一顺序。
因此,很难轻松维护批次的重试状态。
FallbackBatchErrorHandler采取了以下方法。如果批次监听器抛出的异常不是BatchListenerFailedException,则从内存中的批次记录执行重试。
为了避免在长时间重试序列期间发生重新平衡,错误处理器会暂停消费者,在每次重试前睡眠之前进行轮询,并再次调用监听器。
如果重试耗尽,则为批次中的每个记录调用ConsumerRecordRecoverer。
如果恢复者抛出异常,或者线程在睡眠过程中被中断,则记录批次将在下一次轮询时重新发送。
无论结果如何,在退出之前,都会恢复消费者。
| 此机制无法与事务一起使用。 |
在等待 BackOff 间隔时,错误处理程序将通过短暂停留进行循环,直到达到期望的延迟,同时检查容器是否已停止,从而在 stop() 处尽快退出循环,而不是造成延迟。
容器停止错误处理程序
The CommonContainerStoppingErrorHandler stops the container if the listener throws an exception.
For record listeners, when the AckMode is RECORD, offsets for already processed records are committed.
For record listeners, when the AckMode is any manual value, offsets for already acknowledged records are committed.
For record listeners, wWhen the AckMode is BATCH, or for batch listeners, the entire batch is replayed when the container is restarted.
容器停止后,会抛出包装 ListenerExecutionFailedException 的异常。
这是为了在事务启用时导致事务回滚。
委托式错误处理器
The CommonDelegatingErrorHandler 可以根据异常类型委托到不同的错误处理器。
例如,您可以希望为大多数异常调用 DefaultErrorHandler,或者为其他异常调用 CommonContainerStoppingErrorHandler。
使用不同的常见错误处理器为记录器和批处理监听器
如果您希望为record和batch监听器使用不同的错误处理策略,提供了CommonMixedErrorHandler,允许为每种监听器类型配置特定的错误处理器。
<p>通用错误处理器摘要</p>
-
DefaultErrorHandler -
CommonContainerStoppingErrorHandler -
CommonDelegatingErrorHandler -
CommonLoggingErrorHandler -
CommonMixedErrorHandler
遗留错误处理器及其替换
| 遗留错误处理程序 | 替换 |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
无替换,使用 |
|
|
|
没有替换 - 使用 |
迁移自定义遗留错误处理实现到CommonErrorHandler
参考 CommonErrorHandler 中的 JavaDocs。
要替换一个 ErrorHandler 或 ConsumerAwareErrorHandler 实现,你应该实现 handleRecord() 并留下 remainingRecords() 来返回 false(默认值)。
你还应该实现 handleOtherException() - 处理发生在记录处理范围之外的异常(例如,消费者错误)。
为了替换一个 RemainingRecordsErrorHandler 实现,您应该实现 handleRemaining() 并重写 remainingRecords() 以返回 true。 您还应该实现 handleOtherException() - 用于处理记录处理范围之外发生的异常(例如,消费者错误)。
To replace any BatchErrorHandler 实现,你应该实现 handleBatch()
你也应该实现 handleOtherException() - 以处理超出记录处理范围的异常(例如消费者错误)。
后滚操作处理器
在使用事务时,如果监听器抛出异常(且如果存在错误处理程序也抛出异常),事务将被回滚。
默认情况下,任何未处理的记录(包括失败的记录)将在下次轮询时重新获取。
这通过在DefaultAfterRollbackProcessor中执行seek个操作实现。
对于批处理监听器,整个记录批次将被重新处理(容器不知道哪个记录在批次中失败)。
要修改此行为,可以使用自定义的AfterRollbackProcessor配置。
例如,对于基于记录的监听器,您可能需要跟踪失败的记录,并在尝试一定次数后放弃,或许将其发布到死信主题。
From version 2.2, the DefaultAfterRollbackProcessor can now recover (skip) a record that keeps failing.
By default, after ten failures, the failed record is logged (at the ERROR level).
You can configure the processor with a custom recoverer (BiConsumer) and maximum failures.
Setting the maxFailures property to a negative number causes infinite retries.
The following example configures recovery after three tries:
AfterRollbackProcessor<String, String> processor =
new DefaultAfterRollbackProcessor((record, exception) -> {
// recover after 3 failures, with no back off - e.g. send to a dead-letter topic
}, new FixedBackOff(0L, 2L));
当你不使用事务时,可以通过配置一个 DefaultErrorHandler。
查看 容器错误处理器。
| 恢复在批处理监听器中是不可能的,因为框架没有关于哪个批次中的记录持续失败的知识。在这种情况下,应用程序监听器必须处理一个持续失败的记录。 |
参见 发布死信记录。
从 2.2.5 版本开始,DefaultAfterRollbackProcessor 可以在新的事务中被调用(新事务在失败事务回滚后启动)。
然后,如果你使用 DeadLetterPublishingRecoverer 发布失败记录,处理器会将恢复记录的偏移发送到原始主题/分区的事务。
要启用此功能,请在 DefaultAfterRollbackProcessor 上设置 commitRecovered 和 kafkaTemplate 属性。
如果恢复器失败(抛出异常),则失败的记录将包含在seeks中。
从2.5.5版本开始,如果恢复器失败,默认会将BackOff重置,并在恢复尝试再次时按照退避策略重新投递。
在更早的版本中,BackOff不会被重置,恢复会在下一次失败时再次尝试。
要还原到以前的行为,请将处理器的resetStateOnRecoveryFailure属性设置为false。 |
从版本 2.6 开始,您可以向处理器提供一个BiFunction<ConsumerRecord<?, ?>, Exception, BackOff>来根据失败记录和/或异常确定要使用的BackOff:
handler.setBackOffFunction((record, ex) -> { ... });
如果该函数返回null,处理器的默认BackOff将被使用。
从版本 2.6.3 开始,如果在失败之间异常类型发生变化,则将resetStateOnExceptionChange设置为true并将重新启动重试序列(如果配置了选择新的BackOff)。默认情况下,不考虑异常类型。
从版本2.3.1开始,与DefaultErrorHandler类似,DefaultAfterRollbackProcessor将某些异常视为致命错误,在发生这些异常时跳过重试;在首次失败时调用recoverer。
默认情况下被视为致命的异常包括:
-
DeserializationException -
MessageConversionException -
ConversionException -
MethodArgumentResolutionException -
NoSuchMethodException -
ClassCastException
由于这些异常在重试投递时不太可能得到解决。
您可以在非重试类别中添加更多的异常类型,或者完全替换分类异常的映射。
请参阅DefaultAfterRollbackProcessor.setClassifications()的Javadoc,以及spring-retryBinaryExceptionClassifier的相关文档以获取更多信息。
这里是将 IllegalArgumentException 添加到不可重试异常的示例:
@Bean
public DefaultAfterRollbackProcessor errorHandler(BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer) {
DefaultAfterRollbackProcessor processor = new DefaultAfterRollbackProcessor(recoverer);
processor.addNotRetryableException(IllegalArgumentException.class);
return processor;
}
也请参见 Delivery Attempts 头部。
在当前的kafka-clients下,容器无法检测到ProducerFencedException是由再平衡引起的还是由于生产者的transactional.id因超时或到期而被撤销。
因为大多数情况下是由于再平衡引起的,所以容器不会调用AfterRollbackProcessor(因为不再分配给它们,因此不合适的重新定位分区)。
如果您确保超时时间足够长以处理每个事务,并定期执行“空”事务(例如通过ListenerContainerIdleEvent),则可以避免由于超时和到期导致的围栏问题。
或者,您可以将stopContainerWhenFenced容器属性设置为true,这样容器将会停止,从而避免记录丢失。
您可以消费一个ConsumerStoppedEvent并检查Reason属性是否为FENCED来检测此条件。
由于事件还包含对容器的引用,您可以使用此事件重启容器。 |
从版本 2.7 开始,在等待 BackOff 时间间隔期间,错误处理器将循环并短暂休眠,直到达到所需延迟,并同时检查容器是否已停止,以便在收到 stop() 后尽快退出休眠而不是造成延迟。
从版本 2.7 开始,处理器可以配置一个或多个 RetryListener,接收重试和恢复进度的通知。
@FunctionalInterface
public interface RetryListener {
void failedDelivery(ConsumerRecord<?, ?> record, Exception ex, int deliveryAttempt);
default void recovered(ConsumerRecord<?, ?> record, Exception ex) {
}
default void recoveryFailed(ConsumerRecord<?, ?> record, Exception original, Exception failure) {
}
}
查看更多Java文档信息。
配送尝试标题
以下规则仅适用于记录侦听器,不适用于批处理侦听器。
从版本 2.5 开始,当使用实现 DeliveryAttemptAware 的 ErrorHandler 或 AfterRollbackProcessor 时,可以启用向记录添加 KafkaHeaders.DELIVERY_ATTEMPT 头(kafka_deliveryAttempt)的功能。
此头的值是从 1 开始递增的整数。
在接收原始 ConsumerRecord<?, ?> 时,该整数位于 byte[4] 中。
int delivery = ByteBuffer.wrap(record.headers()
.lastHeader(KafkaHeaders.DELIVERY_ATTEMPT).value())
.getInt()
当使用@KafkaListener与DefaultKafkaHeaderMapper或SimpleKafkaHeaderMapper时,可以通过将@Header(KafkaHeaders.DELIVERY_ATTEMPT) int delivery作为参数添加到监听器方法中来获取。
要启用此标题的填充,请将容器属性 deliveryAttemptHeader 设置为 true。 默认情况下它是禁用的,以避免每次查找记录状态并添加标题时产生的(微小)开销。
数字 DefaultErrorHandler 和 DefaultAfterRollbackProcessor 支持此功能。
监听器信息标题
在某些情况下,能够知道监听器正在哪个容器中运行是有用的。
从版本 2.8.4 开始,现在可以在监听器容器上设置 listenerInfo 属性,或者在 info 注解上设置 @KafkaListener 属性。然后,容器将在所有传入消息的 KafkaListener.LISTENER_INFO 头中添加此属性;它可以用作记录拦截器、过滤器等中的条件判断,或在监听器本身中使用。
@KafkaListener(id = "something", topic = "topic", filter = "someFilter",
info = "this is the something listener")
public void listen2(@Payload Thing thing,
@Header(KafkaHeaders.LISTENER_INFO) String listenerInfo) {
...
}
在使用 RecordInterceptor 或 RecordFilterStrategy 实现时,标头作为字节数组存在于消费者记录中,并使用 KafkaListenerAnnotationBeanPostProcessor 的 charSet 属性进行转换。
标头映射器在从消费者记录创建1时也会转换为String,并且永远不会将此标头映射到传出记录上。
对于POJO批处理监听器,从版本2.8.6开始,标头被复制到每个批次成员中,并且在转换后也作为单个String参数可用。
@KafkaListener(id = "list2", topics = "someTopic", containerFactory = "batchFactory",
info = "info for batch")
public void listen(List<Thing> list,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets,
@Header(KafkaHeaders.LISTENER_INFO) String info) {
...
}
如果批处理监听器具有过滤器,并且该过滤器导致批次为空,您需要向参数@Header添加required = false,因为对于空批次而言这些信息不可用。 |
如果您收到List<Message<Thing>>,则信息位于每个Message<?>的KafkaHeaders.LISTENER_INFO标题中。
见批处理监听器获取更多关于消费批处理的信息。
发布死信记录
当记录的最大失败次数达到时,您可以使用记录恢复器来配置DefaultErrorHandler和DefaultAfterRollbackProcessor。 框架提供了DeadLetterPublishingRecoverer,它会将失败的消息发布到另一个主题。 恢复程序需要一个KafkaTemplate<Object, Object>,用于发送记录。 您还可以选择性地用BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition>进行配置,该代码用于解析目标主题和分区。
默认情况下,死信记录会被发送到一个名为 <originalTopic>.DLT 的主题(原始主题名称后缀为 .DLT)以及与原始记录相同的分区。因此,当您使用默认解析器时,死信主题必须至少具有与原始主题一样多的分区。 |
如果返回的 TopicPartition 具有负分区,则该分区在 ProducerRecord 中未被设置,因此由 Kafka 选择分区。
从版本 2.2.4 开始,任何 ListenerExecutionFailedException(例如,在检测到 @KafkaListener 方法中的异常时抛出)都会增强 groupId 属性。
这使得目标解析器可以使用此属性,结合 ConsumerRecord 中的信息来选择死信主题。
以下示例显示了如何绑定自定义目标解析器:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template,
(r, e) -> {
if (e instanceof FooException) {
return new TopicPartition(r.topic() + ".Foo.failures", r.partition());
}
else {
return new TopicPartition(r.topic() + ".other.failures", r.partition());
}
});
CommonErrorHandler errorHandler = new DefaultErrorHandler(recoverer, new FixedBackOff(0L, 2L));
发送到死信主题的记录会附加以下标题:<br/>
-
KafkaHeaders.DLT_EXCEPTION_FQCN: 异常类名(通常为ListenerExecutionFailedException,但也可能是其他值)。 -
KafkaHeaders.DLT_EXCEPTION_CAUSE_FQCN: 异常原因类名(如果存在的话)(自版本 2.8 起)。 -
KafkaHeaders.DLT_EXCEPTION_STACKTRACE: 异常堆栈跟踪。 -
KafkaHeaders.DLT_EXCEPTION_MESSAGE: 异常消息。 -
KafkaHeaders.DLT_KEY_EXCEPTION_FQCN: 异常类名称(仅反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_STACKTRACE: 异常堆栈跟踪(仅键反序列化错误)。 -
KafkaHeaders.DLT_KEY_EXCEPTION_MESSAGE: 异常消息(仅键反序列化错误)。 -
KafkaHeaders.DLT_ORIGINAL_TOPIC: 原始主题。 -
KafkaHeaders.DLT_ORIGINAL_PARTITION: 原始分区。 -
KafkaHeaders.DLT_ORIGINAL_OFFSET: 原始偏移量。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP: 原始时间戳。 -
KafkaHeaders.DLT_ORIGINAL_TIMESTAMP_TYPE: 原始时间戳类型。 -
KafkaHeaders.DLT_ORIGINAL_CONSUMER_GROUP: 消费者组在处理记录时失败(自版本 2.8 起)。
关键异常仅由 DeserializationException 引起,因此不存在 DLT_KEY_EXCEPTION_CAUSE_FQCN。
有两种机制可以添加更多标题。
-
继承恢复器并重写
createProducerRecord()- 调用super.createProducerRecord()并添加更多标题。 -
提供一个
BiFunction来接收消费者记录和异常,并返回一个Headers对象;该对象中的标题将被复制到最终的生产者记录中;另请参阅管理死信记录标题。使用setHeadersFunction()来设置BiFunction。
第二个实现起来更简单,但第一个有更多可用信息,包括已组装的标准标题。
从版本 2.3 开始,与 ErrorHandlingDeserializer 结合使用时,发布者将恢复死信生产记录中的记录 value(),使其回到原始值(即未能反序列化的失败值)。
此前,value() 是 null,用户代码必须从消息头中解码 DeserializationException。
此外,您可以向发布者提供多个 KafkaTemplate;例如,如果您想发布来自 DeserializationException 的 byte[],以及使用不同序列化器对成功反序列化的记录进行处理,则可能需要这样做。
以下是配置使用 String 和 byte[] 序列化器的 KafkaTemplate 的发布者的示例:
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<?, ?> stringTemplate,
KafkaTemplate<?, ?> bytesTemplate) {
Map<Class<?>, KafkaTemplate<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
发布者使用映射键来定位适合于即将发布的value()的模板。LinkedHashMap是推荐使用的,这样键就可以按顺序进行检查。
当发布 null 值时,如果有多个模板,恢复者会查找 Void 类型的模板;如果不存在,则使用第一个来自 values().iterator() 的模板。
从2.7版本开始,当消息发布失败时,您可以使用setFailIfSendResultIsError方法抛出异常。您还可以使用setWaitForSendResultTimeout为验证发送者成功设置超时时间。
| 如果恢复器失败(抛出异常),则失败的记录将包含在查找中。 从版本 2.5.5 开始,如果恢复器失败,默认情况下 对于较早的版本, 要恢复到以前的行为,请将错误处理器的 |
从版本 2.6.3 开始,如果在失败之间异常类型发生变化,则将resetStateOnExceptionChange设置为true并将重新启动重试序列(如果配置了选择新的BackOff)。默认情况下,不考虑异常类型。
从版本2.3开始,恢复器也可以与Kafka Streams一起使用 - 有关更多信息,请参阅有关反序列化异常的恢复。
在标题ErrorHandlingDeserializer.VALUE_DESERIALIZER_EXCEPTION_HEADER和ErrorHandlingDeserializer.KEY_DESERIALIZER_EXCEPTION_HEADER中添加序列化异常(使用Java序列化)。ErrorHandlingDeserializer。默认情况下,这些标题不会保留在发布到死信主题的消息中。从版本2.7开始,如果键和值都失败了反序列化,则原始值都会填充到发送到DLT的记录中。
如果传入的记录相互依赖,但可能以无序方式到达,则重新发布失败的记录到原始主题末尾(重复若干次)可能是有用的,而不是直接将其发送到死信主题。有关示例,请参阅此Stack Overflow问题。
以下错误处理程序配置将完全实现这一点:
@Bean
public ErrorHandler eh(KafkaOperations<String, String> template) {
return new DefaultErrorHandler(new DeadLetterPublishingRecoverer(template,
(rec, ex) -> {
org.apache.kafka.common.header.Header retries = rec.headers().lastHeader("retries");
if (retries == null) {
retries = new RecordHeader("retries", new byte[] { 1 });
rec.headers().add(retries);
}
else {
retries.value()[0]++;
}
return retries.value()[0] > 5
? new TopicPartition("topic.DLT", rec.partition())
: new TopicPartition("topic", rec.partition());
}), new FixedBackOff(0L, 0L));
}
从版本 2.7 开始,恢复程序会检查目标解析器选择的分区是否确实存在。如果该分区不存在,则将 ProducerRecord 的值设置为 null,以便 KafkaProducer 可以选择该分区。您可以通过将 verifyPartition 属性设置为 false 来禁用此检查。
处理死信记录标头
-
appendOriginalHeaders(默认值true) -
stripPreviousExceptionHeaders(默认值为true,自版本 2.8 起)
Apache Kafka 支持具有相同名称的多个标头;要获取最新的值,可以使用 headers.lastHeader(headerName);若要迭代多个标头,请使用 headers.headers(headerName).iterator()。
在反复重新发布失败的记录时,这些标头可能会增长(最终导致由于 RecordTooLargeException 而发布失败);对于异常标头以及特别是堆栈跟踪标头来说尤其如此。
设置两个属性的原因在于,虽然您可能只想保留最后一次异常信息,但您也可能想保留每次记录失败时经过的主题历史。
appendOriginalHeaders 应用于所有名为 ORIGINAL 的标题,而 stripPreviousExceptionHeaders 应用于所有名为 EXCEPTION 的标题。
从版本 2.8.4 开始,您可以控制哪些标准标头将被添加到输出记录中。
请参阅enum HeadersToAdd获取默认情况下(目前)由 10 个标准标头的通用名称(这些不是实际的标头名称,而只是抽象;实际的标头名称由子类可以覆盖的getHeaderNames()方法设置。
要排除标头,请使用excludeHeaders()方法;例如,要禁止在标头中添加异常堆栈跟踪,请使用:
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.excludeHeaders(HeaderNames.HeadersToAdd.EX_STACKTRACE);
此外,您可以通过添加一个 ExceptionHeadersCreator 来完全自定义异常头信息的添加;这也会禁用所有标准的异常头信息。
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(template);
recoverer.setExceptionHeadersCreator((kafkaHeaders, exception, isKey, headerNames) -> {
kafkaHeaders.add(new RecordHeader(..., ...));
});
同样,从版本2.8.4开始,您现在可以通过addHeadersFunction方法提供多个headers函数。 这允许应用额外的功能,即使另一个功能已被注册,例如,在使用非阻塞性重试时。
ExponentialBackOffWithMaxRetries实施
Spring 框架提供了多个BackOff实现。
默认情况下,ExponentialBackOff会无限重试;若要在一定次数的重试后放弃,则需要计算maxElapsedTime。
自版本 2.7.3 起,用于 Apache Kafka 的 Spring 提供了ExponentialBackOffWithMaxRetries,它是一个子类,接收maxRetries属性并自动计算maxElapsedTime,这稍微更方便一些。
@Bean
DefaultErrorHandler handler() {
ExponentialBackOffWithMaxRetries bo = new ExponentialBackOffWithMaxRetries(6);
bo.setInitialInterval(1_000L);
bo.setMultiplier(2.0);
bo.setMaxInterval(10_000L);
return new DefaultErrorHandler(myRecoverer, bo);
}
将在调用恢复程序之前重试1, 2, 4, 8, 10, 10秒。
4.1.22. JAAS 和 Kerberos
从 2.0 版本开始,新增了一个 KafkaJaasLoginModuleInitializer 类,用于协助进行 Kerberos 配置。
您可以将带有所需配置的此类 bean 添加到您的应用程序上下文中。
以下示例配置了此类 bean:
@Bean
public KafkaJaasLoginModuleInitializer jaasConfig() throws IOException {
KafkaJaasLoginModuleInitializer jaasConfig = new KafkaJaasLoginModuleInitializer();
jaasConfig.setControlFlag("REQUIRED");
Map<String, String> options = new HashMap<>();
options.put("useKeyTab", "true");
options.put("storeKey", "true");
options.put("keyTab", "/etc/security/keytabs/kafka_client.keytab");
options.put("principal", "[email protected]");
jaasConfig.setOptions(options);
return jaasConfig;
}
4.2. Apache Kafka Streams 支持
从版本 1.1.4 开始,Spring for Apache Kafka 提供了对 Kafka Streams 的全面支持。
要在 Spring 应用程序中使用它,kafka-streams jar 必须在 classpath 上。
它是 Spring for Apache Kafka 项目的可选依赖,并且不会通过传递依赖进行下载。
4.2.1. 基础
The reference Apache Kafka Streams documentation suggests the following way of using the API:
// Use the builders to define the actual processing topology, e.g. to specify
// from which input topics to read, which stream operations (filter, map, etc.)
// should be called, and so on.
StreamsBuilder builder = ...; // when using the Kafka Streams DSL
// Use the configuration to tell your application where the Kafka cluster is,
// which serializers/deserializers to use by default, to specify security settings,
// and so on.
StreamsConfig config = ...;
KafkaStreams streams = new KafkaStreams(builder, config);
// Start the Kafka Streams instance
streams.start();
// Stop the Kafka Streams instance
streams.close();
我们有两个主要组件:
-
StreamsBuilder: 通过API构建KStream(或KTable)实例。 -
KafkaStreams: 用于管理这些实例的生命周期。
所有由单个 KStream 通过 KafkaStreams 曝光给 StreamsBuilder 的 StreamsBuilder 实例都会在相同的时间启动和停止,即使它们有不同的逻辑。
换句话说,由一个 StreamsBuilder 定义的所有流都与单个生命周期控制绑定。
一旦 KafkaStreams 实例被 streams.close() 关闭,就无法重新启动。
相反,必须创建一个新的 KafkaStreams 实例以重新开始流处理。 |
4.2.2. Spring 管理
为了简化从 Spring 应用程序上下文角度使用 Kafka Streams 并通过容器进行生命周期管理,Spring for Apache Kafka 引入了 StreamsBuilderFactoryBean。 这是一个 AbstractFactoryBean 实现,用于将 StreamsBuilder 单例实例作为 bean 暴露出来。 以下示例创建了这样一个 bean:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
从 2.2 版本开始,stream 配置现在以一个 KafkaStreamsConfiguration 对象提供,而不是一个 StreamsConfig 对象。 |
The StreamsBuilderFactoryBean 也实现了 SmartLifecycle 以管理内部 KafkaStreams 实例的生命周期。
类似 Kafka Streams API,你必须在启动 KafkaStreams 之前定义 KStream 实例。
同样适用于 Spring API for Kafka Streams。
因此,当你在 StreamsBuilderFactoryBean 上使用默认的 autoStartup = true 时,你必须在应用程序上下文刷新之前在 StreamsBuilder 上声明 KStream 实例。
例如,KStream 可以是一个常规的 bean 定义,而 Kafka Streams API 的使用不受影响。
以下示例展示了如何实现:
@Bean
public KStream<?, ?> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream(STREAMING_TOPIC1);
// Fluent KStream API
return stream;
}
如果您想手动控制生命周期(例如,根据某些条件停止和启动),您可以直接通过工厂 bean (&) 引用 StreamsBuilderFactoryBean bean,前缀。 由于 StreamsBuilderFactoryBean 使用其内部的 KafkaStreams 实例,因此可以安全地停止并重新启动它。 每次 start() 都会创建一个新的 KafkaStreams。 如果您想分别控制 KStream 实例的生命周期,您还可以考虑使用不同的 StreamsBuilderFactoryBean 实例。
您还可以在 StreamsBuilderFactoryBean 上指定 KafkaStreams.StateListener、Thread.UncaughtExceptionHandler 和 StateRestoreListener 选项,这些选项会被委托给内部的 KafkaStreams 实例。
此外,从 版本 2.1.5 开始,您还可以通过使用 KafkaStreamsCustomizer 回调接口来配置内部的 KafkaStreams 实例,而无需间接设置这些选项。注意,KafkaStreamsCustomizer 将覆盖由 StreamsBuilderFactoryBean 提供的选项。
如果您需要直接执行某些 KafkaStreams 操作,可以通过使用 StreamsBuilderFactoryBean.getKafkaStreams() 访问内部的 KafkaStreams 实例。
您可以按类型注入 StreamsBuilderFactoryBean bean,但请确保在 bean 定义中使用完整的类型,如下所示:
@Bean
public StreamsBuilderFactoryBean myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
或者,如果你使用接口类型的 bean 定义,可以添加 @Qualifier 以通过名称进行注入。
以下示例展示了如何操作:
@Bean
public FactoryBean<StreamsBuilder> myKStreamBuilder(KafkaStreamsConfiguration streamsConfig) {
return new StreamsBuilderFactoryBean(streamsConfig);
}
...
@Autowired
@Qualifier("&myKStreamBuilder")
private StreamsBuilderFactoryBean myKStreamBuilderFactoryBean;
从版本 2.4.1 开始,工厂 bean 新增了一个类型为 KafkaStreamsInfrastructureCustomizer 的 infrastructureCustomizer 属性;这允许自定义 StreamsBuilder(例如添加一个状态存储)和/或在创建流之前自定义 Topology。
public interface KafkaStreamsInfrastructureCustomizer {
void configureBuilder(StreamsBuilder builder);
void configureTopology(Topology topology);
}
默认提供无操作实现,以避免在不需要某方法时同时实现两个方法。
一个 CompositeKafkaStreamsInfrastructureCustomizer 提供,用于在需要应用多个自定义器时使用。
4.2.3. KafkaStreams Micrometer 支持
从2.5.3版本开始,你可以配置一个KafkaStreamsMicrometerListener,以自动为工厂bean管理的KafkaStreams对象注册micrometer meters:
streamsBuilderFactoryBean.addListener(new KafkaStreamsMicrometerListener(meterRegistry,
Collections.singletonList(new ImmutableTag("customTag", "customTagValue"))));
4.2.4. Streams JSON 序列化和反序列化
对于在读取或写入主题或状态存储时以 JSON 格式序列化和反序列化数据,Spring for Apache Kafka 提供了一个 JsonSerde 实现,该实现使用 JSON,并委托给 序列化、反序列化和消息转换 中描述的 JsonSerializer 和 JsonDeserializer。 JsonSerde 实现通过其构造函数(目标类型或 ObjectMapper)提供了相同的配置选项。 在以下示例中,我们使用 JsonSerde 来序列化和反序列化 Kafka 流的 Cat 负载(JsonSerde 可以在需要实例的任何地方以类似方式使用):
stream.through(Serdes.Integer(), new JsonSerde<>(Cat.class), "cats");
在使用2.3版本之后,当程序化地构建用于生产者/消费者的序列化器/反序列化器时,您可以使用流畅API(fluent API),这简化了配置。
stream.through(new JsonSerde<>(MyKeyType.class)
.forKeys()
.noTypeInfo(),
new JsonSerde<>(MyValueType.class)
.noTypeInfo(),
"myTypes");
4.2.5. 使用 KafkaStreamBrancher
The KafkaStreamBrancher类引入了一种在 KStream 之上构建更方便的条件分支的方式。
考虑以下不使用 KafkaStreamBrancher 的示例:
KStream<String, String>[] branches = builder.stream("source").branch(
(key, value) -> value.contains("A"),
(key, value) -> value.contains("B"),
(key, value) -> true
);
branches[0].to("A");
branches[1].to("B");
branches[2].to("C");
以下示例使用 KafkaStreamBrancher:
new KafkaStreamBrancher<String, String>()
.branch((key, value) -> value.contains("A"), ks -> ks.to("A"))
.branch((key, value) -> value.contains("B"), ks -> ks.to("B"))
//default branch should not necessarily be defined in the end of the chain!
.defaultBranch(ks -> ks.to("C"))
.onTopOf(builder.stream("source"));
//onTopOf method returns the provided stream so we can continue with method chaining
4.2.6. 配置
要配置Kafka Streams环境,需要一个StreamsBuilderFactoryBean实例。
查看Apache Kafka的文档以了解所有可能的选项。
从 2.2 版本开始,stream 配置现在以一个 KafkaStreamsConfiguration 对象提供,而不是以一个 StreamsConfig 提供。 |
为了避免在大多数情况下编写样板代码,特别是在开发微服务时,Spring for Apache Kafka 提供了 @EnableKafkaStreams 注解,你应该将其放在 @Configuration 类上。 你所需要做的就是声明一个名为 defaultKafkaStreamsConfig 的 KafkaStreamsConfiguration bean。 一个名为 defaultKafkaStreamsBuilder 的 StreamsBuilderFactoryBean bean 会自动在应用上下文中声明。 你也可以声明并使用任何额外的 StreamsBuilderFactoryBean bean。 你可以通过提供一个实现 StreamsBuilderFactoryBeanConfigurer 的 bean 来对该 bean 进行额外的自定义。 如果有多个这样的 bean,它们将根据其 Ordered.order 属性被应用。
默认情况下,当工厂bean被停止时,KafkaStreams.cleanUp()方法会被调用。
从2.1.2版本开始,工厂bean新增了带有一个CleanupConfig对象的构造函数,该对象具有属性,可让您在start()或stop()时调用cleanUp()方法,或两者皆不调用。
从2.7版本开始,默认情况下从不清理本地状态。
4.2.7. 标头增强器
版本 2.3 增加了 HeaderEnricher 的 Transformer 实现。 这可以用于在流处理中添加头部信息;头部值是 SpEL 表达式;表达式求值的根对象具有 3 个属性:
-
context- theProcessorContext, allowing access to the current record metadata -
key- 当前记录的键 -
value- 当前记录的值
这些表达式必须返回一个 byte[] 或一个 String(这将使用 UTF-8 转换为 byte[])。
使用 enricher 于流中:
.transform(() -> enricher)
转换器不会更改 key 或 value;它只是添加了头部信息。
| 如果您的流是多线程的,则需要为每条记录创建一个新实例。 |
.transform(() -> new HeaderEnricher<..., ...>(expressionMap))
这里是简单的示例,添加一个字面量标题和一个变量:
Map<String, Expression> headers = new HashMap<>();
headers.put("header1", new LiteralExpression("value1"));
SpelExpressionParser parser = new SpelExpressionParser();
headers.put("header2", parser.parseExpression("context.timestamp() + ' @' + context.offset()"));
HeaderEnricher<String, String> enricher = new HeaderEnricher<>(headers);
KStream<String, String> stream = builder.stream(INPUT);
stream
.transform(() -> enricher)
.to(OUTPUT);
4.2.8. MessagingTransformer
版本 2.3 增加了 MessagingTransformer,这使得 Kafka Streams 拓扑能够与 Spring Messaging 组件(例如 Spring Integration 流程)进行交互。 该转换器需要一个 MessagingFunction 的实现。
@FunctionalInterface
public interface MessagingFunction {
Message<?> exchange(Message<?> message);
}
Spring Integration 自动提供了一个实现,使用其 GatewayProxyFactoryBean。它还需要一个 MessagingMessageConverter 来将键、值和元数据(包括标头)转换为 Spring Messaging 的 Message<?>。更多详细信息,请参见 [从 KStream 调用 Spring Integration 流程].
4.2.9. 从反序列化异常中恢复
版本 2.3 引入了 RecoveringDeserializationExceptionHandler,它可以在反序列化异常发生时执行某些操作。 有关 DeserializationExceptionHandler 的信息,请参阅 Kafka 文档,其中 RecoveringDeserializationExceptionHandler 是一个实现。 RecoveringDeserializationExceptionHandler 配置了一个 ConsumerRecordRecoverer 实现。 框架提供了 DeadLetterPublishingRecoverer,它可以将失败的记录发送到死信主题。 更多关于此恢复器的信息,请参见 发布死信记录。
配置恢复器,请在流配置中添加以下属性:
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
...
props.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG,
RecoveringDeserializationExceptionHandler.class);
props.put(RecoveringDeserializationExceptionHandler.KSTREAM_DESERIALIZATION_RECOVERER, recoverer());
...
return new KafkaStreamsConfiguration(props);
}
@Bean
public DeadLetterPublishingRecoverer recoverer() {
return new DeadLetterPublishingRecoverer(kafkaTemplate(),
(record, ex) -> new TopicPartition("recovererDLQ", -1));
}
当然,recoverer() 这个 bean 可以是 ConsumerRecordRecoverer 的自己实现。
4.2.10. Kafka Streams 示例
The following example combines all the topics we have covered in this chapter:
@Configuration
@EnableKafka
@EnableKafkaStreams
public static class KafkaStreamsConfig {
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
return new KafkaStreamsConfiguration(props);
}
@Bean
public StreamsBuilderFactoryBeanConfigurer configurer() {
return fb -> fb.setStateListener((newState, oldState) -> {
System.out.println("State transition from " + oldState + " to " + newState);
});
}
@Bean
public KStream<Integer, String> kStream(StreamsBuilder kStreamBuilder) {
KStream<Integer, String> stream = kStreamBuilder.stream("streamingTopic1");
stream
.mapValues((ValueMapper<String, String>) String::toUpperCase)
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMillis(1000)))
.reduce((String value1, String value2) -> value1 + value2,
Named.as("windowStore"))
.toStream()
.map((windowedId, value) -> new KeyValue<>(windowedId.key(), value))
.filter((i, s) -> s.length() > 40)
.to("streamingTopic2");
stream.print(Printed.toSysOut());
return stream;
}
}
4.3. 测试应用程序
The spring-kafka-test jar 包含一些有用的功能,用于帮助您测试您的应用程序。
4.3.1. KafkaTestUtils
o.s.kafka.test.utils.KafkaTestUtils 提供了许多静态辅助方法来消费记录、检索各种记录偏移量等。
参见其 Javadocs 以获取完整详情。
4.3.2. JUnit
o.s.kafka.test.utils.KafkaTestUtils 也提供了一些静态方法来设置生产者和消费者的属性。
以下列表显示了这些方法的签名:
/**
* Set up test properties for an {@code <Integer, String>} consumer.
* @param group the group id.
* @param autoCommit the auto commit.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> consumerProps(String group, String autoCommit,
EmbeddedKafkaBroker embeddedKafka) { ... }
/**
* Set up test properties for an {@code <Integer, String>} producer.
* @param embeddedKafka a {@link EmbeddedKafkaBroker} instance.
* @return the properties.
*/
public static Map<String, Object> producerProps(EmbeddedKafkaBroker embeddedKafka) { ... }
|
从 2.5 版本开始, 当使用嵌入式代理时,通常的最佳实践是为每个测试使用不同的主题,以防止交叉干扰。 如果由于某种原因无法实现这一点,请注意 |
JUnit 4 的 @Rule 提供了一个用于创建嵌入式 Kafka 和嵌入式 Zookeeper 服务器的包装器。
(参见 @EmbeddedKafka 注解,了解如何使用 @EmbeddedKafka 配合 JUnit 5)。
以下列表显示了这些方法的签名:
/**
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param topics the topics to create (2 partitions per).
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, String... topics) { ... }
/**
*
* Create embedded Kafka brokers.
* @param count the number of brokers.
* @param controlledShutdown passed into TestUtils.createBrokerConfig.
* @param partitions partitions per topic.
* @param topics the topics to create.
*/
public EmbeddedKafkaRule(int count, boolean controlledShutdown, int partitions, String... topics) { ... }
The EmbeddedKafkaBroker 类有一个实用方法,可以消费它所创建的所有主题。
以下示例展示了如何使用它:
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(
consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
的 KafkaTestUtils 包含一些用于从消费者获取结果的实用方法。
以下列表显示了这些方法签名:
/**
* Poll the consumer, expecting a single record for the specified topic.
* @param consumer the consumer.
* @param topic the topic.
* @return the record.
* @throws org.junit.ComparisonFailure if exactly one record is not received.
*/
public static <K, V> ConsumerRecord<K, V> getSingleRecord(Consumer<K, V> consumer, String topic) { ... }
/**
* Poll the consumer for records.
* @param consumer the consumer.
* @return the records.
*/
public static <K, V> ConsumerRecords<K, V> getRecords(Consumer<K, V> consumer) { ... }
下面的例子展示了如何使用KafkaTestUtils:
...
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = KafkaTestUtils.getSingleRecord(consumer, "topic");
...
当通过EmbeddedKafkaBroker启动嵌入式 Kafka 和嵌入式 Zookeeper 服务器时,会设置一个系统属性 spring.embedded.kafka.brokers,该属性的值为 Kafka 协调器的地址,并设置另一个系统属性 spring.embedded.zookeeper.connect,其值为 Zookeeper 的地址。
提供了一些方便的常量 (EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS 和 EmbeddedKafkaBroker.SPRING_EMBEDDED_ZOOKEEPER_CONNECT) 用于这些属性。
使用 EmbeddedKafkaBroker.brokerProperties(Map<String, String>),您可以为 Kafka 服务器提供额外的属性。
见 Kafka 配置 以了解有关可能的 broker 属性的更多信息。
4.3.3. 配置主题
以下示例配置创建了名为cat和hat的主题,每个有五个分区,名为thing1的主题有10个分区,以及名为thing2的主题有15个分区:
public class MyTests {
@ClassRule
private static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, false, 5, "cat", "hat");
@Test
public void test() {
embeddedKafkaRule.getEmbeddedKafka()
.addTopics(new NewTopic("thing1", 10, (short) 1), new NewTopic("thing2", 15, (short) 1));
...
}
}
默认情况下,当出现问题(例如添加已存在的主题)时,addTopics 会抛出异常。
版本 2.6 增加了该方法的新版本,返回值为 Map<String, Exception>;键是主题名称,值为 null 表示成功,或 Exception 表示失败。
4.3.4. 在多个测试类中使用相同的代理
没有内置的支持来实现这一点,但您可以使用相同的代理来支持多个测试类,类似于以下内容:
public final class EmbeddedKafkaHolder {
private static EmbeddedKafkaBroker embeddedKafka = new EmbeddedKafkaBroker(1, false)
.brokerListProperty("spring.kafka.bootstrap-servers");
private static boolean started;
public static EmbeddedKafkaBroker getEmbeddedKafka() {
if (!started) {
try {
embeddedKafka.afterPropertiesSet();
}
catch (Exception e) {
throw new KafkaException("Embedded broker failed to start", e);
}
started = true;
}
return embeddedKafka;
}
private EmbeddedKafkaHolder() {
super();
}
}
这假设使用的是 Spring Boot 环境,并且嵌入式的消息中间件替换了 bootstrap.servers 属性。
然后,在每个测试类中,您可以使用类似以下内容:
static {
EmbeddedKafkaHolder.getEmbeddedKafka().addTopics("topic1", "topic2");
}
private static final EmbeddedKafkaBroker broker = EmbeddedKafkaHolder.getEmbeddedKafka();
如果未使用 Spring Boot,可以通过 broker.getBrokersAsString() 获取 bootstrap servers。
| 上面的例子在所有测试完成后没有提供关闭经纪人(broker)的机制。 这可能在所有测试完成后关闭经纪人(broker)时出现问题。 例如,如果你在使用 Gradle daemon 运行测试,这就会成为一个问题。 你不应该在这种情况下使用这种技术,或者你应该使用某种方式在测试完成后调用对 1 的 0 方法。 |
4.3.5. @EmbeddedKafka 注解
我们通常建议您将规则设置为@ClassRule以避免在测试之间启动和停止代理(并为每个测试使用不同的主题)。
自2.0版本起,如果您使用Spring的测试应用程序上下文缓存功能,还可以声明一个EmbeddedKafkaBroker bean,因此单个代理可以在多个测试类之间共享。
为了方便起见,我们提供了一个名为@EmbeddedKafka的测试类级别的注解来注册EmbeddedKafkaBroker bean。
以下示例展示了如何使用它:
@RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1,
topics = {
KafkaStreamsTests.STREAMING_TOPIC1,
KafkaStreamsTests.STREAMING_TOPIC2 })
public class KafkaStreamsTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Test
public void someTest() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", this.embeddedKafka);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
Consumer<Integer, String> consumer = cf.createConsumer();
this.embeddedKafka.consumeFromAnEmbeddedTopic(consumer, KafkaStreamsTests.STREAMING_TOPIC2);
ConsumerRecords<Integer, String> replies = KafkaTestUtils.getRecords(consumer);
assertThat(replies.count()).isGreaterThanOrEqualTo(1);
}
@Configuration
@EnableKafkaStreams
public static class KafkaStreamsConfiguration {
@Value("${" + EmbeddedKafkaBroker.SPRING_EMBEDDED_KAFKA_BROKERS + "}")
private String brokerAddresses;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "testStreams");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, this.brokerAddresses);
return new KafkaStreamsConfiguration(props);
}
}
}
从 2.2.4 版本开始,你也可以使用 @EmbeddedKafka 注解来指定 Kafka 端口属性。
以下示例设置了对 @EmbeddedKafka 的属性占位解析的 topics、brokerProperties 和 brokerPropertiesLocation 属性:
@TestPropertySource(locations = "classpath:/test.properties")
@EmbeddedKafka(topics = { "any-topic", "${kafka.topics.another-topic}" },
brokerProperties = { "log.dir=${kafka.broker.logs-dir}",
"listeners=PLAINTEXT://localhost:${kafka.broker.port}",
"auto.create.topics.enable=${kafka.broker.topics-enable:true}" },
brokerPropertiesLocation = "classpath:/broker.properties")
在前面的例子中,属性占位符${kafka.topics.another-topic}、${kafka.broker.logs-dir}和${kafka.broker.port}从Spring Environment中解析。
此外,经纪人属性从由brokerPropertiesLocation指定的类路径资源broker.properties加载。
属性占位符对于brokerPropertiesLocationURL以及在该资源中发现的任何属性占位符进行解析。
由brokerProperties定义的属性会覆盖在brokerPropertiesLocation中找到的属性。
您可以使用 @EmbeddedKafka 注解与 JUnit 4 或 JUnit 5。
4.3.6. 使用 JUnit5 的 @EmbeddedKafka 注解
从2.3版本开始,使用@EmbeddedKafka注解与JUnit5结合有两条途径。
当与@SpringJunitConfig注解一起使用时,内嵌的代理会被添加到测试应用上下文中。
您可以在类级别或方法级别自动注入代理以获取代理地址列表。
当不使用 Spring 测试上下文时,EmbdeddedKafkaCondition 会创建一个代理;该条件包含一个参数解析器,因此您可以在测试方法中访问该代理…
@EmbeddedKafka
public class EmbeddedKafkaConditionTests {
@Test
public void test(EmbeddedKafkaBroker broker) {
String brokerList = broker.getBrokersAsString();
...
}
}
如果使用 @EmbeddedBroker 注解的类没有同时使用 ExtendedWith(SpringExtension.class) 注解(或元注解),将创建一个独立的(非 Spring 测试上下文)代理。 @SpringJunitConfig 和 @SpringBootTest 是元注解,当存在这些注解中的任何一个时,将使用基于上下文的代理。
| 当可用Spring测试应用上下文存在时,主题和经纪人属性可以包含属性占位符,只要某个属性在某处被定义,这些占位符将被解析。 如果不存在Spring上下文,这些占位符将不会被解析。 |
4.3.7. 嵌入式代理在 @SpringBootTest注解
Spring Initializr 现在会自动将 spring-kafka-test 依赖添加到项目配置的测试作用域中。
|
如果您的应用程序使用在
|
有几种方法可以在 Spring Boot 应用程序测试中使用嵌入式消息中间件。
它们包括:
JUnit4 类规则
The following example shows how to use a JUnit4 class rule to create an embedded broker:
@RunWith(SpringRunner.class)
@SpringBootTest
public class MyApplicationTests {
@ClassRule
public static EmbeddedKafkaRule broker = new EmbeddedKafkaRule(1,
false, "someTopic")
.brokerListProperty("spring.kafka.bootstrap-servers");
}
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
请注意,由于这是一个 Spring Boot 应用程序,我们重写了 broker list 属性以设置 Boot 的属性。
@EmbeddedKafka注解或EmbeddedKafkaBrokerBean
以下示例展示了如何使用一个 @EmbeddedKafka 注解来创建一个内嵌的 broker:
@RunWith(SpringRunner.class)
@EmbeddedKafka(topics = "someTopic",
bootstrapServersProperty = "spring.kafka.bootstrap-servers")
public class MyApplicationTests {
@Autowired
private KafkaTemplate<String, String> template;
@Test
public void test() {
...
}
}
4.3.8. Hamcrest 匹配器
The o.s.kafka.test.hamcrest.KafkaMatchers 提供以下匹配器:
/**
* @param key the key
* @param <K> the type.
* @return a Matcher that matches the key in a consumer record.
*/
public static <K> Matcher<ConsumerRecord<K, ?>> hasKey(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Matcher that matches the value in a consumer record.
*/
public static <V> Matcher<ConsumerRecord<?, V>> hasValue(V value) { ... }
/**
* @param partition the partition.
* @return a Matcher that matches the partition in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasPartition(int partition) { ... }
/**
* Matcher testing the timestamp of a {@link ConsumerRecord} assuming the topic has been set with
* {@link org.apache.kafka.common.record.TimestampType#CREATE_TIME CreateTime}.
*
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(long ts) {
return hasTimestamp(TimestampType.CREATE_TIME, ts);
}
/**
* Matcher testing the timestamp of a {@link ConsumerRecord}
* @param type timestamp type of the record
* @param ts timestamp of the consumer record.
* @return a Matcher that matches the timestamp in a consumer record.
*/
public static Matcher<ConsumerRecord<?, ?>> hasTimestamp(TimestampType type, long ts) {
return new ConsumerRecordTimestampMatcher(type, ts);
}
4.3.9. AssertJ 条件
您可以使用以下 AssertJ 条件:
/**
* @param key the key
* @param <K> the type.
* @return a Condition that matches the key in a consumer record.
*/
public static <K> Condition<ConsumerRecord<K, ?>> key(K key) { ... }
/**
* @param value the value.
* @param <V> the type.
* @return a Condition that matches the value in a consumer record.
*/
public static <V> Condition<ConsumerRecord<?, V>> value(V value) { ... }
/**
* @param key the key.
* @param value the value.
* @param <K> the key type.
* @param <V> the value type.
* @return a Condition that matches the key in a consumer record.
* @since 2.2.12
*/
public static <K, V> Condition<ConsumerRecord<K, V>> keyValue(K key, V value) { ... }
/**
* @param partition the partition.
* @return a Condition that matches the partition in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> partition(int partition) { ... }
/**
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(long value) {
return new ConsumerRecordTimestampCondition(TimestampType.CREATE_TIME, value);
}
/**
* @param type the type of timestamp
* @param value the timestamp.
* @return a Condition that matches the timestamp value in a consumer record.
*/
public static Condition<ConsumerRecord<?, ?>> timestamp(TimestampType type, long value) {
return new ConsumerRecordTimestampCondition(type, value);
}
4.3.10. 示例
以下示例将本章中讨论的大部分主题结合起来:
public class KafkaTemplateTests {
private static final String TEMPLATE_TOPIC = "templateTopic";
@ClassRule
public static EmbeddedKafkaRule embeddedKafka = new EmbeddedKafkaRule(1, true, TEMPLATE_TOPIC);
@Test
public void testTemplate() throws Exception {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testT", "false",
embeddedKafka.getEmbeddedKafka());
DefaultKafkaConsumerFactory<Integer, String> cf =
new DefaultKafkaConsumerFactory<Integer, String>(consumerProps);
ContainerProperties containerProperties = new ContainerProperties(TEMPLATE_TOPIC);
KafkaMessageListenerContainer<Integer, String> container =
new KafkaMessageListenerContainer<>(cf, containerProperties);
final BlockingQueue<ConsumerRecord<Integer, String>> records = new LinkedBlockingQueue<>();
container.setupMessageListener(new MessageListener<Integer, String>() {
@Override
public void onMessage(ConsumerRecord<Integer, String> record) {
System.out.println(record);
records.add(record);
}
});
container.setBeanName("templateTests");
container.start();
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
Map<String, Object> producerProps =
KafkaTestUtils.producerProps(embeddedKafka.getEmbeddedKafka());
ProducerFactory<Integer, String> pf =
new DefaultKafkaProducerFactory<Integer, String>(producerProps);
KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
template.setDefaultTopic(TEMPLATE_TOPIC);
template.sendDefault("foo");
assertThat(records.poll(10, TimeUnit.SECONDS), hasValue("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("bar"));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
assertThat(received, hasKey(2));
assertThat(received, hasPartition(0));
assertThat(received, hasValue("baz"));
}
}
前面的例子使用了Hamcrest匹配器。
With AssertJ,最终部分看起来如下代码:
assertThat(records.poll(10, TimeUnit.SECONDS)).has(value("foo"));
template.sendDefault(0, 2, "bar");
ConsumerRecord<Integer, String> received = records.poll(10, TimeUnit.SECONDS);
// using individual assertions
assertThat(received).has(key(2));
assertThat(received).has(value("bar"));
assertThat(received).has(partition(0));
template.send(TEMPLATE_TOPIC, 0, 2, "baz");
received = records.poll(10, TimeUnit.SECONDS);
// using allOf()
assertThat(received).has(allOf(keyValue(2, "baz"), partition(0)));
4.4. 非阻塞重试
| <p>这是一个实验性功能,通常不破坏API更改规则并不适用于此功能,直到实验性标记被移除。</p> <p>鼓励用户尝试此功能并提供反馈,通过GitHub Issues或GitHub讨论。</p> <p>这仅针对API;该功能被认为已完成且稳定。</p> |
实现Kafka的非阻塞重试/死信功能通常需要设置额外的主题,并创建和配置相应的监听器。
自2.7版本起,Spring for Apache Kafka通过@RetryableTopic注解和RetryTopicConfiguration类简化了这一启动过程。
4.4.1. 模式的工作原理
If 消息处理失败,消息将转发到重试主题,并附带一个退避时间戳。 重试主题的消费者会检查该时间戳,如果不是时候,则暂停该主题分区的消息消费。 当时间已到时,分区消费会恢复,消息会被再次消费。 如果消息处理再次失败,消息将被转发到下一个重试主题,该模式会重复,直到处理成功,或尝试次数耗尽,此时如果已配置,消息将发送到死信主题。
为了说明,如果你有一个"main-topic"主题,并希望设置具有1000ms指数退避、乘数为2且最多尝试4次的非阻塞重试,它将创建main-topic-retry-1000、main-topic-retry-2000、main-topic-retry-4000和main-topic-dlt主题,并配置相应的消费者。 该框架还会负责创建主题并设置和配置监听器。
| 通过使用这种策略,你会失去该主题的Kafka顺序保证。 |
您可以设置您偏好的 AckMode 模式,但 RECORD 被建议。 |
此时,此功能不支持类级别的 @KafkaListener 注解 |
4.4.2. 回退延迟精度
概述与保证
所有消息处理和回退都由消费者线程处理,因此在尽力而为的基础上保证延迟精度。 如果一条消息的处理时间比该消费者下下条消息的回退间隔长,那么下条消息的延迟将高于预期。 此外,对于短延迟(约1秒或以下),线程需要进行的维护工作(如提交偏移量),可能会延迟消息处理的执行。 如果重试主题的消费者正在处理多个分区,延迟精度也会受到影响,因为我们依赖于从轮询中唤醒消费者并拥有完整的pollTimeouts来进行时间调整。
也就是说,对于处理单个分区的消费者来说,在大多数情况下,消息的处理应该在其确切的到期时间后的 100 毫秒内完成。
| 可以保证消息绝不会在到期时间之前被处理。 |
调整延迟精度
消息的处理延迟精度依赖于两个 ContainerProperties: ContainerProperties.pollTimeout 和 ContainerProperties.idlePartitionEventInterval。 这两个属性将自动设置为重试主题和dlt的 ListenerContainerFactory,为其最小延迟值的四分之一,最小值为250毫秒,最大值为5000毫秒。 只有当属性具有其默认值时才会设置这些值——如果您自己更改了任一值,则您的更改不会被覆盖。 通过这种方式,您可以根据需要调整重试主题的精度和性能。
您可以为主主题和重试主题分别设置 ListenerContainerFactory 个实例——通过这种方式,您可以根据需求使用不同的配置,例如为主主题设置较高的轮询超时时间,而为重试主题设置较低的轮询超时时间。 |
4.4.3. 配置
使用@RetryableTopic注解
为了配置一个标注为@KafkaListener的方法的重试主题和死信主题,你只需要在该方法上添加@RetryableTopic注解,Spring for Apache Kafka 将会使用默认配置自动创建所有必要的主题和消费者。
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
您可以使用同一类中的方法来处理dlt消息,通过使用@DltHandler注解进行标注。
如果未提供DltHandler方法,则会创建一个默认的消费者,仅记录消费。
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
如果未指定kafkaTemplate的名称,将查找名为retryTopicDefaultKafkaTemplate的bean。
如果未找到该bean将抛出异常。 |
使用RetryTopicConfigurationBean
您也可以通过在一个@Configuration注解类中创建RetryTopicConfiguration个beans来配置非阻塞重试支持。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
这将为使用默认配置且在标注了 '@KafkaListener' 注解的方法中的所有主题创建重试主题、死信队列 (dlt) 以及相应的消费者。消息转发需要 KafkaTemplate 实例。
为更精细地控制每个主题的非阻塞重试处理,可以提供多个 RetryTopicConfiguration bean。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(5)
.includeTopics("my-topic", "my-other-topic")
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1000, 2, 5000)
.maxAttempts(4)
.excludeTopics("my-topic", "my-other-topic")
.retryOn(MyException.class)
.create(template);
}
重试主题和dlt的消费者将被分配到一个消费者组中,该组ID是您在 |
如果消费者配置了 ErrorHandlingDeserializer 来处理反序列化异常,则务必要为 KafkaTemplate 及其生产者配置一个能够处理普通对象以及由反序列化异常导致的原始 byte[] 值的序列化器。 模板的通用值类型应为 Object。 一种技术是使用 DelegatingByTypeSerializer;以下是一个示例: |
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
多个 @KafkaListener 注解可以用于同一主题,可与手动分区分配和非阻塞重试同时使用,但针对同一主题只会应用一个配置。
最好为此类主题使用一个单独的 RetryTopicConfiguration bean 进行配置;如果在同一主题上使用多个 @RetryableTopic 注解,所有注解的值必须相同,否则其中一个配置将应用于该主题所有监听器,其他注解的值会被忽略。 |
4.4.4. 特性
大多数功能对于 @RetryableTopic 注解和 RetryTopicConfiguration 的 beans 都是可用的。
退避策略配置
The BackOff 配置依赖于来自 Spring Retry 项目的 BackOffPolicy 接口。
它包含:
-
固定退避
-
指数退避
-
随机指数退避
-
均匀随机退避
-
没有退避
-
自定义退避
@RetryableTopic(attempts = 5,
backoff = @Backoff(delay = 1000, multiplier = 2, maxDelay = 5000))
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(4)
.build();
}
你也可以提供 Spring Retry 的 SleepingBackOffPolicy 接口的自定义实现:
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.customBackOff(new MyCustomBackOffPolicy())
.maxAttempts(5)
.build();
}
默认退避策略为 FixedBackOffPolicy,最多尝试3次,间隔1000ms。 |
对于ExponentialBackOffPolicy,默认最大延迟为30秒。 如果您的退避策略需要更大的延迟值,请相应地调整maxDelay属性。 |
第一次尝试计入 maxAttempts,所以如果你提供一个 maxAttempts 值为 4,那么将包括原来的 1 次尝试加 3 次重试。 |
单主题固定延迟重试
如果你使用固定的延迟策略,如FixedBackOffPolicy或NoBackOffPolicy,你可以使用一个单一的主题来实现非阻塞重试。
该主题将加上提供的或默认后缀,且不会附加索引或延迟值。
@RetryableTopic(backoff = @Backoff(2000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.build();
}
| 默认行为是为每次尝试创建单独的重试主题,并在其后附加它们的索引值:retry-0, retry-1, …… |
全局超时
你可以为重试过程设置全局超时。 如果达到该时间,当消费者在下一次抛出异常时,消息会直接进入 DLT,或者在没有 DLT 可用时结束处理。
@RetryableTopic(backoff = @Backoff(2000), timeout = 5000)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(2000)
.timeoutAfter(5000)
.build();
}
| 默认情况下是没有设置超时时间的,这也可以通过提供-1作为超时值来实现。 |
异常分类器
你可以指定希望重试的异常以及不希望重试的异常。 你还可以设置遍历原因以查找嵌套异常。
@RetryableTopic(include = {MyRetryException.class, MyOtherRetryException.class}, traversingCauses = true)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
throw new RuntimeException(new MyRetryException()); // Will retry
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.notRetryOn(MyDontRetryException.class)
.create(template);
}
| 默认行为是重试所有异常,并不遍历原因。 |
从2.8.3版本开始,有一个全局的致命异常列表,这些异常会导致记录被直接发送到DLT,而不会进行任何重试。 详见DefaultErrorHandler中的默认致命异常列表。 您可以使用以下方式向此列表添加或从中删除异常:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicationContext,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
ddtr.addNotRetryableExceptions(MyFatalException.class);
ddtr.removeNotRetryableException(ConversionException.class);
return ddtr;
}
要禁用致命异常的分类,请使用 DefaultDestinationTopicResolver 中的 setClassifications 方法清除默认列表。 |
包含和排除主题
你可以通过 .includeTopic(String topic),.include topics(Collection<String> topics) ,.excludeTopic(String topic) 和 .exclude topics(Collection<String> topics) 方法决定哪些主题将由一个 RetryTopicConfiguration bean 处理。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.includeTopics(List.of("my-included-topic", "my-other-included-topic"))
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.excludeTopic("my-excluded-topic")
.create(template);
}
| 默认行为是包含所有主题。 |
话题自动创建
除非另有说明,框架将自动为由KafkaAdmin个bean消费的NewTopic个bean创建所需的主题。 您可以指定主题将被创建的分区数和复制因子,并可以关闭此功能。
| 注意:如果你没有使用 Spring Boot,你将需要提供一个 KafkaAdmin 组件 才能使用该功能。 |
@RetryableTopic(numPartitions = 2, replicationFactor = 3)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@RetryableTopic(autoCreateTopics = false)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.autoCreateTopicsWith(2, 3)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotAutoCreateRetryTopics()
.create(template);
}
| <p>默认情况下,主题会自动生成一个分区,并且复制因子为一。</p> |
失败标题管理
在考虑如何管理失败头(原始头和异常头)时,框架会将决定是否追加或替换头的职责委托给DeadLetterPublishingRecover。
默认情况下,它显式地将 appendOriginalHeaders 设置为 false,并将 stripPreviousExceptionHeaders 保留为由 DeadLetterPublishingRecover 使用的默认值。
这表示默认配置下,只会保留第一个“原始”和最后一个异常头。 这是为了避免在涉及许多重试步骤时产生过大的消息(例如由于堆栈跟踪头)。
要重新配置框架以使用这些属性的不同设置,请通过添加 recovererCustomizer 来替换标准的 DeadLetterPublishingRecovererFactory bean:
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver);
factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
dlpr.appendOriginalHeaders(true);
dlpr.setStripPreviousExceptionHeaders(false);
});
return factory;
}
从版本 2.8.4 开始,如果您希望添加自定义标头(除了工厂添加的重试信息标头之外),您可以向工厂添加一个 headersFunction - factory.setHeadersFunction((rec, ex) → { … })
4.4.5. 结合阻塞和非阻塞重试
在2.8.4版本中,您可以配置框架同时使用阻塞和非阻塞重试。
例如,您可以设置一组可能会在下一条记录上引发错误的异常,比如DatabaseAccessException,因此您可以在将该记录发送到重试主题或直接发送到死信队列前尝试重试几次相同的记录。
要配置阻塞重试,您只需通过 ListenerContainerFactoryConfigurer bean 中的 addRetryableExceptions 方法添加您希望重试的异常,如下所示。 默认策略是 FixedBackOff,包含九次重试,并且它们之间没有延迟。 您还可以选择提供自己的退避策略。
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
return lcfc;
}
如果需要进一步调整异常分类,可以通过ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()方法设置自己的Map分类,例如:
lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue));
| 在结合全局重试主题的致命异常分类后,您可以为框架配置任何行为,例如让某些异常触发阻塞和非阻塞重试、仅触发其中一种或另一种,或者直接进入DLT而不进行任何形式的重试。 |
以下是一个配置一起工作的示例:
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
return lcfc;
}
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class);
return ddtr;
}
在此示例中:
-
ShouldRetryOnlyBlockingException.class会仅通过阻塞方式重试,如果所有重试都失败,则直接进入死信队列。 -
ShouldRetryViaBothException.class会通过阻塞方式重试,如果所有阻塞重试失败,则会被转发到下一个重试主题进行另一轮尝试。 -
ShouldSkipBothRetriesException.class会直接发送到 DLT,而不会以任何方式重新尝试,并且如果首次处理尝试失败。
| 注意,阻塞重试行为是白名单模式——您需要以这种方式添加您想要重试的异常;而非阻塞重试分类则侧重于致命异常,并且是黑名单模式——您添加的是不想进行非阻塞重试的异常,而是直接发送到死信队列(DLT)中。 |
| 非阻塞异常分类行为还取决于具体主题的配置。 |
4.4.6. 主题命名
重试主题和死信主题通过在主主题名称后添加提供的或默认值,再附加该主题的延迟或索引来命名。
示例:
"my-topic" → "my-topic-retry-0", "my-topic-retry-1",…,"my-topic-dlt"
\"my-other-topic\" → \"my-topic-myRetrySuffix-1000\", \"my-topic-myRetrySuffix-2000\", …, \"my-topic-myDltSuffix\".
重试主题和 DLT 后缀
您可以指定将由重试和 DLT 主题使用的后缀。
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
| 默认后缀为"-retry" 和 "-dlt",分别用于重试主题和死信队列。 |
追加主题索引或延迟
你可以将主题的索引或延迟值附加在后缀之后。
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
| 默认行为是对延迟值进行后缀处理,除了具有多个主题的固定延迟配置,在这种情况下,会使用主题的索引对主题进行后缀处理。 |
自定义命名策略
更复杂的命名策略可以通过注册一个实现RetryTopicNamesProviderFactory接口的bean来完成。默认实现是SuffixingRetryTopicNamesProviderFactory,可以通过以下方式注册不同的实现: \ "
}
@Bean
public RetryTopicNamesProviderFactory myRetryNamingProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
例如,以下实现除了标准后缀外,还为重试/死信主题名称添加了一个前缀:
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if(properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}
4.4.7. 删除策略
<p>该框架为与分布式账本技术(DLT)交互提供了几种策略。您可以提供一种方法来处理DLT,使用默认的日志记录方法,或者根本不使用DLT。您还可以选择如果DLT处理失败会发生什么。</p>
数据链路处理方法
您可以指定用于处理该主题的DLT方法,以及在该处理失败时的行为。
要实现这一点,你可以在一个带有@DltHandler注解的方法中使用@RetryableTopic注解。注意,该方法将用于该类中所有带有@RetryableTopic注解的方法。
@RetryableTopic
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
DLT 处理方法也可以通过 RetryTopicConfigurationBuilder.dltHandlerMethod(String, String) 方法提供,该方法的参数是应处理 DLT 消息的 bean 名称和方法名称。
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltProcessor("myCustomDltProcessor", "processDltMessage")
.create(template);
}
@Component
public class MyCustomDltProcessor {
private final MyDependency myDependency;
public MyCustomDltProcessor(MyDependency myDependency) {
this.myDependency = myDependency;
}
public void processDltMessage(MyPojo message) {
// ... message processing, persistence, etc
}
}
| 如果未提供 DLT 处理程序,则使用默认的 RetryTopicConfigurer.LoggingDltListenerHandlerMethod。 |
从版本2.8开始,如果你不希望在该应用程序中消费DLT,包括默认处理程序(或希望延迟消费),你可以控制DLT容器是否启动,而与容器工厂的0属性无关。
当使用 @RetryableTopic 注解时,将 autoStartDltHandler 属性设置为 false;当使用配置构建器时,使用 .autoStartDltHandler(false) 。
你可以稍后通过 KafkaListenerEndpointRegistry 启动 DLT 处理器。
DLT 失败行为
如果DLT处理失败,将有两种可能的行为:ALWAYS_RETRY_ON_ERROR 和 FAIL_ON_ERROR。
在前者中,记录会被转发回DLT主题,因此不会阻塞其他DLT记录的处理。 在后者中,消费者会在不转发消息的情况下结束执行。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.FAIL_ON_ERROR)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.dltProcessor(MyCustomDltProcessor.class, "processDltMessage")
.doNotRetryOnDltFailure()
.create(template);
}
默认行为是将 ALWAYS_RETRY_ON_ERROR。 |
从版本 2.8.3 开始,如果记录导致致命异常抛出,ALWAYS_RETRY_ON_ERROR 将不会将记录重新路由回 DLT, 例如 DeserializationException,因为通常此类异常总是会被抛出。 |
被考虑为致命的异常有:
-
DeserializationException -
MessageConversionException -
ConversionException -
MethodArgumentResolutionException -
NoSuchMethodException -
ClassCastException
你可以使用在 DestinationTopicResolver 组件上的方法将其添加到或从此列表中移除异常。
查看异常分类器以获取更多信息。
配置 No DLT
框架还提供了不为该主题配置DLT的可能性。 在这种情况下,重试耗尽后处理将简单地结束。
@RetryableTopic(dltProcessingFailureStrategy =
DltStrategy.NO_DLT)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.doNotConfigureDlt()
.create(template);
}
4.4.8. 指定 ListenerContainerFactory
默认情况下,RetryTopic配置将使用从@KafkaListener注解提供的工厂,但您可以指定一个不同的工厂来用于创建重试主题和DLT监听器容器。
对于@RetryableTopic注解,您可以提供工厂的bean名称,而对于RetryTopicConfiguration bean,您可以提供bean名称或其实例本身。
@RetryableTopic(listenerContainerFactory = "my-retry-topic-factory")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<Integer, MyPojo> template,
ConcurrentKafkaListenerContainerFactory<Integer, MyPojo> factory) {
return RetryTopicConfigurationBuilder
.newInstance()
.listenerFactory(factory)
.create(template);
}
@Bean
public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.listenerFactory("my-retry-topic-factory")
.create(template);
}
| 自 2.8.3 版本起,您可以使用相同的工厂来创建可重试和不可重试主题。 |
如果需要将工厂配置行为恢复到2.8.3之前的版本,可以替换标准的RetryTopicConfigurer bean并将useLegacyFactoryConfigurer设置为true,例如:
@Bean(name = RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER)
public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
ListenerContainerFactoryResolver containerFactoryResolver,
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
BeanFactory beanFactory,
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
RetryTopicConfigurer retryTopicConfigurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, retryTopicNamesProviderFactory);
retryTopicConfigurer.useLegacyFactoryConfigurer(true);
return retryTopicConfigurer;
}
4.4.9. 更改 KafkaBackOffException 日志记录级别
当消息在重试主题中尚未到达消费时间时,会抛出一个 KafkaBackOffException。这些异常默认会在 DEBUG 级别日志记录,但您可以通过在 ListenerContainerFactoryConfigurer 中设置错误处理器定制器来改变这种行为,在 @Configuration 类中。
例如,要将日志级别更改为 WARN,您可以添加:
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer listenerContainer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
ListenerContainerFactoryConfigurer configurer = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
configurer.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN));
return configurer;
}