对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

暂停与恢复监听器容器

版本 2.1.3 为监听器容器新增了 pause()resume() 方法。 此前,您可以在 ConsumerAwareMessageListener 中暂停消费者,并通过监听 ListenerContainerIdleEvent 事件来恢复,这提供了对 Consumer 对象的访问。 虽然可以通过事件监听器在空闲容器中暂停消费者,但在某些情况下这并非线程安全,因为无法保证事件监听器在消费者线程中被调用。 为了安全地暂停和恢复消费者,应使用监听器容器上的 pauseresume 方法。 一个 pause() 在下一次 poll() 之前生效;一个 resume() 在当前 poll() 返回后生效。 当容器被暂停时,它会继续 poll() 消费者,避免在使用组管理时发生重新平衡,但不会检索任何记录。 有关更多信息,请参阅 Kafka 文档。spring-doc.cadn.net.cn

从版本 2.1.5 开始,您可以调用 isPauseRequested() 来检查 pause() 是否已经调用。 然而,消费者可能尚未实际暂停。 isConsumerPaused() 返回 true 当所有 Consumer 实例实际上已暂停。spring-doc.cadn.net.cn

此外(自 2.1.5 起也包括),ConsumerPausedEventConsumerResumedEvent 实例会以 source 属性作为容器发布,而涉及在 partitions 属性中的 TopicPartition 实例也会被发布。spring-doc.cadn.net.cn

Starting with version 2.9, a new container property pauseImmediate, when set to true, causes the pause to take effect after the current record is processed. By default, the pause takes effect when all of the records from the previous poll have been processed. See [pauseImmediate].spring-doc.cadn.net.cn

以下这个简单的Spring Boot应用程序演示了如何使用容器注册表获取对某个方法的容器的引用,并暂停或恢复其消费者,以及接收相应的事件:spring-doc.cadn.net.cn

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下代码列表显示了前述示例的结果:spring-doc.cadn.net.cn

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2