对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0spring-doc.cadn.net.cn

发送消息

本节介绍如何发送消息。spring-doc.cadn.net.cn

卡夫卡模板

本节介绍如何使用卡夫卡模板传递信息。spring-doc.cadn.net.cn

概述

卡夫卡模板封装生产器,并提供便捷的方法将数据发送到Kafka主题。 以下列表展示了相关方法卡夫卡模板:spring-doc.cadn.net.cn

CompletableFuture<SendResult<K, V>> sendDefault(V data);

CompletableFuture<SendResult<K, V>> sendDefault(K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, V data);

CompletableFuture<SendResult<K, V>> send(String topic, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

CompletableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

CompletableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

CompletableFuture<SendResult<K, V>> send(Message<?> message);

Map<MetricName, ? extends Metric> metrics();

List<PartitionInfo> partitionsFor(String topic);

<T> T execute(ProducerCallback<K, V, T> callback);

<T> T executeInTransaction(OperationsCallback<K, V, T> callback);

// Flush the producer.
void flush();

interface ProducerCallback<K, V, T> {

    T doInKafka(Producer<K, V> producer);

}

interface OperationsCallback<K, V, T> {

    T doInOperations(KafkaOperations<K, V> operations);

}

详情请参见Javadocspring-doc.cadn.net.cn

sendDefaultAPI 要求模板中必须提供默认主题。spring-doc.cadn.net.cn

API 接收时间戳作为参数,并将该时间戳存储在记录中。 用户提供的时间戳如何存储取决于卡夫卡主题中配置的时间戳类型。 如果主题配置为CREATE_TIME,用户指定的时间戳被记录(或未指定时生成)。 如果主题配置为LOG_APPEND_TIME忽略用户指定的时间戳,代理将本地代理时间相加。spring-doc.cadn.net.cn

指标分区用于方法在底层 上委派给相同的方法制作人. 这执行方法直接访问底层制作人.spring-doc.cadn.net.cn

要使用模板,你可以配置生产工厂,并在模板的构造函数中提供。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public Map<String, Object> producerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    // See https://kafka.apache.org/documentation/#producerconfigs for more properties
    return props;
}

@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
    return new KafkaTemplate<Integer, String>(producerFactory());
}

从2.5版本开始,你可以覆盖原厂的ProducerConfig属性用于创建来自同一工厂的不同生产者配置模板。spring-doc.cadn.net.cn

@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
    return new KafkaTemplate<>(pf);
}

@Bean
public KafkaTemplate<String, byte[]> bytesTemplate(ProducerFactory<String, byte[]> pf) {
    return new KafkaTemplate<>(pf,
            Collections.singletonMap(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class));
}

注意,一个类型的豆制作工厂<?, ?>(例如Spring Boot自动配置的那个)可以用不同的缩窄的通用类型来引用。spring-doc.cadn.net.cn

你也可以用标准配置模板<豆/>定义。spring-doc.cadn.net.cn

然后,使用模板时,你可以调用其中一种方法。spring-doc.cadn.net.cn

当你使用这些方法时,留言<?>参数,主题、分区、键和时间戳信息在包含以下项的消息头中提供:spring-doc.cadn.net.cn

消息有效载荷就是数据。spring-doc.cadn.net.cn

可选地,你可以配置卡夫卡模板其中制作人听众以便获得带有发送结果(成功或失败)的异步回调,而不是等待前途完成。 以下列表展示了制作人听众接口:spring-doc.cadn.net.cn

public interface ProducerListener<K, V> {

    default void onSuccess(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata) {
	}

    default void onError(ProducerRecord<K, V> producerRecord, RecordMetadata recordMetadata, Exception exception) {
	}

}

默认情况下,模板配置为LoggingProducerListener该系统会记录错误,发送成功时不做任何作。spring-doc.cadn.net.cn

为了方便,默认方法实现是为你只想实现其中一种方法时提供。spring-doc.cadn.net.cn

