IndexServiceImpl.java 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791
  1. package com.uas.search.service.impl;
  2. import com.alibaba.fastjson.JSONObject;
  3. import com.uas.search.annotation.NotEmpty;
  4. import com.uas.search.constant.SearchConstants;
  5. import com.uas.search.constant.model.PageInfo;
  6. import com.uas.search.constant.model.PageParams;
  7. import com.uas.search.dao.*;
  8. import com.uas.search.exception.SearchException;
  9. import com.uas.search.jms.JmsListener;
  10. import com.uas.search.jms.QueueMessageParser;
  11. import com.uas.search.model.*;
  12. import com.uas.search.service.IndexService;
  13. import com.uas.search.support.IndexSearcherManager;
  14. import com.uas.search.support.IndexWriterManager;
  15. import com.uas.search.util.ObjectToDocumentUtils;
  16. import com.uas.search.util.SearchUtils;
  17. import org.apache.lucene.document.Document;
  18. import org.apache.lucene.index.IndexWriter;
  19. import org.apache.lucene.index.Term;
  20. import org.slf4j.Logger;
  21. import org.slf4j.LoggerFactory;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.data.domain.Page;
  24. import org.springframework.data.domain.Sort;
  25. import org.springframework.stereotype.Service;
  26. import org.springframework.util.CollectionUtils;
  27. import org.springframework.util.StringUtils;
  28. import java.io.*;
  29. import java.util.ArrayList;
  30. import java.util.Date;
  31. import java.util.List;
  32. /**
  33. * 创建索引
  34. *
  35. * @author sunyj
  36. * @since 2016年8月5日 下午2:23:22
  37. */
  38. @Service
  39. public class IndexServiceImpl implements IndexService {
  40. @Autowired
  41. private QueueMessageParser queueMessageParser;
  42. @Autowired
  43. private KindDao kindDao;
  44. @Autowired
  45. private BrandDao brandDao;
  46. @Autowired
  47. private ComponentDao componentDao;
  48. @Autowired
  49. private GoodsDao goodsDao;
  50. @Autowired
  51. private OrderDao orderDao;
  52. @Autowired
  53. private OrderInvoiceDao orderInvoiceDao;
  54. @Autowired
  55. private PurchaseDao purchaseDao;
  56. @Autowired
  57. private PurchaseInvoiceDao purchaseInvoiceDao;
  58. private IndexWriter indexWriter;
  59. private static IndexWriterManager indexWriterManager = new IndexWriterManager();
  60. @Autowired
  61. private JmsListener jmsListener;
  62. private IndexSearcherManager indexSearcherManager = IndexSearcherManager.getInstance();
  63. /**
  64. * 是否正在创建索引
  65. */
  66. private boolean creatingIndex = false;
  67. /**
  68. * 从数据库获取数据时的分页大小
  69. */
  70. private static final int PAGE_SIZE = 1000;
  71. /**
  72. * 单个文件存储的最大数据数目,需是PAGE_SIZE的整数倍
  73. */
  74. public static final int SINGLE_FILE_MAX_SIZE = 100000;
  75. private static Logger logger = LoggerFactory.getLogger(IndexServiceImpl.class);
  76. @Override
  77. public Long createIndexes(List<String> tableNames, Boolean componentFromFiles) {
  78. if (creatingIndex) {
  79. throw new SearchException("已存在线程在创建索引,不可重复请求");
  80. }
  81. creatingIndex = true;
  82. // 如果索引实时更新处于开启状态,需要关闭(以免两者同时操作索引出现问题)
  83. if (jmsListener.isRunning()) {
  84. logger.info("索引实时更新服务正在运行,尝试关闭索引实时更新服务...");
  85. jmsListener.stop();
  86. }
  87. if (CollectionUtils.isEmpty(tableNames)) {
  88. tableNames = SearchUtils.getTableNames();
  89. }
  90. try {
  91. Long startTime = new Date().getTime();
  92. for (String tableName : tableNames) {
  93. Long start = new Date().getTime();
  94. Long size = 0L;
  95. deleteIndexes(tableName);
  96. if (tableName.equals(SearchConstants.KIND_TABLE_NAME)) {
  97. size = createKindIndexes();
  98. } else if (tableName.equals(SearchConstants.BRAND_TABLE_NAME)) {
  99. size = createBrandIndexes();
  100. } else if (tableName.equals(SearchConstants.COMPONENT_TABLE_NAME)) {
  101. // 只有明确指定不从本地文件读取数据时,才从数据库获取数据建立索引
  102. if (componentFromFiles != null && !componentFromFiles) {
  103. size = createComponentIndexes();
  104. } else {
  105. size = createComponentIndexesFromFiles();
  106. }
  107. } else if (tableName.equals(SearchConstants.GOODS_TABLE_NAME)) {
  108. size = createGoodsIndexesFromFiles();
  109. } else if (tableName.equals(SearchConstants.ORDER_TABLE_NAME)) {
  110. size = createOrderIndexes();
  111. } else if (tableName.equals(SearchConstants.ORDER_INVOICE_TABLE_NAME)) {
  112. size = createOrderInvoiceIndexes();
  113. } else if (tableName.equals(SearchConstants.PURCHASE_TABLE_NAME)) {
  114. size = createPurchaseIndexes();
  115. } else if (tableName.equals(SearchConstants.PURCHASE_INVOICE_TABLE_NAME)) {
  116. size = createPurchaseInvoiceIndexes();
  117. } else {
  118. logger.error("不支持创建该表索引:" + tableName);
  119. continue;
  120. }
  121. indexWriterManager.release(tableName);
  122. indexSearcherManager.flushCache(tableName, indexWriter, null);
  123. Long end = new Date().getTime();
  124. logger.info(String.format("创建%s索引: %s条,耗时%.2fs\n ", tableName, size, (end - start) / 1000.0));
  125. }
  126. Long endTime = new Date().getTime();
  127. logger.info(String.format("索引创建成功, 共用时间%.2fs\n", (endTime - startTime) / 1000.0));
  128. return endTime - startTime;
  129. } catch (Exception e) {
  130. // 防止SQLRecoverableException导致应用终止
  131. throw new IllegalStateException("索引创建失败", e);
  132. } finally {
  133. creatingIndex = false;
  134. }
  135. }
  136. /**
  137. * 删除指定表的索引
  138. *
  139. * @param tableName
  140. * 表名
  141. * @throws InterruptedException
  142. * @throws IOException
  143. */
  144. private void deleteIndexes(String tableName) throws InterruptedException, IOException {
  145. indexWriter = indexWriterManager.get(tableName);
  146. logger.info("正在清理旧索引..." + tableName);
  147. indexWriter.deleteAll();
  148. logger.info("旧索引清理完毕..." + tableName);
  149. }
  150. /**
  151. * 创建类目索引
  152. *
  153. * @return 写入的类目索引数
  154. * @throws IOException
  155. */
  156. private Long createKindIndexes() throws IOException {
  157. logger.info("正在创建类目索引...");
  158. List<Kind> kinds = kindDao.findAll();
  159. logger.info("发现数据:" + kinds.size() + "条");
  160. return createIndexesWithObjects(kinds.toArray());
  161. }
  162. /**
  163. * 创建品牌索引
  164. *
  165. * @return 写入的品牌索引数
  166. * @throws IOException
  167. */
  168. private Long createBrandIndexes() throws IOException {
  169. logger.info("正在创建品牌索引...");
  170. List<Brand> brands = brandDao.findAll();
  171. logger.info("发现数据:" + brands.size() + "条");
  172. return createIndexesWithObjects(brands.toArray());
  173. }
  174. private Long createComponentIndexesFromFiles() {
  175. logger.info("正在创建器件索引...");
  176. Long size = 0L;
  177. try {
  178. // 从本地路径读取器件数据
  179. String componentDataPath = SearchUtils.getDataPath(SearchConstants.COMPONENT_TABLE_NAME);
  180. File[] files = new File(componentDataPath).listFiles();
  181. if (files == null || files.length == 0) {
  182. logger.info("创建器件索引失败,原因:器件数据文件不存在!");
  183. return 0L;
  184. }
  185. // 将要创建的索引总数目约为:文件数目*单个文件的行数
  186. long totalSize = files.length * SINGLE_FILE_MAX_SIZE;
  187. for (File file : files) {
  188. logger.info("读取文件: " + file.getName());
  189. BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
  190. String line = null;
  191. while (!StringUtils.isEmpty(line = bufferedReader.readLine())) {
  192. Component component = JSONObject.parseObject(line, Component.class);
  193. Document document = ObjectToDocumentUtils.toDocument(component);
  194. if (document != null) {
  195. size++;
  196. // 每创建10000条,打印一次进度
  197. if (size % 10000 == 0) {
  198. logger.info(String.format("Component indexed...................%.2f%%",
  199. size * 100.0 / totalSize));
  200. }
  201. indexWriter.addDocument(document);
  202. }
  203. }
  204. indexWriter.commit();
  205. bufferedReader.close();
  206. }
  207. } catch (FileNotFoundException e) {
  208. logger.error("创建器件索引失败,原因:器件数据文件不存在!");
  209. return 0L;
  210. } catch (IOException e) {
  211. logger.error("", e);
  212. }
  213. return size;
  214. }
  215. /**
  216. * 创建器件索引,从数据库取数据
  217. *
  218. * @return 写入的器件索引数
  219. * @throws IOException
  220. */
  221. public Long createComponentIndexes() throws IOException {
  222. logger.info("正在创建器件索引...");
  223. Long size = 0L;
  224. PageParams params = new PageParams();
  225. int page = 1;
  226. params.setSize(PAGE_SIZE);
  227. params.setPage(page);
  228. PageInfo info = new PageInfo(params);
  229. Page<Component> pageResult = componentDao.findAll(info);
  230. long totalElements = pageResult.getTotalElements();
  231. logger.info("Number of components: " + totalElements);
  232. // 用于记录上次提交索引时的创建进度
  233. double recordProgress = 0;
  234. while (totalElements > size) {
  235. List<Component> components = pageResult.getContent();
  236. for (Component component : components) {
  237. Document document = ObjectToDocumentUtils.toDocument(component);
  238. if (document != null) {
  239. indexWriter.addDocument(document);
  240. }
  241. }
  242. size += components.size();
  243. // 器件索引的创建进度(百分比)
  244. double indexProgress = size * 100.0 / totalElements;
  245. logger.info(String.format("Component indexed...................%.2f%%", indexProgress));
  246. // 每创建5%,提交一次,避免内存耗尽,发生OutOfMemoryError
  247. if (indexProgress - recordProgress > 5) {
  248. indexWriter.commit();
  249. recordProgress = indexProgress;
  250. }
  251. page++;
  252. params.setPage(page);
  253. info = new PageInfo(params);
  254. pageResult = componentDao.findAll(info);
  255. }
  256. indexWriter.commit();
  257. return size;
  258. }
  259. private Long createGoodsIndexesFromFiles() {
  260. logger.info("正在创建批次索引...");
  261. Long size = 0L;
  262. try {
  263. // 从本地路径读取批次数据
  264. File[] files = new File(SearchUtils.getDataPath(SearchConstants.GOODS_TABLE_NAME)).listFiles();
  265. if (files == null || files.length == 0) {
  266. logger.info("创建批次索引失败,原因:批次数据文件不存在!");
  267. return 0L;
  268. }
  269. // 将要创建的索引总数目约为:文件数目*单个文件的行数
  270. long totalSize = files.length * SINGLE_FILE_MAX_SIZE;
  271. for (File file : files) {
  272. logger.info("读取文件: " + file.getName());
  273. BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
  274. String line = null;
  275. while (!StringUtils.isEmpty(line = bufferedReader.readLine())) {
  276. Goods goods = JSONObject.parseObject(line, Goods.class);
  277. Document document = ObjectToDocumentUtils.toDocument(goods);
  278. if (document != null) {
  279. size++;
  280. // 每创建10000条,打印一次进度
  281. if (size % 10000 == 0) {
  282. logger.info(
  283. String.format("Goods indexed...................%.2f%%", size * 100.0 / totalSize));
  284. }
  285. indexWriter.addDocument(document);
  286. }
  287. }
  288. indexWriter.commit();
  289. bufferedReader.close();
  290. }
  291. } catch (FileNotFoundException e) {
  292. logger.error("创建批次索引失败,原因:批次数据文件不存在!");
  293. return 0L;
  294. } catch (IOException e) {
  295. logger.error("", e);
  296. }
  297. return size;
  298. }
  299. private Long createOrderIndexes() {
  300. logger.info("正在创建销售单索引...");
  301. List<Order> orders = orderDao.findAll();
  302. logger.info("发现数据:" + orders.size() + "条");
  303. return createIndexesWithObjects(orders.toArray());
  304. }
  305. private Long createOrderInvoiceIndexes() {
  306. logger.info("正在创建销售发货单索引...");
  307. List<OrderInvoice> orderInvoices = orderInvoiceDao.findAll();
  308. logger.info("发现数据:" + orderInvoices.size() + "条");
  309. return createIndexesWithObjects(orderInvoices.toArray());
  310. }
  311. private Long createPurchaseIndexes() {
  312. logger.info("正在创建采购单索引...");
  313. List<Purchase> purchases = purchaseDao.findAll();
  314. logger.info("发现数据:" + purchases.size() + "条");
  315. return createIndexesWithObjects(purchases.toArray());
  316. }
  317. private Long createPurchaseInvoiceIndexes() {
  318. logger.info("正在创建采购发货单索引...");
  319. List<PurchaseInvoice> purchaseInvoices = purchaseInvoiceDao.findAll();
  320. logger.info("发现数据:" + purchaseInvoices.size() + "条");
  321. return createIndexesWithObjects(purchaseInvoices.toArray());
  322. }
  323. /**
  324. * 利用对象数组创建索引
  325. *
  326. * @param objects
  327. * 对象数组,可为Kind、Brand、Order、
  328. * OrderInvoice、Purchase、
  329. * PurchaseInvoice
  330. * @return 对象数组的数量
  331. */
  332. private Long createIndexesWithObjects(Object[] objects) {
  333. if (objects == null || objects.length < 1)
  334. return 0L;
  335. long count = 0;
  336. try {
  337. for (Object object : objects) {
  338. Document document = ObjectToDocumentUtils.toDocument(object);
  339. if (document != null) {
  340. indexWriter.addDocument(document);
  341. count++;
  342. }
  343. }
  344. indexWriter.commit();
  345. } catch (IOException e) {
  346. logger.error("", e);
  347. }
  348. return count;
  349. }
  350. @Override
  351. public Long downloadComponentDataFromDatabase(Integer startFileIndex) {
  352. int fileIndex = 1;
  353. if (startFileIndex != null) {
  354. if (startFileIndex < 1) {
  355. throw new SearchException("startFileIndex不小于1");
  356. } else {
  357. fileIndex = startFileIndex;
  358. }
  359. }
  360. Long startTime = new Date().getTime();
  361. logger.info("下载器件... ");
  362. // 分页获取数据
  363. PageParams pageParams = new PageParams();
  364. pageParams.setPage((fileIndex - 1) * SINGLE_FILE_MAX_SIZE / PAGE_SIZE + 1);
  365. pageParams.setSize(PAGE_SIZE);
  366. PageInfo pageInfo = new PageInfo(pageParams);
  367. Page<Component> pageResult = componentDao.findAll(pageInfo);
  368. // 数据库中数据的总数目
  369. if (pageResult.getTotalElements() < (fileIndex - 1) * SINGLE_FILE_MAX_SIZE) {
  370. throw new SearchException("startFileIndex不超过"
  371. + ((int) Math.ceil(pageResult.getTotalElements() / (1.0 * SINGLE_FILE_MAX_SIZE))));
  372. }
  373. long totalElements = pageResult.getTotalElements() - (fileIndex - 1) * SINGLE_FILE_MAX_SIZE;
  374. logger.info("发现数据:" + totalElements + "条");
  375. // 已翻页的数据数目
  376. Long size = 0L;
  377. PrintWriter printWriter = null;
  378. int count = 0;
  379. try {
  380. String componentDataPath = SearchUtils.getDataPath(SearchConstants.COMPONENT_TABLE_NAME);
  381. File file = new File(componentDataPath);
  382. if (!file.exists()) {
  383. file.mkdirs();
  384. }
  385. printWriter = new PrintWriter(componentDataPath + "/" + fileIndex + ".txt");
  386. while (totalElements > size) {
  387. // 一个文件存放100000条数据,一旦超过,写入新的文件
  388. if (count == SINGLE_FILE_MAX_SIZE) {
  389. count = 0;
  390. printWriter.flush();
  391. printWriter.close();
  392. fileIndex++;
  393. printWriter = new PrintWriter(componentDataPath + "/" + fileIndex + ".txt");
  394. }
  395. List<Component> content = pageResult.getContent();
  396. for (Component element : content) {
  397. printWriter.println(JSONObject.toJSONString(element));
  398. count++;
  399. }
  400. size += content.size();
  401. logger.info(String.format("Downloaded...................%.2f%%", size * 100.0 / totalElements));
  402. pageParams.setPage(pageParams.getPage() + 1);
  403. pageInfo = new PageInfo(pageParams);
  404. pageResult = componentDao.findAll(pageInfo);
  405. }
  406. printWriter.flush();
  407. printWriter.close();
  408. } catch (FileNotFoundException e) {
  409. logger.error("", e);
  410. }
  411. Long endTime = new Date().getTime();
  412. logger.info(String.format("下载数据%s条,耗时%.2fs\n ", totalElements, (endTime - startTime) / 1000.0));
  413. return endTime - startTime;
  414. }
  415. @Override
  416. public Long downloadGoods(Integer startFileIndex) {
  417. int fileIndex = 1;
  418. if (startFileIndex != null) {
  419. if (startFileIndex < 1) {
  420. throw new SearchException("startFileIndex不小于1");
  421. } else {
  422. fileIndex = startFileIndex;
  423. }
  424. }
  425. Long startTime = new Date().getTime();
  426. logger.info("下载批次... ");
  427. Sort sort = new Sort(Sort.Direction.ASC, "id");
  428. // 分页获取数据
  429. PageParams pageParams = new PageParams();
  430. pageParams.setPage((fileIndex - 1) * SINGLE_FILE_MAX_SIZE / PAGE_SIZE + 1);
  431. pageParams.setSize(PAGE_SIZE);
  432. PageInfo pageInfo = new PageInfo(pageParams, sort);
  433. Page<Component> pageResult = componentDao.findAll(pageInfo);
  434. // 数据库中数据的总数目
  435. if (pageResult.getTotalElements() < (fileIndex - 1) * SINGLE_FILE_MAX_SIZE) {
  436. throw new SearchException("startFileIndex不超过"
  437. + ((int) Math.ceil(pageResult.getTotalElements() / (1.0 * SINGLE_FILE_MAX_SIZE))));
  438. }
  439. long totalElements = pageResult.getTotalElements() - (fileIndex - 1) * SINGLE_FILE_MAX_SIZE;
  440. logger.info("发现数据:" + totalElements + "条");
  441. // 已翻页的数据数目
  442. Long size = 0L;
  443. PrintWriter printWriter = null;
  444. int count = 0;
  445. try {
  446. String goodsDataPath = SearchUtils.getDataPath(SearchConstants.GOODS_TABLE_NAME);
  447. File file = new File(goodsDataPath);
  448. if (!file.exists()) {
  449. file.mkdirs();
  450. }
  451. printWriter = new PrintWriter(goodsDataPath + "/" + fileIndex + ".txt");
  452. while (totalElements > size) {
  453. // 一个文件存放100000条数据,一旦超过,写入新的文件
  454. if (count == SINGLE_FILE_MAX_SIZE) {
  455. count = 0;
  456. printWriter.flush();
  457. printWriter.close();
  458. fileIndex++;
  459. printWriter = new PrintWriter(goodsDataPath + "/" + fileIndex + ".txt");
  460. }
  461. List<Component> content = pageResult.getContent();
  462. for (Component element : content) {
  463. List<Goods> goodses = goodsDao.findByComponent(element);
  464. for (Goods goods : goodses) {
  465. printWriter.println(JSONObject.toJSONString(goods));
  466. }
  467. count++;
  468. }
  469. size += content.size();
  470. logger.info(String.format("Downloaded...................%.2f%%", size * 100.0 / totalElements));
  471. pageParams.setPage(pageParams.getPage() + 1);
  472. pageInfo = new PageInfo(pageParams, sort);
  473. pageResult = componentDao.findAll(pageInfo);
  474. }
  475. printWriter.flush();
  476. printWriter.close();
  477. } catch (FileNotFoundException e) {
  478. logger.error("", e);
  479. }
  480. Long endTime = new Date().getTime();
  481. logger.info(String.format("下载数据%s条,耗时%.2fs\n ", totalElements, (endTime - startTime) / 1000.0));
  482. return endTime - startTime;
  483. }
  484. @Override
  485. public void multiDownloadGoods(Integer number, Integer startFileIndex, Integer endFileIndex) {
  486. number = number == null || number < 1 ? 1 : number;
  487. startFileIndex = startFileIndex == null || startFileIndex < 1 ? 1 : startFileIndex;
  488. endFileIndex = endFileIndex == null || endFileIndex < 1 ? 1024 * 1024 * 1024 : endFileIndex;
  489. for (int i = 1; i <= number; i++) {
  490. new Thread(new DownloadTread("Thread-" + i, number, startFileIndex + i - 1, endFileIndex)).start();
  491. }
  492. }
  493. class DownloadTread implements Runnable {
  494. private String name;
  495. private int step;
  496. private int startFileIndex;
  497. private int endFileIndex;
  498. public DownloadTread(String name, int step, int startFileIndex, int endFileIndex) {
  499. this.name = name;
  500. this.step = step;
  501. this.startFileIndex = startFileIndex;
  502. this.endFileIndex = endFileIndex;
  503. }
  504. @Override
  505. public void run() {
  506. try {
  507. if (endFileIndex < startFileIndex) {
  508. logger.error(name + " fileIndex 不可超过 : endFileIndex=" + endFileIndex);
  509. return;
  510. }
  511. Long startTime = new Date().getTime();
  512. logger.info(name + " 下载批次... ");
  513. Sort sort = new Sort(Sort.Direction.ASC, "id");
  514. // 分页获取数据
  515. PageParams pageParams = new PageParams();
  516. pageParams.setPage(startFileIndex);
  517. pageParams.setSize(PAGE_SIZE);
  518. PageInfo pageInfo = new PageInfo(pageParams, sort);
  519. Page<Component> pageResult = componentDao.findAll(pageInfo);
  520. int totalPages = pageResult.getTotalPages();
  521. if (totalPages < startFileIndex) {
  522. logger.error(name + " fileIndex 不可超过 : totalPages=" + totalPages);
  523. return;
  524. }
  525. // 已翻页的数据数目
  526. Long size = 0L;
  527. String goodsDataPath = SearchUtils.getDataPath(SearchConstants.GOODS_TABLE_NAME);
  528. File file = new File(goodsDataPath);
  529. if (!file.exists()) {
  530. file.mkdirs();
  531. }
  532. while (totalPages >= startFileIndex && endFileIndex >= startFileIndex) {
  533. PrintWriter printWriter = new PrintWriter(goodsDataPath + "/" + String.format("%05d", startFileIndex) + ".txt");
  534. List<Component> content = pageResult.getContent();
  535. for (Component element : content) {
  536. List<Goods> goodses = goodsDao.findByComponent(element);
  537. for (Goods goods : goodses) {
  538. printWriter.println(JSONObject.toJSONString(goods));
  539. }
  540. }
  541. size += content.size();
  542. logger.info(name + " " + startFileIndex + ".txt" + " - Downloaded..................." + size);
  543. printWriter.flush();
  544. printWriter.close();
  545. startFileIndex += step;
  546. pageParams.setPage(startFileIndex);
  547. pageInfo = new PageInfo(pageParams, sort);
  548. pageResult = componentDao.findAll(pageInfo);
  549. }
  550. Long endTime = new Date().getTime();
  551. logger.info(String.format("%s 下载完成,耗时%.2fs\n ", name, (endTime - startTime) / 1000.0));
  552. } catch (Throwable e) {
  553. logger.error(name, e);
  554. }
  555. }
  556. }
  557. @Override
  558. public Object save(Object obj) {
  559. if (obj != null) {
  560. String tableName = SearchUtils.getTableName(obj.getClass());
  561. Document document = ObjectToDocumentUtils.toDocument(obj);
  562. if (document != null) {
  563. try {
  564. indexWriter = indexWriterManager.get(tableName);
  565. indexWriter.addDocument(document);
  566. indexSearcherManager.flushCache(tableName, indexWriter, null);
  567. return obj;
  568. } catch (IOException | InterruptedException e) {
  569. logger.error("", e);
  570. } finally {
  571. indexWriterManager.release(tableName);
  572. }
  573. } else {
  574. logger.info("对象转为Document时为null:" + obj);
  575. }
  576. }
  577. return null;
  578. }
  579. @Override
  580. public Object update(Object obj) {
  581. if (obj != null) {
  582. String tableName = SearchUtils.getTableName(obj.getClass());
  583. Document document = ObjectToDocumentUtils.toDocument(obj);
  584. if (document != null) {
  585. try {
  586. indexWriter = indexWriterManager.get(tableName);
  587. if (obj instanceof Kind) {
  588. indexWriter.updateDocument(
  589. new Term(SearchConstants.KIND_ID_FIELD, String.valueOf(((Kind) obj).getId())),
  590. document);
  591. } else if (obj instanceof Brand) {
  592. indexWriter.updateDocument(new Term(SearchConstants.BRAND_ID_FIELD,
  593. String.valueOf(((Brand) obj).getId())), document);
  594. } else if (obj instanceof Component) {
  595. indexWriter.updateDocument(new Term(SearchConstants.COMPONENT_ID_FIELD,
  596. String.valueOf(((Component) obj).getId())), document);
  597. } else if (obj instanceof Order) {
  598. indexWriter.updateDocument(new Term(SearchConstants.ORDER_ID_FIELD,
  599. String.valueOf(((Order) obj).getId())), document);
  600. } else if (obj instanceof OrderInvoice) {
  601. indexWriter.updateDocument(new Term(SearchConstants.ORDER_INVOICE_ID_FIELD,
  602. String.valueOf(((OrderInvoice) obj).getId())), document);
  603. } else if (obj instanceof Purchase) {
  604. indexWriter.updateDocument(new Term(SearchConstants.PURCHASE_ID_FIELD,
  605. String.valueOf(((Purchase) obj).getId())), document);
  606. } else if (obj instanceof PurchaseInvoice) {
  607. indexWriter.updateDocument(new Term(SearchConstants.PURCHASE_INVOICE_ID_FIELD,
  608. String.valueOf(((PurchaseInvoice) obj).getId())), document);
  609. } else {
  610. throw new SearchException("Message parsing failed!");
  611. }
  612. indexSearcherManager.flushCache(tableName, indexWriter, null);
  613. return obj;
  614. } catch (IOException | InterruptedException e) {
  615. logger.error("", e);
  616. } finally {
  617. indexWriterManager.release(tableName);
  618. }
  619. } else {
  620. logger.info("对象转为Document时为null:" + obj);
  621. }
  622. }
  623. return null;
  624. }
  625. @Override
  626. public Object delete(Object obj) {
  627. if (obj != null) {
  628. String tableName = SearchUtils.getTableName(obj.getClass());
  629. try {
  630. indexWriter = indexWriterManager.get(tableName);
  631. if (obj instanceof Kind) {
  632. indexWriter.deleteDocuments(
  633. new Term(SearchConstants.KIND_ID_FIELD, String.valueOf(((Kind) obj).getId())));
  634. } else if (obj instanceof Brand) {
  635. indexWriter.deleteDocuments(
  636. new Term(SearchConstants.BRAND_ID_FIELD, String.valueOf(((Brand) obj).getId())));
  637. } else if (obj instanceof Component) {
  638. indexWriter.deleteDocuments(new Term(SearchConstants.COMPONENT_ID_FIELD,
  639. String.valueOf(((Component) obj).getId())));
  640. } else if (obj instanceof Goods) {
  641. indexWriter.deleteDocuments(toTerm((Goods) obj));
  642. } else if (obj instanceof Order) {
  643. indexWriter.deleteDocuments(
  644. new Term(SearchConstants.ORDER_ID_FIELD, String.valueOf(((Order) obj).getId())));
  645. } else if (obj instanceof OrderInvoice) {
  646. indexWriter.deleteDocuments(new Term(SearchConstants.ORDER_INVOICE_ID_FIELD,
  647. String.valueOf(((OrderInvoice) obj).getId())));
  648. } else if (obj instanceof Purchase) {
  649. indexWriter.deleteDocuments(new Term(SearchConstants.PURCHASE_ID_FIELD,
  650. String.valueOf(((Purchase) obj).getId())));
  651. } else if (obj instanceof PurchaseInvoice) {
  652. indexWriter.deleteDocuments(new Term(SearchConstants.PURCHASE_INVOICE_ID_FIELD,
  653. String.valueOf(((PurchaseInvoice) obj).getId())));
  654. } else {
  655. throw new SearchException("Message parsing failed!");
  656. }
  657. indexSearcherManager.flushCache(tableName, indexWriter, null);
  658. return obj;
  659. } catch (IOException e) {
  660. logger.error("", e);
  661. } catch (InterruptedException e) {
  662. logger.error("", e);
  663. } finally {
  664. indexWriterManager.release(tableName);
  665. }
  666. }
  667. return null;
  668. }
  669. /**
  670. * 根据goods构造Term
  671. *
  672. * @param goods
  673. * @return
  674. */
  675. private Term toTerm(Goods goods) {
  676. if (goods.getGoId() != null) {
  677. return new Term(SearchConstants.GOODS_GO_ID_FIELD, String.valueOf(goods.getGoId()));
  678. } else if (goods.getCmpId() != null) {
  679. return new Term(SearchConstants.GOODS_CMP_ID_FIELD, String.valueOf(goods.getCmpId()));
  680. }
  681. return null;
  682. }
  683. @Override
  684. public List<Object> maintainIndexes(@NotEmpty("parsedQueueMessage") ParsedQueueMessage parsedQueueMessage) {
  685. Object object = parsedQueueMessage.getObject();
  686. if (object == null) {
  687. return null;
  688. }
  689. List<Object> maintainedObjects = new ArrayList<>();
  690. // 新增、更新索引
  691. if (parsedQueueMessage.isInsert() || parsedQueueMessage.isUpdate()) {
  692. if (object instanceof Goods) {
  693. List<Goods> goodsesList = goodsDao.find((Goods) object);
  694. delete(object);
  695. for (Goods goods : goodsesList) {
  696. Object maintainedObject = save(goods);
  697. if (maintainedObject != null) {
  698. maintainedObjects.add(maintainedObject);
  699. }
  700. }
  701. } else {
  702. Object maintainedObject = update(object);
  703. if (maintainedObject != null) {
  704. maintainedObjects.add(maintainedObject);
  705. }
  706. }
  707. }
  708. // 删除索引
  709. else if (parsedQueueMessage.isDelete()) {
  710. Object maintainedObject = delete(object);
  711. if (maintainedObject != null) {
  712. maintainedObjects.add(maintainedObject);
  713. }
  714. } else {
  715. throw new IllegalStateException("message parsing failed!");
  716. }
  717. return maintainedObjects;
  718. }
  719. @Override
  720. public List<Object> maintainIndexes(@NotEmpty("tableName") String tableName, @NotEmpty("dataId") Long dataId, @NotEmpty("methodType") String methodType, String data) {
  721. ParsedQueueMessage parsedQueueMessage = queueMessageParser.parse(tableName, dataId, methodType, data);
  722. return maintainIndexes(parsedQueueMessage);
  723. }
  724. }