|
对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4! |
过滤消息
在某些场景中,例如重新平衡,一个已经处理过的消息可能会被重新投递。 框架无法知道该消息是否已被处理。 这是应用程序级别的功能。 这被称为 幂等接收器 模式,Spring Integration 提供了它的 实现。
The Spring for Apache Kafka 项目也通过 FilteringMessageListenerAdapter 类提供了一些帮助,该类可以包装你的 MessageListener。
此类接受一个 RecordFilterStrategy 的实现,其中你实现 filter 方法以指示消息是重复的并应被丢弃。
此类还有一个名为 ackDiscarded 的额外属性,用于指示适配器是否应确认丢弃的记录。
它默认值为 false。
当您使用 @KafkaListener 时,请在容器工厂上设置 RecordFilterStrategy(可选地设置 ackDiscarded),以便将监听器包装在适当的过滤适配器中。
此外,还提供一个 FilteringBatchMessageListenerAdapter,用于当您使用一批量 消息监听器 时。
当 @KafkaListener 接收到一个 List<ConsumerRecord<?, ?>> 而不是 ConsumerRecords 时,FilteringBatchMessageListenerAdapter 会被忽略,因为 ConsumerRecords 是不可变的。 |
从 2.8.4 版本开始,您可以使用在监听器注解上的 filter 属性来覆盖监听器容器工厂的默认 RecordFilterStrategy。
@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
...
}