注意发送方法返回CompletableFuture<SendResult>. 你可以向监听者注册回调,以异步接收发送结果。 以下示例展示了如何实现:spring-doc.cadn.net.cn

CompletableFuture<SendResult<Integer, String>> future = template.send("myTopic", "something");
future.whenComplete((result, ex) -> {
    ...
});

发送结果具有两个性质,a制作人唱片记录元数据. 有关这些对象的信息,请参见 Kafka API 文档。spring-doc.cadn.net.cn

可投掷可以被铸造为KafkaProducerException;其制作人唱片财产包含失败记录。spring-doc.cadn.net.cn

如果你想屏蔽发送线程以等待结果,可以调用未来的get()方法;建议使用带有超时的方法。 如果你设置了linger.ms,你可能想要调用同花()在等待之前,或者为方便起见,模板中有一个构造函数,其中自动冲洗参数,使模板同花()每次发送。 只有在你设置了linger.ms生产者财产,想立即发送部分批次。spring-doc.cadn.net.cn

例子

本节展示了向卡夫卡发送消息的示例:spring-doc.cadn.net.cn

例子1。非阻塞(异步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    CompletableFuture<SendResult<String, String>> future = template.send(record);
    future.whenComplete((result, ex) -> {
        if (ex == null) {
            handleSuccess(data);
        }
        else {
            handleFailure(data, record, ex);
        }
    });
}
定格(同步)
public void sendToKafka(final MyOutputData data) {
    final ProducerRecord<String, String> record = createRecord(data);

    try {
        template.send(record).get(10, TimeUnit.SECONDS);
        handleSuccess(data);
    }
    catch (ExecutionException e) {
        handleFailure(data, record, e.getCause());
    }
    catch (TimeoutException | InterruptedException e) {
        handleFailure(data, record, e);
    }
}

注意执行异常KafkaProducerException其中制作人唱片财产。spring-doc.cadn.net.cn

RoutingKafkaTemplate

从2.5版本开始,你可以使用RoutingKafkaTemplate在运行时根据目的地选择生产者主题名字。spring-doc.cadn.net.cn

路由模板支持交易,执行,冲洗指标因为该主题不为这些作所知。

该模板需要一个映射java.util.regex.PatternProducerFactory<Object, Object>实例。 该映射应有序排列(例如 aLinkedHashMap)因为它是按顺序遍历的;你应该在一开始添加更具体的图案。spring-doc.cadn.net.cn

以下简单的 Spring Boot 应用程序示例,展示了如何使用同一模板发送到不同主题,每个主题使用不同的值串行器。spring-doc.cadn.net.cn

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Bean
    public RoutingKafkaTemplate routingTemplate(GenericApplicationContext context,
            ProducerFactory<Object, Object> pf) {

        // Clone the PF with a different Serializer, register with Spring for shutdown
        Map<String, Object> configs = new HashMap<>(pf.getConfigurationProperties());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
        DefaultKafkaProducerFactory<Object, Object> bytesPF = new DefaultKafkaProducerFactory<>(configs);
        context.registerBean("bytesPF", DefaultKafkaProducerFactory.class, () -> bytesPF);

        Map<Pattern, ProducerFactory<Object, Object>> map = new LinkedHashMap<>();
        map.put(Pattern.compile("two"), bytesPF);
        map.put(Pattern.compile(".+"), pf); // Default PF with StringSerializer
        return new RoutingKafkaTemplate(map);
    }

    @Bean
    public ApplicationRunner runner(RoutingKafkaTemplate routingTemplate) {
        return args -> {
            routingTemplate.send("one", "thing1");
            routingTemplate.send("two", "thing2".getBytes());
        };
    }

}

对应的@KafkaListener本例中 s 已在注释性质中展示。spring-doc.cadn.net.cn

关于另一种实现类似结果但具备将不同类型发送到同一主题的技术,请参见“分派串行器和解串”spring-doc.cadn.net.cn

