|
对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4! |
寻找特定偏移
为了搜索,您的监听器必须实现 ConsumerSeekAware,其包含以下方法:
void registerSeekCallback(ConsumerSeekCallback callback);
void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
void onPartitionsRevoked(Collection<TopicPartition> partitions)
void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);
The registerSeekCallback 会在容器启动时以及分区分配时被调用。
你应该在初始化后任意时间点使用此回调时调用它。
你应该保存对该回调的引用。
如果你在同一 ConcurrentMessageListenerContainer 中使用相同的监听器,你应该将回调保存在一个 ThreadLocal 或其他以监听器 Thread 为键的结构中。
When 使用 group 管理时, onPartitionsAssigned 是在分区分配时调用的。
你可以使用此方法,例如,通过调用回调来设置分区的初始偏移量。
你也可以使用此方法将当前线程的回调与分配到的分区进行关联(参见下面的示例)。
必须使用传入 registerSeekCallback 的 callback 参数,而不是该方法的参数。
从 2.5.5 版本开始,即使使用 手动分区分配,也会调用此方法。
onPartitionsRevoked 在容器停止或Kafka撤销分配时被调用。
你应该丢弃此线程的回调并移除与撤销分区的任何关联。
回调具有以下方法:
void seek(String topic, int partition, long offset);
void seekToBeginning(String topic, int partition);
void seekToBeginning(Collection<TopicPartitions> partitions);
void seekToEnd(String topic, int partition);
void seekToEnd(Collection<TopicPartitions> partitions);
void seekRelative(String topic, int partition, long offset, boolean toCurrent);
void seekToTimestamp(String topic, int partition, long timestamp);
void seekToTimestamp(Collection<TopicPartition> topicPartitions, long timestamp);
seekRelative 在 2.3 版本中添加,用于执行相对定位。
-
offset开始于分区末尾的负数和toCurrentfalse- 相对于分区末尾的查找。 -
offset正向且toCurrentfalse- 相对于分区的开始进行查找。 -
offset为负数且toCurrenttrue- 相对于当前位置进行查找(回退)。 -
offset正向且toCurrenttrue- 相对于当前位置(快进)。
The seekToTimestamp 方法在 2.3 版本中也已添加。
当在 onIdleContainer 或 onPartitionsAssigned 方法中为多个分区查找相同时间戳时,优先选择第二种方法,因为在一个调用到消费者 offsetsForTimes 方法中查找时间戳对应偏移量更为高效。
当从其他位置调用时,容器会收集所有时间戳查找请求,并在一个调用到 offsetsForTimes 上进行处理。 |
你也可以在检测到空闲容器时从onIdleContainer()执行seek操作。
见检测空闲和无响应消费者了解如何启用空闲容器检测。
The seekToBeginning 方法接受一个集合作为参数,在处理压缩主题时特别有用,例如每次应用程序启动时都希望定位到主题的开始位置: |
public class MyListener implements ConsumerSeekAware {
...
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
callback.seekToBeginning(assignments.keySet());
}
}
在运行时任意查找,请使用来自registerSeekCallback的适当线程的回调引用。
这里是使用回调的简单Spring Boot应用程序示例;它向主题发送10条记录;在控制台输入<Enter>会使所有分区跳转到开头。
@SpringBootApplication
public class SeekExampleApplication {
public static void main(String[] args) {
SpringApplication.run(SeekExampleApplication.class, args);
}
@Bean
public ApplicationRunner runner(Listener listener, KafkaTemplate<String, String> template) {
return args -> {
IntStream.range(0, 10).forEach(i -> template.send(
new ProducerRecord<>("seekExample", i % 3, "foo", "bar")));
while (true) {
System.in.read();
listener.seekToStart();
}
};
}
@Bean
public NewTopic topic() {
return new NewTopic("seekExample", 3, (short) 1);
}
}
@Component
class Listener implements ConsumerSeekAware {
private static final Logger logger = LoggerFactory.getLogger(Listener.class);
private final ThreadLocal<ConsumerSeekCallback> callbackForThread = new ThreadLocal<>();
private final Map<TopicPartition, ConsumerSeekCallback> callbacks = new ConcurrentHashMap<>();
@Override
public void registerSeekCallback(ConsumerSeekCallback callback) {
this.callbackForThread.set(callback);
}
@Override
public void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> this.callbacks.put(tp, this.callbackForThread.get()));
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
partitions.forEach(tp -> this.callbacks.remove(tp));
this.callbackForThread.remove();
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback) {
}
@KafkaListener(id = "seekExample", topics = "seekExample", concurrency = "3")
public void listen(ConsumerRecord<String, String> in) {
logger.info(in.toString());
}
public void seekToStart() {
this.callbacks.forEach((tp, callback) -> callback.seekToBeginning(tp.topic(), tp.partition()));
}
}
为使事情更简单,版本 2.3 增加了 AbstractConsumerSeekAware 类,用于跟踪某个主题/分区应使用的回调。
以下示例展示了如何在容器空闲时,将每个分区的游标seek到上次处理的最后一条记录。
它还提供了方法,允许任意外部调用来将分区回退一位记录。
public class SeekToLastOnIdleListener extends AbstractConsumerSeekAware {
@KafkaListener(id = "seekOnIdle", topics = "seekOnIdle")
public void listen(String in) {
...
}
@Override
public void onIdleContainer(Map<TopicPartition, Long> assignments,
ConsumerSeekCallback callback) {
assignments.keySet().forEach(tp -> callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind all partitions one record.
*/
public void rewindAllOneRecord() {
getSeekCallbacks()
.forEach((tp, callback) ->
callback.seekRelative(tp.topic(), tp.partition(), -1, true));
}
/**
* Rewind one partition one record.
*/
public void rewindOnePartitionOneRecord(String topic, int partition) {
getSeekCallbackFor(new TopicPartition(topic, partition))
.seekRelative(topic, partition, -1, true);
}
}
版本 2.6 为抽象类新增了便利方法:
-
seekToBeginning()- 将所有已分配的分区定位到开始位置。 -
seekToEnd()- 将所有已分配的分区移动到末尾。 -
seekToTimestamp(long timestamp)- 将所有已分配的分区定位到由该时间戳表示的偏移量。
示例:
public class MyListener extends AbstractConsumerSeekAware {
@KafkaListener(...)
void listn(...) {
...
}
}
public class SomeOtherBean {
MyListener listener;
...
void someMethod() {
this.listener.seekToTimestamp(System.currentTimeMillis() - 60_000);
}
}