|
该版本仍在开发中,尚未被视为稳定。对于最新稳定版本,请使用 Spring for Apache Kafka 4.0.0! |
消息头
0.11.0.0客户端引入了对消息头部的支持。
从2.0版本起,Apache Kafka的Spring现在支持将这些头部映射到和映射到两个春季消息 消息头.
之前的版本映射消费者记录和制作人唱片到春季消息留言<?>,其中价值性质映射到 和有效载荷以及其他性质(主题,分区,依此类推)被映射到头部。
情况依然如此,但现在可以映射额外的(任意)头部。 |
Apache Kafka 头部有一个简单的 API,如下接口定义所示:
public interface Header {
String key();
byte[] value();
}
这KafkaHeaderMapper提供了策略用于映射Kafka之间的头部条目头和消息头.
其接口定义如下:
public interface KafkaHeaderMapper {
void fromHeaders(MessageHeaders headers, Headers target);
void toHeaders(Headers source, Map<String, Object> target);
}
这SimpleKafkaHeaderMapper原始头部映射为字节[],并配有转换为的配置选项字符串值。
这JsonKafkaHeaderMapper将密钥映射到消息头为支持发件消息的丰富头部类型,执行了JSON转换。
A "特殊“ 头部(带有spring_json_header_types)包含一个 的 JSON 映射<key>:<type>.
该头部用于入站端,以适当地将每个头值转换为原始类型。
进站一侧,全是卡夫卡页眉实例映射到消息头.
在出站端,默认情况下,所有消息头映射,除了身份证,时间戳,以及映射到 的报头消费者记录性能。
你可以通过向映射器提供模式来指定哪些头部要映射到外发消息。 以下列表展示了若干示例映射:
public JsonKafkaHeaderMapper() { (1)
...
}
public JsonKafkaHeaderMapper(ObjectMapper objectMapper) { (2)
...
}
public JsonKafkaHeaderMapper(String... patterns) { (3)
...
}
public JsonKafkaHeaderMapper(ObjectMapper objectMapper, String... patterns) { (4)
...
}
| 1 | 使用默认的Jackson音对象映射器并且映射大多数头部,如前所述。 |
| 2 | 使用提供的Jackson对象映射器并且映射大多数头部,如前所述。 |
| 3 | 使用默认的Jackson音对象映射器并根据提供的模式映射页眉。 |
| 4 | 使用提供的Jackson对象映射器并根据提供的模式映射页眉。 |
模式相当简单,可以包含前置万用符()、后面的万用符,或两者兼有(例如,**。猫。*).
你可以用引导来抵消模式!.
第一个匹配头部名称(无论正负)的模式获胜。
当你提供自己的图样时,我们建议包含!id和!时间戳,因为这些头部在入站端是只读的。
默认情况下,映射器只反序列化java.lang和java.util.
你可以通过添加可信包来信任其他(或全部)包addTrustedPackages方法。
如果你收到来自不受信任来源的消息,可能只想添加你信任的包裹。
要信任所有包裹,你可以使用mapper.addTrustedPackages(“*”). |
映射字符串原始格式的头部值在与不了解映射器JSON格式的系统通信时非常有用。 |
从2.2.5版本开始,你可以指定某些字符串值的头部不应用JSON映射,而是映射到/从原始文件字节[].
这AbstractKafkaHeaderMapper拥有新的属性;mapAllStringsOut当设置为 true 时,所有字符串值的头都会被转换为字节[]使用字符集属性(默认)UTF-8).
此外,还有一个属性rawMappedHeaders,是 的映射头部名称:布尔;如果映射包含一个头部名称,且头部包含字符串值,它将被映射为原始文件字节[]使用字符集。
该映射也用于映射原始输入字节[]头部 到字符串使用字元集当且仅当映射值中的布尔值为true.
如果布尔值为false,或者首部名称不在带有true值,接收的头部被简单映射为原始未映射的头部。
以下测试用例展示了这一机制。
@Test
public void testSpecificStringConvert() {
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
Map<String, Boolean> rawMappedHeaders = new HashMap<>();
rawMappedHeaders.put("thisOnesAString", true);
rawMappedHeaders.put("thisOnesBytes", false);
mapper.setRawMappedHeaders(rawMappedHeaders);
Map<String, Object> headersMap = new HashMap<>();
headersMap.put("thisOnesAString", "thing1");
headersMap.put("thisOnesBytes", "thing2");
headersMap.put("alwaysRaw", "thing3".getBytes());
MessageHeaders headers = new MessageHeaders(headersMap);
Headers target = new RecordHeaders();
mapper.fromHeaders(headers, target);
assertThat(target).containsExactlyInAnyOrder(
new RecordHeader("thisOnesAString", "thing1".getBytes()),
new RecordHeader("thisOnesBytes", "thing2".getBytes()),
new RecordHeader("alwaysRaw", "thing3".getBytes()));
headersMap.clear();
mapper.toHeaders(target, headersMap);
assertThat(headersMap).contains(
entry("thisOnesAString", "thing1"),
entry("thisOnesBytes", "thing2".getBytes()),
entry("alwaysRaw", "thing3".getBytes()));
}
默认情况下,两个头部映射器都映射所有入站头。 从2.8.8版本开始,这些模式也可以应用于入站映射。 要创建入站映射器的映射器,请使用相应映射器上的静态方法之一:
public static JsonKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
public static JsonKafkaHeaderMapper forInboundOnlyWithMatchers(ObjectMapper objectMapper, String... patterns) {
}
public static SimpleKafkaHeaderMapper forInboundOnlyWithMatchers(String... patterns) {
}
例如:
JsonKafkaHeaderMapper inboundMapper = JsonKafkaHeaderMapper.forInboundOnlyWithMatchers("!abc*", "*");
这将排除所有以ABC并包括所有其他。
默认情况下,JsonKafkaHeaderMapper用于消息信息转换器和批处理消息传递转换器只要Jackson走在阶级道路上。
使用批量转换器,转换后的头部可以在KafkaHeaders.BATCH_CONVERTED_HEADERS作为List<Map<String, Object>>其中列表中某一位置的映射对应于有效载荷中的数据位置。
如果没有转换器(要么是因为没有Jackson,要么是明确设置为零),消费者记录中的头部在KafkaHeaders.NATIVE_HEADERS页眉。 这个头部是头对象(或列表<头>在批处理转换器的情况下,列表中的位置对应于有效载荷中的数据位置。
某些类型不适合JSON序列化,简单toString()对于这些类型,序列化可能更为受青睐。 这JsonKafkaHeaderMapper有一个方法称为addToStringClasses()这样你就可以提供应当这样处理的类名称,用于出站映射。在入站映射中,它们被映射为字符串. 默认情况下,只有org.springframework.util.MimeType和org.springframework.http.MediaType以这种方式映射。 |
从版本 2.3 开始,字符串值头部的处理变得更为简化。这类头部默认不再使用 JSON 编码(即没有包围)"..."添加)。类型仍被添加到JSON_TYPES头部,以便接收系统能够转换回字符串(来自字节[]). 映射器可以处理(解码)旧版本产生的头部(检查前导词);通过这种方式,使用 2.3 版本的应用程序可以调用旧版本的记录。" |
为了与早期版本兼容,设encodeStrings自true如果使用2.3版本产生的记录可能会被使用更早版本的应用程序消耗。当所有应用程序都使用2.3或更高版本时,你可以将属性保持为默认值false. |
@Bean
MessagingMessageConverter converter() {
MessagingMessageConverter converter = new MessagingMessageConverter();
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
mapper.setEncodeStrings(true);
converter.setHeaderMapper(mapper);
return converter;
}
如果使用 Spring Boot,它会自动将这个转换器 bean 配置成自动配置卡夫卡模板; 否则你应该把这个转换器添加到模板里。
支持多值头部映射
从 4.0 版本开始,支持多值头部映射,即同一逻辑头键在 Kafka 记录中出现多次。
默认情况下,头部映射器不会创建多个同名的Kafka头。相反,当它遇到集合值(例如,列表<字节[]>),它将整个集合序列化为一个 Kafka 头部,其值为 JSON 数组。
-
制作人方面:
JsonKafkaHeaderMapper写入 JSON 字节,而SimpleKafkaHeaderMapper忽略它。 -
消费者端:映射器将头部暴露为单一值——最后一次出现者获胜;早期的重复被悄然丢弃。
要保留每个单独的头部,需要明确注册将该头标记为多值的模式。
JsonKafkaHeaderMapper#setMultiValueHeaderPatterns(String...模式)接受一组模式,这些模式可以是万用字表达式或精确的头部名称。
JsonKafkaHeaderMapper mapper = new JsonKafkaHeaderMapper();
// Explicit header names
mapper.setMultiValueHeaderPatterns("test-multi-value1", "test-multi-value2");
// Wildcard patterns for test-multi-value1, test-multi-value2
mapper.setMultiValueHeaderPatterns("test-multi-*");
任何名称与所提供模式之一匹配的头部是
-
制作方:以独立的Kafka头写成,每个元素一个。
-
消费者端:汇集成
名单<?>包含各个头部值;每个元素返回给应用程序在配置后执行了通常的反序列化或类型转换头部映射器.
| 不支持正则表达式;在简单模式中仅允许*万用符——支持直接等式和诸如:xxx*、*xxx、*xxx*、xxx*yyy。 |
|
在制作方方面,当 |