默认KafkaProducerFactory

如同卡夫卡模板一个生产工厂用于创建生产者。spring-doc.cadn.net.cn

当不使用交易时,默认情况下,默认KafkaProducerFactory创建一个单例生产器,所有客户端都使用,正如卡夫卡制片人JavaDocs。 然而,如果你打电话同花()在模板上,这可能会导致使用同一生产者的其他线程延迟。 从2.3版本开始,默认KafkaProducerFactory拥有一个新房产制作者PerThread. 当设置为true工厂会为每个线程创建(并缓存)一个独立的生产者,以避免此问题。spring-doc.cadn.net.cn

什么时候制作者PerThreadtrue,用户代码必须调用closeThreadBoundProducer()当生产者不再需要时,工厂才会被使用。 这会物理关闭生产者并从中移除ThreadLocal. 叫重置()摧毁()不会清理这些生产者。

当创建默认KafkaProducerFactory, 键和/或值串行器类可以通过调用仅取属性映射的构造函数(参见卡夫卡模板),或串行器实例可以传递给默认KafkaProducerFactory构造子(此时所有制作人s 共享相同的实例)。 或者你也可以提供提供商<序列号生成器>s(从版本2.3开始)将用于获得单独的串行器每个实例制作人:spring-doc.cadn.net.cn

@Bean
public ProducerFactory<Integer, CustomValue> producerFactory() {
    return new DefaultKafkaProducerFactory<>(producerConfigs(), null, () -> new CustomValueSerializer());
}

@Bean
public KafkaTemplate<Integer, CustomValue> kafkaTemplate() {
    return new KafkaTemplate<Integer, CustomValue>(producerFactory());
}

从2.5.10版本开始,工厂创建后你可以更新生产者属性。 比如,如果你在凭证更改后需要更新 SSL 密钥/信任存储位置,这可能很有用。 这些变更不会影响现有的生产者实例;叫重置()关闭现有生产者,以便利用新物业创造新的生产者。spring-doc.cadn.net.cn

你不能把交易型生产者工厂改成非交易型,反之亦然。

现在新增了两种方法:spring-doc.cadn.net.cn

void updateConfigs(Map<String, Object> updates);

void removeConfig(String configKey);

从版本 2.8 开始,如果你作为对象(在构造函数中或通过 setter)提供序列化器,工厂会调用configure()用配置属性配置它们的方法。spring-doc.cadn.net.cn

回复Kafka模板

2.1.3版本引入了一个子类卡夫卡模板提供请求/回复语义。 该级被命名为回复Kafka模板并且还有两种额外的方法;以下展示了方法签名:spring-doc.cadn.net.cn

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record);

RequestReplyFuture<K, V, R> sendAndReceive(ProducerRecord<K, V> record,
    Duration replyTimeout);

结果是完成未来该异常被异步填充结果(或超时例外)。 结果还有sendFuture性质,即调用的结果KafkaTemplate.send(). 你可以利用这个未来来确定发送作的结果。spring-doc.cadn.net.cn

如果使用第一种方法,或者回复Timeout论元为模板的默认回复超时使用属性(默认为5秒)。spring-doc.cadn.net.cn

从2.8.8版本开始,模板新增了方法等待任务. 如果回复容器配置为auto.offset.reset=latest以避免在容器初始化前发送请求和回复。spring-doc.cadn.net.cn

使用手动分区分配(无组管理)时,等待时间必须长于容器的等待时间投票时间因为通知要在第一次轮询完成后才会发送。

以下 Spring Boot 应用程序展示了如何使用该功能的示例:spring-doc.cadn.net.cn

