主题配置
如果你定义一个卡夫卡管理员在你的应用上下文中,它可以自动向经纪人添加主题。
为此,你可以添加一个新主题 @Bean针对每个主题,应用上下文。
2.3 版本引入了新类别主题构建器以便更方便地制作此类Beans。
以下示例展示了如何实现:
-
Java
-
Kotlin
@Bean
public KafkaAdmin admin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
return new KafkaAdmin(configs);
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
public NewTopic topic3() {
return TopicBuilder.name("thing3")
.assignReplicas(0, List.of(0, 1))
.assignReplicas(1, List.of(1, 2))
.assignReplicas(2, List.of(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))
@Bean
fun topic1() =
TopicBuilder.name("thing1")
.partitions(10)
.replicas(3)
.compact()
.build()
@Bean
fun topic2() =
TopicBuilder.name("thing2")
.partitions(10)
.replicas(3)
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
@Bean
fun topic3() =
TopicBuilder.name("thing3")
.assignReplicas(0, Arrays.asList(0, 1))
.assignReplicas(1, Arrays.asList(1, 2))
.assignReplicas(2, Arrays.asList(2, 0))
.config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
.build()
从2.6版本开始,你可以省略分区()和/或复制品()经纪人违约将应用于这些房产。
代理版本至少为2.4.0才能支持此功能——参见KIP-464。
-
Java
-
Kotlin
@Bean
public NewTopic topic4() {
return TopicBuilder.name("defaultBoth")
.build();
}
@Bean
public NewTopic topic5() {
return TopicBuilder.name("defaultPart")
.replicas(1)
.build();
}
@Bean
public NewTopic topic6() {
return TopicBuilder.name("defaultRepl")
.partitions(3)
.build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()
@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()
@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()
从2.7版本开始,你可以声明多个新主题单一卡夫卡管理员。新话题豆子定义:
-
Java
-
Kotlin
@Bean
public KafkaAdmin.NewTopics topics456() {
return new NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
TopicBuilder.name("defaultBoth")
.build(),
TopicBuilder.name("defaultPart")
.replicas(1)
.build(),
TopicBuilder.name("defaultRepl")
.partitions(3)
.build()
)
使用Spring Boot时,一个卡夫卡管理员BEAN 是自动注册的,所以你只需要新主题(和/或新话题) @Beans. |
默认情况下,如果代理不可用,消息会被记录,但上下文会继续加载。
你可以程序化调用管理员的initialize()方法可以稍后再试。
如果你希望此状况被视为致命,请设置管理员的fatalIfBrokerNotAvailable属性到true.
上下文因此无法初始化。
如果代理支持(1.0.0或更高版本),管理员会增加分区数量,且发现已有主题的分区数少于NewTopic.numPartitions. |
从2.7版本开始,卡夫卡管理员提供在运行时创建和检查主题的方法。
从4.0版本开始,它还提供了删除主题的方法。
-
创建或修改主题 -
描述主题 -
deleteTopics(自4.0起)
对于更高级的管理功能,您可以使用AdminClient径直。
以下示例展示了如何实现:
@Autowired
private KafkaAdmin admin;
...
AdminClient client = AdminClient.create(admin.getConfigurationProperties());
...
client.close();
从2.9.10、3.0.9版本开始,你可以提供谓词<NewTopic>可以用来确定新主题豆子应考虑用于创造或改造。
例如,如果你有多条,这非常有用卡夫卡管理员实例指向不同的集群,你希望选择每个管理员应创建或修改的主题。
admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));