Kafka 队列(共享消费者)

从4.0版本开始,Spring for Apache Kafka通过共享消费者支持Kafka队列,这些共享消费者是Apache Kafka 4.0.0的一部分,并实现了KIP-932(Kafka队列)。 该功能目前处于抢先体验阶段。spring-doc.cadn.net.cn

Kafka 队列实现了与传统消费群体不同的消费模式。 与基于分区的分配模型不同,共享消费者可以合作从相同的分区中消费,记录在共享组的消费者之间分配。spring-doc.cadn.net.cn

股份消费者工厂

ShareConsumerFactory负责创建共享消费者实例。 春季卡夫卡提供了默认共享消费者工厂实现。spring-doc.cadn.net.cn

配置

你可以配置一个默认共享消费者工厂类似于你配置普通机的方式消费者工厂:spring-doc.cadn.net.cn

@Bean
public ShareConsumerFactory<String, String> shareConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-share-group");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    return new DefaultShareConsumerFactory<>(props);
}

制造商选项

默认共享消费者工厂提供多种构造器选项:spring-doc.cadn.net.cn

// Basic configuration
new DefaultShareConsumerFactory<>(configs);

// With deserializer suppliers
new DefaultShareConsumerFactory<>(configs, keyDeserializerSupplier, valueDeserializerSupplier);

// With deserializer instances
new DefaultShareConsumerFactory<>(configs, keyDeserializer, valueDeserializer, configureDeserializers);

解串器配置

你可以用多种方式配置解串器:spring-doc.cadn.net.cn

  1. 通过配置属性(推荐用于简单情况):spring-doc.cadn.net.cn

    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  2. 通过赛特斯spring-doc.cadn.net.cn

    factory.setKeyDeserializer(new StringDeserializer());
    factory.setValueDeserializer(new StringDeserializer());
  3. 通过提供商(适用于需要为每个消费者创建反串化器的情况):spring-doc.cadn.net.cn

    factory.setKeyDeserializerSupplier(() -> new StringDeserializer());
    factory.setValueDeserializerSupplier(() -> new StringDeserializer());

设置配置解串器false如果你的反串化器已经完全配置好,工厂不应该重新配置。spring-doc.cadn.net.cn

生命周期听众

你可以添加监听者以监控共享消费者的生命周期:spring-doc.cadn.net.cn

factory.addListener(new ShareConsumerFactory.Listener<String, String>() {
    @Override
    public void consumerAdded(String id, ShareConsumer<String, String> consumer) {
        // Called when a new consumer is created
        System.out.println("Consumer added: " + id);
    }

    @Override
    public void consumerRemoved(String id, ShareConsumer<String, String> consumer) {
        // Called when a consumer is closed
        System.out.println("Consumer removed: " + id);
    }
});

分享消息监听器容器

ShareKafkaMessageListenerContainer

ShareKafkaMessageListenerContainer为共享消费者提供一个容器,支持并发处理:spring-doc.cadn.net.cn

@Bean
public ShareKafkaMessageListenerContainer<String, String> container(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ContainerProperties containerProps = new ContainerProperties("my-topic");
    containerProps.setGroupId("my-share-group");

    ShareKafkaMessageListenerContainer<String, String> container =
        new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

    container.setupMessageListener(new MessageListener<String, String>() {
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            System.out.println("Received: " + record.value());
        }
    });

    return container;
}

容器属性

共享容器支持面向普通消费者的部分容器属性:spring-doc.cadn.net.cn

共享消费者不支持:spring-doc.cadn.net.cn

并发

ShareKafkaMessageListenerContainer支持通过在单一容器内创建多个消费者线程来并发处理。 每个线程都有自己的运行ShareConsumer参与同一个分享组的实例。spring-doc.cadn.net.cn

与传统消费者群体涉及分区分发不同,共享消费者在经纪人中利用Kafka的记录级分发。 这意味着同一容器中的多个消费者线程作为共享组的一部分协同工作,Kafka 代理负责在所有消费者实例间分发记录。spring-doc.cadn.net.cn

并发是跨应用实例的加法spring-doc.cadn.net.cn

从分享小组的角度来看,每一个ShareConsumer实例是独立成员,无论运行在哪里。 设置并发=3在一个容器中创建3个共享组成员。 如果你运行多个应用实例,使用相同的共享组ID,它们的所有消费者线程会合并成一个池。spring-doc.cadn.net.cn