@SpringBootApplication
public class KRequestingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KRequestingApplication.class, args).close();
    }

    @Bean
    public ApplicationRunner runner(ReplyingKafkaTemplate<String, String, String> template) {
        return args -> {
            if (!template.waitForAssignment(Duration.ofSeconds(10))) {
                throw new IllegalStateException("Reply container did not initialize");
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("kRequests", "foo");
            RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record);
            SendResult<String, String> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
            System.out.println("Sent ok: " + sendResult.getRecordMetadata());
            ConsumerRecord<String, String> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);
            System.out.println("Return value: " + consumerRecord.value());
        };
    }

    @Bean
    public ReplyingKafkaTemplate<String, String, String> replyingTemplate(
            ProducerFactory<String, String> pf,
            ConcurrentMessageListenerContainer<String, String> repliesContainer) {

        return new ReplyingKafkaTemplate<>(pf, repliesContainer);
    }

    @Bean
    public ConcurrentMessageListenerContainer<String, String> repliesContainer(
            ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

        ConcurrentMessageListenerContainer<String, String> repliesContainer =
                containerFactory.createContainer("kReplies");
        repliesContainer.getContainerProperties().setGroupId("repliesGroup");
        repliesContainer.setAutoStartup(false);
        return repliesContainer;
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean
    public NewTopic kReplies() {
        return TopicBuilder.name("kReplies")
            .partitions(10)
            .replicas(2)
            .build();
    }

}

注意,我们可以使用 Boot 的自动配置容器工厂来创建回复容器。spring-doc.cadn.net.cn

如果回复中使用非平凡的反串化器,考虑使用ErrorHandlingDeserializer这会委派给你配置好的反串行器。 当配置如此时,请求回复未来将会非常出色地完成,你可以抓住执行异常,其中反序列化例外在其原因财产。spring-doc.cadn.net.cn

从2.6.7版本开始,除了检测反序列化例外s,模板将调用回复错误检查器功能,如果提供了。 如果返回异常,未来将被特殊完成。spring-doc.cadn.net.cn

这里有一个例子:spring-doc.cadn.net.cn

template.setReplyErrorChecker(record -> {
    Header error = record.headers().lastHeader("serverSentAnError");
    if (error != null) {
        return new MyException(new String(error.value()));
    }
    else {
        return null;
    }
});

...

RequestReplyFuture<Integer, String, String> future = template.sendAndReceive(record);
try {
    future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
    ConsumerRecord<Integer, String> consumerRecord = future.get(10, TimeUnit.SECONDS);
    ...
}
catch (InterruptedException e) {
    ...
}
catch (ExecutionException e) {
    if (e.getCause() instanceof MyException) {
        ...
    }
}
catch (TimeoutException e) {
    ...
}

模板设置一个头部(名为KafkaHeaders.CORRELATION_ID默认情况下),服务器端必须回应。spring-doc.cadn.net.cn

在这种情况下,如下@KafkaListener申请回复:spring-doc.cadn.net.cn

@SpringBootApplication
public class KReplyingApplication {

    public static void main(String[] args) {
        SpringApplication.run(KReplyingApplication.class, args);
    }

    @KafkaListener(id="server", topics = "kRequests")
    @SendTo // use default replyTo expression
    public String listen(String in) {
        System.out.println("Server received: " + in);
        return in.toUpperCase();
    }

    @Bean
    public NewTopic kRequests() {
        return TopicBuilder.name("kRequests")
            .partitions(10)
            .replicas(2)
            .build();
    }

    @Bean // not required if Jackson is on the classpath
    public MessagingMessageConverter simpleMapperConverter() {
        MessagingMessageConverter messagingMessageConverter = new MessagingMessageConverter();
        messagingMessageConverter.setHeaderMapper(new SimpleKafkaHeaderMapper());
        return messagingMessageConverter;
    }

}

@KafkaListener基础设施响应相关ID,并决定回复主题。spring-doc.cadn.net.cn

使用@SendTo关于发送回复的更多信息。 模板使用默认的报头KafKaHeaders.REPLY_TOPIC用来表明回复所涉及的主题。spring-doc.cadn.net.cn

