此版本仍在开发中,尚未被视为稳定版本。对于最新的稳定版本,请使用 Spring for Apache Kafka 3.3.6spring-doc.cadn.net.cn

重新平衡侦听器

ContainerProperties具有一个名为consumerRebalanceListener,它采用 Kafka 客户端的ConsumerRebalanceListener接口。 如果未提供此属性,则容器会配置一个日志记录侦听器,该侦听器在INFO水平。 框架还添加了一个子接口ConsumerAwareRebalanceListener. 下面的清单显示了ConsumerAwareRebalanceListener接口定义:spring-doc.cadn.net.cn

public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {

    void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

    void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);

}

请注意,当 partitions 被撤销时,有两个回调。 第一个会立即调用。 第二个 is 在提交任何待处理的偏移量后调用。 如果您希望在某些外部存储库中维护偏移量,这非常有用,如下例所示:spring-doc.cadn.net.cn

containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {

    @Override
    public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // acknowledge any pending Acknowledgments (if using manual acks)
    }

    @Override
    public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
        // ...
        store(consumer.position(partition));
        // ...
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
        // ...
        consumer.seek(partition, offsetTracker.getOffset() + 1);
        // ...
    }
});
从版本 2.4 开始,新方法onPartitionsLost()已添加(类似于ConsumerRebalanceLister). 默认实现ConsumerRebalanceLister只需调用onPartitionsRevoked. 默认实现ConsumerAwareRebalanceListener什么都不做。 当为侦听器容器提供自定义侦听器(任一类型)时,您的实现必须不要调用onPartitionsRevokedonPartitionsLost. 如果您实现ConsumerRebalanceListener您应该覆盖 default 方法。 这是因为侦听器容器将调用自己的onPartitionsRevoked从其onPartitionsLost在对实现调用 方法后。 如果 implementation delegate to default 行为,onPartitionsRevoked将调用两次,每次Consumer在容器的侦听器上调用该方法。

Kafka 4.0 消费者再平衡协议

Spring for Apache Kafka 4.0 支持 Apache Kafka 4.0 的新消费者再平衡协议 (KIP-848),该协议通过服务器驱动的增量分区分配来提高性能。 这减少了使用者组的再平衡停机时间。spring-doc.cadn.net.cn

要启用新协议,请配置group.protocol财产:spring-doc.cadn.net.cn

spring.kafka.consumer.properties.group.protocol=consumer

请记住,上述属性是 Spring Boot 属性。 如果您不使用 Spring Boot,则可能需要手动设置它,如下所示。spring-doc.cadn.net.cn

或者,以编程方式设置它:spring-doc.cadn.net.cn

Map<String, Object> props = new HashMap<>();
props.put("group.protocol", "consumer");
ConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(props);

新协议与ConsumerAwareRebalanceListener. 由于增量再平衡,onPartitionsAssigned可以使用较小的分区集多次调用,这与传统协议的典型单个回调不同。spring-doc.cadn.net.cn

新协议使用服务器端分区分配,忽略通过spring.kafka.consumer.partition-assignment-strategy. 如果检测到自定义分配器,则会记录警告。 要使用自定义分配器,请将group.protocol=classic(如果未为group.protocol).spring-doc.cadn.net.cn