应用事件

以下春季应用事件由听众容器及其消费者发布:spring-doc.cadn.net.cn

  • ConsumerStartingEvent:在消费者线程首次启动时发布,且在开始轮询之前。spring-doc.cadn.net.cn

  • ConsumerStartedEvent:当消费者准备开始投票时发布。spring-doc.cadn.net.cn

  • ConsumerFailedToStartEvent:如果没有ConsumerStartingEventconsumerStartTimeout容器属性。 此事件可能表明配置的任务执行器线程不足以支持其所使用的容器及其并发性。 当出现这种情况时,也会记录错误信息。spring-doc.cadn.net.cn

  • ListenerContainerIdleEvent:在未收到任何消息时发布idleEventInterval(如果已配置)。spring-doc.cadn.net.cn

  • ListenerContainerNoLongerIdleEvent:当记录在之前发布ListenerContainerIdleEvent.spring-doc.cadn.net.cn

  • ListenerContainerPartitionIdleEvent:当该分区没有收到任何消息时发布idlePartitionEventInterval(如果已配置)。spring-doc.cadn.net.cn

  • ListenerContainerPartitionNoLongerIdleEvent:当记录被从之前发布过的分区中消耗时发布ListenerContainerPartitionIdleEvent.spring-doc.cadn.net.cn

  • NonResponsiveConsumerEvent:当消费者似乎被阻断时发布民意调查方法。spring-doc.cadn.net.cn

  • ConsumerPartitionPausedEvent:每个消费者在分区暂停时发布。spring-doc.cadn.net.cn

  • ConsumerPartitionResumedEvent:每个消费者在分区恢复时发布。spring-doc.cadn.net.cn

  • ConsumerPausedEvent:由每个消费者在容器暂停时发布。spring-doc.cadn.net.cn

  • ConsumerResumedEvent:在容器恢复时由每个消费者发布。spring-doc.cadn.net.cn

  • ConsumerStoppingEvent:由每位用户在停车前发布。spring-doc.cadn.net.cn

  • ConsumerStoppedEvent:在消费者关闭后发布。 参见螺纹安全spring-doc.cadn.net.cn

  • ConsumerRetryAuthEvent:当消费者的身份验证或授权失败且正在重试时发布。spring-doc.cadn.net.cn

  • ConsumerRetryAuthSuccessfulEvent:当认证或授权成功重试时发布。只有在发生ConsumerRetryAuthEvent以前。spring-doc.cadn.net.cn

  • ContainerStoppedEvent:在所有消费者停止后发布。spring-doc.cadn.net.cn

  • ConcurrentContainerStoppedEvent:发布时ConcurrentMessageListenerContainer已经停止了。spring-doc.cadn.net.cn

默认情况下,应用上下文的事件多播器会调用调用线程上的事件监听器。 如果你把多播器改成使用异步执行器,就不能调用任何消费者当事件包含对消费者的引用时。

ListenerContainerIdleEvent具有以下性质:spring-doc.cadn.net.cn

ListenerContainerNoLongerIdleEvent具有相同的性质,但idleTime暂停.spring-doc.cadn.net.cn

ListenerContainerPartitionIdleEvent具有以下性质:spring-doc.cadn.net.cn

ListenerContainerPartitionNoLongerIdleEvent具有相同的性质,但idleTime暂停.spring-doc.cadn.net.cn

NonResponsiveConsumerEvent具有以下性质:spring-doc.cadn.net.cn

ConsumerPausedEvent,ConsumerResumedEvent消费者停止事件具有以下性质:spring-doc.cadn.net.cn

ConsumerPartitionPausedEvent,ConsumerPartitionResumedEvent事件具有以下性质:spring-doc.cadn.net.cn

ConsumerRetryAuthEvent事件具有以下性质:spring-doc.cadn.net.cn

ConsumerStartingEvent,ConsumerStartedEvent,ConsumerFailedToStartEvent,ConsumerStoppedEvent,ConsumerRetryAuthSuccessfulEventContainerStoppedEvent事件具有以下性质:spring-doc.cadn.net.cn

所有容器(无论是子容器还是父容器)都发布ContainerStoppedEvent. 对于父容器,源和容器属性是相同的。spring-doc.cadn.net.cn