例如: * 应用实例1:并发=3→ 3 共享组成员 * 应用实例2:并发=3→ 3 共享组成员 * 总计:经纪人可分发记录的6个股权组成员spring-doc.cadn.net.cn

这意味着设置并发=5在单个容器中运行5个独立应用实例,作上等同于并发=1每个(都使用相同的group.id). Kafka 代理对所有消费者实例一视同仁,并将记录分发到整个池中。spring-doc.cadn.net.cn

程序化配置并发

@Bean
public ShareKafkaMessageListenerContainer<String, String> concurrentContainer(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ContainerProperties containerProps = new ContainerProperties("my-topic");
    containerProps.setGroupId("my-share-group");

    ShareKafkaMessageListenerContainer<String, String> container =
        new ShareKafkaMessageListenerContainer<>(shareConsumerFactory, containerProps);

    // Set concurrency to create 5 consumer threads
    container.setConcurrency(5);

    container.setupMessageListener(new MessageListener<String, String>() {
        @Override
        public void onMessage(ConsumerRecord<String, String> record) {
            System.out.println("Received on " + Thread.currentThread().getName() + ": " + record.value());
        }
    });

    return container;
}

通过工厂配置并发

你可以在工厂层面设置默认并发,这适用于该工厂创建的所有容器:spring-doc.cadn.net.cn

@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
        ShareConsumerFactory<String, String> shareConsumerFactory) {

    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

    // Set default concurrency for all containers created by this factory
    factory.setConcurrency(3);

    return factory;
}

每监听器并发

并发设置可以通过以下方式为每个监听者覆盖并发属性:spring-doc.cadn.net.cn

@Component
public class ConcurrentShareListener {

    @KafkaListener(
        topics = "high-throughput-topic",
        containerFactory = "shareKafkaListenerContainerFactory",
        groupId = "my-share-group",
        concurrency = "10"  // Override factory default
    )
    public void listen(ConsumerRecord<String, String> record) {
        // This listener will use 10 consumer threads
        System.out.println("Processing: " + record.value());
    }
}

并发考虑

  • 线程安全:每个消费者线程都有自己的ShareConsumer实例并独立管理自己的确认spring-doc.cadn.net.cn

  • 客户端ID:每个消费者线程都会获得一个带有数字后缀的唯一客户端ID(例如,我的容器-0,我的容器-1,等等)spring-doc.cadn.net.cn

  • 指标:所有消费者线程的指标汇总并可通过以下方式访问container.metrics()spring-doc.cadn.net.cn

  • 生命周期:所有消费者线程作为一个整体一起开始和终止spring-doc.cadn.net.cn

  • 工作分发:Kafka 代理处理共享组中所有消费者实例的记录分发spring-doc.cadn.net.cn

  • 显式确认:每个线程独立管理其记录的确认;一个线程中未确认的记录不会阻挡其他线程spring-doc.cadn.net.cn

与显式确认的并发

并发与显式确认模式无缝协作。每个消费者线程独立跟踪并确认自身记录:spring-doc.cadn.net.cn

@KafkaListener(
    topics = "order-queue",
    containerFactory = "explicitShareKafkaListenerContainerFactory",
    groupId = "order-processors",
    concurrency = "5"
)
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
    try {
        // Process the order
        processOrderLogic(record.value());
        acknowledgment.acknowledge(); // ACCEPT
    }
    catch (RetryableException e) {
        acknowledgment.release(); // Will be redelivered
    }
    catch (Exception e) {
        acknowledgment.reject(); // Permanent failure
    }
}

记录获取与分发行为:spring-doc.cadn.net.cn

共享消费者采用拉取模型,每个消费者线程调用poll()以获取经纪人的记录。当消费者轮询时,经纪商的份额分区领导者:spring-doc.cadn.net.cn

  • 选择处于“可用”状态的记录spring-doc.cadn.net.cn

  • 将他们移至“已获得”状态,并设置限时的获取锁定(默认为30秒,可通过以下方式配置)group.share.record.lock.duration.ms)spring-doc.cadn.net.cn

  • 为了效率,倾向于返回完整的记录批次spring-doc.cadn.net.cn

  • 适用Max.poll.records作为软限制,意味着即使超过该值,也会获得完整的记录批次spring-doc.cadn.net.cn

虽然记录被一位消费者获取,但其他消费者无法访问。当获取锁到期时,未确认记录会自动恢复为“可用”状态,并可交付给另一位消费者。spring-doc.cadn.net.cn

