主题配置

如果你定义一个卡夫卡管理员在你的应用上下文中,它可以自动向经纪人添加主题。 为此,你可以添加一个新主题 @Bean针对每个主题,应用上下文。 2.3 版本引入了新类别主题构建器以便更方便地制作此类Beans。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@Bean
public KafkaAdmin admin() {
    Map<String, Object> configs = new HashMap<>();
    configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    return new KafkaAdmin(configs);
}

@Bean
public NewTopic topic1() {
    return TopicBuilder.name("thing1")
            .partitions(10)
            .replicas(3)
            .compact()
            .build();
}

@Bean
public NewTopic topic2() {
    return TopicBuilder.name("thing2")
            .partitions(10)
            .replicas(3)
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}

@Bean
public NewTopic topic3() {
    return TopicBuilder.name("thing3")
            .assignReplicas(0, List.of(0, 1))
            .assignReplicas(1, List.of(1, 2))
            .assignReplicas(2, List.of(2, 0))
            .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
            .build();
}
@Bean
fun admin() = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092"))

@Bean
fun topic1() =
    TopicBuilder.name("thing1")
        .partitions(10)
        .replicas(3)
        .compact()
        .build()

@Bean
fun topic2() =
    TopicBuilder.name("thing2")
        .partitions(10)
        .replicas(3)
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

@Bean
fun topic3() =
    TopicBuilder.name("thing3")
        .assignReplicas(0, Arrays.asList(0, 1))
        .assignReplicas(1, Arrays.asList(1, 2))
        .assignReplicas(2, Arrays.asList(2, 0))
        .config(TopicConfig.COMPRESSION_TYPE_CONFIG, "zstd")
        .build()

从2.6版本开始,你可以省略分区()和/或复制品()经纪人违约将应用于这些房产。 代理版本至少为2.4.0才能支持此功能——参见KIP-464spring-doc.cadn.net.cn

@Bean
public NewTopic topic4() {
    return TopicBuilder.name("defaultBoth")
            .build();
}

@Bean
public NewTopic topic5() {
    return TopicBuilder.name("defaultPart")
            .replicas(1)
            .build();
}

@Bean
public NewTopic topic6() {
    return TopicBuilder.name("defaultRepl")
            .partitions(3)
            .build();
}
@Bean
fun topic4() = TopicBuilder.name("defaultBoth").build()

@Bean
fun topic5() = TopicBuilder.name("defaultPart").replicas(1).build()

@Bean
fun topic6() = TopicBuilder.name("defaultRepl").partitions(3).build()

从2.7版本开始,你可以声明多个新主题单一卡夫卡管理员。新话题豆子定义:spring-doc.cadn.net.cn

@Bean
public KafkaAdmin.NewTopics topics456() {
    return new NewTopics(
            TopicBuilder.name("defaultBoth")
                .build(),
            TopicBuilder.name("defaultPart")
                .replicas(1)
                .build(),
            TopicBuilder.name("defaultRepl")
                .partitions(3)
                .build());
}
@Bean
fun topics456() = KafkaAdmin.NewTopics(
    TopicBuilder.name("defaultBoth")
        .build(),
    TopicBuilder.name("defaultPart")
        .replicas(1)
        .build(),
    TopicBuilder.name("defaultRepl")
        .partitions(3)
        .build()
)
使用Spring Boot时,一个卡夫卡管理员BEAN 是自动注册的,所以你只需要新主题(和/或新话题) @Beans.

默认情况下,如果代理不可用,消息会被记录,但上下文会继续加载。 你可以程序化调用管理员的initialize()方法可以稍后再试。 如果你希望此状况被视为致命,请设置管理员的fatalIfBrokerNotAvailable属性到true. 上下文因此无法初始化。spring-doc.cadn.net.cn

如果代理支持(1.0.0或更高版本),管理员会增加分区数量,且发现已有主题的分区数少于NewTopic.numPartitions.

从2.7版本开始,卡夫卡管理员提供在运行时创建和检查主题的方法。 从4.0版本开始,它还提供了删除主题的方法。spring-doc.cadn.net.cn

对于更高级的管理功能,您可以使用AdminClient径直。 以下示例展示了如何实现:spring-doc.cadn.net.cn

@Autowired
private KafkaAdmin admin;

...

    AdminClient client = AdminClient.create(admin.getConfigurationProperties());
    ...
    client.close();

从2.9.10、3.0.9版本开始,你可以提供谓词<NewTopic>可以用来确定新主题豆子应考虑用于创造或改造。 例如,如果你有多条,这非常有用卡夫卡管理员实例指向不同的集群,你希望选择每个管理员应创建或修改的主题。spring-doc.cadn.net.cn

admin.setCreateOrModifyTopic(nt -> !nt.name().equals("dontCreateThisOne"));