该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0spring-doc.cadn.net.cn

交易

本节介绍了 Spring for Apache Kafka 如何支持事务。spring-doc.cadn.net.cn

概述

0.11.0.0 客户端库增加了对事务的支持。Spring for Apache Kafka 通过以下方式增加了支持:spring-doc.cadn.net.cn

通过提供默认KafkaProducerFactory其中transactionId前缀. 在这种情况下,与其管理单一共享制作人工厂维护着一批交易生产者。 当用户呼叫时接近()在生产者中,它会被返回缓存进行重复利用,而不是被关闭。 这transactional.id每个生产者的属性为transactionId前缀 + n哪里n开头为0并且会根据每新增一个生产者而增加。 在之前版本的 Apache Kafka Spring 中,transactional.id为由带有记录监听器的监听器容器启动的交易,以支持围栏僵尸,但现在已不再必要,且EOSMode.V2从3.0开始,是唯一的选择。 对于运行多个实例的应用程序,transactionId前缀每个实例必须是唯一的。spring-doc.cadn.net.cn

使用 Spring Boot 时,只需设置spring.kafka.producer.transaction-id-prefix属性 - Spring Boot 会自动配置KafkaTransactionManager然后接线到监听器容器里。spring-doc.cadn.net.cn

从2.5.8版本开始,你现在可以配置最大年龄生产工厂的财产。 这在使用可能为经纪人闲置的交易型生产者时非常有用transactional.id.expiration.ms. 与当前的卡夫卡客户端,这可能导致制片人受限例外没有重新平衡。 通过设置最大年龄减少到transactional.id.expiration.ms工厂会刷新生产者,如果超过其最大寿命。

KafkaTransactionManager

KafkaTransactionManager是 Spring Framework 的实现PlatformTransactionManager. 其构建器中附有生产工厂的引用。 如果你提供定制生产工厂,它必须支持交易。 看ProducerFactory.transactionCapable().spring-doc.cadn.net.cn

