|
对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
重新平衡听众
容器属性有一个性质,称为consumerRebalanceListener,该实现取了 Kafka 客户端的消费者RebalanceListener接口。
如果没有提供该属性,容器会配置一个日志监听器,记录在信息水平。
该框架还增加了一个子接口消费者意识RebalanceListener.
以下列表显示了消费者意识RebalanceListener界面定义:
public interface ConsumerAwareRebalanceListener extends ConsumerRebalanceListener {
void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
void onPartitionsLost(Consumer<?, ?> consumer, Collection<TopicPartition> partitions);
}
注意,当分区被撤销时,有两次回调。 第一个人马上被叫来。 第二个是在提交任何待处理的抵消后调用。 如果你想在某个外部仓库中保持偏移量,这非常有用,如下示例所示:
containerProperties.setConsumerRebalanceListener(new ConsumerAwareRebalanceListener() {
@Override
public void onPartitionsRevokedBeforeCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// acknowledge any pending Acknowledgments (if using manual acks)
}
@Override
public void onPartitionsRevokedAfterCommit(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
// ...
store(consumer.position(partition));
// ...
}
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
// ...
consumer.seek(partition, offsetTracker.getOffset() + 1);
// ...
}
});
从2.4版本开始,采用了一种新方法onPartitionsLost()已被添加(类似于同名的方法,在ConsumerRebalanceLister).
默认实现于ConsumerRebalanceLister简单呼叫在被取消的分区上.
默认实现于消费者意识RebalanceListener什么都没做。
在为监听器容器提供自定义监听器(任一类型)时,重要的是你的实现不要调用在被取消的分区上从onPartitionsLost(分区丢失).
如果你实现消费者RebalanceListener你应该覆盖默认方法。
这是因为监听器容器会调用自己的在被取消的分区上从其实现onPartitionsLost(分区丢失)在调用实现中的方法后,
如果你的实现委托给默认行为,在被取消的分区上每次消费者调用容器监听器上的该方法。 |