Browse Source

optimize the multiDownload

sunyj 8 years ago
parent
commit
19f87c8ea9

+ 16 - 6
search-console-b2b/src/main/java/com/uas/search/console/b2b/controller/IndexController.java

@@ -5,6 +5,7 @@ import com.uas.search.b2b.service.SearchService.Table_name;
 import com.uas.search.console.b2b.jms.AQListener;
 import com.uas.search.console.b2b.jms.LuceneQueueMessage;
 import com.uas.search.console.b2b.service.IndexService;
+import com.uas.search.console.b2b.support.DownloadHelper;
 import com.uas.search.console.b2b.util.SearchConstants.DataSourceQualifier;
 import com.uas.search.util.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -58,16 +59,25 @@ public class IndexController {
 
     @RequestMapping("/multiDownloadData")
     @ResponseBody
-    public String multiDownloadData(String tableNames, Integer threads) {
-        indexService.multiDownloadData(getTableNames(tableNames), threads);
-        return "Start download...";
+    public String multiDownloadData(String tableNames, Integer threads, String validateResult) {
+        long start = System.currentTimeMillis();
+        String message= "Downloaded: "+ indexService.multiDownloadData(getTableNames(tableNames), threads, parseValidateResult(validateResult));
+        message += String.format(", Time: %.2fs", (System.currentTimeMillis()-start)/1000.0);
+        return message;
     }
 
     @RequestMapping("/multiDownloadData/{tableName}")
     @ResponseBody
-    public String multiDownloadData(@PathVariable String tableName, Integer threads, Integer startFileIndex, Integer endFileIndex) {
-        indexService.multiDownloadData(Table_name.valueOf(tableName.toUpperCase()), threads, startFileIndex, endFileIndex);
-        return "Start download...";
+    public String multiDownloadData(@PathVariable String tableName, Integer threads, Integer startFileIndex, Integer endFileIndex, String validateResult) {
+        long start = System.currentTimeMillis();
+        String message= "Downloaded: "+ indexService.multiDownloadData(Table_name.valueOf(tableName.toUpperCase()),
+                threads, startFileIndex, endFileIndex, parseValidateResult(validateResult));
+        message += String.format(", Time: %.2fs", (System.currentTimeMillis()-start)/1000.0);
+        return message;
+    }
+
+    private DownloadHelper.ValidateResult parseValidateResult(String validateResult){
+        return StringUtils.isEmpty(validateResult) ? DownloadHelper.ValidateResult.CURRENT : DownloadHelper.ValidateResult.valueOf(validateResult.toUpperCase());
     }
 
     private List<Table_name> getTableNames(String tableNames){

+ 13 - 10
search-console-b2b/src/main/java/com/uas/search/console/b2b/service/IndexService.java

@@ -4,6 +4,7 @@ import com.uas.search.b2b.model.SPage;
 import com.uas.search.b2b.service.SearchService.Table_name;
 import com.uas.search.console.b2b.jms.LuceneQueueMessage;
 import com.uas.search.console.b2b.model.ParsedQueueMessage;
+import com.uas.search.console.b2b.support.DownloadHelper;
 import com.uas.search.console.b2b.util.SearchConstants.DataSourceQualifier;
 
 import java.util.List;
@@ -51,21 +52,23 @@ public interface IndexService {
 
 	/**
 	 * 多线程下载指定表的数据至本地文件中,以供建索引用
-	 *
-	 * @param tableNames     可为空,指定的表,默认下载所有表的数据
+	 *  @param tableNames     可为空,指定的表,默认下载所有表的数据
 	 * @param threads        线程数量,默认为 1
-	 */
-	public void multiDownloadData(List<Table_name> tableNames, Integer threads);
+     * @param validateResult 下载完成后,是否对结果进行校验
+     * @return 下载的数据条数
+     */
+	public long multiDownloadData(List<Table_name> tableNames, Integer threads, DownloadHelper.ValidateResult validateResult);
 
 	/**
 	 * 多线程下载指定表的数据至本地文件中,以供建索引用
-	 *
-	 * @param tableName      指定的表
+	 *  @param tableName      指定的表
 	 * @param threads        线程数量,默认为 1
-	 * @param startFileIndex 开始的文件,默认为 1
-	 * @param endFileIndex   结束的文件,默认为 1024 * 1024 * 1024
-	 */
-	public void multiDownloadData(Table_name tableName, Integer threads, Integer startFileIndex, Integer endFileIndex);
+     * @param startFileIndex 开始的文件,默认为 1
+     * @param endFileIndex   结束的文件,默认为 1024 * 1024 * 1024
+     * @param validateResult 下载完成后,是否对结果进行校验
+     * @return 下载的数据条数
+     */
+	public long multiDownloadData(Table_name tableName, Integer threads, Integer startFileIndex, Integer endFileIndex, DownloadHelper.ValidateResult validateResult);
 
 	/**
 	 * 将新对象添加在lucene索引中

+ 13 - 111
search-console-b2b/src/main/java/com/uas/search/console/b2b/service/impl/IndexServiceImpl.java

@@ -15,6 +15,8 @@ import com.uas.search.console.b2b.jms.QueueMessageParser;
 import com.uas.search.console.b2b.model.PageInfo;
 import com.uas.search.console.b2b.model.ParsedQueueMessage;
 import com.uas.search.console.b2b.service.IndexService;
+import com.uas.search.console.b2b.support.DownloadHelper;
+import com.uas.search.console.b2b.support.DownloadService;
 import com.uas.search.console.b2b.support.IndexSearcherManager;
 import com.uas.search.console.b2b.support.IndexWriterManager;
 import com.uas.search.console.b2b.util.ClassAndTableNameUtils;
@@ -32,7 +34,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.domain.Page;
-import org.springframework.data.domain.Sort;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.stereotype.Service;
 
@@ -68,12 +69,12 @@ public class IndexServiceImpl implements IndexService {
 	/**
 	 * 从数据库获取数据时的分页大小
 	 */
-	private static final int PAGE_SIZE = 1000;
+	public static final int PAGE_SIZE = 1000;
 
 	/**
 	 * 单个文件存储的最大数据数目,需是PAGE_SIZE的整数倍
 	 */
-	public static final int SINGLE_FILE_MAX_SIZE = 100000;
+	public static final int SINGLE_FILE_MAX_SIZE = PAGE_SIZE;
 
 	/**
 	 * 是否正在创建索引
@@ -359,131 +360,32 @@ public class IndexServiceImpl implements IndexService {
 	}
 
     @Override
-    public void multiDownloadData(List<Table_name> tableNames, Integer threads) {
+    public long multiDownloadData(List<Table_name> tableNames, Integer threads, DownloadHelper.ValidateResult validateResult) {
         // 参数为空,则下载所有需要建立索引的表的数据
         if (CollectionUtils.isEmpty(tableNames)) {
             tableNames = ClassAndTableNameUtils.getTableNames();
             SearchUtils.deleteDir(new File(luceneProperties.getDataDir()));
         }
+        long result = 0L;
         for (Table_name tableName : tableNames) {
             // 指定的表数目为1时才起作用
-            multiDownloadData(tableName,
-                    threads, null, null);
+            result += multiDownloadData(tableName, threads, null, null, validateResult);
         }
+        return result;
     }
 
     @Override
-    public void multiDownloadData(Table_name tableName, Integer threads, Integer startFileIndex, Integer endFileIndex) {
-        multiDownloadData(ClassAndTableNameUtils.toClass(tableName), threads, startFileIndex, endFileIndex);
+    public long multiDownloadData(Table_name tableName, Integer threads, Integer startFileIndex, Integer endFileIndex, DownloadHelper.ValidateResult validateResult) {
+        return multiDownloadData(ClassAndTableNameUtils.toClass(tableName), threads, startFileIndex, endFileIndex, validateResult);
     }
 
-    private <T> void multiDownloadData(Class<T> clazz, Integer threads, Integer startFileIndex, Integer endFileIndex) {
+    private <T> long multiDownloadData(Class<T> clazz, Integer threads, Integer startFileIndex, Integer endFileIndex, DownloadHelper.ValidateResult validateResult) {
         threads = threads == null || threads < 1 ? 1 : threads;
         if (threads > druidDBConfiguration.getMaxActive()) {
             throw new IllegalArgumentException("线程数量不可超过 " + druidDBConfiguration.getMaxActive());
         }
-        startFileIndex = startFileIndex == null || startFileIndex < 1 ? 1 : startFileIndex;
-        if (startFileIndex == 1 && endFileIndex == null) {
-            // 删除旧的文件
-            Table_name tableName = ClassAndTableNameUtils.toTableName(clazz);
-            FileUtils.deleteSubFiles(new File(SearchUtils.getDataPath(tableName)));
-        }
-        endFileIndex = endFileIndex == null || endFileIndex < 1 ? 1024 * 1024 * 1024 : endFileIndex;
-        for (int i = 1; i <= threads; i++) {
-            new Thread(new DownloadTread<>(clazz, "Thread-" + i, threads, startFileIndex + i - 1, endFileIndex)).start();
-        }
-    }
-
-    private class DownloadTread<T> implements Runnable {
-
-        /**
-         * 表名
-         */
-        private Class<T> clazz;
-
-        /**
-         * 线程名称
-         */
-        private String name;
-
-        /**
-         * 新增文件时,文件 id 的自增步长,(即线程数量)
-         */
-        private int step;
-
-        /**
-         * 开始的文件
-         */
-        private int startFileIndex;
-
-        /**
-         * 开始的文件
-         */
-        private int endFileIndex;
-
-        public DownloadTread(Class<T> clazz, String name, int step, int startFileIndex, int endFileIndex) {
-            this.clazz = clazz;
-            this.name = name;
-            this.step = step;
-            this.startFileIndex = startFileIndex;
-            this.endFileIndex = endFileIndex;
-        }
-
-        @Override
-        public void run() {
-            Table_name tableName = ClassAndTableNameUtils.toTableName(clazz);
-            try {
-                if (endFileIndex < startFileIndex) {
-                    logger.error(name + " fileIndex 不可超过 : endFileIndex=" + endFileIndex);
-                    return;
-                }
-                Long startTime = new Date().getTime();
-                logger.info("Download... " + tableName.value());
-
-                Sort sort = new Sort(Sort.Direction.ASC, "id");
-                // 分页获取数据
-                PageParams pageParams = new PageParams();
-                pageParams.setPage(startFileIndex);
-                pageParams.setSize(PAGE_SIZE);
-                PageInfo pageInfo = new PageInfo(pageParams, sort);
-                JpaRepository<T, Long> dao = ClassAndTableNameUtils.getDao(clazz);
-                Page<T> pageResult = dao.findAll(pageInfo);
-
-                int totalPages = pageResult.getTotalPages();
-                if (totalPages < startFileIndex) {
-                    logger.error(name + " fileIndex 不可超过 : totalPages=" + totalPages);
-                    return;
-                }
-                // 已翻页的数据数目
-                Long size = 0L;
-                String dataPath = SearchUtils.getDataPath(tableName);
-                File file = new File(dataPath);
-                if (!file.exists()) {
-                    file.mkdirs();
-                }
-                while (totalPages >= startFileIndex && endFileIndex >= startFileIndex) {
-                    String fileName = String.format("%010d", startFileIndex) + ".txt";
-                    PrintWriter printWriter = new PrintWriter(dataPath + "/" + fileName);
-                    List<T> content = pageResult.getContent();
-                    for (T element : content) {
-                        printWriter.println(JSONObject.toJSONString(element));
-                    }
-                    size += content.size();
-                    logger.info(tableName.value() + " " + name + " " + fileName + " - Downloaded..................." + size);
-
-                    printWriter.flush();
-                    printWriter.close();
-                    startFileIndex += step;
-                    pageParams.setPage(startFileIndex);
-                    pageInfo = new PageInfo(pageParams, sort);
-                    pageResult = dao.findAll(pageInfo);
-                }
-
-                logger.info(String.format("%s %s 下载完成,耗时%.2fs\n ", tableName.value(), name, (new Date().getTime() - startTime) / 1000.0));
-            } catch (Throwable e) {
-                logger.error(tableName.value() + " " + name + " 下载失败", e);
-            }
-        }
+        DownloadHelper<T> downloadHelper = new DownloadHelper<>(threads, startFileIndex, endFileIndex, clazz, "id", ClassAndTableNameUtils.getDao(clazz), new DownloadService<T>(), validateResult);
+        return downloadHelper.getResult();
     }
 
 	@Override

+ 393 - 0
search-console-b2b/src/main/java/com/uas/search/console/b2b/support/DownloadHelper.java

@@ -0,0 +1,393 @@
+package com.uas.search.console.b2b.support;
+
+import com.uas.search.b2b.model.PageParams;
+import com.uas.search.b2b.service.SearchService;
+import com.uas.search.console.b2b.model.PageInfo;
+import com.uas.search.console.b2b.util.ClassAndTableNameUtils;
+import com.uas.search.console.b2b.util.FileUtils;
+import com.uas.search.console.b2b.util.SearchUtils;
+import com.uas.search.util.ArrayUtils;
+import com.uas.search.util.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.data.jpa.repository.JpaRepository;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.*;
+
+import static com.uas.search.console.b2b.service.impl.IndexServiceImpl.PAGE_SIZE;
+import static com.uas.search.console.b2b.service.impl.IndexServiceImpl.SINGLE_FILE_MAX_SIZE;
+
+/**
+ * 下载数据的辅助类
+ *
+ * @author sunyj
+ * @since 2017/11/25 17:30
+ */
+public class DownloadHelper<T> {
+
+    /**
+     * 线程最小数量
+     */
+    private final int MIN_THREAD_SIZE = 1;
+
+    /**
+     * 线程最大数量
+     */
+    private final int MAX_THREAD_SIZE = 10000;
+
+    /**
+     * 默认开始的文件
+     */
+    private final int DEFAULT_START_FILE_INDEX = 1;
+
+    /**
+     * 默认结束的文件
+     */
+    private final int DEFAULT_END_FILE_INDEX = 1024 * 1024 * 1024;
+
+    /**
+     * 线程数量
+     */
+    private Integer threadSize;
+
+    /**
+     * 开始的文件
+     */
+    private Integer startFileIndex;
+
+    /**
+     * 结束的文件
+     */
+    private Integer endFileIndex;
+
+    /**
+     * 要下载的实体类
+     */
+    private Class<T> clazz;
+
+    /**
+     * 排序字段
+     */
+    private String sortField;
+
+    /**
+     * dao
+     */
+    private JpaRepository<T, Long> dao;
+
+    /**
+     * 下载的实现
+     */
+    private DownloadService<T> downloadService;
+
+    /**
+     * 下载完成后,是否对结果进行校验
+     */
+    private ValidateResult validateResult;
+
+    /**
+     * 线程管理
+     */
+    private ExecutorService executorService;
+
+    /**
+     * 收集执行结果
+     */
+    private CompletionService<Long> completionService;
+
+    /**
+     * 要下载表的数据总行数
+     */
+    private Long totalElements;
+
+    /**
+     * 执行结果
+     */
+    private Long result;
+
+    private Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * @param threadSize      线程数量
+     * @param startFileIndex  开始的文件
+     * @param endFileIndex    结束的文件
+     * @param clazz           要下载的实体类
+     * @param sortField       排序字段
+     * @param dao             dao
+     * @param downloadService 下载的实现
+     */
+    public DownloadHelper(Integer threadSize, Integer startFileIndex, Integer endFileIndex, Class<T> clazz, String sortField, JpaRepository<T, Long> dao, DownloadService<T> downloadService) {
+        this(threadSize, startFileIndex, endFileIndex, clazz, sortField, dao, downloadService, ValidateResult.CURRENT);
+    }
+
+    /**
+     * @param threadSize      线程数量
+     * @param startFileIndex  开始的文件
+     * @param endFileIndex    结束的文件
+     * @param clazz           要下载的实体类
+     * @param sortField       排序字段
+     * @param dao             dao
+     * @param downloadService 下载的实现
+     * @param validateResult  下载完成后,是否对结果进行校验
+     */
+    public DownloadHelper(Integer threadSize, Integer startFileIndex, Integer endFileIndex, Class<T> clazz, String sortField, JpaRepository<T, Long> dao, DownloadService<T> downloadService, ValidateResult validateResult) {
+        if (threadSize == null || threadSize < MIN_THREAD_SIZE || threadSize > MAX_THREAD_SIZE) {
+            throw new IllegalArgumentException("threadSize is between " + MIN_THREAD_SIZE + " and " + MAX_THREAD_SIZE);
+        }
+        if (downloadService == null) {
+            throw new IllegalArgumentException("runnable is null");
+        }
+        this.threadSize = threadSize;
+        this.downloadService = downloadService;
+        this.startFileIndex = startFileIndex == null || startFileIndex < DEFAULT_START_FILE_INDEX ? DEFAULT_START_FILE_INDEX : startFileIndex;
+        this.endFileIndex = endFileIndex == null || endFileIndex < DEFAULT_START_FILE_INDEX ? DEFAULT_END_FILE_INDEX : endFileIndex;
+        this.clazz = clazz;
+        this.sortField = sortField;
+        this.dao = dao;
+        this.validateResult = validateResult;
+        start();
+    }
+
+    /**
+     * 开始下载
+     */
+    private void start() {
+        SearchService.Table_name tableName = ClassAndTableNameUtils.toTableName(clazz);
+        executorService = Executors.newCachedThreadPool();
+        completionService = new ExecutorCompletionService<>(executorService);
+        if (startFileIndex == DEFAULT_START_FILE_INDEX && endFileIndex == DEFAULT_END_FILE_INDEX) {
+            // 删除旧的文件
+            FileUtils.deleteSubFiles(new File(SearchUtils.getDataPath(tableName)));
+        }
+        getTotalElements();
+        logger.info(tableName + " 发现数据 " + totalElements + " 条");
+        // 线程数量不可高于下载的文件数量
+        threadSize = threadSize <= endFileIndex - startFileIndex + 1 ? threadSize : endFileIndex - startFileIndex + 1;
+        for (int i = 0; i < threadSize; i++) {
+            completionService.submit(getTask(i, threadSize, startFileIndex + i, endFileIndex, tableName, sortField, dao));
+        }
+        waitResult();
+        if (validateResult != null && validateResult != ValidateResult.NONE) {
+            // 对结果进行校验,只校验一定的次数,防止因特殊原因,某些数据始终无法成功下载,陷入死循环
+            int retry = 1;
+            List<Integer> missingFiles;
+            logger.info("校验下载结果");
+            while (retry <= 5 && !CollectionUtils.isEmpty(missingFiles = validateResult())) {
+                logger.error("第 " + retry + " 次校验,下载遗失的文件:" + missingFiles);
+                List<DownloadHelper<T>> downloadHelpers = new ArrayList<>();
+                for (Integer missingFile : missingFiles) {
+                    downloadHelpers.add(new DownloadHelper<>(1, missingFile, missingFile, clazz, sortField, dao, downloadService, ValidateResult.NONE));
+                }
+                for (DownloadHelper downloadHelper : downloadHelpers) {
+                    downloadHelper.getResult();
+                }
+                retry++;
+            }
+            if (!CollectionUtils.isEmpty(missingFiles = validateResult())) {
+                throw new IllegalStateException("部分数据下载失败:" + missingFiles);
+            } else{
+                logger.info(tableName.value() + " 数据下载成功:" + result + " 条");
+            }
+        }
+    }
+
+    /**
+     * 获取要下载表的数据总行数
+     */
+    private void getTotalElements() {
+        PageParams pageParams = new PageParams();
+        pageParams.setPage(startFileIndex);
+        pageParams.setSize(PAGE_SIZE);
+        PageInfo pageInfo = new PageInfo(pageParams);
+        totalElements = dao.findAll(pageInfo).getTotalElements();
+    }
+
+    /**
+     * 获取任务
+     *
+     * @param id             线程 id
+     * @param step           新增文件时,文件 id 的自增步长,(即线程数量)
+     * @param startFileIndex 开始的文件
+     * @param endFileIndex   结束的文件
+     * @param tableName      要下载的表
+     * @param sortField      排序字段
+     * @param dao            dao
+     * @return 任务
+     */
+    private Callable<Long> getTask(final int id, final int step, final int startFileIndex, final int endFileIndex, final SearchService.Table_name tableName, final String sortField, final JpaRepository<T, Long> dao) {
+        return new Callable<Long>() {
+            @Override
+            public Long call() throws Exception {
+                return downloadService.download(id, step, startFileIndex, endFileIndex, tableName, sortField, dao);
+            }
+        };
+    }
+
+    /**
+     * 等待执行结果
+     */
+    private void waitResult() {
+        if (executorService.isShutdown() || executorService.isTerminated()) {
+            throw new IllegalStateException("结果已返回,不可再次获取");
+        }
+        result = 0L;
+        for (int i = 0; i < threadSize; i++) {
+            try {
+                Future<Long> future = completionService.take();
+                Long count = future.get();
+                result += (count == null ? 0L : count);
+            } catch (InterruptedException | ExecutionException e) {
+                throw new IllegalStateException("获取下载结果失败", e);
+            }
+        }
+        executorService.shutdown();
+    }
+
+    /**
+     * 获取执行结果
+     *
+     * @return 下载的总数量
+     */
+    public long getResult() {
+        if (result == null) {
+            waitResult();
+        }
+        return result;
+    }
+
+    /**
+     * 校验结果
+     *
+     * @return 未下载成功的文件
+     */
+    private List<Integer> validateResult() {
+        SearchService.Table_name tableName = ClassAndTableNameUtils.toTableName(clazz);
+        int totalFiles = (int) Math.ceil(totalElements / (1.0 * SINGLE_FILE_MAX_SIZE));
+        // 期待的起始、结束文件
+        final int expectStartFileIndex;
+        final int expectEndFileIndex;
+        if (validateResult == ValidateResult.CURRENT) {
+            // 校验本次下载的文件
+            expectStartFileIndex = startFileIndex;
+            expectEndFileIndex = endFileIndex < totalFiles ? endFileIndex : totalFiles;
+        } else if (validateResult == ValidateResult.ALL) {
+            // 校验所有文件
+            expectStartFileIndex = DEFAULT_START_FILE_INDEX;
+            expectEndFileIndex = totalFiles;
+        } else {
+            return null;
+        }
+        List<Integer> missingFiles = new ArrayList<>();
+
+        File[] files = new File(SearchUtils.getDataPath(tableName)).listFiles(new FileFilter() {
+            @Override
+            public boolean accept(File pathname) {
+                // 只保留指定格式的文件,类似 0000000323.txt
+                if (!pathname.isFile() || !pathname.getName().matches("[0]*?[\\d]+?.txt")) {
+                    return false;
+                }
+                int id = parseName(pathname);
+                return id >= expectStartFileIndex && id <= expectEndFileIndex;
+            }
+        });
+        if (ArrayUtils.isEmpty(files)) {
+            return missingFiles;
+        }
+        // 将文件按名称排序
+        sort(files);
+        // 默认把所有文件视为未下载成功
+        for (int i = expectStartFileIndex; i <= expectEndFileIndex; i++) {
+            missingFiles.add(i);
+        }
+        result = 0L;
+        for (File file : files) {
+            // 读取文件,如果文件行数正常(如果不是最后一个文件,行数必须超过限定值),根据文件 id 将其从 missingFiles 中移出
+            int id = parseName(file);
+            int readLineCount = readLineCount(file);
+            if (readLineCount >= SINGLE_FILE_MAX_SIZE || (id == totalFiles && readLineCount > 0)) {
+                result += readLineCount;
+                missingFiles.remove(new Integer(id));
+            }
+        }
+        return missingFiles;
+    }
+
+    /**
+     * 获取文件行数
+     *
+     * @param file 文件
+     * @return 行数
+     */
+    private int readLineCount(File file) {
+        try (BufferedReader bufferedReader = new BufferedReader(new FileReader(file))) {
+            int count = 0;
+            while (bufferedReader.readLine() != null) {
+                count++;
+            }
+            return count;
+        } catch (IOException e) {
+            throw new IllegalStateException("校验下载的文件时,读取文件出错", e);
+        }
+    }
+
+    /**
+     * 根据文件的名称进行排序(增序)
+     *
+     * @param files
+     */
+    public void sort(File[] files) {
+        int N = files.length;
+        int h = 1;
+        while (h < N / 3) {
+            h = h * 3 + 1;
+        }
+        while (h >= 1) {
+            for (int i = h; i < N; i++) {
+                for (int j = i; j >= h && less(files[j], files[j - h]); j -= h) {
+                    exchange(files, j, j - h);
+                }
+            }
+            h /= 3;
+        }
+    }
+
+    private boolean less(File f1, File f2) {
+        return parseName(f1) - parseName(f2) < 0;
+    }
+
+    /**
+     * 解析文件名称为数字
+     */
+    private int parseName(File f) {
+        return Integer.parseInt(f.getName().replaceAll(".txt", ""));
+    }
+
+    /**
+     * 交换位置
+     */
+    private void exchange(File[] files, int i, int j) {
+        File temp = files[i];
+        files[i] = files[j];
+        files[j] = temp;
+    }
+
+    public enum ValidateResult {
+        /**
+         * 不校验
+         */
+        NONE,
+        /**
+         * 校验所有文件
+         */
+        ALL,
+
+        /**
+         * 只校验本次下载的文件
+         */
+        CURRENT
+    }
+
+}

+ 144 - 0
search-console-b2b/src/main/java/com/uas/search/console/b2b/support/DownloadService.java

@@ -0,0 +1,144 @@
+package com.uas.search.console.b2b.support;
+
+import com.alibaba.fastjson.JSONObject;
+import com.uas.search.b2b.model.PageParams;
+import com.uas.search.b2b.service.SearchService;
+import com.uas.search.console.b2b.model.PageInfo;
+import com.uas.search.console.b2b.util.SearchUtils;
+import com.uas.search.util.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.data.domain.Page;
+import org.springframework.data.domain.Sort;
+import org.springframework.data.jpa.repository.JpaRepository;
+
+import java.io.File;
+import java.io.PrintWriter;
+import java.util.Date;
+import java.util.List;
+
+import static com.uas.search.console.b2b.service.impl.IndexServiceImpl.PAGE_SIZE;
+import static com.uas.search.console.b2b.service.impl.IndexServiceImpl.SINGLE_FILE_MAX_SIZE;
+
+/**
+ * 下载线程
+ *
+ * @author sunyj
+ * @since 2017/11/25 18:24
+ */
+public class DownloadService<T> {
+
+    /**
+     * @param id             线程 id
+     * @param step           新增文件时,文件 id 的自增步长,(即线程数量)
+     * @param startFileIndex 开始的文件
+     * @param endFileIndex   结束的文件
+     * @param tableName      要下载的表
+     * @param sortField      排序字段
+     * @param dao            dao
+     * @return 下载的数量
+     */
+    public long download(int id, int step, int startFileIndex, int endFileIndex, SearchService.Table_name tableName, String sortField, JpaRepository<T, Long> dao) {
+        long size = 0L;
+        Logger logger = LoggerFactory.getLogger(getClass());
+        String name = "Thread_" + id + "_" + startFileIndex + "-" + endFileIndex;
+        File file = null;
+        PrintWriter printWriter = null;
+        try {
+            if (SINGLE_FILE_MAX_SIZE % PAGE_SIZE != 0) {
+                throw new IllegalArgumentException("SINGLE_FILE_MAX_SIZE 并非 PAGE_SIZE 的整数倍:" + SINGLE_FILE_MAX_SIZE + "/" + PAGE_SIZE);
+            }
+            if (startFileIndex > endFileIndex) {
+                logger.info(name + " startFileIndex 超过 " + endFileIndex);
+                return size;
+            }
+            Long startTime = new Date().getTime();
+            logger.info(name + " 下载 " + tableName.value() + "...");
+
+            Sort sort = new Sort(Sort.Direction.ASC, sortField);
+            // 分页获取数据
+            PageParams pageParams = new PageParams();
+            pageParams.setPage((startFileIndex - 1) * SINGLE_FILE_MAX_SIZE / PAGE_SIZE + 1);
+            pageParams.setSize(PAGE_SIZE);
+            PageInfo pageInfo = new PageInfo(pageParams, sort);
+            Page<T> pageResult = dao.findAll(pageInfo);
+
+            int totalFiles = (int) Math.ceil(pageResult.getTotalElements() / (1.0 * SINGLE_FILE_MAX_SIZE));
+            if (startFileIndex > totalFiles) {
+                logger.info(name + " startFileIndex 超过 " + totalFiles);
+                return size;
+            }
+            // 实际的最后一个文件
+            endFileIndex = endFileIndex < totalFiles ? endFileIndex : totalFiles;
+
+            File dataDir = new File(SearchUtils.getDataPath(tableName));
+            if (!dataDir.exists()) {
+                if (!dataDir.mkdirs()) {
+                    logger.error("创建文件夹失败:" + dataDir.getPath());
+                }
+            }
+            int fileIndex = startFileIndex;
+            String fileName = String.format("%010d", fileIndex) + ".txt";
+            file = new File(dataDir, fileName);
+            printWriter = new PrintWriter(file);
+            int count = 0;
+            while (endFileIndex >= fileIndex) {
+                // 数据以 JSON 格式写入文件
+                List<T> content = pageResult.getContent();
+                for (T element : content) {
+                    println(printWriter, element);
+                }
+                count += content.size();
+                size += content.size();
+                logger.info(name + " " + fileName + " - Downloaded..................." + size);
+
+                // 一个文件的函数达到指定值,写入新文件
+                if (count == SINGLE_FILE_MAX_SIZE) {
+                    count = 0;
+                    close(printWriter);
+                    fileIndex += step;
+                    pageParams.setPage(pageParams.getPage() + step * SINGLE_FILE_MAX_SIZE / PAGE_SIZE);
+                    // 不创建多余文件
+                    if (endFileIndex >= fileIndex) {
+                        fileName = String.format("%010d", fileIndex) + ".txt";
+                        file = new File(dataDir, fileName);
+                        printWriter = new PrintWriter(file);
+                    }
+                } else {
+                    pageParams.setPage(pageParams.getPage() + 1);
+                }
+                pageInfo = new PageInfo(pageParams, sort);
+                pageResult = dao.findAll(pageInfo);
+                // 已经下载到最后一页
+                if (CollectionUtils.isEmpty(pageResult.getContent())) {
+                    break;
+                }
+            }
+            logger.info(String.format("%s 下载完成,耗时%.2fs\n ", name, (new Date().getTime() - startTime) / 1000.0));
+        } catch (Throwable e) {
+            close(printWriter);
+            // 出现错误时,删除最近出错的文件
+            if (file != null) {
+                if (!file.delete()) {
+                    logger.error("出错时,清理出错文件失败:" + file.getPath());
+                }
+            }
+            logger.error(name + " " + tableName + " 下载失败", e);
+        } finally {
+            close(printWriter);
+        }
+        return size;
+    }
+
+    protected void println(PrintWriter printWriter, T element) {
+        printWriter.println(JSONObject.toJSONString(element));
+    }
+
+    private void close(PrintWriter printWriter) {
+        if (printWriter != null) {
+            printWriter.flush();
+            printWriter.close();
+        }
+    }
+
+}

+ 3 - 3
search-console-b2b/src/main/webapp/WEB-INF/views/console.html

@@ -46,9 +46,9 @@
 				<li>index/downloadData</li>
 				<li><a target="_blank">index/downloadData?tableNames=PURC$ORDERS,PURC$RETURN</a></li>
                 <li><a target="_blank">index/downloadData/PURC$ORDERS?startFileIndex=2</a></li>
-				<li><a target="_blank">index/multiDownloadData?tableNames=PURC$ORDERS,PURC$RETURN&threads=3</a></li>
-				<li>index/multiDownloadData/PURC$ORDERS?threads=200&startFileIndex=0&endFileIndex=10000000</li>
-				<li><a target="_blank">index/multiDownloadData/PURC$ORDERS?threads=20</a></li>
+				<li><a target="_blank">index/multiDownloadData?tableNames=PURC$ORDERS,PURC$RETURN&threads=3&validateResult=current</a></li>
+				<li>index/multiDownloadData/PURC$ORDERS?threads=200&startFileIndex=0&endFileIndex=10000000&validateResult=all</li>
+				<li><a target="_blank">index/multiDownloadData/PURC$ORDERS?threads=20&validateResult=current</a></li>
 				<li>index/maintain?tableName=PURC$ORDERS&method=update&ids=1,2,3&data=[{"pu_id":1234,"pu_code":"adsf"},{...}]</li>
 				<li><a target="_blank">index/maintain?tableName=PURC$ORDERS&method=update&ids=1,2,3</a></li>
 				<li><a target="_blank">index/dequeue?dataSourceQualifier=platformmanager&messageId=42A34BADEF8A2EBDE050007F01001E6A</a></li>