|
|
@@ -4,8 +4,16 @@ import com.uas.ps.core.util.Assert;
|
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.dao.DataAccessException;
|
|
|
+import org.springframework.jdbc.core.CallableStatementCallback;
|
|
|
+import org.springframework.jdbc.core.CallableStatementCreator;
|
|
|
+import org.springframework.jdbc.core.JdbcTemplate;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
+import java.sql.CallableStatement;
|
|
|
+import java.sql.Connection;
|
|
|
+import java.sql.SQLException;
|
|
|
+import java.sql.Types;
|
|
|
import java.util.UUID;
|
|
|
|
|
|
/**
|
|
|
@@ -21,6 +29,7 @@ public class WaitSyncHelper {
|
|
|
private final MessageConsumer messageConsumer;
|
|
|
private Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
private ThreadLocal<String> batchCodes = new ThreadLocal<>();
|
|
|
+ private ThreadLocal<JdbcTemplate> jdbcTemplates = new ThreadLocal<>();
|
|
|
|
|
|
@Autowired
|
|
|
public WaitSyncHelper(SyncRepository syncRepository, MessageConsumer messageConsumer) {
|
|
|
@@ -30,25 +39,69 @@ public class WaitSyncHelper {
|
|
|
|
|
|
/**
|
|
|
* 预等待同步,进行相关配置(在修改数据之前调用)
|
|
|
+ * 如果修改数据利用的是 jpa 而非 jdbc,则需使用该方法进行预等待
|
|
|
*
|
|
|
* @param sourceApp 发起等待的应用
|
|
|
*/
|
|
|
public void preWait(String sourceApp) {
|
|
|
Assert.notEmpty(sourceApp, "sourceApp is empty");
|
|
|
- String batchCode = UUID.randomUUID().toString().replace("-", "");
|
|
|
+ String batchCode = initBatchCode();
|
|
|
syncRepository.setSessionVariable(sourceApp, batchCode);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 预等待同步,进行相关配置(在修改数据之前调用)
|
|
|
+ * 如果修改数据利用的是 jdbc 而非 jpa,则需使用该方法进行预等待
|
|
|
+ *
|
|
|
+ * @param sourceApp 发起等待的应用
|
|
|
+ * @param jdbcTemplate jdbcTemplate
|
|
|
+ */
|
|
|
+ public void preWait(String sourceApp, JdbcTemplate jdbcTemplate) {
|
|
|
+ Assert.notEmpty(sourceApp, "sourceApp is empty");
|
|
|
+ Assert.notNull(jdbcTemplate, "jdbcTemplate is null");
|
|
|
+ String batchCode = initBatchCode();
|
|
|
+ jdbcTemplates.set(jdbcTemplate);
|
|
|
+ jdbcTemplate.update("call sync$set_session_variable('" + sourceApp + "','" + batchCode + "');");
|
|
|
+ }
|
|
|
+
|
|
|
+ private String initBatchCode() {
|
|
|
+ String batchCode = UUID.randomUUID().toString().replace("-", "");
|
|
|
batchCodes.set(batchCode);
|
|
|
+ return batchCode;
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
* 等待同步完成(在修改数据之后调用)
|
|
|
*/
|
|
|
public void waitResponse() {
|
|
|
- Integer batchSize = syncRepository.getBatchSize();
|
|
|
- syncRepository.unsetSessionVariable();
|
|
|
String batchCode = batchCodes.get();
|
|
|
Assert.notEmpty(batchCode, "batchCode is empty, please call preWait() first");
|
|
|
batchCodes.remove();
|
|
|
+
|
|
|
+ // 根据修改方式是 jdbc 还是 jpa,采用不同的方式获取 batchSize
|
|
|
+ JdbcTemplate jdbcTemplate = jdbcTemplates.get();
|
|
|
+ Integer batchSize;
|
|
|
+ if (jdbcTemplate == null) {
|
|
|
+ batchSize = syncRepository.getBatchSize();
|
|
|
+ syncRepository.unsetSessionVariable();
|
|
|
+ } else {
|
|
|
+ batchSize = jdbcTemplate.execute(new CallableStatementCreator() {
|
|
|
+ @Override
|
|
|
+ public CallableStatement createCallableStatement(Connection connection) throws SQLException {
|
|
|
+ CallableStatement callableStatement = connection.prepareCall("{call sync$get_batch_size(?)}");
|
|
|
+ callableStatement.registerOutParameter(1, Types.INTEGER);
|
|
|
+ return callableStatement;
|
|
|
+ }
|
|
|
+ }, new CallableStatementCallback<Integer>() {
|
|
|
+ @Override
|
|
|
+ public Integer doInCallableStatement(CallableStatement callableStatement) throws SQLException, DataAccessException {
|
|
|
+ callableStatement.execute();
|
|
|
+ return callableStatement.getInt(1);
|
|
|
+ }
|
|
|
+ });
|
|
|
+ jdbcTemplate.execute("call sync$unset_session_variable();");
|
|
|
+ }
|
|
|
+
|
|
|
logger.info("batchCode: " + batchCode + ", batchSize: " + batchSize);
|
|
|
if (batchSize == null || batchSize < 1) {
|
|
|
logger.info("无数据修改,不必等待同步完成: " + batchCode);
|