SpringBoot集成kafka全面實(shí)戰(zhàn)

本文是SpringBoot+Kafka的實(shí)戰(zhàn)講解,如果對(duì)kafka的架構(gòu)原理還不了解的讀者,建議先看一下《大白話kafka架構(gòu)原理》、《秒懂kafka HA(高可用)》兩篇文章。

一、生產(chǎn)者實(shí)踐

  • 普通生產(chǎn)者

  • 帶回調(diào)的生產(chǎn)者

  • 自定義分區(qū)器

  • kafka事務(wù)提交

二、消費(fèi)者實(shí)踐

  • 簡(jiǎn)單消費(fèi)

  • 指定topic、partition、offset消費(fèi)

  • 批量消費(fèi)

  • 監(jiān)聽(tīng)異常處理器

  • 消息過(guò)濾器

  • 消息轉(zhuǎn)發(fā)

  • 定時(shí)啟動(dòng)/停止監(jiān)聽(tīng)器

一、前戲

1、在項(xiàng)目中連接kafka,因?yàn)槭峭饩W(wǎng),首先要開(kāi)放kafka配置文件中的如下配置(其中IP為公網(wǎng)IP),

advertised.listeners=PLAINTEXT://112.126.74.249:9092

2、在開(kāi)始前我們先創(chuàng)建兩個(gè)topic:topic1、topic2,其分區(qū)和副本數(shù)都設(shè)置為2,用來(lái)測(cè)試,