代理通过以下方式限制每个分区可获取的记录数量group.share.partition.max.record.locks. 一旦达到该限制,后续轮询暂时不会返回任何记录,直到锁定到期。spring-doc.cadn.net.cn

并发的启示:spring-doc.cadn.net.cn

注释驱动的听众

与共享消费者的@KafkaListener

你可以使用@KafkaListener通过配置共享消费者ShareKafkaListenerContainerFactory:spring-doc.cadn.net.cn

@Configuration
@EnableKafka
public class ShareConsumerConfig {

    @Bean
    public ShareConsumerFactory<String, String> shareConsumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return new DefaultShareConsumerFactory<>(props);
    }

    @Bean
    public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
            ShareConsumerFactory<String, String> shareConsumerFactory) {
        return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
    }
}

然后用它来做你的听众:spring-doc.cadn.net.cn

@Component
public class ShareMessageListener {

    @KafkaListener(
        topics = "my-queue-topic",
        containerFactory = "shareKafkaListenerContainerFactory",
        groupId = "my-share-group"
    )
    public void listen(ConsumerRecord<String, String> record) {
        System.out.println("Received from queue: " + record.value());
        // Record is automatically acknowledged with ACCEPT
    }
}

共享组偏移重置

与普通消费者组不同,共享组在偏移重置行为上使用不同的配置。你可以通过程序方式配置:spring-doc.cadn.net.cn

private void configureShareGroup(String bootstrapServers, String groupId) throws Exception {
    Map<String, Object> adminProps = new HashMap<>();
    adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    try (Admin admin = Admin.create(adminProps)) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);
        ConfigEntry configEntry = new ConfigEntry("share.auto.offset.reset", "earliest");

        Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
            configResource, List.of(new AlterConfigOp(configEntry, AlterConfigOp.OpType.SET))
        );

        admin.incrementalAlterConfigs(configs).all().get();
    }
}

记录致谢

共享消费者支持两种确认模式,分别控制记录处理后如何确认。spring-doc.cadn.net.cn

隐式确认(默认)

在隐式模式下,记录会根据处理结果自动确认:spring-doc.cadn.net.cn

成功处理:记录被确认为接受处理错误:记录被确认为拒绝spring-doc.cadn.net.cn

@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
    ShareConsumerFactory<String, String> shareConsumerFactory) {
    // Implicit mode is the default - no additional configuration needed
    return new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);
}

明确确认

在显式模式下,应用程序必须使用提供的 ShareAcrevelgment 手动确认每个记录。spring-doc.cadn.net.cn

有两种方式可以配置显式确认模式:spring-doc.cadn.net.cn

选项1:使用 Kafka 客户端配置

@Bean
public ShareConsumerFactory<String, String> explicitShareConsumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG, "explicit"); // Official Kafka client config
    return new DefaultShareConsumerFactory<>(props);
}

选项二:使用 Spring 容器配置

@Bean
public ShareKafkaListenerContainerFactory<String, String> explicitShareKafkaListenerContainerFactory(
    ShareConsumerFactory<String, String> shareConsumerFactory) {

    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

    // Configure acknowledgment mode at container factory level
    // true means explicit acknowledgment is required
    factory.getContainerProperties().setExplicitShareAcknowledgment(true);

    return factory;
}

配置优先级

当两种配置方法同时使用时,Spring Kafka 遵循以下优先顺序(从高到低):spring-doc.cadn.net.cn

  1. 容器属性containerProperties.setExplicitShareAcknowledgegment(true/false)spring-doc.cadn.net.cn

  2. 消费者配置ConsumerConfig.SHARE_ACKNOWLEDGEMENT_MODE_CONFIG(“隐含”或“显式”)spring-doc.cadn.net.cn

  3. 默认:false(隐含的承认)spring-doc.cadn.net.cn

确认类型

共享用户支持三种确认类型:spring-doc.cadn.net.cn

ACCEPT: Record processed successfully, mark as completed
RELEASE: Temporary failure, make record available for redelivery
REJECT: Permanent failure, do not retry

ShareAcknowledgegment API

共享确认界面提供了显式确认的方法:spring-doc.cadn.net.cn

public interface ShareAcknowledgment {
    void acknowledge();
    void release();
    void reject();
}

监听器接口

共享消费者支持针对不同用例的专用监听器接口:spring-doc.cadn.net.cn

基础消息监听器

对于简单情况,使用标准的MessageListener:spring-doc.cadn.net.cn

