Browse Source

only keep one consumer to wait RESPONSE

sunyj 7 years ago
parent
commit
e8f7d3c816
1 changed files with 39 additions and 28 deletions
  1. 39 28
      src/main/java/com/uas/ps/product/MessageConsumer.java

+ 39 - 28
src/main/java/com/uas/ps/product/MessageConsumer.java

@@ -38,6 +38,8 @@ public class MessageConsumer {
 
     private List<String> batchCodes = new ArrayList<>();
 
+    private KafkaConsumer<String, String> consumer;
+
     @Bean
     public Properties kafkaConfig() {
         Properties config = new Properties();
@@ -78,37 +80,46 @@ public class MessageConsumer {
      */
     public void waitResponse(String batchCode) {
         logger.info("waiting batchCode: " + batchCode);
-        if(batchCodes.contains(batchCode)){
-            batchCodes.remove(batchCode);
-            return;
+        subscribe();
+        while (true) {
+            if (batchCodes.contains(batchCode)) {
+                batchCodes.remove(batchCode);
+                return;
+            }
         }
-        final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaConfig());
-        // 订阅topic,并实现 ConsumerRebalanceListener
-        consumer.subscribe(topics);
-        boolean processed = false;
-        while (!processed) {
-            try {
-                ConsumerRecords<String, String> records = consumer.poll(maxPollIntervalMillis);
-                logger.info("topic: " + topics + " pool return records size: " + records.count());
-                for (ConsumerRecord<String, String> record : records) {
-                    String value = record.value();
-                    logger.info("receiving batchCode... " + value);
-                    if(value.equals(batchCode)){
-                        processed = true;
-                    } else{
-                        batchCodes.add(batchCode);
-                    }
-                    // 手动提交已消费数据的 offset
-                    Boolean enableAutoCommit = kafkaProperties.getConsumer().getEnableAutoCommit();
-                    if (enableAutoCommit != null && !enableAutoCommit) {
-                        consumer.commitSync();
+    }
+
+    /**
+     * 订阅消息,存储收到回复的 batch code
+     */
+    private void subscribe() {
+        if (consumer == null) {
+            consumer = new KafkaConsumer<>(kafkaConfig());
+            // 订阅topic,并实现 ConsumerRebalanceListener
+            consumer.subscribe(topics);
+            new Thread(new Runnable() {
+                @Override
+                public void run() {
+                    while (true) {
+                        try {
+                            ConsumerRecords<String, String> records = consumer.poll(maxPollIntervalMillis);
+                            logger.info("topic: " + topics + " pool return records size: " + records.count());
+                            for (ConsumerRecord<String, String> record : records) {
+                                String value = record.value();
+                                logger.info("receiving batchCode... " + value);
+                                batchCodes.add(value);
+                                // 手动提交已消费数据的 offset
+                                Boolean enableAutoCommit = kafkaProperties.getConsumer().getEnableAutoCommit();
+                                if (enableAutoCommit != null && !enableAutoCommit) {
+                                    consumer.commitSync();
+                                }
+                            }
+                        } catch (Exception e) {
+                            logger.error("消息处理出错", e);
+                        }
                     }
                 }
-            } catch (Exception e) {
-                logger.error("消息处理出错", e);
-            }
+            }).start();
         }
-        consumer.unsubscribe();
-        consumer.close();
     }
 }