[root@iZ2zegzlkedbo3e64vkbefZ ~]#  cd /usr/local/kafka-cluster/kafka1/bin/
[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic1
Created topic topic1.
[root@iZ2zegzlkedbo3e64vkbefZ bin]# ./kafka-topics.sh --create --zookeeper 172.17.80.219:2181 --replication-factor 2 --partitions 2 --topic topic2
Created topic topic2.

當(dāng)然我們也可以不手動(dòng)創(chuàng)建topic,在執(zhí)行代碼kafkaTemplate.send("topic1", normalMessage)發(fā)送消息時(shí),kafka會(huì)幫我們自動(dòng)完成topic的創(chuàng)建工作,但這種情況下創(chuàng)建的topic默認(rèn)只有一個(gè)分區(qū),分區(qū)也沒(méi)有副本。所以,我們可以在項(xiàng)目中新建一個(gè)配置類專門用來(lái)初始化topic,如下,

@Configuration
public class KafkaInitialConfiguration {
    // 創(chuàng)建一個(gè)名為testtopic的Topic并設(shè)置分區(qū)數(shù)為8,分區(qū)副本數(shù)為2
    @Bean
    public NewTopic initialTopic() {
        return new NewTopic("testtopic",8, (short) 2 );
    }

     // 如果要修改分區(qū)數(shù),只需修改配置值重啟項(xiàng)目即可
    // 修改分區(qū)數(shù)并不會(huì)導(dǎo)致數(shù)據(jù)的丟失,但是分區(qū)數(shù)只能增大不能減小
    @Bean
    public NewTopic updateTopic() {
        return new NewTopic("testtopic",10, (short) 2 );
    }
}

3、新建SpringBoot項(xiàng)目

① 引入pom依賴

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

② application.propertise配置(本文用到的配置項(xiàng)這里全列了出來(lái))

###########【Kafka集群】###########
spring.kafka.bootstrap-servers=112.126.74.249:9092,112.126.74.249:9093
###########【初始化生產(chǎn)者配置】###########
# 重試次數(shù)
spring.kafka.producer.retries=0
# 應(yīng)答級(jí)別:多少個(gè)分區(qū)副本備份完成時(shí)向生產(chǎn)者發(fā)送ack確認(rèn)(可選0、1、all/-1)
spring.kafka.producer.acks=1
# 批量大小
spring.kafka.producer.batch-size=16384
# 提交延時(shí)
spring.kafka.producer.properties.linger.ms=0
# 當(dāng)生產(chǎn)端積累的消息達(dá)到batch-size或接收到消息linger.ms后,生產(chǎn)者就會(huì)將消息提交給kafka
# linger.ms為0表示每接收到一條消息就提交給kafka,這時(shí)候batch-size其實(shí)就沒(méi)用了

# 生產(chǎn)端緩沖區(qū)大小
spring.kafka.producer.buffer-memory = 33554432
# Kafka提供的序列化和反序列化類
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 自定義分區(qū)器
# spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

###########【初始化消費(fèi)者配置】###########
# 默認(rèn)的消費(fèi)組ID
spring.kafka.consumer.properties.group.id=defaultConsumerGroup
# 是否自動(dòng)提交offset
spring.kafka.consumer.enable-auto-commit=true
# 提交offset延時(shí)(接收到消息后多久提交offset)
spring.kafka.consumer.auto.commit.interval.ms=1000
# 當(dāng)kafka中沒(méi)有初始o(jì)ffset或offset超出范圍時(shí)將自動(dòng)重置offset
# earliest:重置為分區(qū)中最小的offset;
# latest:重置為分區(qū)中最新的offset(消費(fèi)分區(qū)中新產(chǎn)生的數(shù)據(jù));
# none:只要有一個(gè)分區(qū)不存在已提交的offset,就拋出異常;
spring.kafka.consumer.auto-offset-reset=latest
# 消費(fèi)會(huì)話超時(shí)時(shí)間(超過(guò)這個(gè)時(shí)間consumer沒(méi)有發(fā)送心跳,就會(huì)觸發(fā)rebalance操作)
spring.kafka.consumer.properties.session.timeout.ms=120000
# 消費(fèi)請(qǐng)求超時(shí)時(shí)間
spring.kafka.consumer.properties.request.timeout.ms=180000
# Kafka提供的序列化和反序列化類
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
# 消費(fèi)端監(jiān)聽(tīng)的topic不存在時(shí),項(xiàng)目啟動(dòng)會(huì)報(bào)錯(cuò)(關(guān)掉)
spring.kafka.listener.missing-topics-fatal=false
# 設(shè)置批量消費(fèi)
# spring.kafka.listener.type=batch
# 批量消費(fèi)每次最多消費(fèi)多少條消息
# spring.kafka.consumer.max-poll-records=50

二、Hello Kafka

1、簡(jiǎn)單生產(chǎn)者

@RestController
public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;

    // 發(fā)送消息
    @GetMapping("/kafka/normal/{message}")
    public void sendMessage1(@PathVariable("message") String normalMessage) {
        kafkaTemplate.send("topic1", normalMessage);
    }
}

2、簡(jiǎn)單消費(fèi)

@Component
public class KafkaConsumer {
 // 消費(fèi)監(jiān)聽(tīng)
 @KafkaListener(topics = {"topic1"})
 public void onMessage1(ConsumerRecord<?, ?> record){
     // 消費(fèi)的哪個(gè)topic、partition的消息,打印出消息內(nèi)容
     System.out.println("簡(jiǎn)單消費(fèi):"+record.topic()+"-"+record.partition()+"-"+record.value());
 }
}

上面示例創(chuàng)建了一個(gè)生產(chǎn)者,發(fā)送消息到topic1,消費(fèi)者監(jiān)聽(tīng)topic1消費(fèi)消息。監(jiān)聽(tīng)器用@KafkaListener注解,topics表示監(jiān)聽(tīng)的topic,支持同時(shí)監(jiān)聽(tīng)多個(gè),用英文逗號(hào)分隔。啟動(dòng)項(xiàng)目,postman調(diào)接口觸發(fā)生產(chǎn)者發(fā)送消息,

image

可以看到監(jiān)聽(tīng)器消費(fèi)成功,

image

三、生產(chǎn)者

1、帶回調(diào)的生產(chǎn)者

kafkaTemplate提供了一個(gè)回調(diào)方法addCallback,我們可以在回調(diào)方法中監(jiān)控消息是否發(fā)送成功 或 失敗時(shí)做補(bǔ)償處理,有兩種寫(xiě)法,

