Browse Source

调整更新任务按索引关联关系分组

wangyc 7 years ago
parent
commit
fe9c8fe598

+ 2 - 2
mall-search/pom.xml

@@ -114,8 +114,8 @@
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-compiler-plugin</artifactId>
 				<configuration>
-					<source>1.7</source>
-					<target>1.7</target>
+					<source>1.8</source>
+					<target>1.8</target>
 				</configuration>
 			</plugin>
 			<plugin>

+ 1 - 1
mall-search/src/main/java/com/uas/search/Application.java

@@ -37,7 +37,7 @@ public class Application {
 		SystemProperties systemProperties = ContextUtils.getBean(SystemProperties.class);
 		if (systemProperties.isTaskAutoStart()) {
 			JmsListener jmsListener = ContextUtils.getBean(JmsListener.class);
-			jmsListener.start(null);
+			jmsListener.start(null, null);
 		}
 	}
 }

+ 20 - 0
mall-search/src/main/java/com/uas/search/constant/SearchConstants.java

@@ -13,6 +13,26 @@ public class SearchConstants {
 	 */
 	public static final int TOP_NUM = 1024 * 1024 * 1024;
 
+	/**
+	 * 标准器件类索引任务(包含类目(kind)、品牌(brand)、器件(component))
+	 */
+	public static final String STANDARD_INDEX_TASK = "standard_index_task";
+
+	/**
+	 * 在售商品类索引任务(包含goods,pcb_goods)
+	 */
+	public static final String GOODS_INDEX_TASK = "goods_index_task";
+
+	/**
+	 * 物料索引任务(包含v$product$private)
+	 */
+	public static final String PRODUCTS_INDEX_TASK = "products_index_task";
+
+	/**
+	 * 订单类索引任务(包含orders,purchases,invoice_fmor,invoice_fmpu)
+	 */
+	public static final String ORDERS_INDEX_TASK = "orders_index_task";
+
 	/**
 	 * 类目表名
 	 */

+ 4 - 4
mall-search/src/main/java/com/uas/search/controller/IndexController.java

@@ -128,7 +128,7 @@ public class IndexController {
     @RequestMapping("/listen/start")
     @ResponseBody
     public String startListen(Long interval, HttpServletRequest request) {
-        jmsListener.start(interval);
+        jmsListener.start(interval, null);
         return "开启成功";
     }
 
@@ -142,17 +142,17 @@ public class IndexController {
     @RequestMapping("/listen/restart")
     @ResponseBody
     public String restartListen(Long interval, HttpServletRequest request) {
-        if (jmsListener.isRunning()) {
+        if (jmsListener.isRunning(null)) {
             jmsListener.stop();
         }
-        jmsListener.start(interval);
+        jmsListener.start(interval, null);
         return "重启成功";
     }
 
     @RequestMapping("/listen/details")
     @ResponseBody
     public SPage<LuceneMessage> listenDetails(Integer page, Integer size, HttpServletRequest request) {
-        return luceneMessageService.findAll(page, size);
+        return luceneMessageService.findAll(page, size, null);
     }
 
     @RequestMapping("/maintain")

+ 111 - 42
mall-search/src/main/java/com/uas/search/jms/JmsListener.java

@@ -1,17 +1,22 @@
 package com.uas.search.jms;
 
+import com.uas.search.constant.SearchConstants;
 import com.uas.search.constant.model.SPage;
 import com.uas.search.schedule.TaskInformation;
 import com.uas.search.schedule.Executable;
 import com.uas.search.schedule.TaskService;
 import com.uas.search.service.IndexService;
 import com.uas.search.util.CollectionUtils;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import org.springframework.util.StringUtils;
 
 /**对数据库的消息队列进行实时监听
  *
@@ -30,6 +35,29 @@ public class JmsListener {
      */
     private static final long INTERVAL = 1000;
 
+    /**
+     * 基础索引任务
+     * STANDARD_INDEX_TASK 标准器件类索引任务,
+     * GOODS_INDEX_TASK 在售商品类索引任务,
+     * PRODUCTS_INDEX_TASK 物料索引任务,
+     * ORDERS_INDEX_TASK 订单类索引任务
+     */
+    private static final String[] BASKTASKS= {SearchConstants.STANDARD_INDEX_TASK, SearchConstants.GOODS_INDEX_TASK,
+        SearchConstants.PRODUCTS_INDEX_TASK, SearchConstants.ORDERS_INDEX_TASK};
+
+    // 物料类包含的索引名称
+    private static final String[] STANDARD_INDEX_TABLES = {SearchConstants.KIND_TABLE_NAME, SearchConstants.BRAND_TABLE_NAME, SearchConstants.COMPONENT_TABLE_NAME};
+
+    // 在售商品类包含的索引名称
+    private static final String[] GOODS_INDEX_TABLES = {SearchConstants.GOODS_TABLE_NAME, SearchConstants.PCB_GOODS_TABLE_NAME};
+
+    // 物料类包含的索引名称
+    private static final String[] PRODUCTS_INDEX_TABLES = {SearchConstants.PRODUCTS_PRIVATE_TABLE_NAME};
+
+    // 物料类包含的索引名称
+    private static final String[] ORDERS_INDEX_TABLES = {SearchConstants.ORDER_TABLE_NAME, SearchConstants.ORDER_INVOICE_TABLE_NAME,
+                        SearchConstants.PURCHASE_TABLE_NAME, SearchConstants.PURCHASE_INVOICE_TABLE_NAME};
+
     @Autowired
     private LuceneMessageDao luceneMessageDao;
 
@@ -49,14 +77,17 @@ public class JmsListener {
 
     private Logger logger = LoggerFactory.getLogger(getClass());
 
+
     /**
      * 开启实时更新索引
      *
      * @param interval
      *            每次接收到jms消息后等待的时间(秒)
+     *
+     * @param taskName 启动任务名称
      * @return 开启成功与否的提示信息
      */
-    public void start(Long interval) {
+    public void start(Long interval, String taskName) {
         if (interval == null){
             interval = INTERVAL;
         } else {
@@ -66,58 +97,95 @@ public class JmsListener {
             throw new IllegalArgumentException("interval 不合法:" + interval);
         }
 
-        if (isRunning()) {
-            throw new IllegalStateException("已存在运行的索引实时更新服务");
+        if (isRunning(null)) {
+            throw new IllegalStateException("已存在运行的索引" + taskInformation.getTitle() + "服务");
         }
 
-        try {
-            String title = "实时更新";
-            Executable command = new Executable() {
-                @Override
-                public String execute() {
-                    long startTime = System.currentTimeMillis();
-                    SPage<LuceneMessage> sPage = luceneMessageService.findAll(1, 100);
-                    List<LuceneMessage> luceneMessages = sPage.getContent();
-                    if (CollectionUtils.isEmpty(luceneMessages)) {
-                        return "无消息";
-                    }
-                    logger.info("获取luceneMessages" + luceneMessages.size() + "条,耗时:" + (System.currentTimeMillis() - startTime));
-                    for (LuceneMessage luceneMessage : luceneMessages){
-                        try {
-                            process(luceneMessage);
-                        } catch (Throwable e) {
-                            logger.error("", e);
-                        }
-                    }
-                    logger.info("处理luceneMessages" + luceneMessages.size() + "条,耗时:" + (System.currentTimeMillis() - startTime));
-                    // 如果消息不止一页,立即消费下一页
-                    if (sPage.getTotalPage() > 1){
-                        execute();
-                    }
-                    return "正常";
+        // 启动默认索引更新任务
+        if (StringUtils.isEmpty(taskName)) {
+            for (String baseTask : BASKTASKS) {
+                if (isRunning(baseTask)) {
+                    throw new IllegalStateException("已存在运行的" + baseTask + "索引实时更新服务");
+                } else {
+                    createTask(interval, baseTask);
                 }
-            };
-            taskInformation = new TaskInformation(title, command, INITIAL_DELAY, interval, TaskInformation.ScheduleType.FixedDelay);
-            taskService.newTask(taskInformation);
-            if (!taskService.isStopped()) {
-                taskService.stop();
             }
-            taskService.start();
-        } catch (Throwable e) {
-            if (taskInformation != null) {
-                taskInformation = null;
-            }
-            throw new IllegalStateException("开启失败", e);
+        // 启动指定索引更新任务
+        } else {
+
         }
+
+//        try {
+//            createTask(interval, "实时更新");
+//        } catch (Throwable e) {
+//            if (taskInformation != null) {
+//                taskInformation = null;
+//            }
+//            throw new IllegalStateException("开启失败", e);
+//        }
     }
 
+    /**
+     * 创建索引更新任务
+     * @param interval 任务间隔时间
+     * @param title 任务名称
+     */
+    private void createTask(Long interval, String title) {
+        Executable command = new Executable() {
+            @Override
+            public String execute() {
+                long startTime = System.currentTimeMillis();
+                String[] tablenames = null;
+                switch (title) {
+                    case SearchConstants.STANDARD_INDEX_TASK:
+                        tablenames = STANDARD_INDEX_TABLES;
+                        break;
+                    case SearchConstants.GOODS_INDEX_TASK:
+                        tablenames = GOODS_INDEX_TABLES;
+                        break;
+                    case SearchConstants.PRODUCTS_INDEX_TASK:
+                        tablenames = PRODUCTS_INDEX_TABLES;
+                        break;
+                    case SearchConstants.ORDERS_INDEX_TASK:
+                        tablenames = ORDERS_INDEX_TABLES;
+                        break;
+                    default: break;
+                }
+                SPage<LuceneMessage> sPage = luceneMessageService.findAll(1, 100, tablenames);
+                List<LuceneMessage> luceneMessages = sPage.getContent();
+                if (CollectionUtils.isEmpty(luceneMessages)) {
+                    return "无消息";
+                }
+                logger.info("获取luceneMessages" + luceneMessages.size() + "条,耗时:" + (System.currentTimeMillis() - startTime));
+                for (LuceneMessage luceneMessage : luceneMessages){
+                    try {
+                        process(luceneMessage);
+                    } catch (Throwable e) {
+                        logger.error("", e);
+                    }
+                }
+                logger.info("处理luceneMessages" + luceneMessages.size() + "条,耗时:" + (System.currentTimeMillis() - startTime));
+                // 如果消息不止一页,立即消费下一页
+                if (sPage.getTotalPage() > 1){
+                    execute();
+                }
+                return "正常";
+            }
+        };
+        taskInformation = new TaskInformation(title, command, INITIAL_DELAY, interval, TaskInformation.ScheduleType.FixedDelay);
+        taskService.newTask(taskInformation);
+//        if (!taskService.isStopped()) {
+//            taskService.stop();
+//        }
+        taskService.start();
+    }
     /**
      * 关闭实时更新索引服务
      *
      * @return 关闭成功与否的提示信息
      */
     public void stop() {
-        if (!isRunning()) {
+        if (!isRunning(null)) {
             throw new IllegalStateException("索引实时更新服务未开启或已关闭");
         } else {
             taskService.remove(taskInformation.getCode());
@@ -130,10 +198,11 @@ public class JmsListener {
     }
 
     /**
+     * @param title 索引名称
      * @return 索引实时更新服务是否正在运行
      */
-    public boolean isRunning() {
-        return taskInformation != null && taskService.exist(taskInformation.getCode());
+    public boolean isRunning(String title) {
+        return taskInformation != null && taskService.exist(StringUtils.isEmpty(title) ? taskInformation.getCode() : title);
     }
 
     /**

+ 20 - 0
mall-search/src/main/java/com/uas/search/jms/LuceneMessageDao.java

@@ -5,6 +5,7 @@ import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.jpa.repository.query.Procedure;
+import org.springframework.data.repository.query.Param;
 import org.springframework.stereotype.Repository;
 import org.springframework.transaction.annotation.Transactional;
 
@@ -28,6 +29,17 @@ public interface LuceneMessageDao
     @Query(value = "select * from lucene$message where me_retry_count < 5 order by me_priority desc, me_id limit ?1, ?2", nativeQuery = true)
     List<LuceneMessage> findList(@NotEmpty("start") Integer start, @NotEmpty("size") Integer size);
 
+    /**
+     * 获取指定表名、数目的数据
+     *
+     * @param start 开始的记录
+     * @param size 指定数目
+     * @param tableNames 表名
+     * @return 数据
+     */
+    @Query(value = "select * from lucene$message where me_retry_count < 10 and me_table_name in :tableNames order by me_priority desc, me_id limit :start, :size", nativeQuery = true)
+    List<LuceneMessage> findListByTableNames(@Param("start") Integer start, @Param("size") Integer size, @Param("tableNames") String[] tableNames);
+
     /**
      * 统计行数
      * @return 行数
@@ -35,6 +47,14 @@ public interface LuceneMessageDao
     @Query(value = "select count(1) from lucene$message where me_retry_count < 5", nativeQuery = true)
     long count();
 
+    /**
+     * 统计指定表名的行数
+     * @param tableNames
+     * @return
+     */
+    @Query(value = "select count(1) from lucene$message where me_retry_count < 10 and me_table_name in :tableNames", nativeQuery = true)
+    long countByTableNames(@Param("tableNames") String[] tableNames);
+
     /**
      * 出队消息
      * @param id 消息 id

+ 3 - 1
mall-search/src/main/java/com/uas/search/jms/LuceneMessageService.java

@@ -17,7 +17,9 @@ public interface LuceneMessageService {
      *            页码
      * @param size
      *            页大小
+     * @param tableNames
+     *            表名
      * @return 消息(实时更新情况的详细信息)
      */
-    SPage<LuceneMessage> findAll(Integer page, Integer size);
+    SPage<LuceneMessage> findAll(Integer page, Integer size, String[] tableNames);
 }

+ 20 - 4
mall-search/src/main/java/com/uas/search/jms/LuceneMessageServiceImpl.java

@@ -1,6 +1,7 @@
 package com.uas.search.jms;
 
 import com.uas.search.constant.model.SPage;
+import java.util.ArrayList;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
@@ -29,7 +30,7 @@ public class LuceneMessageServiceImpl implements LuceneMessageService {
     private LuceneMessageDao luceneMessageDao;
 
     @Override
-    public SPage<LuceneMessage> findAll(Integer page, Integer size) {
+    public SPage<LuceneMessage> findAll(Integer page, Integer size, String[] tableNames) {
         // 处理分页信息
         if (page == null || page <= 0) {
             page = PAGE_INDEX;
@@ -37,9 +38,14 @@ public class LuceneMessageServiceImpl implements LuceneMessageService {
         if (size == null || size <= 0) {
             size = PAGE_SIZE;
         }
-
+        long totalElement = 0L;
         SPage<LuceneMessage> sPage = new SPage<>();
-        long totalElement = luceneMessageDao.count();
+        if (tableNames != null && tableNames.length > 1) {
+            totalElement = luceneMessageDao.countByTableNames(tableNames);
+        } else {
+            totalElement = luceneMessageDao.count();
+        }
+
         sPage.setTotalElement(totalElement);
         // 总数目为0,返回
         if (totalElement == 0) {
@@ -61,7 +67,17 @@ public class LuceneMessageServiceImpl implements LuceneMessageService {
         if (page == totalPage) {
             sPage.setLast(true);
         }
-        List<LuceneMessage> messages = luceneMessageDao.findList(start, size);
+
+        List<LuceneMessage> messages = new ArrayList<>();
+        if (tableNames != null && tableNames.length > 1) {
+            messages = luceneMessageDao.findListByTableNames(start, size, tableNames);
+        } else {
+            messages = luceneMessageDao.findList(start, size);
+        }
+
+        if (tableNames.length == 1) {
+            System.out.println(messages);
+        }
         sPage.setContent(messages);
         return sPage;
     }

+ 10 - 9
mall-search/src/main/java/com/uas/search/schedule/TaskServiceImpl.java

@@ -2,6 +2,7 @@ package com.uas.search.schedule;
 
 import com.uas.search.annotation.NotEmpty;
 import com.uas.search.util.ExceptionUtils;
+import java.util.Map;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
@@ -34,9 +35,9 @@ public class TaskServiceImpl implements TaskService {
 	private List<TaskInformation> taskInformations = new ArrayList<>();
 
 	/**
-	 * 定时任务调度
+	 * 定时任务调度
 	 */
-	private ScheduledExecutorService scheduledExecutorService;
+	private Map<String, ScheduledExecutorService> scheduledExecutorMap;
 
 	/**
 	 * 任务执行日志文件的路径
@@ -132,17 +133,17 @@ public class TaskServiceImpl implements TaskService {
 	@Override
 	public String start() {
 		String message = "";
-		if (!isStopped()) {
-			message = "已存在运行的定时任务";
-			logger.error(message);
-			return message;
-		}
+//		if (!isStopped()) {
+//			message = "已存在运行的定时任务";
+//			logger.error(message);
+//			return message;
+//		}
 		if (!CollectionUtils.isEmpty(taskInformations)) {
 			// 线程数为1,以免多个创建索引的任务互相影响,导致索引创建失败
-			scheduledExecutorService = Executors.newScheduledThreadPool(1);
+			scheduledExecutorService = Executors.newScheduledThreadPool(4);
 			for (TaskInformation taskInformation : taskInformations) {
 				logger.info("New task: " + taskInformation);
-				switch (taskInformation.getScheduleType()){
+				switch (taskInformation.getScheduleType()) {
                     case FixedRate:
                         scheduledExecutorService.scheduleAtFixedRate(getCommand(taskInformation),
                                 taskInformation.getInitialDelay(), taskInformation.getInterval(), TimeUnit.MILLISECONDS);

+ 1 - 1
mall-search/src/main/java/com/uas/search/service/impl/IndexServiceImpl.java

@@ -155,7 +155,7 @@ public class IndexServiceImpl implements IndexService {
         }
         creatingIndex = true;
         // 如果索引实时更新处于开启状态,需要关闭(以免两者同时操作索引出现问题)
-        if (jmsListener.isRunning()) {
+        if (jmsListener.isRunning(null)) {
             logger.info("索引实时更新服务正在运行,尝试关闭索引实时更新服务...");
             jmsListener.stop();
         }