|
对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
开始@KafkaListeners 按顺序排列
一个常见的用例是在另一个监听者用完主题中的所有记录后启动一个监听器。
例如,你可能想先加载一个或多个压缩主题的内容到内存,再处理其他主题的记录。
从2.7.3版本开始,新增了一个组件ContainerGroupSequencer已经被引入。
它使用@KafkaListener的containerGroup(容器集团)属性 将容器分组在一起,并在当前组中的所有容器都空闲后,将容器分组到下一组。
用一个例子来说明。
@KafkaListener(id = "listen1", topics = "topic1", containerGroup = "g1", concurrency = "2")
public void listen1(String in) {
}
@KafkaListener(id = "listen2", topics = "topic2", containerGroup = "g1", concurrency = "2")
public void listen2(String in) {
}
@KafkaListener(id = "listen3", topics = "topic3", containerGroup = "g2", concurrency = "2")
public void listen3(String in) {
}
@KafkaListener(id = "listen4", topics = "topic4", containerGroup = "g2", concurrency = "2")
public void listen4(String in) {
}
@Bean
ContainerGroupSequencer sequencer(KafkaListenerEndpointRegistry registry) {
return new ContainerGroupSequencer(registry, 5000, "g1", "g2");
}
这里,我们有4名听众,分为两组,G1和G2.
在应用上下文初始化过程中,序列器设置自动启动所有容器在所提供组中的属性为false.
它还设定了idleEventInterval对于任何容器(尚未设置的)都必须以提供的值(此处为5000毫秒)。
然后,当应用上下文启动序列器时,第一组的容器也被启动。
如ListenerContainerIdleEvent收到 s,每个容器中的子容器都会被停止。
当所有子容器都处于ConcurrentMessageListenerContainer停止了,父容器也被停止了。
当一组中的所有容器都停止后,下一组的容器开始运行。
组内的组数或容器数量没有限制。
默认情况下,最后一组的容器(G2上述)在空闲时不会停止。
要修改该行为,设置stopLastGroupWhenIdle自true在音序器上。
顺便说一句,之前每组的容器会被添加到一颗类型的豆子里Collection<MessageListenerContainer>豆子的名字是containerGroup(容器集团).
这些收藏现已被弃用,取而代之的是字型豆集装箱集团以豆名(称为乐队名)为乐队名称,后缀为。群;在上面的例子中,会有2颗豆子G1.集团和g2.group.
这收集豆子将在未来的版本中被移除。