@KafkaListener注解
这@KafkaListener注释用于将 BEAN 方法指定为监听器容器的监听者。
豆子被包裹在MessagingMessageListenerAdapter配置多种功能,如必要时可转换数据以匹配方法参数的转换器。
你可以用 SpEL 配置注释上的大多数属性,方法是#{…}或属性占位符(${…}).
更多信息请参见 Javadoc。
唱片听众
这@KafkaListener注释为简单的POJO监听器提供了一种机制。
以下示例展示了如何使用它:
public class Listener {
@KafkaListener(id = "foo", topics = "myTopic", clientIdPrefix = "myClientId")
public void listen(String data) {
...
}
}
该机制需要@EnableKafka在你的其中一个注释上@Configuration类和监听器容器工厂,用于配置底层ConcurrentMessageListenerContainer.
默认情况下,是带名字的豆子kafkaListenerContainerFactory这是预期中的。
以下示例展示了如何使用ConcurrentMessageListenerContainer:
@Configuration
@EnableKafka
public class KafkaConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
...
return props;
}
}
注意,要设置容器属性,必须使用getContainerProperties()工厂的方法。
它被用作注入容器实际属性的模板。
从2.1.1版本开始,你现在可以设置client.id由注释创建的消费者财产。
这clientIdPrefix后缀为-n哪里n是表示并发时容器编号的整数。
从2.2版本开始,你可以覆盖容器工厂的并发和自动启动通过使用注释本身的属性来实现属性。
属性可以是简单的值、属性占位符或 SpEL 表达式。
以下示例展示了如何实现:
@KafkaListener(id = "myListener", topics = "myTopic",
autoStartup = "${listen.auto.start:true}", concurrency = "${listen.concurrency:3}")
public void listen(String data) {
...
}
显式分区分配
你还可以用显式主题和分区配置 POJO 监听器(可选地设置初始偏移量)。 以下示例展示了如何实现:
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
你可以在分区或partitionOffsets属性,但不能两者兼有。
和大多数注释属性一样,你可以使用 SpEL 表达式;关于如何生成大量分区列表的示例,请参见“手动分配所有分区”。
从版本 2.5.5 开始,你可以对所有分配的分区应用初始偏移:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" },
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
万用符表示所有分区*分区属性。
必须只有一个@PartitionOffset每个都包含万用卡@TopicPartition.
此外,当监听者实现消费者寻求,onPartitionsAssigned现在即使使用手动分配,也被调用。
例如,这允许在该时刻任意进行寻道作。
从版本 2.6.4 开始,你可以指定一个逗号分隔的分区列表,或分区范围:
@KafkaListener(id = "pp", autoStartup = "false",
topicPartitions = @TopicPartition(topic = "topic1",
partitions = "0-5, 7, 10-15"))
public void process(String in) {
...
}
该系列是包容性的;上述示例将分配分区0, 1, 2, 3, 4, 5, 7, 10, 11, 12, 13, 14, 15.
在指定初始偏移量时,也可以采用相同的技术:
@KafkaListener(id = "thing3", topicPartitions =
{ @TopicPartition(topic = "topic1",
partitionOffsets = @PartitionOffset(partition = "0-5", initialOffset = "0"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
初始偏移量将应用到所有6个分区。
自3.2版本起,@PartitionOffset支持寻找定位。结束,寻找位置。开始,SeekPosition.TIMESTAMP,寻觅位置火柴寻定位列举名:
@KafkaListener(id = "seekPositionTime", topicPartitions = {
@TopicPartition(topic = TOPIC_SEEK_POSITION, partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "723916800000", seekPosition = "TIMESTAMP"),
@PartitionOffset(partition = "1", initialOffset = "0", seekPosition = "BEGINNING"),
@PartitionOffset(partition = "2", initialOffset = "0", seekPosition = "END")
})
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
如果 seekPosition set结束或开始会忽略初始偏移和亲属至现任.
如果 seekPosition set时间戳,初始偏移表示时间戳。
手动确认
使用手动挡时AckMode你也可以向听者提供确认.
启动手册AckMode你需要在 中设置 ack-mode容器属性切换到相应的手动模式。
以下示例还展示了如何使用不同的集装箱工厂。
这个自定义容器工厂必须设置AckMode通过调用getContainerProperties()然后呼唤setAckMode马上去。
否则,确认对象将为空。
@KafkaListener(id = "cat", topics = "myTopic",
containerFactory = "kafkaManualAckListenerContainerFactory")
public void listen(String data, Acknowledgment ack) {
...
ack.acknowledge();
}
消费者记录元数据
最后,关于记录的元数据可通过消息头获取。 您可以使用以下头部名称来获取消息的头部:
-
KafkaHeaders.OFFSET -
KafkaHeaders.RECEIVED_KEY -
KafkaHeaders.RECEIVED_TOPIC -
KafkaHeaders.RECEIVED_PARTITION -
KafkaHeaders.RECEIVED_TIMESTAMP -
KafkaHeaders.TIMESTAMP_TYPE
从2.5版本开始RECEIVED_KEY如果输入记录具有零钥匙;此前,头部填充的是零价值。
这一变化是为了使框架与春季消息约定零值头不存在。
以下示例展示了如何使用头部:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(name = KafkaHeaders.RECEIVED_KEY, required = false) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
参数注释(@Payload,@Header) 必须在监听器方法的具体实现上指定;如果它们在接口上定义,则不会被检测到。 |
从版本2.5开始,你可以在消费者记录元数据参数。
@KafkaListener(...)
public void listen(String str, ConsumerRecordMetadata meta) {
...
}
这包含了所有来自消费者记录除了关键和价值。
批次听众
从1.1版本开始,你可以配置@KafkaListener接收来自消费者投票的全部消费者记录。
| 批处理监听器不支持非阻塞重试。 |
要配置监听器容器工厂创建批量监听器,你可以设置批处理听器财产。
以下示例展示了如何实现:
@Bean
public KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
return factory;
}
从2.8版本开始,你可以覆盖原厂的批处理听器利用Batch属性@KafkaListener注解。
这加上对容器错误处理程序的改进,使得同一工厂可以同时用于记录监听器和批处理监听器。 |
从2.9.6版本开始,容器工厂为记录消息转换器和批量消息转换器性能。
此前,只有一处房产消息转换器该规则适用于录音和批量听众。 |
以下示例展示了如何接收有效载荷列表:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list) {
...
}
主题、分区、偏移等内容均可作为与有效载荷平行的头部提供。 以下示例展示了如何使用头部:
@KafkaListener(id = "list", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<String> list,
@Header(KafkaHeaders.RECEIVED_KEY) List<Integer> keys,
@Header(KafkaHeaders.RECEIVED_PARTITION) List<Integer> partitions,
@Header(KafkaHeaders.RECEIVED_TOPIC) List<String> topics,
@Header(KafkaHeaders.OFFSET) List<Long> offsets) {
...
}
或者,你也可以获得列表之留言<?>每个消息中每个偏移量和其他细节的对象,但必须是唯一的参数(除可选确认,在使用手动提交时,和/或消费者<?, ?>参数)定义在方法上。
以下示例展示了如何实现:
@KafkaListener(id = "listMsg", topics = "myTopic", containerFactory = "batchFactory")
public void listen1(List<Message<?>> list) {
...
}
@KafkaListener(id = "listMsgAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen2(List<Message<?>> list, Acknowledgment ack) {
...
}
@KafkaListener(id = "listMsgAckConsumer", topics = "myTopic", containerFactory = "batchFactory")
public void listen3(List<Message<?>> list, Acknowledgment ack, Consumer<?, ?> consumer) {
...
}
在这种情况下,有效载荷不进行转换。
如果批处理消息传递转换器配置为记录消息转换器你也可以在消息参数 和 有效载荷 会被转换。
更多信息请参见“使用批处理监听器的有效载荷转换”。
你还可以收到一份清单消费者记录<?, ?>但必须是唯一的参数(可选确认, 在使用手动提交和消费者<?, ?>参数)定义在方法上。
以下示例展示了如何实现:
@KafkaListener(id = "listCRs", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list) {
...
}
@KafkaListener(id = "listCRsAck", topics = "myTopic", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<Integer, String>> list, Acknowledgment ack) {
...
}
从2.2版本开始,听者可以接收完整的消费者记录<?, ?>对象由poll()方法,允许监听者访问其他方法,例如分区()(返回主题分区列表中的实例)以及记录(TopicPartition)(该机构获得选择性记录)。
同样,这必须是唯一的参数(除了可选参数确认, 在使用手动提交或消费者<?, ?>参数)在方法上。
以下示例展示了如何实现:
@KafkaListener(id = "pollResults", topics = "myTopic", containerFactory = "batchFactory")
public void pollResults(ConsumerRecords<?, ?> records) {
...
}
如果集装箱工厂有RecordFilterStrategy配置后,忽略了消费者记录<?, ?>听众,带有警告日志消息已发出。
只有当名单<?>采用了听者的形式。
默认情况下,记录是逐条过滤;从2.8版本开始,你可以覆盖filterBatch在一次通话中过滤整个批次。 |
注释性质
从2.0版本开始,身份证属性(如果存在)被用作卡夫卡消费者group.id如果存在,则覆盖消费工厂中的配置属性。
你也可以设置组ID显式或设定idIsGroup(同名集团)为 false 以恢复之前使用消费者工厂的行为group.id.
你可以在大多数注释属性中使用属性占位符或 SpEL 表达式,如下示例所示:
@KafkaListener(topics = "${some.property}")
@KafkaListener(topics = "#{someBean.someProperty}",
groupId = "#{someBean.someProperty}.group")
从2.1.2版本开始,SpEL表达式支持一个特殊的Tokens:__听者.
它是一个伪豆名,代表该注释存在的当前豆实例。
请考虑以下例子:
@Bean
public Listener listener1() {
return new Listener("topic1");
}
@Bean
public Listener listener2() {
return new Listener("topic2");
}
给定前一个例子中的豆子,我们可以使用以下方法:
public class Listener {
private final String topic;
public Listener(String topic) {
this.topic = topic;
}
@KafkaListener(topics = "#{__listener.topic}",
groupId = "#{__listener.topic}.group")
public void listen(...) {
...
}
public String getTopic() {
return this.topic;
}
}
如果,在极不可能的情况下,你真的有个叫做的豆子__听者,你可以通过使用豆评属性。
以下示例展示了如何实现:
@KafkaListener(beanRef = "__x", topics = "#{__x.topic}", groupId = "#{__x.topic}.group")
从2.2.4版本开始,你可以直接在注释上指定Kafka的消费者属性,这些属性会覆盖消费者工厂中配置的同名属性。你无法指定group.id和client.id性质如下;他们会被忽视;使用以下组ID和clientIdPrefix这些的注释属性。
属性以普通 Java 语言的单个字符串形式指定性能文件格式:FOO:酒吧,foo=bar或福巴尔,如下示例所示:
@KafkaListener(topics = "myTopic", groupId = "group", properties = {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
以下是对应的监听者示例用RoutingKafkaTemplate.
@KafkaListener(id = "one", topics = "one")
public void listen1(String in) {
System.out.println("1: " + in);
}
@KafkaListener(id = "two", topics = "two",
properties = "value.deserializer:org.apache.kafka.common.serialization.ByteArrayDeserializer")
public void listen2(byte[] in) {
System.out.println("2: " + new String(in));
}