|
对于最新稳定版本,请使用Spring for Apache Kafka 4.0.4! |
主题命名
重试主题和死信主题通过在主主题名称后添加提供的或默认值,再附加该主题的延迟或索引来命名。
示例:
"my-topic" → "my-topic-retry-0", "my-topic-retry-1",…,"my-topic-dlt"
"my-other-topic" → "my-topic-我的重试后缀-1000", "my-topic-我的重试后缀-2000", …, "my-topic-我的DLT后缀"
默认行为是为每个重试尝试创建单独的重试主题,并附加索引值:retry-0, retry-1, …, retry-n。
因此,默认情况下,重试主题的数量是配置的maxAttempts减去1。 |
You can 配置后缀, 选择是否追加 尝试索引或延迟, 使用 固定退避时的单个重试主题, 并在使用指数退避时为具有 maxInterval 的尝试使用 单个重试主题.
重试主题和 DLT 后缀
您可以指定将由重试和 DLT 主题使用的后缀。
@RetryableTopic(retryTopicSuffix = "-my-retry-suffix", dltTopicSuffix = "-my-dlt-suffix")
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyOtherPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.retryTopicSuffix("-my-retry-suffix")
.dltTopicSuffix("-my-dlt-suffix")
.create(template);
}
| 默认后缀为"-retry" 和 "-dlt",分别用于重试主题和死信队列。 |
追加主题索引或延迟
你可以将主题的索引或延迟值附加在后缀之后。
@RetryableTopic(topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.suffixTopicsWithIndexValues()
.create(template);
}
| 默认行为是对延迟值进行后缀处理,除了具有多个主题的固定延迟配置,在这种情况下,会使用主题的索引对主题进行后缀处理。 |
单个固定延迟重试主题
如果你使用固定的延迟策略,如FixedBackOffPolicy或NoBackOffPolicy,你可以使用一个单一的主题来实现非阻塞重试。
该主题将加上提供的或默认后缀,且不会附加索引或延迟值。
之前的 FixedDelayStrategy 已不再推荐使用,可以替换为 SameIntervalTopicReuseStrategy。 |
@RetryableTopic(backoff = @Backoff(2_000), fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.fixedBackoff(3_000)
.maxAttempts(5)
.useSingleTopicForFixedDelays()
.create(template);
}
| 默认行为是为每次尝试创建单独的重试主题,并在其后面附加其索引值:retry-0,retry-1,... |
单一主题用于maxInterval指数延迟
如果你使用指数退避策略(0),可以使用一个重试主题来实现对那些延迟为配置的1的尝试的非阻塞重试。
这个以"final"结尾的重试主题将附加提供的或默认后缀,并会附加索引或maxInterval值。
通过选择使用延迟为maxInterval的单个主题进行重试,可能会更可行地配置指数重试策略,该策略会在较长时间内持续重试,因为在这种方法中不需要大量的主题。 |
The default behavior is to work with the number of retry topics equal to the configured maxAttempts minus 1 and, when using exponential backoff, the retry topics are suffixed with the delay values, with the last retry topic (corresponding to the maxInterval delay) being suffixed with an additional index.
例如,当使用 initialInterval=1_000、multiplier=2 和 maxInterval=16_000 配置指数退避时,为了在1小时内持续尝试,需要将 maxAttempts 配置为229,且默认需要的重试主题为:
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000-0
-
-retry-16000-1
-
-retry-16000-2
-
…
-
-retry-16000-224
当使用重用相同的间隔来重新使用的重试主题的策略时,在上述相同配置中所需的重试主题为:
-
-retry-1000
-
-retry-2000
-
-retry-4000
-
-retry-8000
-
-retry-16000
这将在未来的版本中默认使用。
@RetryableTopic(attempts = 230,
backoff = @Backoff(delay = 1_000, multiplier = 2, maxDelay = 16_000),
sameIntervalTopicReuseStrategy = SameIntervalTopicReuseStrategy.SINGLE_TOPIC)
@KafkaListener(topics = "my-annotated-topic")
public void processMessage(MyPojo message) {
// ... message processing
}
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, MyPojo> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.exponentialBackoff(1_000, 2, 16_000)
.maxAttempts(230)
.useSingleTopicForSameIntervals()
.create(template);
}
自定义命名策略
更复杂的命名策略可以通过注册一个实现 RetryTopicNamesProviderFactory 的 bean 来完成。
默认实现是 SuffixingRetryTopicNamesProviderFactory,可以通过以下方式注册不同的实现:
@Override
protected RetryTopicComponentFactory createComponentFactory() {
return new RetryTopicComponentFactory() {
@Override
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
return new CustomRetryTopicNamesProviderFactory();
}
};
}
作为一个示例,以下实现除标准后缀外,还为 retry/dlt 主题名称添加前缀:
public class CustomRetryTopicNamesProviderFactory implements RetryTopicNamesProviderFactory {
@Override
public RetryTopicNamesProvider createRetryTopicNamesProvider(
DestinationTopic.Properties properties) {
if (properties.isMainEndpoint()) {
return new SuffixingRetryTopicNamesProvider(properties);
}
else {
return new SuffixingRetryTopicNamesProvider(properties) {
@Override
public String getTopicName(String topic) {
return "my-prefix-" + super.getTopicName(topic);
}
};
}
}
}