Quellcode durchsuchen

set max times for waiting sync

sunyj vor 8 Jahren
Ursprung
Commit
53b746ddf5
1 geänderte Dateien mit 25 neuen und 6 gelöschten Zeilen
  1. 25 6
      src/main/java/com/uas/ps/product/sync/MessageConsumer.java

+ 25 - 6
src/main/java/com/uas/ps/product/sync/MessageConsumer.java

@@ -1,5 +1,6 @@
 package com.uas.ps.product.sync;
 
+import com.uas.ps.core.util.DateFormatUtils;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -13,10 +14,10 @@ import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.stereotype.Service;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.*;
 
 /**
  * @author sunyj
@@ -28,6 +29,13 @@ public class MessageConsumer {
 
     private static final List<String> topics = Collections.singletonList("RESPONSE");
 
+    private static final String LOG_DIR = System.getProperty("java.io.tmpdir");
+
+    /**
+     * 等待的最大次数
+     */
+    private final static int MAX_WAIT_TIMES = 10000;
+
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
     @Autowired
@@ -80,8 +88,10 @@ public class MessageConsumer {
      */
     public void waitResponse(String batchCode) {
         subscribe();
-        while (true) {
-            logger.info("waiting batchCode: " + batchCode);
+        int count = 0;
+        while (count < MAX_WAIT_TIMES) {
+            count++;
+            logger.info("waiting batchCode: " + batchCode + " for " + count + " times");
             if (batchCodes.contains(batchCode)) {
                 logger.info("waited batchCode: " + batchCode);
                 batchCodes.remove(batchCode);
@@ -93,6 +103,15 @@ public class MessageConsumer {
                 logger.error("sleep error", e);
             }
         }
+        // 超过最大次数后,记录 batchCode
+        String logFilePath = (LOG_DIR.endsWith(File.separator) ? LOG_DIR : LOG_DIR + "/") + "batch-code-timeout.log";
+        try (FileWriter fileWriter = new FileWriter(logFilePath, true)) {
+            fileWriter.write(DateFormatUtils.DATETIME_FORMAT.format(new Date()) + " " + batchCode + "\n");
+            fileWriter.flush();
+        } catch (IOException e) {
+            logger.error("日志写入失败", e);
+        }
+        throw new IllegalStateException("等待数据同步超时");
     }
 
     /**