package com.uas.search.service.impl; import com.alibaba.fastjson.JSONObject; import com.uas.search.annotation.NotEmpty; import com.uas.search.constant.SearchConstants; import com.uas.search.constant.model.PageInfo; import com.uas.search.constant.model.PageParams; import com.uas.search.dao.*; import com.uas.search.exception.SearchException; import com.uas.search.jms.JmsListener; import com.uas.search.jms.QueueMessageParser; import com.uas.search.model.*; import com.uas.search.service.IndexService; import com.uas.search.support.IndexSearcherManager; import com.uas.search.support.IndexWriterManager; import com.uas.search.util.ObjectToDocumentUtils; import com.uas.search.util.SearchUtils; import org.apache.lucene.document.Document; import org.apache.lucene.index.IndexWriter; import org.apache.lucene.index.Term; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; import org.springframework.data.domain.Sort; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.io.*; import java.util.ArrayList; import java.util.Date; import java.util.List; /** * 创建索引 * * @author sunyj * @since 2016年8月5日 下午2:23:22 */ @Service public class IndexServiceImpl implements IndexService { @Autowired private QueueMessageParser queueMessageParser; @Autowired private KindDao kindDao; @Autowired private BrandDao brandDao; @Autowired private ComponentDao componentDao; @Autowired private GoodsDao goodsDao; @Autowired private OrderDao orderDao; @Autowired private OrderInvoiceDao orderInvoiceDao; @Autowired private PurchaseDao purchaseDao; @Autowired private PurchaseInvoiceDao purchaseInvoiceDao; private IndexWriter indexWriter; private static IndexWriterManager indexWriterManager = new IndexWriterManager(); @Autowired private JmsListener jmsListener; private IndexSearcherManager indexSearcherManager = IndexSearcherManager.getInstance(); /** * 是否正在创建索引 */ private boolean creatingIndex = false; /** * 从数据库获取数据时的分页大小 */ private static final int PAGE_SIZE = 1000; /** * 单个文件存储的最大数据数目,需是PAGE_SIZE的整数倍 */ public static final int SINGLE_FILE_MAX_SIZE = 100000; private static Logger logger = LoggerFactory.getLogger(IndexServiceImpl.class); @Override public Long createIndexes(List tableNames, Boolean componentFromFiles) { if (creatingIndex) { throw new SearchException("已存在线程在创建索引,不可重复请求"); } creatingIndex = true; // 如果索引实时更新处于开启状态,需要关闭(以免两者同时操作索引出现问题) if (jmsListener.isRunning()) { logger.info("索引实时更新服务正在运行,尝试关闭索引实时更新服务..."); jmsListener.stop(); } if (CollectionUtils.isEmpty(tableNames)) { tableNames = SearchUtils.getTableNames(); } try { Long startTime = new Date().getTime(); for (String tableName : tableNames) { Long start = new Date().getTime(); Long size = 0L; deleteIndexes(tableName); if (tableName.equals(SearchConstants.KIND_TABLE_NAME)) { size = createKindIndexes(); } else if (tableName.equals(SearchConstants.BRAND_TABLE_NAME)) { size = createBrandIndexes(); } else if (tableName.equals(SearchConstants.COMPONENT_TABLE_NAME)) { // 只有明确指定不从本地文件读取数据时,才从数据库获取数据建立索引 if (componentFromFiles != null && !componentFromFiles) { size = createComponentIndexes(); } else { size = createComponentIndexesFromFiles(); } } else if (tableName.equals(SearchConstants.GOODS_TABLE_NAME)) { size = createGoodsIndexesFromFiles(); } else if (tableName.equals(SearchConstants.ORDER_TABLE_NAME)) { size = createOrderIndexes(); } else if (tableName.equals(SearchConstants.ORDER_INVOICE_TABLE_NAME)) { size = createOrderInvoiceIndexes(); } else if (tableName.equals(SearchConstants.PURCHASE_TABLE_NAME)) { size = createPurchaseIndexes(); } else if (tableName.equals(SearchConstants.PURCHASE_INVOICE_TABLE_NAME)) { size = createPurchaseInvoiceIndexes(); } else { logger.error("不支持创建该表索引:" + tableName); continue; } indexWriterManager.release(tableName); indexSearcherManager.flushCache(tableName, indexWriter, null); Long end = new Date().getTime(); logger.info(String.format("创建%s索引: %s条,耗时%.2fs\n ", tableName, size, (end - start) / 1000.0)); } Long endTime = new Date().getTime(); logger.info(String.format("索引创建成功, 共用时间%.2fs\n", (endTime - startTime) / 1000.0)); return endTime - startTime; } catch (Exception e) { // 防止SQLRecoverableException导致应用终止 throw new IllegalStateException("索引创建失败", e); } finally { creatingIndex = false; } } /** * 删除指定表的索引 * * @param tableName * 表名 * @throws InterruptedException * @throws IOException */ private void deleteIndexes(String tableName) throws InterruptedException, IOException { indexWriter = indexWriterManager.get(tableName); logger.info("正在清理旧索引..." + tableName); indexWriter.deleteAll(); logger.info("旧索引清理完毕..." + tableName); } /** * 创建类目索引 * * @return 写入的类目索引数 * @throws IOException */ private Long createKindIndexes() throws IOException { logger.info("正在创建类目索引..."); List kinds = kindDao.findAll(); logger.info("发现数据:" + kinds.size() + "条"); return createIndexesWithObjects(kinds.toArray()); } /** * 创建品牌索引 * * @return 写入的品牌索引数 * @throws IOException */ private Long createBrandIndexes() throws IOException { logger.info("正在创建品牌索引..."); List brands = brandDao.findAll(); logger.info("发现数据:" + brands.size() + "条"); return createIndexesWithObjects(brands.toArray()); } private Long createComponentIndexesFromFiles() { logger.info("正在创建器件索引..."); Long size = 0L; try { // 从本地路径读取器件数据 String componentDataPath = SearchUtils.getDataPath(SearchConstants.COMPONENT_TABLE_NAME); File[] files = new File(componentDataPath).listFiles(); if (files == null || files.length == 0) { logger.info("创建器件索引失败,原因:器件数据文件不存在!"); return 0L; } // 将要创建的索引总数目约为:文件数目*单个文件的行数 long totalSize = files.length * SINGLE_FILE_MAX_SIZE; for (File file : files) { logger.info("读取文件: " + file.getName()); BufferedReader bufferedReader = new BufferedReader(new FileReader(file)); String line = null; while (!StringUtils.isEmpty(line = bufferedReader.readLine())) { Component component = JSONObject.parseObject(line, Component.class); Document document = ObjectToDocumentUtils.toDocument(component); if (document != null) { size++; // 每创建10000条,打印一次进度 if (size % 10000 == 0) { logger.info(String.format("Component indexed...................%.2f%%", size * 100.0 / totalSize)); } indexWriter.addDocument(document); } } indexWriter.commit(); bufferedReader.close(); } } catch (FileNotFoundException e) { logger.error("创建器件索引失败,原因:器件数据文件不存在!"); return 0L; } catch (IOException e) { logger.error("", e); } return size; } /** * 创建器件索引,从数据库取数据 * * @return 写入的器件索引数 * @throws IOException */ public Long createComponentIndexes() throws IOException { logger.info("正在创建器件索引..."); Long size = 0L; PageParams params = new PageParams(); int page = 1; params.setSize(PAGE_SIZE); params.setPage(page); PageInfo info = new PageInfo(params); Page pageResult = componentDao.findAll(info); long totalElements = pageResult.getTotalElements(); logger.info("Number of components: " + totalElements); // 用于记录上次提交索引时的创建进度 double recordProgress = 0; while (totalElements > size) { List components = pageResult.getContent(); for (Component component : components) { Document document = ObjectToDocumentUtils.toDocument(component); if (document != null) { indexWriter.addDocument(document); } } size += components.size(); // 器件索引的创建进度(百分比) double indexProgress = size * 100.0 / totalElements; logger.info(String.format("Component indexed...................%.2f%%", indexProgress)); // 每创建5%,提交一次,避免内存耗尽,发生OutOfMemoryError if (indexProgress - recordProgress > 5) { indexWriter.commit(); recordProgress = indexProgress; } page++; params.setPage(page); info = new PageInfo(params); pageResult = componentDao.findAll(info); } indexWriter.commit(); return size; } private Long createGoodsIndexesFromFiles() { logger.info("正在创建批次索引..."); Long size = 0L; try { // 从本地路径读取批次数据 File[] files = new File(SearchUtils.getDataPath(SearchConstants.GOODS_TABLE_NAME)).listFiles(); if (files == null || files.length == 0) { logger.info("创建批次索引失败,原因:批次数据文件不存在!"); return 0L; } // 将要创建的索引总数目约为:文件数目*单个文件的行数 long totalSize = files.length * SINGLE_FILE_MAX_SIZE; for (File file : files) { logger.info("读取文件: " + file.getName()); BufferedReader bufferedReader = new BufferedReader(new FileReader(file)); String line = null; while (!StringUtils.isEmpty(line = bufferedReader.readLine())) { Goods goods = JSONObject.parseObject(line, Goods.class); Document document = ObjectToDocumentUtils.toDocument(goods); if (document != null) { size++; // 每创建10000条,打印一次进度 if (size % 10000 == 0) { logger.info( String.format("Goods indexed...................%.2f%%", size * 100.0 / totalSize)); } indexWriter.addDocument(document); } } indexWriter.commit(); bufferedReader.close(); } } catch (FileNotFoundException e) { logger.error("创建批次索引失败,原因:批次数据文件不存在!"); return 0L; } catch (IOException e) { logger.error("", e); } return size; } private Long createOrderIndexes() { logger.info("正在创建销售单索引..."); List orders = orderDao.findAll(); logger.info("发现数据:" + orders.size() + "条"); return createIndexesWithObjects(orders.toArray()); } private Long createOrderInvoiceIndexes() { logger.info("正在创建销售发货单索引..."); List orderInvoices = orderInvoiceDao.findAll(); logger.info("发现数据:" + orderInvoices.size() + "条"); return createIndexesWithObjects(orderInvoices.toArray()); } private Long createPurchaseIndexes() { logger.info("正在创建采购单索引..."); List purchases = purchaseDao.findAll(); logger.info("发现数据:" + purchases.size() + "条"); return createIndexesWithObjects(purchases.toArray()); } private Long createPurchaseInvoiceIndexes() { logger.info("正在创建采购发货单索引..."); List purchaseInvoices = purchaseInvoiceDao.findAll(); logger.info("发现数据:" + purchaseInvoices.size() + "条"); return createIndexesWithObjects(purchaseInvoices.toArray()); } /** * 利用对象数组创建索引 * * @param objects * 对象数组,可为Kind、Brand、Order、 * OrderInvoice、Purchase、 * PurchaseInvoice * @return 对象数组的数量 */ private Long createIndexesWithObjects(Object[] objects) { if (objects == null || objects.length < 1) return 0L; long count = 0; try { for (Object object : objects) { Document document = ObjectToDocumentUtils.toDocument(object); if (document != null) { indexWriter.addDocument(document); count++; } } indexWriter.commit(); } catch (IOException e) { logger.error("", e); } return count; } @Override public Long downloadComponentDataFromDatabase(Integer startFileIndex) { int fileIndex = 1; if (startFileIndex != null) { if (startFileIndex < 1) { throw new SearchException("startFileIndex不小于1"); } else { fileIndex = startFileIndex; } } Long startTime = new Date().getTime(); logger.info("下载器件... "); // 分页获取数据 PageParams pageParams = new PageParams(); pageParams.setPage((fileIndex - 1) * SINGLE_FILE_MAX_SIZE / PAGE_SIZE + 1); pageParams.setSize(PAGE_SIZE); PageInfo pageInfo = new PageInfo(pageParams); Page pageResult = componentDao.findAll(pageInfo); // 数据库中数据的总数目 if (pageResult.getTotalElements() < (fileIndex - 1) * SINGLE_FILE_MAX_SIZE) { throw new SearchException("startFileIndex不超过" + ((int) Math.ceil(pageResult.getTotalElements() / (1.0 * SINGLE_FILE_MAX_SIZE)))); } long totalElements = pageResult.getTotalElements() - (fileIndex - 1) * SINGLE_FILE_MAX_SIZE; logger.info("发现数据:" + totalElements + "条"); // 已翻页的数据数目 Long size = 0L; PrintWriter printWriter = null; int count = 0; try { String componentDataPath = SearchUtils.getDataPath(SearchConstants.COMPONENT_TABLE_NAME); File file = new File(componentDataPath); if (!file.exists()) { file.mkdirs(); } printWriter = new PrintWriter(componentDataPath + "/" + fileIndex + ".txt"); while (totalElements > size) { // 一个文件存放100000条数据,一旦超过,写入新的文件 if (count == SINGLE_FILE_MAX_SIZE) { count = 0; printWriter.flush(); printWriter.close(); fileIndex++; printWriter = new PrintWriter(componentDataPath + "/" + fileIndex + ".txt"); } List content = pageResult.getContent(); for (Component element : content) { printWriter.println(JSONObject.toJSONString(element)); count++; } size += content.size(); logger.info(String.format("Downloaded...................%.2f%%", size * 100.0 / totalElements)); pageParams.setPage(pageParams.getPage() + 1); pageInfo = new PageInfo(pageParams); pageResult = componentDao.findAll(pageInfo); } printWriter.flush(); printWriter.close(); } catch (FileNotFoundException e) { logger.error("", e); } Long endTime = new Date().getTime(); logger.info(String.format("下载数据%s条,耗时%.2fs\n ", totalElements, (endTime - startTime) / 1000.0)); return endTime - startTime; } @Override public Long downloadGoods(Integer startFileIndex) { int fileIndex = 1; if (startFileIndex != null) { if (startFileIndex < 1) { throw new SearchException("startFileIndex不小于1"); } else { fileIndex = startFileIndex; } } Long startTime = new Date().getTime(); logger.info("下载批次... "); Sort sort = new Sort(Sort.Direction.ASC, "id"); // 分页获取数据 PageParams pageParams = new PageParams(); pageParams.setPage((fileIndex - 1) * SINGLE_FILE_MAX_SIZE / PAGE_SIZE + 1); pageParams.setSize(PAGE_SIZE); PageInfo pageInfo = new PageInfo(pageParams, sort); Page pageResult = componentDao.findAll(pageInfo); // 数据库中数据的总数目 if (pageResult.getTotalElements() < (fileIndex - 1) * SINGLE_FILE_MAX_SIZE) { throw new SearchException("startFileIndex不超过" + ((int) Math.ceil(pageResult.getTotalElements() / (1.0 * SINGLE_FILE_MAX_SIZE)))); } long totalElements = pageResult.getTotalElements() - (fileIndex - 1) * SINGLE_FILE_MAX_SIZE; logger.info("发现数据:" + totalElements + "条"); // 已翻页的数据数目 Long size = 0L; PrintWriter printWriter = null; int count = 0; try { String goodsDataPath = SearchUtils.getDataPath(SearchConstants.GOODS_TABLE_NAME); File file = new File(goodsDataPath); if (!file.exists()) { file.mkdirs(); } printWriter = new PrintWriter(goodsDataPath + "/" + fileIndex + ".txt"); while (totalElements > size) { // 一个文件存放100000条数据,一旦超过,写入新的文件 if (count == SINGLE_FILE_MAX_SIZE) { count = 0; printWriter.flush(); printWriter.close(); fileIndex++; printWriter = new PrintWriter(goodsDataPath + "/" + fileIndex + ".txt"); } List content = pageResult.getContent(); for (Component element : content) { List goodses = goodsDao.findByComponent(element); for (Goods goods : goodses) { printWriter.println(JSONObject.toJSONString(goods)); } count++; } size += content.size(); logger.info(String.format("Downloaded...................%.2f%%", size * 100.0 / totalElements)); pageParams.setPage(pageParams.getPage() + 1); pageInfo = new PageInfo(pageParams, sort); pageResult = componentDao.findAll(pageInfo); } printWriter.flush(); printWriter.close(); } catch (FileNotFoundException e) { logger.error("", e); } Long endTime = new Date().getTime(); logger.info(String.format("下载数据%s条,耗时%.2fs\n ", totalElements, (endTime - startTime) / 1000.0)); return endTime - startTime; } @Override public void multiDownloadGoods(Integer number, Integer startFileIndex, Integer endFileIndex) { number = number == null || number < 1 ? 1 : number; startFileIndex = startFileIndex == null || startFileIndex < 1 ? 1 : startFileIndex; endFileIndex = endFileIndex == null || endFileIndex < 1 ? 1024 * 1024 * 1024 : endFileIndex; for (int i = 1; i <= number; i++) { new Thread(new DownloadTread("Thread-" + i, number, startFileIndex + i - 1, endFileIndex)).start(); } } class DownloadTread implements Runnable { private String name; private int step; private int startFileIndex; private int endFileIndex; public DownloadTread(String name, int step, int startFileIndex, int endFileIndex) { this.name = name; this.step = step; this.startFileIndex = startFileIndex; this.endFileIndex = endFileIndex; } @Override public void run() { try { if (endFileIndex < startFileIndex) { logger.error(name + " fileIndex 不可超过 : endFileIndex=" + endFileIndex); return; } Long startTime = new Date().getTime(); logger.info(name + " 下载批次... "); Sort sort = new Sort(Sort.Direction.ASC, "id"); // 分页获取数据 PageParams pageParams = new PageParams(); pageParams.setPage(startFileIndex); pageParams.setSize(PAGE_SIZE); PageInfo pageInfo = new PageInfo(pageParams, sort); Page pageResult = componentDao.findAll(pageInfo); int totalPages = pageResult.getTotalPages(); if (totalPages < startFileIndex) { logger.error(name + " fileIndex 不可超过 : totalPages=" + totalPages); return; } // 已翻页的数据数目 Long size = 0L; String goodsDataPath = SearchUtils.getDataPath(SearchConstants.GOODS_TABLE_NAME); File file = new File(goodsDataPath); if (!file.exists()) { file.mkdirs(); } while (totalPages >= startFileIndex && endFileIndex >= startFileIndex) { PrintWriter printWriter = new PrintWriter(goodsDataPath + "/" + String.format("%05d", startFileIndex) + ".txt"); List content = pageResult.getContent(); for (Component element : content) { List goodses = goodsDao.findByComponent(element); for (Goods goods : goodses) { printWriter.println(JSONObject.toJSONString(goods)); } } size += content.size(); logger.info(name + " " + startFileIndex + ".txt" + " - Downloaded..................." + size); printWriter.flush(); printWriter.close(); startFileIndex += step; pageParams.setPage(startFileIndex); pageInfo = new PageInfo(pageParams, sort); pageResult = componentDao.findAll(pageInfo); } Long endTime = new Date().getTime(); logger.info(String.format("%s 下载完成,耗时%.2fs\n ", name, (endTime - startTime) / 1000.0)); } catch (Throwable e) { logger.error(name, e); } } } @Override public Object save(Object obj) { if (obj != null) { String tableName = SearchUtils.getTableName(obj.getClass()); Document document = ObjectToDocumentUtils.toDocument(obj); if (document != null) { try { indexWriter = indexWriterManager.get(tableName); indexWriter.addDocument(document); indexSearcherManager.flushCache(tableName, indexWriter, null); return obj; } catch (IOException | InterruptedException e) { logger.error("", e); } finally { indexWriterManager.release(tableName); } } else { logger.info("对象转为Document时为null:" + obj); } } return null; } @Override public Object update(Object obj) { if (obj != null) { String tableName = SearchUtils.getTableName(obj.getClass()); Document document = ObjectToDocumentUtils.toDocument(obj); if (document != null) { try { indexWriter = indexWriterManager.get(tableName); if (obj instanceof Kind) { indexWriter.updateDocument( new Term(SearchConstants.KIND_ID_FIELD, String.valueOf(((Kind) obj).getId())), document); } else if (obj instanceof Brand) { indexWriter.updateDocument(new Term(SearchConstants.BRAND_ID_FIELD, String.valueOf(((Brand) obj).getId())), document); } else if (obj instanceof Component) { indexWriter.updateDocument(new Term(SearchConstants.COMPONENT_ID_FIELD, String.valueOf(((Component) obj).getId())), document); } else if (obj instanceof Order) { indexWriter.updateDocument(new Term(SearchConstants.ORDER_ID_FIELD, String.valueOf(((Order) obj).getId())), document); } else if (obj instanceof OrderInvoice) { indexWriter.updateDocument(new Term(SearchConstants.ORDER_INVOICE_ID_FIELD, String.valueOf(((OrderInvoice) obj).getId())), document); } else if (obj instanceof Purchase) { indexWriter.updateDocument(new Term(SearchConstants.PURCHASE_ID_FIELD, String.valueOf(((Purchase) obj).getId())), document); } else if (obj instanceof PurchaseInvoice) { indexWriter.updateDocument(new Term(SearchConstants.PURCHASE_INVOICE_ID_FIELD, String.valueOf(((PurchaseInvoice) obj).getId())), document); } else { throw new SearchException("Message parsing failed!"); } indexSearcherManager.flushCache(tableName, indexWriter, null); return obj; } catch (IOException | InterruptedException e) { logger.error("", e); } finally { indexWriterManager.release(tableName); } } else { logger.info("对象转为Document时为null:" + obj); } } return null; } @Override public Object delete(Object obj) { if (obj != null) { String tableName = SearchUtils.getTableName(obj.getClass()); try { indexWriter = indexWriterManager.get(tableName); if (obj instanceof Kind) { indexWriter.deleteDocuments( new Term(SearchConstants.KIND_ID_FIELD, String.valueOf(((Kind) obj).getId()))); } else if (obj instanceof Brand) { indexWriter.deleteDocuments( new Term(SearchConstants.BRAND_ID_FIELD, String.valueOf(((Brand) obj).getId()))); } else if (obj instanceof Component) { indexWriter.deleteDocuments(new Term(SearchConstants.COMPONENT_ID_FIELD, String.valueOf(((Component) obj).getId()))); } else if (obj instanceof Goods) { indexWriter.deleteDocuments(toTerm((Goods) obj)); } else if (obj instanceof Order) { indexWriter.deleteDocuments( new Term(SearchConstants.ORDER_ID_FIELD, String.valueOf(((Order) obj).getId()))); } else if (obj instanceof OrderInvoice) { indexWriter.deleteDocuments(new Term(SearchConstants.ORDER_INVOICE_ID_FIELD, String.valueOf(((OrderInvoice) obj).getId()))); } else if (obj instanceof Purchase) { indexWriter.deleteDocuments(new Term(SearchConstants.PURCHASE_ID_FIELD, String.valueOf(((Purchase) obj).getId()))); } else if (obj instanceof PurchaseInvoice) { indexWriter.deleteDocuments(new Term(SearchConstants.PURCHASE_INVOICE_ID_FIELD, String.valueOf(((PurchaseInvoice) obj).getId()))); } else { throw new SearchException("Message parsing failed!"); } indexSearcherManager.flushCache(tableName, indexWriter, null); return obj; } catch (IOException e) { logger.error("", e); } catch (InterruptedException e) { logger.error("", e); } finally { indexWriterManager.release(tableName); } } return null; } /** * 根据goods构造Term * * @param goods * @return */ private Term toTerm(Goods goods) { if (goods.getGoId() != null) { return new Term(SearchConstants.GOODS_GO_ID_FIELD, String.valueOf(goods.getGoId())); } else if (goods.getCmpId() != null) { return new Term(SearchConstants.GOODS_CMP_ID_FIELD, String.valueOf(goods.getCmpId())); } return null; } @Override public List maintainIndexes(@NotEmpty("parsedQueueMessage") ParsedQueueMessage parsedQueueMessage) { Object object = parsedQueueMessage.getObject(); if (object == null) { return null; } List maintainedObjects = new ArrayList<>(); // 新增、更新索引 if (parsedQueueMessage.isInsert() || parsedQueueMessage.isUpdate()) { if (object instanceof Goods) { List goodsesList = goodsDao.find((Goods) object); delete(object); for (Goods goods : goodsesList) { Object maintainedObject = save(goods); if (maintainedObject != null) { maintainedObjects.add(maintainedObject); } } } else { Object maintainedObject = update(object); if (maintainedObject != null) { maintainedObjects.add(maintainedObject); } } } // 删除索引 else if (parsedQueueMessage.isDelete()) { Object maintainedObject = delete(object); if (maintainedObject != null) { maintainedObjects.add(maintainedObject); } } else { throw new IllegalStateException("message parsing failed!"); } return maintainedObjects; } @Override public List maintainIndexes(@NotEmpty("tableName") String tableName, @NotEmpty("dataId") Long dataId, @NotEmpty("methodType") String methodType, String data) { ParsedQueueMessage parsedQueueMessage = queueMessageParser.parse(tableName, dataId, methodType, data); return maintainIndexes(parsedQueueMessage); } }