从版本 2.2 开始,模板尝试检测配置后的回复容器中的回复主题或分区。 如果容器配置为监听单一主题或单一主题主题分区偏移,它用于设置回复头。 如果容器配置不同,用户必须设置回复头。 在这种情况下,一个信息日志消息是在初始化时写入的。 以下示例使用KafkaHeaders.REPLY_TOPIC:spring-doc.cadn.net.cn

record.headers().add(new RecordHeader(KafkaHeaders.REPLY_TOPIC, "kReplies".getBytes()));

当你用单一回复配置时主题分区偏移,只要每个实例在不同的分区监听,你可以用同一个回复主题来处理多个模板。 在配置单一回复主题时,每个实例必须使用不同的group.id. 在这种情况下,所有实例都会收到每个回复,但只有发送请求的实例能找到相关ID。 这对自动扩展可能有用,但会带来额外的网络流量开销,并且丢弃每个不需要的回复成本很小。 使用此设置时,建议您设置模板共享回复主题true,降低了对DEBUG的意外回复记录水平,而非默认的ERROR。spring-doc.cadn.net.cn

以下是配置回复容器以使用相同共享回复主题的示例:spring-doc.cadn.net.cn

@Bean
public ConcurrentMessageListenerContainer<String, String> replyContainer(
        ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) {

    ConcurrentMessageListenerContainer<String, String> container = containerFactory.createContainer("topic2");
    container.getContainerProperties().setGroupId(UUID.randomUUID().toString()); // unique
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // so the new group doesn't get old replies
    container.getContainerProperties().setKafkaConsumerProperties(props);
    return container;
}
如果你有多个客户端实例,但没有按照前一段讨论的配置,每个实例都需要一个专门的回复主题。 另一种方法是将KafkaHeaders.REPLY_PARTITION并为每个实例使用专用分区。 这页眉包含一个四字节的整数(大延序)。 服务器必须使用该头将回复路由到正确的分区(@KafkaListener这样做)。 但在这种情况下,回复容器必须不使用 Kafka 的组管理功能,必须配置为在固定分区监听(通过使用主题分区偏移在其容器属性构造者)。
DefaultKafkaHeaderMapper需要 Jackson 在类路径上(对于@KafkaListener). 如果无法使用,消息转换器没有报头映射器,因此必须配置消息信息转换器其中SimpleKafkaHeaderMapper,如前所述。

默认情况下,使用3个头部:spring-doc.cadn.net.cn

这些头部名称被以下@KafkaListener基础设施用于路由回复。spring-doc.cadn.net.cn

从版本 2.3 开始,你可以自定义头部名称——模板有三个属性关联头名称,回复主题标题名称replyPartitionHeaderName. 如果你的服务器不是 Spring 应用程序(或者不使用@KafkaListener).spring-doc.cadn.net.cn

相反,如果请求的应用程序不是 Spring 应用,并且从 3.0 版本开始将相关信息放在不同的头部,你可以自定义配置关联头名称在监听器容器工厂中,该首部会被回码回来。 此前,监听者需要重复自定义的相关头。

请求/回复留言<?>s

2.7 版本增加了回复Kafka模板发送和接收春季消息留言<?>抽象化:spring-doc.cadn.net.cn

RequestReplyMessageFuture<K, V> sendAndReceive(Message<?> message);

<P> RequestReplyTypedMessageFuture<K, V, P> sendAndReceive(Message<?> message,
        ParameterizedTypeReference<P> returnType);

这些都会使用模板的默认模式回复Timeout,也存在超载版本,可以在方法调用中超时。spring-doc.cadn.net.cn

如果消费者的反串化器或者模板的消息转换器可以通过配置或回复消息中的类型元数据,无需额外信息即可转换有效载荷。spring-doc.cadn.net.cn

如果你需要为返回类型提供类型信息,以帮助消息转换器,可以使用第二种方法。 这也允许同一模板接收不同类型,即使回复中没有类型元数据,例如服务器端不是 Spring 应用。 以下是后者的一个例子:spring-doc.cadn.net.cn

