对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4spring-doc.cadn.net.cn

连接到Kafka

从 2.5 版本开始,每个类都扩展了 KafkaResourceFactory。 这允许在运行时通过向其配置添加 Supplier<String> 来更改引导服务器: setBootstrapServersSupplier(() -> …​)。 这将对所有新连接调用以获取服务器列表。 生产者和消费者通常具有长生命周期。 要关闭现有生产者,请在 DefaultKafkaProducerFactory 上调用 reset()。 要关闭现有消费者,请在 KafkaListenerEndpointRegistry 上调用 stop()(然后 start()),并在/或在任何其他监听器容器 bean 上的 stop()start() 调用。spring-doc.cadn.net.cn

为了方便,框架还提供了一个ABSwitchCluster,它支持两组引导服务器;任何时候其中一组处于活动状态。
配置ABSwitchCluster并将它添加到生产者和消费者工厂中,以及调用setBootstrapServersSupplier()KafkaAdmin
当您想切换时,请调用primary()secondary()并调用reset()以建立新的连接(对于生产者);对于消费者,请调用stop()start()所有监听器容器。
在使用@KafkaListener时,请将stop()start()作为KafkaListenerEndpointRegistry bean。spring-doc.cadn.net.cn

查看更多Java文档信息。spring-doc.cadn.net.cn

工厂监听器

从版本 2.5 开始,DefaultKafkaProducerFactoryDefaultKafkaConsumerFactory 可以通过配置 Listener 来接收生产者或消费者创建或关闭的通知。spring-doc.cadn.net.cn

生产者工厂监听器
interface Listener<K, V> {

    default void producerAdded(String id, Producer<K, V> producer) {
    }

    default void producerRemoved(String id, Producer<K, V> producer) {
    }

}
消费者工厂监听器
interface Listener<K, V> {

    default void consumerAdded(String id, Consumer<K, V> consumer) {
    }

    default void consumerRemoved(String id, Consumer<K, V> consumer) {
    }

}

在每种情况下,id都是通过将创建后从metrics()获得的client-id属性附加到工厂beanName属性来创建的,并用.分隔。
spring-doc.cadn.net.cn

这些侦听器可以用于创建和绑定一个 Micrometer KafkaClientMetrics 实例,当新客户端被创建时(并在关闭客户端时将其关闭)。spring-doc.cadn.net.cn

框架提供了监听器来实现完全相同的功能;参见Micrometer原生指标spring-doc.cadn.net.cn