|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
@KafkaListener在一门课上
当你使用@KafkaListener在类别层面,你必须具体说明@KafkaHandler在方法层面。
如果没有@KafkaHandler对于该类或其子类的任何方法,框架会拒绝此类配置。
这@KafkaHandler对于方法的明确和简洁目的,需要注释。
否则,没有额外限制,很难做出这种或其他方法的决定。
当消息被传递时,转换后的消息有效载荷类型用于决定调用哪种方法。 以下示例展示了如何实现:
@KafkaListener(id = "multi", topics = "myTopic")
static class MultiListenerBean {
@KafkaHandler
public void listen(String foo) {
...
}
@KafkaHandler
public void listen(Integer bar) {
...
}
@KafkaHandler(isDefault = true)
public void listenDefault(Object object) {
...
}
}
从2.1.3版本开始,你可以指定一个@KafkaHandler方法作为默认方法,如果其他方法无匹配时调用。
最多只能用一种方法来指定。
使用@KafkaHandler方法中,有效载荷必须已经转换为域对象(以便执行匹配)。
使用自定义的反串化器,比如JacksonJsonDeserializer,或JacksonJsonMessageConverter其类型优先权设置为TYPE_ID.
更多信息请参见串行化、反串列化和消息转换。
由于 Spring 解析方法参数的方式存在一些限制,因此成为默认@KafkaHandler无法接收离散头部;它必须使用消费者记录元数据详见消费者记录元数据。 |
例如:
@KafkaHandler(isDefault = true)
public void listenDefault(Object object, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
...
}
如果对象是字符串;这主题参数还会获得一个引用对象.
如果你需要默认记录的元数据,可以使用以下方法:
@KafkaHandler(isDefault = true)
void listen(Object in, @Header(KafkaHeaders.RECORD_METADATA) ConsumerRecordMetadata meta) {
String topic = meta.topic();
...
}
而且,这种方法效果也不会那么好。
这主题被解析为有效载荷.
@KafkaHandler(isDefault = true)
public void listenDefault(String payload, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
// payload.equals(topic) is True.
...
}
如果某些用例中默认方法需要离散自定义头部,请使用以下方法:
@KafkaHandler(isDefault = true)
void listenDefault(String payload, @Headers Map<String, Object> headers) {
Object myValue = headers.get("MyCustomHeader");
...
}