@GetMapping("/kafka/callbackOne/{message}")
public void sendMessage2(@PathVariable("message") String callbackMessage) {
  kafkaTemplate.send("topic1", callbackMessage).addCallback(success -> {
      // 消息發(fā)送到的topic
      String topic = success.getRecordMetadata().topic();
      // 消息發(fā)送到的分區(qū)
      int partition = success.getRecordMetadata().partition();
      // 消息在分區(qū)內(nèi)的offset
      long offset = success.getRecordMetadata().offset();
      System.out.println("發(fā)送消息成功:" + topic + "-" + partition + "-" + offset);
  }, failure -> {
      System.out.println("發(fā)送消息失敗:" + failure.getMessage());
  });
}

@GetMapping("/kafka/callbackTwo/{message}")
public void sendMessage3(@PathVariable("message") String callbackMessage) {
    kafkaTemplate.send("topic1", callbackMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("發(fā)送消息失?。?+ex.getMessage());
        }
 
        @Override
        public void onSuccess(SendResult<String, Object> result) {
            System.out.println("發(fā)送消息成功:" + result.getRecordMetadata().topic() + "-"
                    + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
        }
    });
}

2、自定義分區(qū)器

我們知道,kafka中每個(gè)topic被劃分為多個(gè)分區(qū),那么生產(chǎn)者將消息發(fā)送到topic時(shí),具體追加到哪個(gè)分區(qū)呢?這就是所謂的分區(qū)策略,Kafka 為我們提供了默認(rèn)的分區(qū)策略,同時(shí)它也支持自定義分區(qū)策略。其路由機(jī)制為:

① 若發(fā)送消息時(shí)指定了分區(qū)(即自定義分區(qū)策略),則直接將消息append到指定分區(qū);

② 若發(fā)送消息時(shí)未指定 patition,但指定了 key(kafka允許為每條消息設(shè)置一個(gè)key),則對(duì)key值進(jìn)行hash計(jì)算,根據(jù)計(jì)算結(jié)果路由到指定分區(qū),這種情況下可以保證同一個(gè) Key 的所有消息都進(jìn)入到相同的分區(qū);

③ patition 和 key 都未指定,則使用kafka默認(rèn)的分區(qū)策略,輪詢選出一個(gè) patition;

※ 我們來(lái)自定義一個(gè)分區(qū)策略,將消息發(fā)送到我們指定的partition,首先新建一個(gè)分區(qū)器類實(shí)現(xiàn)Partitioner接口,重寫(xiě)方法,其中partition方法的返回值就表示將消息發(fā)送到幾號(hào)分區(qū),

public class CustomizePartitioner implements Partitioner {
   @Override
   public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
       // 自定義分區(qū)規(guī)則(這里假設(shè)全部發(fā)到0號(hào)分區(qū))
       // ......
       return 0;
   }

   @Override
   public void close() {

   }

   @Override
   public void configure(Map<String, ?> configs) {

   }
}

在application.propertise中配置自定義分區(qū)器,配置的值就是分區(qū)器類的全路徑名,

# 自定義分區(qū)器
spring.kafka.producer.properties.partitioner.class=com.felix.kafka.producer.CustomizePartitioner

3、kafka事務(wù)提交

如果在發(fā)送消息時(shí)需要?jiǎng)?chuàng)建事務(wù),可以使用 KafkaTemplate 的 executeInTransaction 方法來(lái)聲明事務(wù),

@GetMapping("/kafka/transaction")
public void sendMessage7(){
  // 聲明事務(wù):后面報(bào)錯(cuò)消息不會(huì)發(fā)出去
  kafkaTemplate.executeInTransaction(operations -> {
      operations.send("topic1","test executeInTransaction");
      throw new RuntimeException("fail");
  });

  // 不聲明事務(wù):后面報(bào)錯(cuò)但前面消息已經(jīng)發(fā)送成功了
 kafkaTemplate.send("topic1","test executeInTransaction");
 throw new RuntimeException("fail");
}

