Browse Source

create goods' index from component's files

sunyj 8 years ago
parent
commit
8ed60c9bad

+ 9 - 3
mall-search/src/main/java/com/uas/search/model/ParsedQueueMessage.java

@@ -1,7 +1,5 @@
 package com.uas.search.model;
 
-import java.util.Arrays;
-
 /**
  * 对数据库队列里的消息进行解析后所得到的数据
  * 
@@ -35,7 +33,15 @@ public class ParsedQueueMessage {
 	 */
 	private Object object;
 
-	/**
+    public ParsedQueueMessage() {
+    }
+
+    public ParsedQueueMessage(int methodType, Object object) {
+        this.methodType = methodType;
+        this.object = object;
+    }
+
+    /**
 	 * 是否为insert类型
 	 * 
 	 * @return

+ 135 - 41
mall-search/src/main/java/com/uas/search/service/impl/IndexServiceImpl.java

@@ -18,8 +18,7 @@ import com.uas.search.support.DownloadHelper;
 import com.uas.search.support.DownloadService;
 import com.uas.search.support.IndexSearcherManager;
 import com.uas.search.support.IndexWriterManager;
-import com.uas.search.util.ObjectToDocumentUtils;
-import com.uas.search.util.SearchUtils;
+import com.uas.search.util.*;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
@@ -30,8 +29,6 @@ import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.domain.Page;
 import org.springframework.stereotype.Service;
-import com.uas.search.util.CollectionUtils;
-import com.uas.search.util.StringUtils;
 
 import java.io.*;
 import java.util.*;
@@ -311,51 +308,150 @@ public class IndexServiceImpl implements IndexService {
 	}
 
 	private Long createGoodsIndexesFromFiles() throws IOException {
-		logger.info("正在创建批次索引...");
-		Long size = 0L;
+		return createGoodsIndexesFromFiles(true);
+	}
+
+    /**
+     * 创建批次索引
+     *
+     * @param updateFromDbAfterCreating 创建索引后是否从批次表获取数据,更新本地索引(此种情况下,用于创建索引的数据来自器件,并不包括批次,因此需要单独进行更新)
+     * @return 写入的索引总数(不包括创建之后,所更新的数目)
+     * @throws IOException
+     */
+    private Long createGoodsIndexesFromFiles(boolean updateFromDbAfterCreating) throws IOException {
+        if(updateFromDbAfterCreating){
+            logger.info("正在创建批次索引...");
+        }else{
+            logger.info("正在更新批次索引...");
+        }
+        Long size = 0L;
         BufferedReader bufferedReader=null;
-		try {
-			// 从本地路径读取批次数据
-			File[] files = new File(SearchUtils.getDataPath(SearchConstants.GOODS_TABLE_NAME)).listFiles();
-			if (files == null || files.length == 0) {
-				logger.info("创建批次索引失败,原因:批次数据文件不存在!");
-				return 0L;
-			}
-			for (File file : files) {
-				logger.info("读取文件: " + file.getName());
-				bufferedReader = new BufferedReader(new FileReader(file));
-				String line;
-				while (!StringUtils.isEmpty(line = bufferedReader.readLine())) {
+        try {
+            // 如果创建后需要更新本地索引,说明还未创建索引,本次便是在进行创建索引,需要转换器件数据,用来建索引
+            if(updateFromDbAfterCreating){
+                convertGoodsFromComponent();
+            }
+            // 从本地路径读取批次数据
+            File[] files = new File(SearchUtils.getDataPath(SearchConstants.GOODS_TABLE_NAME)).listFiles();
+            if (files == null || files.length == 0) {
+                logger.info("创建批次索引失败,原因:批次数据文件不存在!");
+                return 0L;
+            }
+            for (File file : files) {
+                logger.info("读取文件: " + file.getName());
+                bufferedReader = new BufferedReader(new FileReader(file));
+                String line;
+                while (!StringUtils.isEmpty(line = bufferedReader.readLine())) {
                     Goods goods;
                     try {
                         goods = JSONObject.parseObject(line, Goods.class);
                     } catch (JSONException e) {
                         throw new IllegalArgumentException(line, e);
                     }
-                    Document document = ObjectToDocumentUtils.toDocument(goods);
-					if (document != null) {
-						size++;
-						// 每创建10000条,打印一次进度
-						if (size % 10000 == 0) {
-                            logger.info("Goods indexed..................." + size);
-						}
-						indexWriter.addDocument(document);
-					}
-				}
-				indexWriter.commit();
+                    // 如果创建后不需要更新本地索引,说明已经创建了索引,本次便是在进行更新索引
+                    if (!updateFromDbAfterCreating) {
+                        indexWriterManager.release(SearchConstants.GOODS_TABLE_NAME);
+                        maintainIndexes(new ParsedQueueMessage(ParsedQueueMessage.UPDATE, goods));
+                        try {
+                            indexWriter = indexWriterManager.get(SearchConstants.GOODS_TABLE_NAME);
+                        } catch (InterruptedException e) {
+                            logger.error("", e);
+                        }
+                    } else {
+                        Document document = ObjectToDocumentUtils.toDocument(goods);
+                        if (document != null) {
+                            size++;
+                            // 每创建10000条,打印一次进度
+                            if (size % 10000 == 0) {
+                                logger.info("Goods indexed..................." + size);
+                            }
+                            indexWriter.addDocument(document);
+                        }
+                    }
+                }
+                indexWriter.commit();
                 bufferedReader.close();
-			}
-		} catch (FileNotFoundException e) {
-			logger.error("创建批次索引失败,原因:批次数据文件不存在!");
-			return 0L;
-		} finally {
+            }
+        } catch (FileNotFoundException e) {
+            logger.error("创建批次索引失败,原因:批次数据文件不存在!");
+            return 0L;
+        } finally {
             indexWriter.commit();
             if (bufferedReader != null) {
                 bufferedReader.close();
             }
         }
-		return size;
-	}
+        // 如果需要从批次表获取数据,更新本地索引
+        if(updateFromDbAfterCreating){
+            try {
+                multiDownloadGoods(null, null, null, null);
+                createGoodsIndexesFromFiles(false);
+            } catch (Throwable e) {
+                throw new IllegalStateException("批次索引建立完成后,获取批次表数据用来更新索引时出错", e);
+            }
+        }
+        return size;
+    }
+
+    /**
+     * 根据本地器件的数据文件,转为批次数据
+     *
+     * @return 花费总时间 ms
+     */
+    private Long convertGoodsFromComponent() {
+        Long startTime = new Date().getTime();
+        Long size = 0L;
+        logger.info("转换批次... ");
+        BufferedReader bufferedReader = null;
+        try {
+            // 从本地路径读取器件数据
+            String componentDataPath = SearchUtils.getDataPath(COMPONENT_TABLE_NAME);
+            String goodsDataPath = SearchUtils.getDataPath(SearchConstants.GOODS_TABLE_NAME);
+            File[] files = new File(componentDataPath).listFiles();
+            if (files == null || files.length == 0) {
+                logger.info("转换批次失败,原因:器件数据文件不存在!");
+                return 0L;
+            }
+            FileUtils.deleteSubFiles(new File(goodsDataPath));
+            int fileIndex = 1;
+            for (File file : files) {
+                logger.info("读取器件文件: " + file.getName());
+                bufferedReader = new BufferedReader(new FileReader(file));
+                String goodsFileName = String.format("%010d", fileIndex) + ".txt";
+                PrintWriter printWriter = new PrintWriter(goodsDataPath + "/" + goodsFileName);
+                String line;
+                while (!StringUtils.isEmpty(line = bufferedReader.readLine())) {
+                    Component component;
+                    try {
+                        component = JSONObject.parseObject(line, Component.class);
+                    } catch (JSONException e) {
+                        throw new IllegalArgumentException(line, e);
+                    }
+                    // 器件作为主体,得到批次
+                    printWriter.println(JSONObject.toJSONString(new Goods(null, null, component, null)));
+                    size++;
+                }
+                logger.info(goodsFileName + " - Converted..................." + size);
+                printWriter.flush();
+                printWriter.close();
+                fileIndex++;
+                bufferedReader.close();
+            }
+            long endStartTime = new Date().getTime();
+            logger.info(String.format("转换完成,耗时%.2fs\n ", (endStartTime - startTime) / 1000.0));
+            return endStartTime - startTime;
+        } catch (Throwable e) {
+            throw new IllegalStateException("批次转换失败", e);
+        }finally{
+            if(bufferedReader!=null){
+                try {
+                    bufferedReader.close();
+                } catch (IOException e) {
+                    throw new IllegalStateException("批次转换失败", e);
+                }
+            }
+        }
+    }
 
 	private Long createOrderIndexes() {
 		logger.info("正在创建销售单索引...");
@@ -458,7 +554,6 @@ public class IndexServiceImpl implements IndexService {
         }
     }
 
-
 	@Override
 	public Object save(Object obj) {
 		if (obj != null) {
@@ -563,9 +658,7 @@ public class IndexServiceImpl implements IndexService {
 				}
 				indexSearcherManager.flushCache(tableName, indexWriter, null);
 				return obj;
-			} catch (IOException e) {
-				logger.error("", e);
-			} catch (InterruptedException e) {
+			} catch (IOException | InterruptedException e) {
 				logger.error("", e);
 			} finally {
 				indexWriterManager.release(tableName);
@@ -599,8 +692,9 @@ public class IndexServiceImpl implements IndexService {
         // 新增、更新索引
         if (parsedQueueMessage.isInsert() || parsedQueueMessage.isUpdate()) {
             if (object instanceof Goods) {
-                List<Goods> goodsesList = goodsDao.find((Goods) object);
+                // 先删除相关批次和器件,再重新写入
                 delete(object);
+                List<Goods> goodsesList = goodsDao.find((Goods) object);
                 for (Goods goods : goodsesList) {
                     Object maintainedObject = save(goods);
                     if (maintainedObject != null) {