对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.6

动态创建容器

有几种技术可用于在运行时创建侦听器容器。 本节探讨了其中的一些技术。

MessageListener 实现

如果你直接实现自己的侦听器,你可以简单地使用容器工厂为该侦听器创建一个原始容器:

用户侦听器
public class MyListener implements MessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data) {
        // ...
    }

}

private ConcurrentMessageListenerContainer<String, String> createContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> factory, String topic, String group) {

    ConcurrentMessageListenerContainer<String, String> container = factory.createContainer(topic);
    container.getContainerProperties().setMessageListener(new MyListener());
    container.getContainerProperties().setGroupId(group);
    container.setBeanName(group);
    container.start();
    return container;
}

原型 Bean

带有 Comments 的方法的容器@KafkaListener可以通过将 bean 声明为 prototype 来动态创建:

原型
public class MyPojo {

    private final String id;

    private final String topic;

    public MyPojo(String id, String topic) {
        this.id = id;
        this.topic = topic;
    }

    public String getId() {
        return this.id;
    }

    public String getTopic() {
        return this.topic;
    }

    @KafkaListener(id = "#{__listener.id}", topics = "#{__listener.topic}")
    public void listen(String in) {
        System.out.println(in);
    }

}

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
MyPojo pojo(String id, String topic) {
    return new MyPojo(id, topic);
}

applicationContext.getBean(MyPojo.class, "one", "topic2");
applicationContext.getBean(MyPojo.class, "two", "topic3");
侦听器必须具有唯一的 ID。 从版本 2.8.9 开始,KafkaListenerEndpointRegistry有新方法unregisterListenerContainer(String id)以允许您重复使用 ID。 取消注册容器不会stop()容器,您必须自己执行此作。