该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0spring-doc.cadn.net.cn

暂停与恢复监听器容器

新增版本 2.1.3暂停(pause)简历()监听器容器的方法。 以前,你可以暂停用户在消费者意识信息听众然后通过聆听ListenerContainerIdleEvent,提供访问消费者对象。 虽然你可以通过使用事件监听器暂停闲置容器中的消费者,但在某些情况下,这并不安全,因为无法保证事件监听器会被调用到消费线程。 为了安全地暂停并恢复消费者,你应该使用暂停恢复监听器容器上的方法。 一个暂停(pause)生效时间紧接在下一个poll();一个简历()在电流之后生效poll()返回。 当容器暂停时,它会继续poll()消费者,如果使用组管理,则避免重新平衡,但不会检索任何记录。 更多信息请参见卡夫卡文档。spring-doc.cadn.net.cn

从2.1.5版本开始,你可以调用isPauseRequested()看看暂停(pause)已经被叫到了。 然而,消费者可能还没有真正暂停。isConsumerPaused()如果所有消费者实际上,实例已经暂停了。spring-doc.cadn.net.cn

此外(自2.1.5版本起),ConsumerPausedEventConsumerResumedEvent实例以容器的形式发布财产和主题分区涉及的实例分区财产。spring-doc.cadn.net.cn

从版本 2.9 开始,新增了一个容器属性暂停立刻当设置为 true时,暂停在当前记录处理后生效。 默认情况下,暂停生效时是因为之前投票的所有记录都已处理完毕。 参见pauseImmediate。spring-doc.cadn.net.cn

以下简单的 Spring Boot 应用演示了如何使用容器注册表获取对@KafkaListener方法的容器,暂停或恢复其使用者,同时接收相应的事件:spring-doc.cadn.net.cn

@SpringBootApplication
public class Application implements ApplicationListener<KafkaEvent> {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Override
    public void onApplicationEvent(KafkaEvent event) {
        System.out.println(event);
    }

    @Bean
    public ApplicationRunner runner(KafkaListenerEndpointRegistry registry,
            KafkaTemplate<String, String> template) {
        return args -> {
            template.send("pause.resume.topic", "thing1");
            Thread.sleep(10_000);
            System.out.println("pausing");
            registry.getListenerContainer("pause.resume").pause();
            Thread.sleep(10_000);
            template.send("pause.resume.topic", "thing2");
            Thread.sleep(10_000);
            System.out.println("resuming");
            registry.getListenerContainer("pause.resume").resume();
            Thread.sleep(10_000);
        };
    }

    @KafkaListener(id = "pause.resume", topics = "pause.resume.topic")
    public void listen(String in) {
        System.out.println(in);
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("pause.resume.topic")
            .partitions(2)
            .replicas(1)
            .build();
    }

}

以下列表展示了上述示例的结果:spring-doc.cadn.net.cn

partitions assigned: [pause.resume.topic-1, pause.resume.topic-0]
thing1
pausing
ConsumerPausedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
resuming
ConsumerResumedEvent [partitions=[pause.resume.topic-1, pause.resume.topic-0]]
thing2