|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
应用事件
以下春季应用事件由听众容器及其消费者发布:
-
ConsumerStartingEvent:在消费者线程首次启动时发布,且在开始轮询之前。 -
ConsumerStartedEvent:当消费者准备开始投票时发布。 -
ConsumerFailedToStartEvent:如果没有ConsumerStartingEvent在consumerStartTimeout容器属性。 此事件可能表明配置的任务执行器线程不足以支持其所使用的容器及其并发性。 当出现这种情况时,也会记录错误信息。 -
ListenerContainerIdleEvent:在未收到任何消息时发布idleEventInterval(如果已配置)。 -
ListenerContainerNoLongerIdleEvent:当记录在之前发布ListenerContainerIdleEvent. -
ListenerContainerPartitionIdleEvent:当该分区没有收到任何消息时发布idlePartitionEventInterval(如果已配置)。 -
ListenerContainerPartitionNoLongerIdleEvent:当记录被从之前发布过的分区中消耗时发布ListenerContainerPartitionIdleEvent. -
NonResponsiveConsumerEvent:当消费者似乎被阻断时发布民意调查方法。 -
ConsumerPartitionPausedEvent:每个消费者在分区暂停时发布。 -
ConsumerPartitionResumedEvent:每个消费者在分区恢复时发布。 -
ConsumerPausedEvent:由每个消费者在容器暂停时发布。 -
ConsumerResumedEvent:在容器恢复时由每个消费者发布。 -
ConsumerStoppingEvent:由每位用户在停车前发布。 -
ConsumerStoppedEvent:在消费者关闭后发布。 参见螺纹安全。 -
ConsumerRetryAuthEvent:当消费者的身份验证或授权失败且正在重试时发布。 -
ConsumerRetryAuthSuccessfulEvent:当认证或授权成功重试时发布。只有在发生ConsumerRetryAuthEvent以前。 -
ContainerStoppedEvent:在所有消费者停止后发布。 -
ConcurrentContainerStoppedEvent:发布时ConcurrentMessageListenerContainer已经停止了。
默认情况下,应用上下文的事件多播器会调用调用线程上的事件监听器。
如果你把多播器改成使用异步执行器,就不能调用任何消费者当事件包含对消费者的引用时。 |
这ListenerContainerIdleEvent具有以下性质:
-
源:发布该事件的监听器容器实例。 -
容器:监听器容器或父监听器容器,如果源容器是子节点。 -
身份证:监听器ID(或容器豆名)。 -
idleTime:集装箱闲置的时间,事件发布时。 -
主题分区:事件生成时容器分配的主题和分区。 -
消费者:对卡夫卡的引用消费者对象。 例如,如果消费者的暂停(pause)方法之前被调用过,它可以简历()当事件被接收时。 -
暂停: 容器是否当前暂停。 更多信息请参见暂停和继续听众容器。
这ListenerContainerNoLongerIdleEvent具有相同的性质,但idleTime和暂停.
这ListenerContainerPartitionIdleEvent具有以下性质:
-
源:发布该事件的监听器容器实例。 -
容器:监听器容器或父监听器容器,如果源容器是子节点。 -
身份证:监听器ID(或容器豆名)。 -
idleTime:事件发布时时间划分消耗处于空闲状态。 -
topicPartition:触发事件的主题和分区。 -
消费者:对卡夫卡的引用消费者对象。 例如,如果消费者的暂停(pause)方法之前被调用过,它可以简历()当事件被接收时。 -
暂停: 该分区消费是否当前暂停。 更多信息请参见暂停和继续听众容器。
这ListenerContainerPartitionNoLongerIdleEvent具有相同的性质,但idleTime和暂停.
这NonResponsiveConsumerEvent具有以下性质:
-
源:发布该事件的监听器容器实例。 -
容器:监听器容器或父监听器容器,如果源容器是子节点。 -
身份证:监听器ID(或容器豆名)。 -
时间自上次投票以来:集装箱最后一次打电话前的时间poll(). -
主题分区:事件生成时容器分配的主题和分区。 -
消费者:对卡夫卡的引用消费者对象。 例如,如果消费者的暂停(pause)方法之前被调用过,它可以简历()当事件被接收时。 -
暂停: 容器是否当前暂停。 更多信息请参见暂停和继续听众容器。
这ConsumerPausedEvent,ConsumerResumedEvent和消费者停止事件具有以下性质:
-
源:发布该事件的监听器容器实例。 -
容器:监听器容器或父监听器容器,如果源容器是子节点。 -
分区:这主题分区涉及的实例。
这ConsumerPartitionPausedEvent,ConsumerPartitionResumedEvent事件具有以下性质:
-
源:发布该事件的监听器容器实例。 -
容器:监听器容器或父监听器容器,如果源容器是子节点。 -
分区:这主题分区涉及实例。
这ConsumerRetryAuthEvent事件具有以下性质:
-
源:发布该事件的监听器容器实例。 -
容器:监听器容器或父监听器容器,如果源容器是子节点。 -
原因:-
认证- 事件发布是因为认证异常。 -
授权- 事件发布是因为授权例外。
-
这ConsumerStartingEvent,ConsumerStartedEvent,ConsumerFailedToStartEvent,ConsumerStoppedEvent,ConsumerRetryAuthSuccessfulEvent和ContainerStoppedEvent事件具有以下性质:
-
源:发布该事件的监听器容器实例。 -
容器:监听器容器或父监听器容器,如果源容器是子节点。
所有容器(无论是子容器还是父容器)都发布ContainerStoppedEvent.
对于父容器,源和容器属性是相同的。
此外,ConsumerStoppedEvent具有以下附加性质:
-
原因:-
正常- 消费者正常停车(容器停车)。 -
错误-一个java.lang.错误被扔了。 -
围栏- 交易生产者被围栏化,且停止容器围栏容器性质为true. -
认证-一认证异常或授权例外被扔出去,然后authExceptionRetryInterval未配置。 -
NO_OFFSET- 分区没有偏移量,且自动.offset.reset政策是没有.
-
在这种情况下,你可以用这个事件重启容器:
if (event.getReason().equals(Reason.FENCED)) {
event.getSource(MessageListenerContainer.class).start();
}
检测闲置和无响应的用户
虽然高效,异步消费者的一个问题是检测其空闲时间。 如果一段时间内没有收到消息,您可能需要采取一些措施。
你可以配置监听器容器来发布ListenerContainerIdleEvent当一段时间过去消息未送达时。
当容器处于空闲状态时,每个事件都会发布idleEventInterval毫秒。
要配置此功能,请设置idleEventInterval在集装箱上。
以下示例展示了如何实现:
@Bean
public KafkaMessageListenerContainer(ConsumerFactory<String, String> consumerFactory) {
ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
...
containerProps.setIdleEventInterval(60000L);
...
KafkaMessageListenerContainer<String, String> container = new KafKaMessageListenerContainer<>(consumerFactory, containerProps);
return container;
}
以下示例展示了如何设置idleEventInterval对于@KafkaListener:
@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.getContainerProperties().setIdleEventInterval(60000L);
...
return factory;
}
在这些情况下,容器闲置时每分钟发布一次事件。
如果,出于某种原因,消费者poll()方法不会退出,不会收到消息,也无法生成空闲事件(这是早期版本卡夫卡客户端当中介无法联系上时)。
在这种情况下,容器发布NonResponsiveConsumerEvent如果投票未返回3次这投票时间财产。
默认情况下,每个容器每30秒进行一次检查。
你可以通过设置monitorInterval(默认30秒)和noPollThreshold(默认3.0)属性容器属性在配置监听器容器时,
这noPollThreshold应大于1.0以避免因竞赛条件而出现虚假事件。
收到这样的事件可以让你停止容器,从而唤醒消费者,从而让它停止。
从版本 2.6.2 开始,如果容器发布了ListenerContainerIdleEvent,它将发布一个ListenerContainerNoLongerIdleEvent当记录随后被接收时。
事件消耗
你可以通过实施这些事件来捕捉ApplicationListener——无论是普通听众,还是被限定为只接收这一特定事件的人。
你也可以使用@EventListener, 在 Spring Framework 4.2 中引入。
下一个例子是组合@KafkaListener和@EventListener合并为一个职业。
你应该明白,应用程序监听器会接收所有容器的事件,所以如果你想根据哪个容器处于空闲状态采取具体作,可能需要检查监听器ID。
你也可以使用@EventListener的条件为此目的。
有关事件属性的信息,请参见应用事件。
该事件通常发布在消费者线程上,因此与消费者对象。
以下示例同时使用了这两种情况@KafkaListener和@EventListener:
public class Listener {
@KafkaListener(id = "qux", topics = "annotated")
public void listen4(@Payload String foo, Acknowledgment ack) {
...
}
@EventListener(condition = "event.listenerId.startsWith('qux-')")
public void eventHandler(ListenerContainerIdleEvent event) {
...
}
}
活动听众会看到所有容器的事件。
因此,在前面的例子中,我们根据听众ID缩小接收到的事件范围。
由于容器为@KafkaListener支持并发,实际的容器被命名ID-n其中n是每个实例的唯一值,以支持并发。
这就是为什么我们使用开始在那个状态下。 |
如果你想用空闲事件停止监听器容器,就不要呼叫container.stop()在呼叫听者的线程中。
这样做会导致延迟和不必要的日志消息。
相反,你应该把事件交给另一个线程,这样可以停止容器。
另外,你也不应该这样做停止()如果容器实例是子容器,则表示该实例。
你应该停止并发容器。 |
闲置时的当前职位
注意,当检测到空闲时,你可以通过实现实现来获得当前位置消费者寻求在你的听众里。
看onIdleContainer()在寻找中。