对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0spring-doc.cadn.net.cn

@KafkaListener生命周期管理

为 创建的监听器容器@KafkaListener在应用语境中,注释不是豆子。 相反,它们会注册在某种基础设施机构KafkaListenerEndpointRegistry. 该 bean 由框架自动声明,并管理容器的生命周期;它会自动启动所有自动启动设置为true. 所有集装箱工厂制造的集装箱必须在同一集装箱中阶段. 更多信息请参见“监听器容器自动启动”。 你可以用注册表程序管理生命周期。 启动或停止注册表会启动或停止所有注册容器。 或者,你可以通过使用其身份证属性。 你可以设置自动启动在注释上,它覆盖了容器工厂中配置的默认设置。 你可以从应用上下文中获取豆子的引用,比如自动接线,以管理其注册容器。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@KafkaListener(id = "myContainer", topics = "myTopic", autoStartup = "false")
public void listen(...) { ... }
@Autowired
private KafkaListenerEndpointRegistry registry;

...

    this.registry.getListenerContainer("myContainer").start();

...

注册表只维护其所管理容器的生命周期;被声明为 Beans 的容器不由注册表管理,可以从应用上下文中获取。 可以通过调用注册表获得一组受管理容器getListenerContainers()方法。 2.2.5 版本增加了一种便捷方法getAllListenerContainers()返回所有容器的集合,包括注册管理的容器和被声明为豆子的容器。 返回的集合会包含已初始化的原型豆,但不会初始化任何懒惰豆声明。spring-doc.cadn.net.cn

在应用上下文刷新后注册的端点将立即启动,无论它们的终端如何自动启动财产,以符合SmartLifecycle合同,其中自动启动仅在应用上下文初始化时考虑。 迟注册的一个例子是具有@KafkaListener在原型作用域中,实例是在上下文初始化后创建的。 从2.8.7版本开始,你可以设置注册表alwaysStartAfterRefresh属性到false然后容器自动启动属性将决定容器是否被启动。

Retrieving MessageListenerContainers from KafkaListenerEndpointRegistry

KafkaListenerEndpointRegistry提供了检索的方法MessageListenerContainer为适应多种管理场景而设的实例:spring-doc.cadn.net.cn

所有容器:对于覆盖所有监听器容器的作,使用getListenerContainers()以获取一个全面的收藏。spring-doc.cadn.net.cn

Collection<MessageListenerContainer> allContainers = registry.getListenerContainers();

按ID指定容器:要管理单个容器,getListenerContainer(String id)通过其ID实现检索。spring-doc.cadn.net.cn

MessageListenerContainer specificContainer = registry.getListenerContainer("myContainerId");

动态容器过滤:在3.2版本引入,涉及两个超载getListenerContainersMatching方法使容器的精细选择成为可能。 一种方法是谓词<字符串>对于基于ID的过滤作为参数,而另一个则取BiPredicate<String, MessageListenerContainer>对于更高级的条件,可能包括容器属性或状态作为参数。spring-doc.cadn.net.cn

// Prefix matching (Predicate<String>)
Collection<MessageListenerContainer> filteredContainers =
    registry.getListenerContainersMatching(id -> id.startsWith("productListener-retry-"));

// Regex matching (Predicate<String>)
Collection<MessageListenerContainer> regexFilteredContainers =
    registry.getListenerContainersMatching(myPattern::matches);

// Pre-built Set of IDs (Predicate<String>)
Collection<MessageListenerContainer> setFilteredContainers =
    registry.getListenerContainersMatching(myIdSet::contains);

// Advanced Filtering: ID prefix and running state (BiPredicate<String, MessageListenerContainer>)
Collection<MessageListenerContainer> advancedFilteredContainers =
    registry.getListenerContainersMatching(
        (id, container) -> id.startsWith("specificPrefix-") && container.isRunning()
    );

利用这些方法高效管理和查询MessageListenerContainer你申请中的实例。spring-doc.cadn.net.cn