你可以使用KafkaTransactionManager支持正常的 Spring 事务 (@Transactional,交易模板,以及其他。 如果交易是活跃的,任何卡夫卡模板在事务范围内执行的作使用事务的制作人. 管理者根据成功或失败来提交或回滚该事务。 你必须配置卡夫卡模板用同样的生产工厂作为交易管理器。spring-doc.cadn.net.cn

事务同步

本节涉及仅生产者事务(未由监听器容器发起的交易);有关容器开始交易时串联交易的信息,请参见“使用消费者发起的交易”。spring-doc.cadn.net.cn

如果你想把记录发送到 Kafka 并进行一些数据库更新,你可以用普通的 Spring 事务管理,比如DataSourceTransactionManager.spring-doc.cadn.net.cn

@Transactional
public void process(List<Thing> things) {
    things.forEach(thing -> this.kafkaTemplate.send("topic", thing));
    updateDb(things);
}

拦截机用于@Transactional注释启动事务,且卡夫卡模板将与该事务管理器同步交易;每个发送者都会参与该交易。 当方法退出时,数据库事务将提交,随后是 Kafka 事务。 如果你希望提交顺序相反(先卡夫卡),可以使用嵌套@Transactional方法,外层方法配置为使用DataSourceTransactionManager,以及配置为使用KafkaTransactionManager.spring-doc.cadn.net.cn

参见“与其他事务管理器的 Kafka 事务示例”,了解一个应用程序在 Kafka 优先或数据库优先配置下同步 JDBC 和 Kafka 事务的示例。spring-doc.cadn.net.cn

从2.5.17、2.6.12、2.7.9和2.8.0版本开始,如果主事务提交后同步事务提交失败,异常将抛给调用者。 此前,这些信息被默许(在调试级别记录)。 申请应采取补救措施(如有必要),以补偿已承诺的主要交易。

使用消费者发起的交易

ChainedKafkaTransactionManager自2.7版本起已被弃用;参见 JavaDocs 的超级类ChainedTransactionManager更多信息请见。 相反,使用一个KafkaTransactionManager在容器中启动Kafka事务,并对监听器方法进行注释@Transactional开始另一笔交易。spring-doc.cadn.net.cn

请参阅“与其他事务管理器的 Kafka 交易示例”,这是一个串联 JDBC 和 Kafka 事务的示例应用。spring-doc.cadn.net.cn

非阻塞重试不能与容器交易合并。 当监听器代码抛出异常时,容器事务提交成功,记录被发送到可重试主题。

卡夫卡模板本地交易

你可以使用卡夫卡模板在本地事务中执行一系列作。 以下示例展示了如何实现:spring-doc.cadn.net.cn

boolean result = template.executeInTransaction(t -> {
    t.sendDefault("thing1", "thing2");
    t.sendDefault("cat", "hat");
    return true;
});

回调中的参数就是模板本身 (). 如果回调正常退出,则该事务被提交。 如果抛出异常,交易会被回滚。spring-doc.cadn.net.cn

如果存在KafkaTransactionManager(或同步)事务正在进行中,但不被使用。 取而代之的是使用新的“嵌套”事务。

TransactionIdPrefix

EOSMode.V2(又名试用版),这是唯一支持的模式,现在不再需要使用相同的模式transactional.id即使是消费者发起的交易;事实上,每个实例上必须与生产者发起的交易一样唯一。 该属性在每个应用实例上必须有不同的值。spring-doc.cadn.net.cn

TransactionIdSuffix 已修复

自3.2版本起,新的交易Id后缀策略引入接口以管理transactional.id后缀。 默认实现为DefaultTransactionIdSuffixStrategy设置时maxCache大于 0 可以重复使用transactional.id在特定范围内,否则通过递增计数器即时生成后缀。 当请求交易生产者时transactional.id全部使用,投掷a无生产者可用例外. 用户随后可以使用重试模板配置为重新尝试该异常,并设置相应的退回。spring-doc.cadn.net.cn

public static class Config {

    @Bean
    public ProducerFactory<String, String> myProducerFactory() {
        Map<String, Object> configs = producerConfigs();
        configs.put(ProducerConfig.CLIENT_ID_CONFIG, "myClientId");
        ...
        DefaultKafkaProducerFactory<String, String> pf = new DefaultKafkaProducerFactory<>(configs);
        ...
        TransactionIdSuffixStrategy ss = new DefaultTransactionIdSuffixStrategy(5);
        pf.setTransactionIdSuffixStrategy(ss);
        return pf;
    }

}

设置时maxCache到5,transactional.id我的.txid。+'{0-4}'。spring-doc.cadn.net.cn

使用KafkaTransactionManager其中ConcurrentMessageListenerContainer以及赋能maxCache,必须设maxCache到一个大于或等于 的值并发. 如果MessageListenerContainer无法获得transactional.id后缀,它会抛出无生产者可用例外. 在使用 嵌套事务时ConcurrentMessageListenerContainer需要调整maxCache设置以应对嵌套事务数量的增加。

卡夫卡模板交易式与非事务式出版

通常,当卡夫卡模板是交易型(配置为具备交易能力的生产者工厂),则需要交易。 交易可以由交易模板一个@Transactional方法,调用执行交易,或通过监听器容器,配置为KafkaTransactionManager. 任何试图在交易范围之外使用该模板的行为,都会导致模板抛出非法州例外. 从2.4.3版本开始,你可以设置模板允许非交易属性到true. 在这种情况下,模板将允许作无需事务即可运行,方法是调用生产工厂createNonTransactionalProducer()方法;生产者将像往常一样被缓存或线程绑定以便重用。 看默认KafkaProducerFactory.spring-doc.cadn.net.cn

与批处理监听器的事务

当监听器在交易使用时失败,AfterRollback处理器在回滚发生后被调用以采取某些作。 使用默认时AfterRollback处理器在唱片听众中,会进行寻曲,以便将失败的唱片重新交付。 然而,使用批处理监听器时,整个批次会被重新交付,因为框架不知道批次中哪个记录失败了。 更多信息请参见后回滚处理器spring-doc.cadn.net.cn

使用批处理监听器时,2.4.2 版本引入了处理批处理失败的替代机制:批处理记录适配器. 当集装箱工厂批处理听器设置为 true,配置为批处理记录适配器,听者一次调用一条记录。 这允许批处理错误,同时根据异常类型停止整个批处理。 默认批处理记录适配器提供的,可以配置为标准消费者记录恢复器例如:死信出版恢复者. 以下测试用例配置摘要展示了如何使用此功能:spring-doc.cadn.net.cn

public static class TestListener {

    final List<String> values = new ArrayList<>();

    @KafkaListener(id = "batchRecordAdapter", topics = "test")
    public void listen(String data) {
        values.add(data);
        if ("bar".equals(data)) {
            throw new RuntimeException("reject partial");
        }
    }

}

@Configuration
@EnableKafka
public static class Config {

    ConsumerRecord<?, ?> failed;

    @Bean
    public TestListener test() {
        return new TestListener();
    }

    @Bean
    public ConsumerFactory<?, ?> consumerFactory() {
        return mock(ConsumerFactory.class);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory());
        factory.setBatchListener(true);
        factory.setBatchToRecordAdapter(new DefaultBatchToRecordAdapter<>((record, ex) ->  {
            this.failed = record;
        }));
        return factory;
    }

}