模板豆
@Bean
ReplyingKafkaTemplate<String, String, String> template(
        ProducerFactory<String, String> pf,
        ConcurrentKafkaListenerContainerFactory<String, String> factory) {

    ConcurrentMessageListenerContainer<String, String> replyContainer =
            factory.createContainer("replies");
    replyContainer.getContainerProperties().setGroupId("request.replies");
    ReplyingKafkaTemplate<String, String, String> template =
            new ReplyingKafkaTemplate<>(pf, replyContainer);
    template.setMessageConverter(new ByteArrayJsonMessageConverter());
    template.setDefaultTopic("requests");
    return template;
}
@Bean
fun template(
    pf: ProducerFactory<String?, String>?,
    factory: ConcurrentKafkaListenerContainerFactory<String?, String?>
): ReplyingKafkaTemplate<String?, String, String?> {
    val replyContainer = factory.createContainer("replies")
    replyContainer.containerProperties.groupId = "request.replies"
    val template = ReplyingKafkaTemplate(pf, replyContainer)
    template.messageConverter = ByteArrayJsonMessageConverter()
    template.defaultTopic = "requests"
    return template
}
使用模板
RequestReplyTypedMessageFuture<String, String, Thing> future1 =
        template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
                new ParameterizedTypeReference<Thing>() { });
log.info(future1.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
Thing thing = future1.get(10, TimeUnit.SECONDS).getPayload();
log.info(thing.toString());

RequestReplyTypedMessageFuture<String, String, List<Thing>> future2 =
        template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
                new ParameterizedTypeReference<List<Thing>>() { });
log.info(future2.getSendFuture().get(10, TimeUnit.SECONDS).getRecordMetadata().toString());
List<Thing> things = future2.get(10, TimeUnit.SECONDS).getPayload();
things.forEach(thing1 -> log.info(thing1.toString()));
val future1: RequestReplyTypedMessageFuture<String?, String?, Thing?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getAThing").build(),
        object : ParameterizedTypeReference<Thing?>() {})
log.info(future1?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata?.toString())
val thing = future1?.get(10, TimeUnit.SECONDS)?.payload
log.info(thing.toString())

val future2: RequestReplyTypedMessageFuture<String?, String?, List<Thing?>?>? =
    template.sendAndReceive(MessageBuilder.withPayload("getThings").build(),
        object : ParameterizedTypeReference<List<Thing?>?>() {})
log.info(future2?.sendFuture?.get(10, TimeUnit.SECONDS)?.recordMetadata.toString())
val things = future2?.get(10, TimeUnit.SECONDS)?.payload
things?.forEach(Consumer { thing1: Thing? -> log.info(thing1.toString()) })

回复类型消息<?>

@KafkaListener返回 a留言<?>在2.5之前的版本中,必须填充回复主题和相关ID头。 在这个例子中,我们使用请求中的回复主题头:spring-doc.cadn.net.cn

@KafkaListener(id = "requestor", topics = "request")
@SendTo
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.TOPIC, replyTo)
            .setHeader(KafkaHeaders.KEY, 42)
            .setHeader(KafkaHeaders.CORRELATION_ID, correlation)
            .build();
}

这也展示了如何在回复记录中设置键。spring-doc.cadn.net.cn

从版本 2.5 开始,框架会检测这些头部是否缺失,并填充主题——要么是从@SendTo值或入场KafkaHeaders.REPLY_TOPIC如果有,则为报头。 它也会回应即将到来的信号KafkaHeaders.CORRELATION_IDKafkaHeaders.REPLY_PARTITION,如果存在。spring-doc.cadn.net.cn

@KafkaListener(id = "requestor", topics = "request")
@SendTo  // default REPLY_TOPIC header
public Message<?> messageReturn(String in) {
    return MessageBuilder.withPayload(in.toUpperCase())
            .setHeader(KafkaHeaders.KEY, 42)
            .build();
}

