Browse Source

Merge remote-tracking branch 'origin/dev' into dev

dongbw 7 years ago
parent
commit
6e08764c5d

+ 2 - 2
src/main/java/com/uas/ps/product/sync/MessageConsumer.java

@@ -30,6 +30,7 @@ public class MessageConsumer {
     private static final List<String> topics = Collections.singletonList("RESPONSE");
 
     private static final String LOG_DIR = System.getProperty("java.io.tmpdir");
+    private static final String LOG_FILE_PATH = (LOG_DIR.endsWith(File.separator) ? LOG_DIR : LOG_DIR + "/") + "batch-code-timeout.log";
 
     /**
      * 等待的最大次数
@@ -104,8 +105,7 @@ public class MessageConsumer {
             }
         }
         // 超过最大次数后,记录 batchCode
-        String logFilePath = (LOG_DIR.endsWith(File.separator) ? LOG_DIR : LOG_DIR + "/") + "batch-code-timeout.log";
-        try (FileWriter fileWriter = new FileWriter(logFilePath, true)) {
+        try (FileWriter fileWriter = new FileWriter(LOG_FILE_PATH, true)) {
             fileWriter.write(DateFormatUtils.DATETIME_FORMAT.format(new Date()) + " " + batchCode + "\n");
             fileWriter.flush();
         } catch (IOException e) {

+ 39 - 22
src/main/java/com/uas/ps/product/sync/WaitSyncHelper.java

@@ -45,6 +45,7 @@ public class WaitSyncHelper {
      */
     public void preWait(String sourceApp) {
         Assert.notEmpty(sourceApp, "sourceApp is empty");
+        reset();
         String batchCode = initBatchCode();
         syncRepository.setSessionVariable(sourceApp, batchCode);
     }
@@ -59,6 +60,7 @@ public class WaitSyncHelper {
     public void preWait(String sourceApp, JdbcTemplate jdbcTemplate) {
         Assert.notEmpty(sourceApp, "sourceApp is empty");
         Assert.notNull(jdbcTemplate, "jdbcTemplate is null");
+        reset();
         String batchCode = initBatchCode();
         jdbcTemplates.set(jdbcTemplate);
         jdbcTemplate.update("call sync$set_session_variable('" + sourceApp + "','" + batchCode + "');");
@@ -76,30 +78,31 @@ public class WaitSyncHelper {
     public void waitResponse() {
         String batchCode = batchCodes.get();
         Assert.notEmpty(batchCode, "batchCode is empty, please call preWait() first");
-        batchCodes.remove();
 
-        // 根据修改方式是 jdbc 还是 jpa,采用不同的方式获取 batchSize
-        JdbcTemplate jdbcTemplate = jdbcTemplates.get();
         Integer batchSize;
-        if (jdbcTemplate == null) {
-            batchSize = syncRepository.getBatchSize();
-            syncRepository.unsetSessionVariable();
-        } else {
-            batchSize = jdbcTemplate.execute(new CallableStatementCreator() {
-                @Override
-                public CallableStatement createCallableStatement(Connection connection) throws SQLException {
-                    CallableStatement callableStatement = connection.prepareCall("{call sync$get_batch_size(?)}");
-                    callableStatement.registerOutParameter(1, Types.INTEGER);
-                    return callableStatement;
-                }
-            }, new CallableStatementCallback<Integer>() {
-                @Override
-                public Integer doInCallableStatement(CallableStatement callableStatement) throws SQLException, DataAccessException {
-                    callableStatement.execute();
-                    return callableStatement.getInt(1);
-                }
-            });
-            jdbcTemplate.execute("call sync$unset_session_variable();");
+        try {
+            // 根据修改方式是 jdbc 还是 jpa,采用不同的方式获取 batchSize
+            JdbcTemplate jdbcTemplate = jdbcTemplates.get();
+            if (jdbcTemplate == null) {
+                batchSize = syncRepository.getBatchSize();
+            } else {
+                batchSize = jdbcTemplate.execute(new CallableStatementCreator() {
+                    @Override
+                    public CallableStatement createCallableStatement(Connection connection) throws SQLException {
+                        CallableStatement callableStatement = connection.prepareCall("{call sync$get_batch_size(?)}");
+                        callableStatement.registerOutParameter(1, Types.INTEGER);
+                        return callableStatement;
+                    }
+                }, new CallableStatementCallback<Integer>() {
+                    @Override
+                    public Integer doInCallableStatement(CallableStatement callableStatement) throws SQLException, DataAccessException {
+                        callableStatement.execute();
+                        return callableStatement.getInt(1);
+                    }
+                });
+            }
+        } finally {
+            reset();
         }
 
         logger.info("batchCode: " + batchCode + ", batchSize: " + batchSize);
@@ -109,4 +112,18 @@ public class WaitSyncHelper {
             messageConsumer.waitResponse(batchCode);
         }
     }
+
+    /**
+     * 重置,清除旧的 session 变量等
+     */
+    private void reset() {
+        batchCodes.remove();
+        JdbcTemplate jdbcTemplate = jdbcTemplates.get();
+        if (jdbcTemplate == null) {
+            syncRepository.unsetSessionVariable();
+        } else {
+            jdbcTemplates.remove();
+            jdbcTemplate.execute("call sync$unset_session_variable();");
+        }
+    }
 }