|
|
@@ -0,0 +1,452 @@
|
|
|
+package com.uas.platform.oos.migration.service.impl;
|
|
|
+
|
|
|
+import static com.uas.platform.oos.core.validate.ArgumentValidate.assertParameterNotNull;
|
|
|
+import static com.uas.platform.oos.core.validate.ArgumentValidate.assertStringNotEmpty;
|
|
|
+
|
|
|
+import com.fasterxml.jackson.annotation.JsonInclude;
|
|
|
+import com.fasterxml.jackson.annotation.JsonInclude.Include;
|
|
|
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
|
+import com.uas.platform.oos.core.oos.client.OosClientHelper;
|
|
|
+import com.uas.platform.oos.core.util.JacksonUtils;
|
|
|
+import com.uas.platform.oos.migration.dao.MigrationFileRepository;
|
|
|
+import com.uas.platform.oos.migration.dao.ResourceFieldsRepository;
|
|
|
+import com.uas.platform.oos.migration.entity.MigrationFile;
|
|
|
+import com.uas.platform.oos.migration.entity.MigrationFile.FileStatus;
|
|
|
+import com.uas.platform.oos.migration.entity.OperationResult;
|
|
|
+import com.uas.platform.oos.migration.entity.ResourceFields;
|
|
|
+import com.uas.platform.oos.migration.service.MigrationFileService;
|
|
|
+import java.time.Duration;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+import java.util.concurrent.CountDownLatch;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
+import java.util.concurrent.LinkedBlockingDeque;
|
|
|
+import java.util.concurrent.ThreadFactory;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor;
|
|
|
+import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+import org.apache.log4j.Logger;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.data.domain.Page;
|
|
|
+import org.springframework.data.domain.PageRequest;
|
|
|
+import org.springframework.data.domain.Pageable;
|
|
|
+import org.springframework.jdbc.core.JdbcTemplate;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.transaction.annotation.Transactional;
|
|
|
+import org.springframework.util.CollectionUtils;
|
|
|
+import org.springframework.util.StringUtils;
|
|
|
+
|
|
|
+@Service
|
|
|
+public class MigrationFileServiceImpl implements MigrationFileService {
|
|
|
+
|
|
|
+ private static final String BASE_URL = "https://oos-gz.ctyunapi.cn";
|
|
|
+
|
|
|
+ private static final String QUERY_RESOURCE_URL_STATEMENT = "SELECT COUNT(DISTINCT %s) "
|
|
|
+ + "FROM %s WHERE %s NOT LIKE ?";
|
|
|
+
|
|
|
+ private static final String BUCKET_NAME = "b2c.test";
|
|
|
+
|
|
|
+ private final Logger logger = Logger.getLogger(getClass());
|
|
|
+
|
|
|
+ private final MigrationFileRepository migrationRepository;
|
|
|
+
|
|
|
+ private final ResourceFieldsRepository fieldsRepository;
|
|
|
+
|
|
|
+ private final JdbcTemplate jdbcTemplate;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Constructor.
|
|
|
+ */
|
|
|
+ @Autowired
|
|
|
+ public MigrationFileServiceImpl(
|
|
|
+ MigrationFileRepository migrationRepository,
|
|
|
+ ResourceFieldsRepository fieldsRepository,
|
|
|
+ JdbcTemplate jdbcTemplate) {
|
|
|
+ this.migrationRepository = migrationRepository;
|
|
|
+ this.fieldsRepository = fieldsRepository;
|
|
|
+ this.jdbcTemplate = jdbcTemplate;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Long createMigrationFileLog(final String tableName, final String fieldName) {
|
|
|
+ assertStringNotEmpty(tableName, "The name of table to be migrating must be non-empty");
|
|
|
+ assertStringNotEmpty(tableName, "The name of table field to be migrating must be non-empty");
|
|
|
+
|
|
|
+ final LocalDateTime startTime = LocalDateTime.now();
|
|
|
+
|
|
|
+ // Count total number of records to be handled
|
|
|
+ Object[] parameters = new Object[]{BASE_URL + "%"};
|
|
|
+ String countSql = String.format(QUERY_RESOURCE_URL_STATEMENT, fieldName, tableName, fieldName);
|
|
|
+ Long count = jdbcTemplate.queryForObject(countSql, parameters, Long.class);
|
|
|
+
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.info(String.format("%s.%s has %d records to be handled", tableName, fieldName, count));
|
|
|
+ }
|
|
|
+
|
|
|
+ // return if the total number of records to be handled is 0
|
|
|
+ if (count == null || count <= 0) {
|
|
|
+ return (long) 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ // calculate page information of data set.
|
|
|
+ final int pageSize = 500;
|
|
|
+ int totalPage = calculateTotalPage(count, pageSize);
|
|
|
+
|
|
|
+ CountDownLatch latch = new CountDownLatch(totalPage);
|
|
|
+
|
|
|
+ ExecutorService executorService = getExecutorService();
|
|
|
+
|
|
|
+ for (int i = 0; i < totalPage; i++) {
|
|
|
+ final int minIndex = i * pageSize;
|
|
|
+ executorService.execute(() -> {
|
|
|
+ batchCreateMigrateRecords(tableName, fieldName, minIndex, pageSize, parameters);
|
|
|
+ latch.countDown();
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ latch.await(60, TimeUnit.SECONDS);
|
|
|
+ executorService.shutdown();
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ logger.error("Concurrent tasks occurs error", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ LocalDateTime endTime = LocalDateTime.now();
|
|
|
+ Duration between = Duration.between(startTime, endTime);
|
|
|
+ return between.toMillis();
|
|
|
+ }
|
|
|
+
|
|
|
+ private ExecutorService getExecutorService() {
|
|
|
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("migration-pool-%d")
|
|
|
+ .build();
|
|
|
+ return new ThreadPoolExecutor(5, 200, 0L, TimeUnit.MILLISECONDS,
|
|
|
+ new LinkedBlockingDeque<>(), threadFactory, new AbortPolicy());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Calculate total page according to total number and page size.
|
|
|
+ *
|
|
|
+ * @param count the total number of all records
|
|
|
+ * @param pageSize the size of page
|
|
|
+ */
|
|
|
+ private int calculateTotalPage(Long count, int pageSize) {
|
|
|
+ int totalPage = (int) ((count - 1) / pageSize) + 1;
|
|
|
+
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug(String.format("Total: %d, PageSize: %d, TotalPage: %d",
|
|
|
+ count, pageSize, totalPage));
|
|
|
+ }
|
|
|
+ return totalPage;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * Create a batch of migrate records.
|
|
|
+ *
|
|
|
+ * @param tableName the name of table migrated
|
|
|
+ * @param fieldName the name of field migrated
|
|
|
+ * @param minIndex the first index of current page
|
|
|
+ * @param pageSize the size of per page
|
|
|
+ * @param parameters sql parameters
|
|
|
+ */
|
|
|
+ @Transactional(rollbackFor = RuntimeException.class)
|
|
|
+ public void batchCreateMigrateRecords(String tableName, String fieldName, int minIndex,
|
|
|
+ int pageSize, Object... parameters) {
|
|
|
+ String sql = String.format("SELECT DISTINCT %s FROM %s WHERE %s NOT LIKE ? limit %d, %d",
|
|
|
+ fieldName, tableName, fieldName, minIndex, pageSize);
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug("Dynamic SQL: " + sql);
|
|
|
+ }
|
|
|
+
|
|
|
+ List<FileUrl> fileUrls = jdbcTemplate.query(sql, parameters, (rs, rowNum) -> {
|
|
|
+ FileUrl fileUrl = new FileUrl();
|
|
|
+ fileUrl.setUrl(rs.getString(fieldName));
|
|
|
+ return fileUrl;
|
|
|
+ });
|
|
|
+ logger.info("The number of current page records: " + fileUrls.size());
|
|
|
+
|
|
|
+ for (FileUrl fileUrl : fileUrls) {
|
|
|
+ MigrationFile file = saveMigrationFile(fileUrl.getUrl());
|
|
|
+
|
|
|
+ String fieldFullName = tableName.toLowerCase() + "." + fieldName.toUpperCase();
|
|
|
+ ResourceFields exist = fieldsRepository.findByFieldNameAndMigration(fieldFullName, file);
|
|
|
+ if (exist == null) {
|
|
|
+ ResourceFields fields = new ResourceFields();
|
|
|
+ fields.setFieldName(fieldFullName);
|
|
|
+ fields.setMigration(file);
|
|
|
+ fields.setCreateTime(new Date());
|
|
|
+ fields.setStatus(file.getStatus());
|
|
|
+ fieldsRepository.save(fields);
|
|
|
+ } else {
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.info("One resource related to this field exists");
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public MigrationFile saveMigrationFile(String url) {
|
|
|
+ final Date date = new Date();
|
|
|
+ MigrationFile migrationFile = migrationRepository.findByOldUrl(url);
|
|
|
+ if (migrationFile != null) {
|
|
|
+ return migrationFile;
|
|
|
+ }
|
|
|
+
|
|
|
+ MigrationFile file = new MigrationFile();
|
|
|
+ file.setOldUrl(url);
|
|
|
+ file.setStatus(FileStatus.CREATED);
|
|
|
+ file.setCreateTime(new Date());
|
|
|
+ migrationRepository.save(file);
|
|
|
+ return file;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public OperationResult gainStatisticsForMigration(String tableName, String fieldName) {
|
|
|
+ assertStringNotEmpty(tableName, "The name of table to be migrating must be non-empty");
|
|
|
+ assertStringNotEmpty(tableName, "The name of table field to be migrating must be non-empty");
|
|
|
+
|
|
|
+ // Count total number of records to be handled
|
|
|
+ Object[] parameters = new Object[]{BASE_URL + "%"};
|
|
|
+ String countSql = String.format("SELECT COUNT(DISTINCT %s) FROM %s WHERE %s NOT LIKE ?",
|
|
|
+ fieldName, tableName, fieldName);
|
|
|
+ Long count = jdbcTemplate.queryForObject(countSql, parameters, Long.class);
|
|
|
+
|
|
|
+ String fieldFullName = tableName.toLowerCase() + "." + fieldName.toUpperCase();
|
|
|
+ Long resourceCount = fieldsRepository.countByFieldName(fieldFullName);
|
|
|
+
|
|
|
+ OperationResult result = new OperationResult();
|
|
|
+ result.setTotal(count == null || count <= 0 ? 0 : count);
|
|
|
+ result.setTodo(resourceCount);
|
|
|
+ return result;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Long migrateFileToOos(Pageable pageable) {
|
|
|
+ assertParameterNotNull(pageable, "Page parameter must be non-empty");
|
|
|
+ final LocalDateTime startTime = LocalDateTime.now();
|
|
|
+
|
|
|
+ Page<MigrationFile> filePage = migrationRepository.readByStatus(FileStatus.CREATED, pageable);
|
|
|
+
|
|
|
+ if (filePage == null || CollectionUtils.isEmpty(filePage.getContent())) {
|
|
|
+ return (long) 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Use concurrent package to reduce time migrate files
|
|
|
+ CountDownLatch latch = new CountDownLatch(filePage.getContent().size());
|
|
|
+ ExecutorService executor = getExecutorService();
|
|
|
+ for (MigrationFile file : filePage.getContent()) {
|
|
|
+ executor.execute(() -> {
|
|
|
+ try {
|
|
|
+ migrateFileToOos(file);
|
|
|
+ } finally {
|
|
|
+ latch.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ }
|
|
|
+ executor.shutdown();
|
|
|
+
|
|
|
+ try {
|
|
|
+ latch.await(3, TimeUnit.MINUTES);
|
|
|
+ logger.info("The Count of file: " + filePage.getContent().size());
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ logger.error("Task interrupt", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ LocalDateTime endTime = LocalDateTime.now();
|
|
|
+ return Duration.between(startTime, endTime).toMillis();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Transactional(rollbackFor = RuntimeException.class)
|
|
|
+ public void migrateFileToOos(MigrationFile file) {
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug(String.format("Upload file[%s]", file.getOldUrl()));
|
|
|
+ }
|
|
|
+
|
|
|
+ // upload file
|
|
|
+ OosClientHelper helper = new OosClientHelper();
|
|
|
+ String url = helper.upload(BUCKET_NAME, file.getOldUrl());
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ if (!StringUtils.isEmpty(url)) {
|
|
|
+ logger.debug(String.format("File[%s] upload successfully, new url[%s]", file.getOldUrl(),
|
|
|
+ url));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ file.setNewUrl(url);
|
|
|
+ file.setMigrateTime(new Date());
|
|
|
+ file.setStatus(FileStatus.MIGRATED);
|
|
|
+ migrationRepository.save(file);
|
|
|
+
|
|
|
+ // Set Mapping Status
|
|
|
+ List<ResourceFields> fieldsList = fieldsRepository.findByMigration(file);
|
|
|
+ if (!CollectionUtils.isEmpty(fieldsList)) {
|
|
|
+ for (ResourceFields fields : fieldsList) {
|
|
|
+ fields.setStatus(FileStatus.MIGRATED);
|
|
|
+ }
|
|
|
+ fieldsRepository.save(fieldsList);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<MigrationStatistic> gainStatisticsForMovingFile() {
|
|
|
+ String mappingTableName = "a_resource_to_fields";
|
|
|
+ String sql = "SELECT field_name, mapping_status, count(*) as total_number FROM "
|
|
|
+ + mappingTableName + " GROUP BY field_name, mapping_status";
|
|
|
+
|
|
|
+ return jdbcTemplate.query(sql,
|
|
|
+ (rs, rowNum) -> {
|
|
|
+ MigrationStatistic statistic = new MigrationStatistic();
|
|
|
+ statistic.setFieldName(rs.getString("field_name"));
|
|
|
+ statistic.setStatus(rs.getString("mapping_status"));
|
|
|
+ statistic.setTotal(rs.getLong("total_number"));
|
|
|
+ return statistic;
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<String> gainFieldsWillMigrate() {
|
|
|
+ String mappingTableName = "a_resource_to_fields";
|
|
|
+ String sql = "SELECT DISTINCT field_name FROM " + mappingTableName
|
|
|
+ + " WHERE mapping_status = 'MIGRATED'";
|
|
|
+
|
|
|
+ return jdbcTemplate.query(sql, (rs, rowNum) -> rs.getString("field_name"));
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Long syncResourceLink(String tableName, String fieldName) {
|
|
|
+ assertStringNotEmpty(tableName, "The name of table to be migrating must be non-empty");
|
|
|
+ assertStringNotEmpty(tableName, "The name of table field to be migrating must be non-empty");
|
|
|
+
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug(String
|
|
|
+ .format("Sync links of field: %s.%s", tableName.toLowerCase(), fieldName.toUpperCase()));
|
|
|
+ }
|
|
|
+
|
|
|
+ final LocalDateTime startTime = LocalDateTime.now();
|
|
|
+
|
|
|
+ // Count total number of records to be handled
|
|
|
+ final String fieldFullName = tableName + "." + fieldName;
|
|
|
+ Long count = fieldsRepository.countByFieldNameAndStatus(fieldFullName, FileStatus.MIGRATED);
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug(String.format("%s.%s has %d records migrated", tableName, fieldName, count));
|
|
|
+ }
|
|
|
+
|
|
|
+ // return if the total number of records to be handled is 0
|
|
|
+ if (count == null || count <= 0) {
|
|
|
+ return 0L;
|
|
|
+ }
|
|
|
+
|
|
|
+ // calculate page information of data set.
|
|
|
+ final int pageSize = 500;
|
|
|
+ int totalPage = calculateTotalPage(count, pageSize);
|
|
|
+ if (logger.isDebugEnabled()) {
|
|
|
+ logger.debug("Print total page: " + totalPage);
|
|
|
+ }
|
|
|
+
|
|
|
+ CountDownLatch latch = new CountDownLatch(totalPage);
|
|
|
+ ExecutorService executorService = getExecutorService();
|
|
|
+ for (int i = 0; i < totalPage; i++) {
|
|
|
+ final int index = i;
|
|
|
+ executorService.execute(() -> {
|
|
|
+ try {
|
|
|
+ Pageable pageInfo = new PageRequest(index, pageSize);
|
|
|
+
|
|
|
+ Page<ResourceFields> fieldsPage = fieldsRepository
|
|
|
+ .readByFieldNameAndStatus(fieldFullName, FileStatus.MIGRATED, pageInfo);
|
|
|
+
|
|
|
+ if (!CollectionUtils.isEmpty(fieldsPage.getContent())) {
|
|
|
+ for (ResourceFields fields : fieldsPage.getContent()) {
|
|
|
+ syncResourceLinkOfField(fields, tableName, fieldName);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ latch.countDown();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+ executorService.shutdown();
|
|
|
+
|
|
|
+ try {
|
|
|
+ latch.await(3, TimeUnit.MINUTES);
|
|
|
+ logger.info("Task finished");
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ logger.error("Concurrent tasks occurs error", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ LocalDateTime endTime = LocalDateTime.now();
|
|
|
+ return Duration.between(startTime, endTime).toMillis();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Transactional(rollbackFor = RuntimeException.class)
|
|
|
+ public void syncResourceLinkOfField(ResourceFields fields, String tableName, String fieldName) {
|
|
|
+ String oldUrl = fields.getMigration().getOldUrl();
|
|
|
+ String newUrl = fields.getMigration().getNewUrl();
|
|
|
+
|
|
|
+ Object[] parameters = new Object[]{newUrl, oldUrl};
|
|
|
+ String sql = String
|
|
|
+ .format("UPDATE %s SET %s = ? WHERE %s = ?", tableName, fieldName, fieldName);
|
|
|
+ int result = jdbcTemplate.update(sql, parameters);
|
|
|
+ logger.info("Update " + result + " rows");
|
|
|
+
|
|
|
+ fields.setStatus(FileStatus.SYNCHRONIZED);
|
|
|
+ fieldsRepository.save(fields);
|
|
|
+ }
|
|
|
+
|
|
|
+ @JsonInclude(Include.NON_EMPTY)
|
|
|
+ public class MigrationStatistic {
|
|
|
+
|
|
|
+ private String fieldName;
|
|
|
+
|
|
|
+ private String status;
|
|
|
+
|
|
|
+ private Long total;
|
|
|
+
|
|
|
+ public MigrationStatistic() {
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getFieldName() {
|
|
|
+ return fieldName;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setFieldName(String fieldName) {
|
|
|
+ this.fieldName = fieldName;
|
|
|
+ }
|
|
|
+
|
|
|
+ public String getStatus() {
|
|
|
+ return status;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setStatus(String status) {
|
|
|
+ this.status = status;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Long getTotal() {
|
|
|
+ return total;
|
|
|
+ }
|
|
|
+
|
|
|
+ public void setTotal(Long total) {
|
|
|
+ this.total = total;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String toString() {
|
|
|
+ return JacksonUtils.toJson(this);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private class FileUrl {
|
|
|
+
|
|
|
+ private String url;
|
|
|
+
|
|
|
+ String getUrl() {
|
|
|
+ return url;
|
|
|
+ }
|
|
|
+
|
|
|
+ void setUrl(String url) {
|
|
|
+ this.url = url;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|