@KafkaListener(topics = "my-topic", containerFactory = "shareKafkaListenerContainerFactory")
public void listen(ConsumerRecord<String, String> record) {
    System.out.println("Received: " + record.value());
    // Automatically acknowledged in implicit mode
}

AcknowledgeledgingShareConsumerAwareMessageListener

该接口提供访问ShareConsumer实例,支持可选的确认。确认参数可空,且依赖于容器的确认模式:spring-doc.cadn.net.cn

隐式模式示例(确认为空)
@KafkaListener(
    topics = "my-topic",
    containerFactory = "shareKafkaListenerContainerFactory"  // Implicit mode by default
)
public void listen(ConsumerRecord<String, String> record,
                  @Nullable ShareAcknowledgment acknowledgment,
                  ShareConsumer<?, ?> consumer) {

    // In implicit mode, acknowledgment is null
    System.out.println("Received: " + record.value());

    // Access consumer metrics if needed
    Map<MetricName, ? extends Metric> metrics = consumer.metrics();

    // Record is auto-acknowledged as ACCEPT on success, REJECT on error
}
显式模式示例(确认非空)
@Component
public class ExplicitAckListener {
    @KafkaListener(
        topics = "my-topic",
        containerFactory = "explicitShareKafkaListenerContainerFactory"
    )
    public void listen(ConsumerRecord<String, String> record,
                      @Nullable ShareAcknowledgment acknowledgment,
                      ShareConsumer<?, ?> consumer) {

        // In explicit mode, acknowledgment is non-null
        try {
            processRecord(record);
            acknowledgment.acknowledge(); // ACCEPT
        }
		catch (RetryableException e) {
            acknowledgment.release(); // Will be redelivered
        }
		catch (Exception e) {
            acknowledgment.reject(); // Permanent failure
        }
    }

    private void processRecord(ConsumerRecord<String, String> record) {
        // Business logic here
    }
}

确认约束

在显式确认模式下,容器会强制执行重要的约束:spring-doc.cadn.net.cn

Poll Blocking: Subsequent polls are blocked until all records from the previous poll are acknowledged.
One-time Acknowledgment: Each record can only be acknowledged once.
Error Handling: If processing throws an exception, the record is automatically acknowledged as `REJECT`.
在显式模式下,未确认记录将阻止后续消息处理。 务必确保所有代码路径中的记录都被确认。

确认超时检测

为了帮助识别缺失的确认,Spring Kafka 提供了可配置的超时检测。 当记录在指定的超时内未被确认时,会记录一个警告,并记录该未确认记录的详细信息。spring-doc.cadn.net.cn

@Bean
public ShareKafkaListenerContainerFactory<String, String> shareKafkaListenerContainerFactory(
    ShareConsumerFactory<String, String> shareConsumerFactory) {
    ShareKafkaListenerContainerFactory<String, String> factory =
        new ShareKafkaListenerContainerFactory<>(shareConsumerFactory);

    // Set acknowledgment timeout (default is 30 seconds)
    factory.getContainerProperties().setShareAcknowledgmentTimeout(Duration.ofSeconds(30));

    return factory;
}

当记录超过超时时间时,你会看到类似的警告:spring-doc.cadn.net.cn

WARN: Record not acknowledged within timeout (30 seconds).
In explicit acknowledgment mode, you must call ack.acknowledge(), ack.release(),
or ack.reject() for every record.

此功能帮助开发者快速识别代码中缺失确认调用的情况,避免了因忘记确认而出现“Spring Kafka 不再消耗新记录”的常见问题。spring-doc.cadn.net.cn

致谢示例

混合确认模式

@KafkaListener(topics = "order-processing", containerFactory = "explicitShareKafkaListenerContainerFactory")
    public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
        String orderId = record.key();
        String orderData = record.value();
        try {
            if (isValidOrder(orderData)) {
                if (processOrder(orderData)) {
                    acknowledgment.acknowledge(); // Success - ACCEPT
                }
                else {
                    acknowledgment.release(); // Temporary failure - retry later
                }
            }
            else {
                acknowledgment.reject(); // Invalid order - don't retry
            }
        }
        catch (Exception e) {
            // Exception automatically triggers REJECT
            throw e;
        }
}

条件确认

