|
对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
使用@SendTo
从2.0版本开始,如果你还注释了@KafkaListener其中@SendTo注释和方法调用返回结果,结果会被转发到由@SendTo.
这@SendTo价值可以有多种形式:
-
@SendTo(“某话题”)通往字面主题的路径。 -
@SendTo(“#{someExpression}”)通过在应用上下文初始化时对表达式进行一次评估确定的主题路径。 -
@SendTo(“!{某些表达}”)通过运行时对表达式进行评估确定的主题路径。 这#root评估对象具有三个性质:-
请求:进站消费者记录(或消费者唱片批处理监听器的对象)。 -
源:这org.springframework.messaging.Message<?>从请求. -
结果: 方法返回结果。
-
-
@SendTo(无性质):此处理为! {source.headers['kafka_replyTopic']}(自版本 2.1.3 起)
从2.1.11和2.2.1版本开始,属性占位符在内部解析@SendTo值。
表达式评估的结果必须是字符串代表主题名称。以下示例展示了各种使用方法@SendTo:
@KafkaListener(topics = "annotated21")
@SendTo("!{request.value()}") // runtime SpEL
public String replyingListener(String in) {
...
}
@KafkaListener(topics = "${some.property:annotated22}")
@SendTo("#{myBean.replyTopic}") // config time SpEL
public Collection<String> replyingBatchListener(List<String> in) {
...
}
@KafkaListener(topics = "annotated23", errorHandler = "replyErrorHandler")
@SendTo("annotated23reply") // static reply topic definition
public String replyingListenerWithErrorHandler(String in) {
...
}
...
@KafkaListener(topics = "annotated25")
@SendTo("annotated25reply1")
public class MultiListenerSendTo {
@KafkaHandler
public String foo(String in) {
...
}
@KafkaHandler
@SendTo("!{'annotated25reply2'}")
public String bar(@Payload(required = false) KafkaNull nul,
@Header(KafkaHeaders.RECEIVED_KEY) int key) {
...
}
}
为了支持@SendTo,监听器容器工厂必须配备卡夫卡模板(在其回复模板属性),用于发送回复。这应该是卡夫卡模板而且不是回复Kafka模板该模板用于客户端的请求/回复处理。使用 Spring Boot 时,它会自动将模板配置到工厂;在配置自己的工厂时,必须按照下面的示例设置。 |
从2.2版本开始,你可以添加一个回复标题Configurer到监听器容器工厂。通过查看以确定你希望在回复消息中设置哪些头部。以下示例展示了如何添加回复标题Configurer:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer((k, v) -> k.equals("cat"));
return factory;
}
如果你愿意,也可以添加更多头部。以下示例展示了如何实现:
@Bean
public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(cf());
factory.setReplyTemplate(template());
factory.setReplyHeadersConfigurer(new ReplyHeadersConfigurer() {
@Override
public boolean shouldCopy(String headerName, Object headerValue) {
return false;
}
@Override
public Map<String, Object> additionalHeaders() {
return Collections.singletonMap("qux", "fiz");
}
});
return factory;
}
当你使用@SendTo,你必须配置ConcurrentKafkaListenerContainerFactory其中卡夫卡模板在其回复模板属性以执行发送。Spring Boot 会自动插入其自动配置的模板(或如果存在单个实例则任意配置)。
除非你用请求/回复语义,否则只能用简单的发送(主题,值)采用方法,因此你可能想创建一个子类来生成分区或键。以下示例展示了如何实现: |
@Bean
public KafkaTemplate<String, String> myReplyingTemplate() {
return new KafkaTemplate<Integer, String>(producerFactory()) {
@Override
public CompletableFuture<SendResult<String, String>> send(String topic, String data) {
return super.send(topic, partitionForData(data), keyForData(data), data);
}
...
};
}
|
如果监听器方法返回
|
使用请求/回复语义时,发送方可以请求目标分区。
|
你可以注释
更多信息请参见处理例外。 |
如果监听器方法返回可迭代默认情况下,每个元素在发送值时都记录一个记录。从版本 2.3.5 开始,设置分割可计算在@KafkaListener自false整个结果将以单一值的形式发送制作人唱片. 这需要在回复模板的生产者配置中加入合适的串行化器。然而,如果回复是可迭代<消息吗<?>>该属性被忽略,每个消息单独发送。 |