集装箱工厂
如同相关内容@KafkaListener注解一个ConcurrentKafkaListenerContainerFactory用于创建注释方法的容器。
从2.2版本开始,你可以用同一个工厂来创建任何ConcurrentMessageListenerContainer.
如果你想创建多个具有相似属性的容器,或者想使用外部配置的工厂,比如 Spring Boot 自动配置提供的,这可能很有用。
一旦容器创建,你可以进一步修改它的属性,其中许多属性是通过使用container.getContainerProperties().
以下示例配置为ConcurrentMessageListenerContainer:
@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {
ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}
以这种方式创建的容器不会被添加到端点注册表。
它们应被创建为@Bean定义以使其在应用上下文中被注册。 |
从2.3.4版本开始,你可以添加一个容器定制器在容器创建和配置完成后,再送往工厂进一步配置。
@Bean
public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
...
factory.setContainerCustomizer(container -> { /* customize the container */ });
return factory;
}
从3.1版本开始,也可以通过在KafkaListener注释中指定“容器PostProcessor”的豆名,对单个监听器进行类似的自定义。
@Bean
public ContainerPostProcessor<String, String, AbstractMessageListenerContainer<String, String>> customContainerPostProcessor() {
return container -> { /* customize the container */ };
}
...
@KafkaListener(..., containerPostProcessor="customContainerPostProcessor", ...)