@KafkaListener(topics = "data-validation", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void validateData(ConsumerRecord<String, String> record, ShareAcknowledgment acknowledgment) {
    ValidationResult result = validator.validate(record.value());
    switch (result.getStatus()) {
        case VALID:
            acknowledgment.acknowledge(AcknowledgeType.ACCEPT);
            break;
        case INVALID_RETRYABLE:
            acknowledgment.acknowledge(AcknowledgeType.RELEASE);
            break;
        case INVALID_PERMANENT:
            acknowledgment.acknowledge(AcknowledgeType.REJECT);
            break;
    }
}

毒消息保护与投递计数

KIP-932包含经纪人端毒讯保护,防止不可处理的记录被无限期重投。spring-doc.cadn.net.cn

工作原理

每当股票组中的消费者获取记录时,经纪人会增加内部交付次数。 第一次收购将交付次数设为1,之后每次收购都会递增。 当投递计数达到配置上限(默认:5)时,记录会进入归档状态,不再有资格进行额外的投递尝试。spring-doc.cadn.net.cn

配置

可通过管理员API配置每个共享组的最大投递尝试次数:spring-doc.cadn.net.cn

private void configureMaxDeliveryAttempts(String bootstrapServers, String groupId) throws Exception {
    Map<String, Object> adminProps = new HashMap<>();
    adminProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

    try (Admin admin = Admin.create(adminProps)) {
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.GROUP, groupId);

        // Default is 5, adjust based on your retry tolerance
        ConfigEntry maxAttempts = new ConfigEntry("group.share.delivery.attempt.limit", "10");

        Map<ConfigResource, Collection<AlterConfigOp>> configs = Map.of(
            configResource, List.of(new AlterConfigOp(maxAttempts, AlterConfigOp.OpType.SET))
        );

        admin.incrementalAlterConfigs(configs).all().get();
    }
}

交付次数不会暴露于应用程序spring-doc.cadn.net.cn

配送数量由经纪人内部管理,不对消费者应用开放。 这是KIP-932中有意设计的决定。 投递计数是近似值,作为毒报保护机制,而非精确的重投计数器。 应用程序无法通过任何API查询或访问该值。spring-doc.cadn.net.cn

对于应用级重试逻辑,请使用以下确认类型:spring-doc.cadn.net.cn

经纪人会自动防止一次无休止的重新投递group.share.delivery.try.limit被访问,记录被移至归档状态。spring-doc.cadn.net.cn

重试策略建议

@KafkaListener(topics = "orders", containerFactory = "explicitShareKafkaListenerContainerFactory")
public void processOrder(ConsumerRecord<String, String> record, ShareAcknowledgment ack) {
    try {
        // Attempt to process the order
        orderService.process(record.value());
        ack.acknowledge(); // ACCEPT - successfully processed
    }
    catch (TransientException e) {
        // Temporary failure (network issue, service unavailable, etc.)
        // Release the record for redelivery
        // Broker will retry up to group.share.delivery.attempt.limit times
        logger.warn("Transient error processing order, will retry: {}", e.getMessage());
        ack.release(); // RELEASE - make available for retry
    }
    catch (ValidationException e) {
        // Permanent semantic error (invalid data format, business rule violation, etc.)
        // Do not retry - this record will never succeed
        logger.error("Invalid order data, rejecting: {}", e.getMessage());
        ack.reject(); // REJECT - permanent failure, do not retry
    }
    catch (Exception e) {
        // Unknown error - typically safer to reject to avoid infinite loops
        // But could also release if you suspect it might be transient
        logger.error("Unexpected error processing order, rejecting: {}", e.getMessage());
        ack.reject(); // REJECT - avoid poison message loops
    }
}

经纪人的毒消息保护确保即使你总是使用也没问题释放对于错误,记录不会被无限次重试。 当发送尝试次数超过后,这些邮件将自动归档。spring-doc.cadn.net.cn

与普通消费者的区别

股份消费者与普通消费者在几个关键方面有所不同:spring-doc.cadn.net.cn

  1. 无分区分配:共享消费者不能被分配特定分区spring-doc.cadn.net.cn

  2. 无主题模式:分享消费者不支持订阅主题模式spring-doc.cadn.net.cn

  3. 协作式消费:同一共享组中的多个消费者可以同时从同一分区消费spring-doc.cadn.net.cn

  4. 记录级确认:支持明确确认接受,释放拒绝类型spring-doc.cadn.net.cn

  5. 不同的组管理:共享组使用不同的协调器协议spring-doc.cadn.net.cn

  6. 无批量处理:共享消费者的记录单独处理,而非批量处理spring-doc.cadn.net.cn

  7. 经纪人端重试管理:送货次数跟踪和毒讯保护由经纪人管理,不暴露给应用spring-doc.cadn.net.cn

局限性与注意事项

当前局限