5. 提示、技巧和示例
5.1. 手动分配所有分区
假设您总是需要读取所有分区中的所有记录(例如在使用压缩主题以加载分布式缓存时),手动分配分区并不使用Kafka的组管理会很有用。但当分区很多时,这就变得很繁琐,因为您需要列出所有分区。如果分区数量会随时间变化,这也是一个问题,因为每次分区数量变化时都需要重新编译您的应用程序。
以下是一个如何使用 SpEL 表达式的示例,以在应用程序启动时通过表达式的权力动态创建分区列表:
@KafkaListener(topicPartitions = @TopicPartition(topic = "compacted",
partitions = "#{@finder.partitions('compacted')}"),
partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0")))
public void listen(@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key, String payload) {
...
}
@Bean
public PartitionFinder finder(ConsumerFactory<String, String> consumerFactory) {
return new PartitionFinder(consumerFactory);
}
public static class PartitionFinder {
private final ConsumerFactory<String, String> consumerFactory;
public PartitionFinder(ConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}
public String[] partitions(String topic) {
try (Consumer<String, String> consumer = consumerFactory.createConsumer()) {
return consumer.partitionsFor(topic).stream()
.map(pi -> "" + pi.partition())
.toArray(String[]::new);
}
}
}
使用此功能与ConsumerConfig.AUTO_OFFSET_RESET_CONFIG=earliest配合,每次启动应用程序时都会加载所有记录。
您还应该将容器的AckMode设置为MANUAL,以防止容器为null消费者组提交偏移量。
然而,在版本2.5.5之后(如上所示),您可以为所有分区应用初始偏移量;有关更多信息,请参阅显式分区分配。
5.2. Kafka 事务与其他事务管理器的示例
以下Spring Boot应用程式是数据库与Kafka事务串接的范例。
监听容器启动Kafka事务,@Transactional注解启动DB事务。
首先提交DB事务;如果Kafka事务未能提交,则记录将被重新传送,因此DB更新应该具有幂等性。
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.executeInTransaction(t -> t.send("topic1", "test"));
}
@Bean
public DataSourceTransactionManager dstm(DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}
@Component
public static class Listener {
private final JdbcTemplate jdbcTemplate;
private final KafkaTemplate<String, String> kafkaTemplate;
public Listener(JdbcTemplate jdbcTemplate, KafkaTemplate<String, String> kafkaTemplate) {
this.jdbcTemplate = jdbcTemplate;
this.kafkaTemplate = kafkaTemplate;
}
@KafkaListener(id = "group1", topics = "topic1")
@Transactional("dstm")
public void listen1(String in) {
this.kafkaTemplate.send("topic2", in.toUpperCase());
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
}
@KafkaListener(id = "group2", topics = "topic2")
public void listen2(String in) {
System.out.println(in);
}
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("topic1").build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("topic2").build();
}
}
spring.datasource.url=jdbc:mysql://localhost/integration?serverTimezone=UTC
spring.datasource.username=root
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.properties.isolation.level=read_committed
spring.kafka.producer.transaction-id-prefix=tx-
#logging.level.org.springframework.transaction=trace
#logging.level.org.springframework.kafka.transaction=debug
#logging.level.org.springframework.jdbc=debug
create table mytable (data varchar(20));
生产者-only 事务,事务同步工作:
@Transactional("dstm")
public void someMethod(String in) {
this.kafkaTemplate.send("topic2", in.toUpperCase());
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
}
该 KafkaTemplate 将与其数据库事务同步,提交/回滚操作将在数据库之后执行。
如果希望先提交Kafka事务,仅在Kafka事务成功时再提交数据库事务,请使用嵌套的@Transactional方法:
@Transactional("dstm")
public void someMethod(String in) {
this.jdbcTemplate.execute("insert into mytable (data) values ('" + in + "')");
sendToKafka(in);
}
@Transactional("kafkaTransactionManager")
public void sendToKafka(String in) {
this.kafkaTemplate.send("topic2", in.toUpperCase());
}
5.3. 自定义 JsonSerializer 和 JsonDeserializer
The serializer and deserializer support a number of cusomizations using properties, see JSON for more information.
The kafka-clients code, not Spring, instantiates these objects, unless you inject them directly into the consumer and producer factories.
If you wish to configure the (de)serializer using properties, but wish to use, say, a custom ObjectMapper, simply create a subclass and pass the custom mapper into the super constructor. For example:
public class CustomJsonSerializer extends JsonSerializer<Object> {
public CustomJsonSerializer() {
super(customizedObjectMapper());
}
private static ObjectMapper customizedObjectMapper() {
ObjectMapper mapper = JacksonUtils.enhancedObjectMapper();
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
return mapper;
}
}