回复中原始记录密钥

从3.3版本开始,来自接收请求的Kafka记录密钥(如果存在)将保留在回复记录中。 这只适用于单记录请求/回复场景。 当监听器是批量处理或返回类型为集合时,应用程序需通过将回复记录包裹为消息类型。spring-doc.cadn.net.cn

汇总多条回复

模板在回复Kafka模板仅限于单一请求/回复场景。 对于多个接收者同时回复一条消息的情况,你可以使用AggregatingReplyingKafkaTemplate. 这是散点-收集企业集成模式客户端的一个实现。spring-doc.cadn.net.cn

就像回复Kafka模板AggregatingReplyingKafkaTemplateConstructor 会拿一个生产工厂和一个监听器容器来接收回复;它还有第三个参数双谓词<列表<消费者记录<K, R>>,布尔>发布策略每次收到回复时都会参考该信息;当谓词返回时true,集合消费者记录s 用于完成前途发送与接收方法。spring-doc.cadn.net.cn

还有一个额外的属性returnPartialOnTimeout(默认 false)。 当此设定为true,而不是用Kafka回复超时例外,部分结果通常会补全未来(只要至少收到一条回复记录)。spring-doc.cadn.net.cn

从2.3.5版本开始,谓词也会在超时后调用(如果returnPartialOnTimeouttrue). 第一个论点是当前记录列表;第二种是true如果这次判罚是因为暂停。 谓词可以修改记录列表。spring-doc.cadn.net.cn

AggregatingReplyingKafkaTemplate<Integer, String, String> template =
        new AggregatingReplyingKafkaTemplate<>(producerFactory, container,
                        coll -> coll.size() == releaseSize);
...
RequestReplyFuture<Integer, String, Collection<ConsumerRecord<Integer, String>>> future =
        template.sendAndReceive(record);
future.getSendFuture().get(10, TimeUnit.SECONDS); // send ok
ConsumerRecord<Integer, Collection<ConsumerRecord<Integer, String>>> consumerRecord =
        future.get(30, TimeUnit.SECONDS);

注意返回类型是消费者记录其中一个值是消费者记录s. “外层”消费者记录不是“真实”记录,它由模板合成,作为实际请求回复记录的存储器。 当正常释放发生(释放策略返回为真)时,主题设置为汇总结果;如果returnPartialOnTimeout为真,且发生超时(且至少收到一条回复记录),话题设置为部分结果暂停后. 该模板为这些“主题”名称提供了恒定的静态变量:spring-doc.cadn.net.cn

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a normal release by the release strategy.
 */
public static final String AGGREGATED_RESULTS_TOPIC = "aggregatedResults";

/**
 * Pseudo topic name for the "outer" {@link ConsumerRecords} that has the aggregated
 * results in its value after a timeout.
 */
public static final String PARTIAL_RESULTS_AFTER_TIMEOUT_TOPIC = "partialResultsAfterTimeout";

真实消费者记录s 在收集包含回复来源的实际主题。spring-doc.cadn.net.cn

回复的监听器容器必须配置为AckMode.MANUALAckMode.MANUAL_IMMEDIATE;消费者财产enable.auto.commit一定是false(自2.3版本起为默认)。 为避免丢失消息,模板仅在未完成请求为零时提交偏移量,即发布策略释放最后一个未完成请求时。 在重新平衡后,可能会出现重复回复交付;机上请求将被忽略;当收到已发布回复的重复回复时,你可能会看到错误日志提示。
如果你使用一个ErrorHandlingDeserializer使用该聚合模板,框架不会自动检测反序列化例外s. 相反,记录(带有value)将完整返回,反序列化例外则在头部中返回。 建议应用程序调用 utility 方法ReplyingKafkaTemplate.checkDeserialization()判断是否发生反序列化异常的方法。 更多信息请参见其JavaDocs。 这回复错误检查器也不需要用于该聚合模板;你应该对回复中的每个部分进行检查。