Browse Source

实现多线程下载批次数据

sunyj 8 years ago
parent
commit
2634e52cae

+ 5 - 0
src/main/java/com/uas/search/constant/model/PageInfo.java

@@ -24,6 +24,11 @@ public class PageInfo implements Pageable {
 		getPageInfo(params);
 	}
 
+	public PageInfo(PageParams params,Sort sort) {
+		getPageInfo(params);
+		this.sort = sort;
+	}
+
 	private void getPageInfo(PageParams params) {
 		this.pageSize = params.getSize();
 		if (this.pageSize == 0) {

+ 82 - 74
src/main/java/com/uas/search/controller/IndexController.java

@@ -2,12 +2,11 @@ package com.uas.search.controller;
 
 import com.uas.search.annotation.NotEmpty;
 import com.uas.search.constant.model.SPage;
-import com.uas.search.jms.LuceneMessageDao;
-import com.uas.search.exception.SearchException;
 import com.uas.search.jms.JmsListener;
 import com.uas.search.jms.LuceneMessage;
-import com.uas.search.service.IndexService;
+import com.uas.search.jms.LuceneMessageDao;
 import com.uas.search.jms.LuceneMessageService;
+import com.uas.search.service.IndexService;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Controller;
 import org.springframework.util.StringUtils;
@@ -31,82 +30,91 @@ public class IndexController {
     @Autowired
     private LuceneMessageDao luceneMessageDao;
 
-	@Autowired
-	private IndexService indexService;
-
-	@Autowired
-	private LuceneMessageService luceneMessageService;
-
-	@Autowired
-	private JmsListener jmsListener;
-
-	@RequestMapping("/create")
-	@ResponseBody
-	public String initIndexes(String tableNames, HttpServletRequest request) {
-		List<String> tableNameList = new ArrayList<>();
-		if (!StringUtils.isEmpty(tableNames)) {
-			String[] strs = tableNames.split(",");
-			for (String str : strs) {
-				tableNameList.add(str.toLowerCase());
-			}
-		}
-		return String.format("Indexes created success in %.2fs",
-				indexService.createIndexes(tableNameList, true) / 1000.0);
-	}
-
-	@RequestMapping("/downloadComponentData")
-	@ResponseBody
-	public String downloadComponentDataFromDatabase(Integer startFileIndex, HttpServletRequest request) {
-		try {
-			return "Data downloaded success in " + indexService.downloadComponentDataFromDatabase(startFileIndex)
-					+ " ms.";
-		} catch (Exception e) {
-			// 防止SQLRecoverableException导致应用终止
-			throw new SearchException(e).setDetailedMessage(e);
-		}
-	}
-
-	@RequestMapping("/listen/start")
-	@ResponseBody
-	public String startListen(Long interval, HttpServletRequest request) {
-		jmsListener.start(interval);
+    @Autowired
+    private IndexService indexService;
+
+    @Autowired
+    private LuceneMessageService luceneMessageService;
+
+    @Autowired
+    private JmsListener jmsListener;
+
+    @RequestMapping("/create")
+    @ResponseBody
+    public String initIndexes(String tableNames, HttpServletRequest request) {
+        List<String> tableNameList = new ArrayList<>();
+        if (!StringUtils.isEmpty(tableNames)) {
+            String[] strs = tableNames.split(",");
+            for (String str : strs) {
+                tableNameList.add(str.toLowerCase());
+            }
+        }
+        return String.format("Indexes created success in %.2fs",
+                indexService.createIndexes(tableNameList, true) / 1000.0);
+    }
+
+    @RequestMapping("/downloadComponentData")
+    @ResponseBody
+    public String downloadComponentDataFromDatabase(Integer startFileIndex, HttpServletRequest request) {
+        return "Data downloaded success in " + indexService.downloadComponentDataFromDatabase(startFileIndex)
+                + " ms.";
+    }
+
+    @RequestMapping("/downloadGoods")
+    @ResponseBody
+    public String downloadGoods(Integer startFileIndex, HttpServletRequest request) {
+        return "Data downloaded success in " + indexService.downloadGoods(startFileIndex)
+                + " ms.";
+    }
+
+    @RequestMapping("/multiDownloadGoods")
+    @ResponseBody
+    public String multiDownloadGoods(Integer number, Integer startFileIndex, Integer endFileIndex, HttpServletRequest request) {
+        indexService.multiDownloadGoods(number, startFileIndex, endFileIndex);
+        return "start download";
+    }
+
+    @RequestMapping("/listen/start")
+    @ResponseBody
+    public String startListen(Long interval, HttpServletRequest request) {
+        jmsListener.start(interval);
         return "开启成功";
-	}
+    }
 
-	@RequestMapping("/listen/stop")
-	@ResponseBody
-	public String stopListen(HttpServletRequest request) {
-		jmsListener.stop();
+    @RequestMapping("/listen/stop")
+    @ResponseBody
+    public String stopListen(HttpServletRequest request) {
+        jmsListener.stop();
         return "关闭成功";
-	}
-
-	@RequestMapping("/listen/restart")
-	@ResponseBody
-	public String restartListen(Long interval, HttpServletRequest request) {
-		if (jmsListener.isRunning()) {
-			jmsListener.stop();
-		}
-		jmsListener.start(interval);
+    }
+
+    @RequestMapping("/listen/restart")
+    @ResponseBody
+    public String restartListen(Long interval, HttpServletRequest request) {
+        if (jmsListener.isRunning()) {
+            jmsListener.stop();
+        }
+        jmsListener.start(interval);
         return "重启成功";
-	}
+    }
 
-	@RequestMapping("/listen/details")
-	@ResponseBody
-	public SPage<LuceneMessage> listenDetails(Integer page, Integer size, HttpServletRequest request) {
-		return luceneMessageService.findAll(page, size);
-	}
+    @RequestMapping("/listen/details")
+    @ResponseBody
+    public SPage<LuceneMessage> listenDetails(Integer page, Integer size, HttpServletRequest request) {
+        return luceneMessageService.findAll(page, size);
+    }
 
-	@RequestMapping("/maintain")
-	@ResponseBody
-	public String maintainIndexes(@NotEmpty("tableName") String tableName, @NotEmpty("dataId") Long dataId, @NotEmpty("methodType") String methodType, String data,
+    @RequestMapping("/maintain")
+    @ResponseBody
+    public String maintainIndexes(@NotEmpty("tableName") String tableName, @NotEmpty("dataId") Long dataId, @NotEmpty("methodType") String methodType, String data,
                                   HttpServletRequest request) {
-		return "已维护:" + indexService.maintainIndexes(tableName.toLowerCase(), dataId, methodType, data);
-	}
-
-	@RequestMapping("/dequeue")
-	@ResponseBody
-	public boolean dequeueLuceneQueueMessage(Long id, HttpServletRequest request) {
-		luceneMessageDao.dequeueLuceneMessage(id);
-		return true;
-	}
+        return "已维护:" + indexService.maintainIndexes(tableName.toLowerCase(), dataId, methodType, data);
+    }
+
+    @RequestMapping("/dequeue")
+    @ResponseBody
+    public boolean dequeueLuceneQueueMessage(Long id, HttpServletRequest request) {
+        luceneMessageDao.dequeueLuceneMessage(id);
+        return true;
+    }
 }

+ 23 - 11
src/main/java/com/uas/search/dao/GoodsDao.java

@@ -1,16 +1,15 @@
 package com.uas.search.dao;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Repository;
-import org.springframework.util.CollectionUtils;
-
 import com.uas.search.model.Component;
 import com.uas.search.model.Goods;
 import com.uas.search.model.Store;
 import com.uas.search.model.TradeGoods;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Repository;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.List;
 
 /**
  * @author sunyj
@@ -30,7 +29,7 @@ public class GoodsDao {
 
 	/**
 	 * 根据批次id或者器件id获取批次
-	 * 
+	 *
 	 * @param goods
 	 * @return
 	 */
@@ -49,7 +48,7 @@ public class GoodsDao {
 
 	/**
 	 * 根据批次id获取批次信息
-	 * 
+	 *
 	 * @param goId
 	 * @return
 	 */
@@ -65,15 +64,28 @@ public class GoodsDao {
 
 	/**
 	 * 根据器件id获取批次信息
-	 * 
+	 *
 	 * @param cmpId
 	 * @return
 	 */
-	private List<Goods> findByCmpId(Long cmpId) {
+	public List<Goods> findByCmpId(Long cmpId) {
 		Component component = componentDao.findOne(cmpId);
 		if(component == null){
 			throw new IllegalStateException("器件不存在:" + cmpId);
 		}
+		return findByComponent(component);
+	}
+
+	/**
+	 * 根据器件获取批次信息
+	 *
+	 * @param component
+	 * @return
+	 */
+	public List<Goods> findByComponent(Component component) {
+		if(component == null){
+			return null;
+		}
 		List<TradeGoods> tradeGoodsesList = tradeGoodsDao.findByCmpUuid(component.getUuid());
 		List<Goods> goodsesList = new ArrayList<>();
 		if (!CollectionUtils.isEmpty(tradeGoodsesList)) {

+ 12 - 0
src/main/java/com/uas/search/service/IndexService.java

@@ -34,6 +34,18 @@ public interface IndexService {
 	 */
 	public Long downloadComponentDataFromDatabase(Integer startFileIndex);
 
+	/**
+	 * 下载批次的数据至本地文件中,以供建索引用
+	 *
+	 * @param startFileIndex
+	 *            可为空,从第startFileIndex个文件开始下载
+	 *
+	 * @return 花费总时间 ms
+	 */
+	public Long downloadGoods(Integer startFileIndex);
+
+	public void multiDownloadGoods(Integer number, Integer startFileIndex, Integer endFileIndex);
+
 	/**
 	 * 将新对象添加在lucene索引中
 	 * 

+ 157 - 0
src/main/java/com/uas/search/service/impl/IndexServiceImpl.java

@@ -22,6 +22,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.domain.Page;
+import org.springframework.data.domain.Sort;
 import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
 import org.springframework.util.StringUtils;
@@ -458,6 +459,162 @@ public class IndexServiceImpl implements IndexService {
 		return endTime - startTime;
 	}
 
+    @Override
+    public Long downloadGoods(Integer startFileIndex) {
+        int fileIndex = 1;
+        if (startFileIndex != null) {
+            if (startFileIndex < 1) {
+                throw new SearchException("startFileIndex不小于1");
+            } else {
+                fileIndex = startFileIndex;
+            }
+        }
+
+        Long startTime = new Date().getTime();
+        logger.info("下载批次... ");
+
+        Sort sort = new Sort(Sort.Direction.ASC, "id");
+        // 分页获取数据
+        PageParams pageParams = new PageParams();
+        pageParams.setPage((fileIndex - 1) * SINGLE_FILE_MAX_SIZE / PAGE_SIZE + 1);
+        pageParams.setSize(PAGE_SIZE);
+        PageInfo pageInfo = new PageInfo(pageParams, sort);
+        Page<Component> pageResult = componentDao.findAll(pageInfo);
+
+        // 数据库中数据的总数目
+        if (pageResult.getTotalElements() < (fileIndex - 1) * SINGLE_FILE_MAX_SIZE) {
+            throw new SearchException("startFileIndex不超过"
+                    + ((int) Math.ceil(pageResult.getTotalElements() / (1.0 * SINGLE_FILE_MAX_SIZE))));
+        }
+        long totalElements = pageResult.getTotalElements() - (fileIndex - 1) * SINGLE_FILE_MAX_SIZE;
+        logger.info("发现数据:" + totalElements + "条");
+        // 已翻页的数据数目
+        Long size = 0L;
+        PrintWriter printWriter = null;
+        int count = 0;
+        try {
+            String goodsDataPath = SearchUtils.getDataPath(SearchConstants.GOODS_TABLE_NAME);
+            File file = new File(goodsDataPath);
+            if (!file.exists()) {
+                file.mkdirs();
+            }
+            printWriter = new PrintWriter(goodsDataPath + "/" + fileIndex + ".txt");
+            while (totalElements > size) {
+                // 一个文件存放100000条数据,一旦超过,写入新的文件
+                if (count == SINGLE_FILE_MAX_SIZE) {
+                    count = 0;
+                    printWriter.flush();
+                    printWriter.close();
+                    fileIndex++;
+                    printWriter = new PrintWriter(goodsDataPath + "/" + fileIndex + ".txt");
+                }
+                List<Component> content = pageResult.getContent();
+                for (Component element : content) {
+                    List<Goods> goodses = goodsDao.findByComponent(element);
+                    for (Goods goods : goodses) {
+                        printWriter.println(JSONObject.toJSONString(goods));
+                    }
+                    count++;
+                }
+                size += content.size();
+                logger.info(String.format("Downloaded...................%.2f%%", size * 100.0 / totalElements));
+
+                pageParams.setPage(pageParams.getPage() + 1);
+                pageInfo = new PageInfo(pageParams, sort);
+                pageResult = componentDao.findAll(pageInfo);
+            }
+            printWriter.flush();
+            printWriter.close();
+        } catch (FileNotFoundException e) {
+            logger.error("", e);
+        }
+
+        Long endTime = new Date().getTime();
+        logger.info(String.format("下载数据%s条,耗时%.2fs\n ", totalElements, (endTime - startTime) / 1000.0));
+        return endTime - startTime;
+    }
+
+    @Override
+    public void multiDownloadGoods(Integer number, Integer startFileIndex, Integer endFileIndex) {
+        number = number == null || number < 1 ? 1 : number;
+        startFileIndex = startFileIndex == null || startFileIndex < 1 ? 1 : startFileIndex;
+        endFileIndex = endFileIndex == null || endFileIndex < 1 ? 1024 * 1024 * 1024 : endFileIndex;
+        for (int i = 1; i <= number; i++) {
+            new Thread(new DownloadTread("Thread-" + i, number, startFileIndex + i - 1, endFileIndex)).start();
+        }
+    }
+
+    class DownloadTread implements Runnable {
+        private String name;
+        private int step;
+        private int startFileIndex;
+        private int endFileIndex;
+
+        public DownloadTread(String name, int step, int startFileIndex, int endFileIndex) {
+            this.name = name;
+            this.step = step;
+            this.startFileIndex = startFileIndex;
+            this.endFileIndex = endFileIndex;
+        }
+
+        @Override
+        public void run() {
+            try {
+                if (endFileIndex < startFileIndex) {
+                    logger.error(name + " fileIndex 不可超过 : endFileIndex=" + endFileIndex);
+                    return;
+                }
+                Long startTime = new Date().getTime();
+                logger.info(name + " 下载批次... ");
+
+                Sort sort = new Sort(Sort.Direction.ASC, "id");
+                // 分页获取数据
+                PageParams pageParams = new PageParams();
+                pageParams.setPage(startFileIndex);
+                pageParams.setSize(PAGE_SIZE);
+                PageInfo pageInfo = new PageInfo(pageParams, sort);
+                Page<Component> pageResult = componentDao.findAll(pageInfo);
+
+                int totalPages = pageResult.getTotalPages();
+                if (totalPages < startFileIndex) {
+                    logger.error(name + " fileIndex 不可超过 : totalPages=" + totalPages);
+                    return;
+                }
+                // 已翻页的数据数目
+                Long size = 0L;
+                String goodsDataPath = SearchUtils.getDataPath(SearchConstants.GOODS_TABLE_NAME);
+                File file = new File(goodsDataPath);
+                if (!file.exists()) {
+                    file.mkdirs();
+                }
+                while (totalPages >= startFileIndex && endFileIndex >= startFileIndex) {
+                    PrintWriter printWriter = new PrintWriter(goodsDataPath + "/" + String.format("%05d", startFileIndex) + ".txt");
+                    List<Component> content = pageResult.getContent();
+                    for (Component element : content) {
+                        List<Goods> goodses = goodsDao.findByComponent(element);
+                        for (Goods goods : goodses) {
+                            printWriter.println(JSONObject.toJSONString(goods));
+                        }
+                    }
+                    size += content.size();
+                    logger.info(name + " " + startFileIndex + ".txt" + " - Downloaded..................." + size);
+
+                    printWriter.flush();
+                    printWriter.close();
+                    startFileIndex += step;
+                    pageParams.setPage(startFileIndex);
+                    pageInfo = new PageInfo(pageParams, sort);
+                    pageResult = componentDao.findAll(pageInfo);
+                }
+
+                Long endTime = new Date().getTime();
+                logger.info(String.format("%s 下载完成,耗时%.2fs\n ", name, (endTime - startTime) / 1000.0));
+            } catch (Throwable e) {
+                logger.error(name, e);
+            }
+        }
+    }
+
 	@Override
 	public Object save(Object obj) {
 		if (obj != null) {

+ 2 - 0
src/main/webapp/WEB-INF/views/console.html

@@ -87,6 +87,8 @@
 				<li><a target="_blank">index/create</a></li>
 				<li><a target="_blank">index/create?tableNames=product$brand,trade$order</a></li>
 				<li><a target="_blank">index/downloadComponentData?startFileIndex=20</a></li>
+				<li><a target="_blank">index/downloadGoods?startFileIndex=20</a></li>
+				<li><a target="_blank">index/multiDownloadGoods?number=2&startFileIndex=0&endFileIndex=10000000</a></li>
 				<li><a target="_blank">index/listen/start?interval=10</a></li>
 				<li><a target="_blank">index/listen/stop</a></li>
 				<li><a target="_blank">index/listen/restart</a></li>