过滤消息

在某些情况下,如重新平衡,已处理的消息可能会被重新投递。 框架无法知道此类消息是否已被处理。 这是一个应用层面的功能。 这被称为幂零接收机模式,Spring Integration 提供了该模式的实现spring-doc.cadn.net.cn

Spring for Apache Kafka 项目也通过以下方式提供一些支持FilteringMessageListenerAdapter类,可以包裹你的消息监听器. 该类采用RecordFilterStrategy其中你实现了Filter一种表示消息为重复且应丢弃的方法。 这还有一个额外的性质,称为ackDiscarded,这表示适配器是否应确认丢弃的记录。 是的false默认。spring-doc.cadn.net.cn

当你使用@KafkaListener,设RecordFilterStrategy(而且可选ackDiscarded)在容器工厂中,使监听器被相应的过滤适配器包裹。spring-doc.cadn.net.cn

此外,还有一个FilteringBatchMessageListenerAdapter是为使用批量消息监听器提供。spring-doc.cadn.net.cn

FilteringBatchMessageListenerAdapter如果你的话,会被忽略@KafkaListener获得消费者记录<?, ?>而不是List<ConsumerRecord<?, ?>>因为消费者唱片是不可改变的。

从2.8.4版本开始,你可以覆盖监听器容器工厂的默认设置RecordFilterStrategy通过使用Filter监听者注释上的属性。spring-doc.cadn.net.cn

@KafkaListener(id = "filtered", topics = "topic", filter = "differentFilter")
public void listen(Thing thing) {
    ...
}

从3.3版本开始,忽略因过滤而产生的空批次RecordFilterStrategy是有支持的。 实现时RecordFilterStrategy,可以通过以下方式配置ignoreEmptyBatch(). 默认设置是false指示卡夫卡听众即使所有消费者记录被过滤掉了。spring-doc.cadn.net.cn

如果true被返回,卡夫卡听众 所有消费者记录被过滤掉了。 然而,承诺经纪人仍然会执行。spring-doc.cadn.net.cn

如果false被返回,卡夫卡听众 当所有权力被调用消费者记录被过滤掉了。spring-doc.cadn.net.cn

这里有一些例子。spring-doc.cadn.net.cn

public class IgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return true;
    }
}

// NOTE: ignoreEmptyBatchRecordFilterStrategy is bean name of IgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "ignoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

在这种情况下,IgnoreEmptyBatchRecordFilterStrategy总是返回空列表和返回true作为ignoreEmptyBatch(). 因此KafkaListener#listen(...)永远不会被调用。spring-doc.cadn.net.cn

public class NotIgnoreEmptyBatchRecordFilterStrategy implements RecordFilterStrategy {
    ...
    @Override
    public List<ConsumerRecord<String, String>> filterBatch(
            List<ConsumerRecord<String, String>> consumerRecords) {
        return List.of();
    }

    @Override
    public boolean ignoreEmptyBatch() {
        return false;
    }
}

// NOTE: notIgnoreEmptyBatchRecordFilterStrategy is bean name of NotIgnoreEmptyBatchRecordFilterStrategy instance.
@KafkaListener(id = "filtered", topics = "topic", filter = "notIgnoreEmptyBatchRecordFilterStrategy")
public void listen(List<Thing> things) {
    ...
}

然而,在这种情况下,IgnoreEmptyBatchRecordFilterStrategy总是返回空列表和返回false作为ignoreEmptyBatch(). 因此KafkaListener#listen(...)永远都会被调用。spring-doc.cadn.net.cn