四、消費(fèi)者

1、指定topic、partition、offset消費(fèi)

前面我們?cè)诒O(jiān)聽(tīng)消費(fèi)topic1的時(shí)候,監(jiān)聽(tīng)的是topic1上所有的消息,如果我們想指定topic、指定partition、指定offset來(lái)消費(fèi)呢?也很簡(jiǎn)單,@KafkaListener注解已全部為我們提供,

/**
* @Title 指定topic、partition、offset消費(fèi)
* @Description 同時(shí)監(jiān)聽(tīng)topic1和topic2,監(jiān)聽(tīng)topic1的0號(hào)分區(qū)、topic2的 "0號(hào)和1號(hào)" 分區(qū),指向1號(hào)分區(qū)的offset初始值為8
* @Author long.yuan
* @Date 2020/3/22 13:38
* @Param [record]
* @return void
**/
@KafkaListener(id = "consumer1",groupId = "felix-group",topicPartitions = {
     @TopicPartition(topic = "topic1", partitions = { "0" }),
     @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "8"))
})
public void onMessage2(ConsumerRecord<?, ?> record) {
 System.out.println("topic:"+record.topic()+"|partition:"+record.partition()+"|offset:"+record.offset()+"|value:"+record.value());
}

屬性解釋:

① id:消費(fèi)者ID;

② groupId:消費(fèi)組ID;

③ topics:監(jiān)聽(tīng)的topic,可監(jiān)聽(tīng)多個(gè);

④ topicPartitions:可配置更加詳細(xì)的監(jiān)聽(tīng)信息,可指定topic、parition、offset監(jiān)聽(tīng)。

上面onMessage2監(jiān)聽(tīng)的含義:監(jiān)聽(tīng)topic1的0號(hào)分區(qū),同時(shí)監(jiān)聽(tīng)topic2的0號(hào)分區(qū)和topic2的1號(hào)分區(qū)里面offset從8開(kāi)始的消息。

注意:topics和topicPartitions不能同時(shí)使用;

2、批量消費(fèi)

設(shè)置application.prpertise開(kāi)啟批量消費(fèi)即可,

    # 設(shè)置批量消費(fèi)
spring.kafka.listener.type=batch
# 批量消費(fèi)每次最多消費(fèi)多少條消息
spring.kafka.consumer.max-poll-records=50

接收消息時(shí)用List來(lái)接收,監(jiān)聽(tīng)代碼如下,

@KafkaListener(id = "consumer2",groupId = "felix-group", topics = "topic1")
public void onMessage3(List<ConsumerRecord<?, ?>> records) {
    System.out.println(">>>批量消費(fèi)一次,records.size()="+records.size());
    for (ConsumerRecord<?, ?> record : records) {
        System.out.println(record.value());
    }
}

3、ConsumerAwareListenerErrorHandler 異常處理器

通過(guò)異常處理器,我們可以處理consumer在消費(fèi)時(shí)發(fā)生的異常。

新建一個(gè) ConsumerAwareListenerErrorHandler 類型的異常處理方法,用@Bean注入,BeanName默認(rèn)就是方法名,然后我們將這個(gè)異常處理器的BeanName放到@KafkaListener注解的errorHandler屬性里面,當(dāng)監(jiān)聽(tīng)拋出異常的時(shí)候,則會(huì)自動(dòng)調(diào)用異常處理器,

// 新建一個(gè)異常處理器,用@Bean注入
@Bean
public ConsumerAwareListenerErrorHandler consumerAwareErrorHandler() {
 return (message, exception, consumer) -> {
     System.out.println("消費(fèi)異常:"+message.getPayload());
     return null;
 };
}

// 將這個(gè)異常處理器的BeanName放到@KafkaListener注解的errorHandler屬性里面
@KafkaListener(topics = {"topic1"},errorHandler = "consumerAwareErrorHandler")
public void onMessage4(ConsumerRecord<?, ?> record) throws Exception {
 throw new Exception("簡(jiǎn)單消費(fèi)-模擬異常");
}

