|
|
@@ -0,0 +1,114 @@
|
|
|
+package com.uas.ps.product;
|
|
|
+
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerConfig;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecord;
|
|
|
+import org.apache.kafka.clients.consumer.ConsumerRecords;
|
|
|
+import org.apache.kafka.clients.consumer.KafkaConsumer;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.context.annotation.Configuration;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Properties;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author sunyj
|
|
|
+ * @since 2018/1/17 19:27
|
|
|
+ */
|
|
|
+@Service
|
|
|
+@Configuration
|
|
|
+public class MessageConsumer {
|
|
|
+
|
|
|
+ private static final List<String> topics = Collections.singletonList("RESPONSE");
|
|
|
+
|
|
|
+ private final Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private KafkaProperties kafkaProperties;
|
|
|
+
|
|
|
+ @Value("${kafka.consumer.max-poll-interval-millis}")
|
|
|
+ private Long maxPollIntervalMillis;
|
|
|
+
|
|
|
+ private List<String> batchCodes = new ArrayList<>();
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public Properties kafkaConfig() {
|
|
|
+ Properties config = new Properties();
|
|
|
+ // 配置kafka集群机器
|
|
|
+ setProperty(config, ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
|
|
|
+ KafkaProperties.Consumer kafkaPropertiesConsumer = kafkaProperties.getConsumer();
|
|
|
+ // 消费者分组
|
|
|
+ setProperty(config, ConsumerConfig.GROUP_ID_CONFIG, kafkaPropertiesConsumer.getGroupId());
|
|
|
+ setProperty(config, ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaPropertiesConsumer.getKeyDeserializer());
|
|
|
+ setProperty(config, ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaPropertiesConsumer.getValueDeserializer());
|
|
|
+
|
|
|
+ // 消费者自动提交已消费消息的 offset
|
|
|
+ setProperty(config, ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaPropertiesConsumer.getEnableAutoCommit());
|
|
|
+ // 自动提交的时间间隔
|
|
|
+ setProperty(config, ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaPropertiesConsumer.getAutoCommitInterval());
|
|
|
+ // 每次 poll 的最大数据个数
|
|
|
+ setProperty(config, ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaPropertiesConsumer.getMaxPollRecords());
|
|
|
+ // 设置使用最开始的 offset 偏移量为该 group.id的 最早
|
|
|
+ // 如果不设置,则会是 latest 即该 topic 最新一个消息的 offset
|
|
|
+ // 如果采用 latest,消费者只能得到其启动后,生产者生产的消息
|
|
|
+ // 一般配置 earliest 或者 latest 值
|
|
|
+ setProperty(config, ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaPropertiesConsumer.getAutoOffsetReset());
|
|
|
+ return config;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void setProperty(Properties properties, Object key, Object value) {
|
|
|
+ if (value != null) {
|
|
|
+ properties.put(key, value);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 如果groupId之前存在 , 则从之前提交的最后消费数据的offset处继续开始消费数据
|
|
|
+ * 如果groupId之前不存在,则从当前分区的最后位置开始消费
|
|
|
+ * <p>
|
|
|
+ * 注意如果enable.auto.commit 设置为false,如果消费完数据没有提交已消费数据的offset,
|
|
|
+ * 则会出现重复消费数据的情况
|
|
|
+ */
|
|
|
+ public void waitResponse(String batchCode) {
|
|
|
+ logger.info("waiting batchCode: " + batchCode);
|
|
|
+ if(batchCodes.contains(batchCode)){
|
|
|
+ batchCodes.remove(batchCode);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfig());
|
|
|
+ // 订阅topic,并实现 ConsumerRebalanceListener
|
|
|
+ consumer.subscribe(topics);
|
|
|
+ boolean processed = false;
|
|
|
+ while (!processed) {
|
|
|
+ try {
|
|
|
+ ConsumerRecords<String, String> records = consumer.poll(maxPollIntervalMillis);
|
|
|
+ logger.info("topic: " + topics + " pool return records size: " + records.count());
|
|
|
+ for (ConsumerRecord<String, String> record : records) {
|
|
|
+ logger.info("receiving batchCode... " + batchCode);
|
|
|
+ String value = record.value();
|
|
|
+ if(value.equals(batchCode)){
|
|
|
+ processed = true;
|
|
|
+ } else{
|
|
|
+ batchCodes.add(batchCode);
|
|
|
+ }
|
|
|
+ // 手动提交已消费数据的 offset
|
|
|
+ Boolean enableAutoCommit = kafkaProperties.getConsumer().getEnableAutoCommit();
|
|
|
+ if (enableAutoCommit != null && !enableAutoCommit) {
|
|
|
+ consumer.commitSync();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ logger.error("消息处理出错", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ consumer.unsubscribe();
|
|
|
+ consumer.close();
|
|
|
+ }
|
|
|
+}
|