此外,ConsumerStoppedEvent具有以下附加性质:spring-doc.cadn.net.cn

在这种情况下,你可以用这个事件重启容器:spring-doc.cadn.net.cn

if (event.getReason().equals(Reason.FENCED)) {
    event.getSource(MessageListenerContainer.class).start();
}

检测闲置和无响应的用户

虽然高效,异步消费者的一个问题是检测其空闲时间。 如果一段时间内没有收到消息,您可能需要采取一些措施。spring-doc.cadn.net.cn

你可以配置监听器容器来发布ListenerContainerIdleEvent当一段时间过去消息未送达时。 当容器处于空闲状态时,每个事件都会发布idleEventInterval毫秒。spring-doc.cadn.net.cn

要配置此功能,请设置idleEventInterval在集装箱上。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
    ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
    ...
    containerProps.setIdleEventInterval(60000L);
    ...
    KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
    return container;
}

以下示例展示了如何设置idleEventInterval对于@KafkaListener:spring-doc.cadn.net.cn

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
    ...
    factory.getContainerProperties().setIdleEventInterval(60000L);
    ...
    return factory;
}

在这些情况下,容器闲置时每分钟发布一次事件。spring-doc.cadn.net.cn

如果,出于某种原因,消费者poll()方法不会退出,不会收到消息,也无法生成空闲事件(这是早期版本卡夫卡客户端当中介无法联系上时)。 在这种情况下,容器发布NonResponsiveConsumerEvent如果投票未返回3次投票时间财产。 默认情况下,每个容器每30秒进行一次检查。 你可以通过设置monitorInterval(默认30秒)和noPollThreshold(默认3.0)属性容器属性在配置监听器容器时, 这noPollThreshold应大于1.0以避免因竞赛条件而出现虚假事件。 收到这样的事件可以让你停止容器,从而唤醒消费者,从而让它停止。spring-doc.cadn.net.cn

从版本 2.6.2 开始,如果容器发布了ListenerContainerIdleEvent,它将发布一个ListenerContainerNoLongerIdleEvent当记录随后被接收时。spring-doc.cadn.net.cn

事件消耗

你可以通过实施这些事件来捕捉ApplicationListener——无论是普通听众,还是被限定为只接收这一特定事件的人。 你也可以使用@EventListener, 在 Spring Framework 4.2 中引入。spring-doc.cadn.net.cn

下一个例子是组合@KafkaListener@EventListener合并为一个职业。 你应该明白,应用程序监听器会接收所有容器的事件,所以如果你想根据哪个容器处于空闲状态采取具体作,可能需要检查监听器ID。 你也可以使用@EventListener条件为此目的。spring-doc.cadn.net.cn

有关事件属性的信息,请参见应用事件。spring-doc.cadn.net.cn

该事件通常发布在消费者线程上,因此与消费者对象。spring-doc.cadn.net.cn

以下示例同时使用了这两种情况@KafkaListener@EventListener:spring-doc.cadn.net.cn

public class Listener {

    @KafkaListener(id = "qux", topics = "annotated")
    public void listen4(@Payload String foo, Acknowledgment ack) {
        ...
    }

    @EventListener(condition = "event.listenerId.startsWith('qux-')")
    public void eventHandler(ListenerContainerIdleEvent event) {
        ...
    }

}
活动听众会看到所有容器的事件。 因此,在前面的例子中,我们根据听众ID缩小接收到的事件范围。 由于容器为@KafkaListener支持并发,实际的容器被命名ID-n其中n是每个实例的唯一值,以支持并发。 这就是为什么我们使用开始在那个状态下。
如果你想用空闲事件停止监听器容器,就不要呼叫container.stop()在呼叫听者的线程中。 这样做会导致延迟和不必要的日志消息。 相反,你应该把事件交给另一个线程,这样可以停止容器。 另外,你也不应该这样做停止()如果容器实例是子容器,则表示该实例。 你应该停止并发容器。

闲置时的当前职位

注意,当检测到空闲时,你可以通过实现实现来获得当前位置消费者寻求在你的听众里。 看onIdleContainer()寻找中。spring-doc.cadn.net.cn