// 批量消費(fèi)也一樣,異常處理器的message.getPayload()也可以拿到各條消息的信息
@KafkaListener(topics = "topic1",errorHandler="consumerAwareErrorHandler")
public void onMessage5(List<ConsumerRecord<?, ?>> records) throws Exception {
 System.out.println("批量消費(fèi)一次...");
 throw new Exception("批量消費(fèi)-模擬異常");
}

執(zhí)行看一下效果,

image

4、消息過(guò)濾器

消息過(guò)濾器可以在消息抵達(dá)consumer之前被攔截,在實(shí)際應(yīng)用中,我們可以根據(jù)自己的業(yè)務(wù)邏輯,篩選出需要的信息再交由KafkaListener處理,不需要的消息則過(guò)濾掉。

配置消息過(guò)濾只需要為 監(jiān)聽(tīng)器工廠 配置一個(gè)RecordFilterStrategy(消息過(guò)濾策略),返回true的時(shí)候消息將會(huì)被拋棄,返回false時(shí),消息能正常抵達(dá)監(jiān)聽(tīng)容器。

@Component
public class KafkaConsumer {
    @Autowired
    ConsumerFactory consumerFactory;

    // 消息過(guò)濾器
    @Bean
    public ConcurrentKafkaListenerContainerFactory filterContainerFactory() {
        ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
        factory.setConsumerFactory(consumerFactory);
        // 被過(guò)濾的消息將被丟棄
        factory.setAckDiscarded(true);
        // 消息過(guò)濾策略
        factory.setRecordFilterStrategy(consumerRecord -> {
            if (Integer.parseInt(consumerRecord.value().toString()) % 2 == 0) {
                return false;
            }
            //返回true消息則被過(guò)濾
            return true;
        });
        return factory;
    }

    // 消息過(guò)濾監(jiān)聽(tīng)
    @KafkaListener(topics = {"topic1"},containerFactory = "filterContainerFactory")
    public void onMessage6(ConsumerRecord<?, ?> record) {
        System.out.println(record.value());
    }
}

上面實(shí)現(xiàn)了一個(gè)"過(guò)濾奇數(shù)、接收偶數(shù)"的過(guò)濾策略,我們向topic1發(fā)送0-99總共100條消息,看一下監(jiān)聽(tīng)器的消費(fèi)情況,可以看到監(jiān)聽(tīng)器只消費(fèi)了偶數(shù),

image

5、消息轉(zhuǎn)發(fā)

在實(shí)際開(kāi)發(fā)中,我們可能有這樣的需求,應(yīng)用A從TopicA獲取到消息,經(jīng)過(guò)處理后轉(zhuǎn)發(fā)到TopicB,再由應(yīng)用B監(jiān)聽(tīng)處理消息,即一個(gè)應(yīng)用處理完成后將該消息轉(zhuǎn)發(fā)至其他應(yīng)用,完成消息的轉(zhuǎn)發(fā)。

在SpringBoot集成Kafka實(shí)現(xiàn)消息的轉(zhuǎn)發(fā)也很簡(jiǎn)單,只需要通過(guò)一個(gè)@SendTo注解,被注解方法的return值即轉(zhuǎn)發(fā)的消息內(nèi)容,如下,

/**
* @Title 消息轉(zhuǎn)發(fā)
* @Description 從topic1接收到的消息經(jīng)過(guò)處理后轉(zhuǎn)發(fā)到topic2
* @Author long.yuan
* @Date 2020/3/23 22:15
* @Param [record]
* @return void
**/
@KafkaListener(topics = {"topic1"})
@SendTo("topic2")
public String onMessage7(ConsumerRecord<?, ?> record) {
 return record.value()+"-forward message";
}

6、定時(shí)啟動(dòng)、停止監(jiān)聽(tīng)器

默認(rèn)情況下,當(dāng)消費(fèi)者項(xiàng)目啟動(dòng)的時(shí)候,監(jiān)聽(tīng)器就開(kāi)始工作,監(jiān)聽(tīng)消費(fèi)發(fā)送到指定topic的消息,那如果我們不想讓監(jiān)聽(tīng)器立即工作,想讓它在我們指定的時(shí)間點(diǎn)開(kāi)始工作,或者在我們指定的時(shí)間點(diǎn)停止工作,該怎么處理呢——使用KafkaListenerEndpointRegistry,下面我們就來(lái)實(shí)現(xiàn):

① 禁止監(jiān)聽(tīng)器自啟動(dòng);

② 創(chuàng)建兩個(gè)定時(shí)任務(wù),一個(gè)用來(lái)在指定時(shí)間點(diǎn)啟動(dòng)定時(shí)器,另一個(gè)在指定時(shí)間點(diǎn)停止定時(shí)器;

新建一個(gè)定時(shí)任務(wù)類,用注解@EnableScheduling聲明,KafkaListenerEndpointRegistry 在SpringIO中已經(jīng)被注冊(cè)為Bean,直接注入,設(shè)置禁止KafkaListener自啟動(dòng),

@EnableScheduling
@Component
public class CronTimer {

 /**
  * @KafkaListener注解所標(biāo)注的方法并不會(huì)在IOC容器中被注冊(cè)為Bean,
  * 而是會(huì)被注冊(cè)在KafkaListenerEndpointRegistry中,
  * 而KafkaListenerEndpointRegistry在SpringIOC中已經(jīng)被注冊(cè)為Bean
  **/
 @Autowired
 private KafkaListenerEndpointRegistry registry;

 @Autowired
 private ConsumerFactory consumerFactory;

 // 監(jiān)聽(tīng)器容器工廠(設(shè)置禁止KafkaListener自啟動(dòng))
 @Bean
 public ConcurrentKafkaListenerContainerFactory delayContainerFactory() {
     ConcurrentKafkaListenerContainerFactory container = new ConcurrentKafkaListenerContainerFactory();
     container.setConsumerFactory(consumerFactory);
     //禁止KafkaListener自啟動(dòng)
     container.setAutoStartup(false);
     return container;
 }

 // 監(jiān)聽(tīng)器
 @KafkaListener(id="timingConsumer",topics = "topic1",containerFactory = "delayContainerFactory")
 public void onMessage1(ConsumerRecord<?, ?> record){
     System.out.println("消費(fèi)成功:"+record.topic()+"-"+record.partition()+"-"+record.value());
 }

 // 定時(shí)啟動(dòng)監(jiān)聽(tīng)器
 @Scheduled(cron = "0 42 11 * * ? ")
 public void startListener() {
     System.out.println("啟動(dòng)監(jiān)聽(tīng)器...");
     // "timingConsumer"是@KafkaListener注解后面設(shè)置的監(jiān)聽(tīng)器ID,標(biāo)識(shí)這個(gè)監(jiān)聽(tīng)器
     if (!registry.getListenerContainer("timingConsumer").isRunning()) {
         registry.getListenerContainer("timingConsumer").start();
     }
     //registry.getListenerContainer("timingConsumer").resume();
 }

 // 定時(shí)停止監(jiān)聽(tīng)器
 @Scheduled(cron = "0 45 11 * * ? ")
 public void shutDownListener() {
     System.out.println("關(guān)閉監(jiān)聽(tīng)器...");
     registry.getListenerContainer("timingConsumer").pause();
 }
}

啟動(dòng)項(xiàng)目,觸發(fā)生產(chǎn)者向topic1發(fā)送消息,可以看到consumer沒(méi)有消費(fèi),因?yàn)檫@時(shí)監(jiān)聽(tīng)器還沒(méi)有開(kāi)始工作,

image

11:42分監(jiān)聽(tīng)器啟動(dòng)開(kāi)始工作,消費(fèi)消息,

image
image

11:45分監(jiān)聽(tīng)器停止工作,

image
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容