Selaa lähdekoodia

增加分组构建索引

wangyc 7 vuotta sitten
vanhempi
commit
1a0b2a30ab

+ 2 - 2
mall-search/src/main/java/com/uas/search/controller/IndexController.java

@@ -135,7 +135,7 @@ public class IndexController {
     @RequestMapping("/listen/stop")
     @ResponseBody
     public String stopListen(HttpServletRequest request) {
-        jmsListener.stop();
+        jmsListener.stop(null);
         return "关闭成功";
     }
 
@@ -143,7 +143,7 @@ public class IndexController {
     @ResponseBody
     public String restartListen(Long interval, HttpServletRequest request) {
         if (jmsListener.isRunning(null)) {
-            jmsListener.stop();
+            jmsListener.stop(null);
         }
         jmsListener.start(interval, null);
         return "重启成功";

+ 2 - 2
mall-search/src/main/java/com/uas/search/controller/ScheduleController.java

@@ -28,7 +28,7 @@ public class ScheduleController {
 	@RequestMapping("/start")
 	@ResponseBody
 	public String start(HttpServletRequest request) {
-		return taskService.start();
+		return taskService.start(null);
 	}
 
 	@RequestMapping("/stop")
@@ -43,7 +43,7 @@ public class ScheduleController {
 		if (!taskService.isStopped()) {
 			taskService.stop();
 		}
-		return taskService.start();
+		return taskService.start(null);
 	}
 
 	@RequestMapping("/isStopped")

+ 33 - 23
mall-search/src/main/java/com/uas/search/jms/JmsListener.java

@@ -1,21 +1,22 @@
 package com.uas.search.jms;
 
+import static com.uas.search.constant.SearchConstants.GOODS_INDEX_TASK;
+import static com.uas.search.constant.SearchConstants.ORDERS_INDEX_TASK;
+import static com.uas.search.constant.SearchConstants.PRODUCTS_INDEX_TASK;
+import static com.uas.search.constant.SearchConstants.STANDARD_INDEX_TASK;
+
 import com.uas.search.constant.SearchConstants;
 import com.uas.search.constant.model.SPage;
-import com.uas.search.schedule.TaskInformation;
 import com.uas.search.schedule.Executable;
+import com.uas.search.schedule.TaskInformation;
 import com.uas.search.schedule.TaskService;
 import com.uas.search.service.IndexService;
 import com.uas.search.util.CollectionUtils;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.stream.Collectors;
+import java.util.List;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
-
-import java.util.List;
 import org.springframework.util.StringUtils;
 
 /**对数据库的消息队列进行实时监听
@@ -42,8 +43,8 @@ public class JmsListener {
      * PRODUCTS_INDEX_TASK 物料索引任务,
      * ORDERS_INDEX_TASK 订单类索引任务
      */
-    private static final String[] BASKTASKS= {SearchConstants.STANDARD_INDEX_TASK, SearchConstants.GOODS_INDEX_TASK,
-        SearchConstants.PRODUCTS_INDEX_TASK, SearchConstants.ORDERS_INDEX_TASK};
+    private static final String[] BASKTASKS= {STANDARD_INDEX_TASK, GOODS_INDEX_TASK,
+        PRODUCTS_INDEX_TASK, ORDERS_INDEX_TASK};
 
     // 物料类包含的索引名称
     private static final String[] STANDARD_INDEX_TABLES = {SearchConstants.KIND_TABLE_NAME, SearchConstants.BRAND_TABLE_NAME, SearchConstants.COMPONENT_TABLE_NAME};
@@ -97,10 +98,14 @@ public class JmsListener {
             throw new IllegalArgumentException("interval 不合法:" + interval);
         }
 
-        if (isRunning(null)) {
+        if (isRunning(taskName)) {
             throw new IllegalStateException("已存在运行的索引" + taskInformation.getTitle() + "服务");
         }
 
+        if (!taskService.isIdle()) {
+            throw new IllegalStateException("任务池已满");
+        }
+
         // 启动默认索引更新任务
         if (StringUtils.isEmpty(taskName)) {
             for (String baseTask : BASKTASKS) {
@@ -137,16 +142,16 @@ public class JmsListener {
                 long startTime = System.currentTimeMillis();
                 String[] tablenames = null;
                 switch (title) {
-                    case SearchConstants.STANDARD_INDEX_TASK:
+                    case STANDARD_INDEX_TASK:
                         tablenames = STANDARD_INDEX_TABLES;
                         break;
-                    case SearchConstants.GOODS_INDEX_TASK:
+                    case GOODS_INDEX_TASK:
                         tablenames = GOODS_INDEX_TABLES;
                         break;
-                    case SearchConstants.PRODUCTS_INDEX_TASK:
+                    case PRODUCTS_INDEX_TASK:
                         tablenames = PRODUCTS_INDEX_TABLES;
                         break;
-                    case SearchConstants.ORDERS_INDEX_TASK:
+                    case ORDERS_INDEX_TASK:
                         tablenames = ORDERS_INDEX_TABLES;
                         break;
                     default: break;
@@ -174,26 +179,31 @@ public class JmsListener {
         };
         taskInformation = new TaskInformation(title, command, INITIAL_DELAY, interval, TaskInformation.ScheduleType.FixedDelay);
         taskService.newTask(taskInformation);
-//        if (!taskService.isStopped()) {
-//            taskService.stop();
-//        }
-        taskService.start();
+        if (!taskService.isStopped(taskInformation)) {
+            taskService.stop(taskInformation);
+        }
+        taskService.start(taskInformation);
     }
     /**
      * 关闭实时更新索引服务
      *
      * @return 关闭成功与否的提示信息
      */
-    public void stop() {
-        if (!isRunning(null)) {
+    public void stop(String title) {
+        String[] defaultTiltes = {STANDARD_INDEX_TASK, GOODS_INDEX_TASK, PRODUCTS_INDEX_TASK, ORDERS_INDEX_TASK};
+        if (StringUtils.isEmpty(title)) {
+            if (!taskService.isStopped()) {
+                taskService.stop();
+            }
+        } else if (!isRunning(title)) {
             throw new IllegalStateException("索引实时更新服务未开启或已关闭");
         } else {
             taskService.remove(taskInformation.getCode());
             taskInformation = null;
-            if (!taskService.isStopped()) {
-                taskService.stop();
+            if (!taskService.isStopped(taskInformation)) {
+                taskService.stop(taskInformation);
             }
-            taskService.start();
+            taskService.start(null);
         }
     }
 
@@ -202,7 +212,7 @@ public class JmsListener {
      * @return 索引实时更新服务是否正在运行
      */
     public boolean isRunning(String title) {
-        return taskInformation != null && taskService.exist(StringUtils.isEmpty(title) ? taskInformation.getCode() : title);
+        return taskInformation != null && taskService.exist(StringUtils.isEmpty(title) ? taskInformation.getTitle() : title);
     }
 
     /**

+ 26 - 8
mall-search/src/main/java/com/uas/search/schedule/TaskService.java

@@ -1,7 +1,6 @@
 package com.uas.search.schedule;
 
 import com.uas.search.annotation.NotEmpty;
-
 import java.util.List;
 
 /**
@@ -29,10 +28,10 @@ public interface TaskService {
 	/**
 	 * 是否存在定时任务
 	 *
-	 * @param code 任务的 code
+	 * @param code 任务的title
 	 * @return true 则存在
 	 */
-	boolean exist(@NotEmpty("code") String code);
+	boolean exist(@NotEmpty("title") String title);
 
 	/**
 	 * 获取定时任务信息
@@ -43,23 +42,42 @@ public interface TaskService {
 
 	/**
 	 * 开启定时任务
+	 * @param taskInformation 任务名称
 	 * 
 	 * @return 返回的结果
 	 */
-	public String start();
+	public String start(TaskInformation taskInformation);
+
+	/**
+	 * 关闭定时全部任务
+	 * @return
+	 */
+	String stop();
 
 	/**
 	 * 关闭定时任务
 	 * 
 	 * @return 返回的结果
 	 */
-	public String stop();
+	String stop(TaskInformation taskInformation);
 
 	/**
-	 * 定时任务是否停止
-	 * 
+	 * 指定任务是否停止
+	 * @param taskInformation 指定任务
+	 * @return
+	 */
+	boolean isStopped(TaskInformation taskInformation);
+
+	/**
+	 * 所有任务是否停止
+	 * @return
+	 */
+	boolean isStopped();
+
+	/**
+	 * 任务池是否空闲
 	 * @return
 	 */
-	public boolean isStopped();
+	boolean isIdle();
 
 }

+ 93 - 30
mall-search/src/main/java/com/uas/search/schedule/TaskServiceImpl.java

@@ -2,7 +2,9 @@ package com.uas.search.schedule;
 
 import com.uas.search.annotation.NotEmpty;
 import com.uas.search.util.ExceptionUtils;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
@@ -70,9 +72,9 @@ public class TaskServiceImpl implements TaskService {
 	}
 
 	@Override
-	public boolean exist(@NotEmpty("code") String code) {
+	public boolean exist(@NotEmpty("title") String title) {
 		for (TaskInformation d : taskInformations) {
-			if (Objects.equals(code, d.getCode())) {
+			if (Objects.equals(title, d.getTitle())) {
 				return true;
 			}
 		}
@@ -131,37 +133,55 @@ public class TaskServiceImpl implements TaskService {
 	}
 
 	@Override
-	public String start() {
+	public String start(TaskInformation taskInformation) {
 		String message = "";
-//		if (!isStopped()) {
-//			message = "已存在运行的定时任务";
-//			logger.error(message);
-//			return message;
-//		}
-		if (!CollectionUtils.isEmpty(taskInformations)) {
-			// 线程数为1,以免多个创建索引的任务互相影响,导致索引创建失败
-			scheduledExecutorService = Executors.newScheduledThreadPool(4);
-			for (TaskInformation taskInformation : taskInformations) {
-				logger.info("New task: " + taskInformation);
-				switch (taskInformation.getScheduleType()) {
-                    case FixedRate:
-                        scheduledExecutorService.scheduleAtFixedRate(getCommand(taskInformation),
-                                taskInformation.getInitialDelay(), taskInformation.getInterval(), TimeUnit.MILLISECONDS);
-                        break;
-                    case FixedDelay:
-                        scheduledExecutorService.scheduleWithFixedDelay(getCommand(taskInformation),
-                                taskInformation.getInitialDelay(), taskInformation.getInterval(), TimeUnit.MILLISECONDS);
-                        break;
-                }
-			}
-			message = "已开启定时任务:" + taskInformations;
-			logger.info(message + "\n");
+
+		if (!isIdle()) {
+			message = "任务池已满";
 			return message;
+		}
+
+		if (taskInformation != null) {
+			scheduledTask(taskInformation);
+		} else if (!CollectionUtils.isEmpty(taskInformations)) {
+			for (TaskInformation task : taskInformations) {
+				scheduledTask(task);
+			}
 		} else {
 			message = "定时任务为空";
 			logger.error(message + "\n");
 			return message;
 		}
+
+		return message;
+	}
+
+	private String scheduledTask(TaskInformation taskInformation) {
+		String message = "";
+		ScheduledExecutorService scheduledExecutorService = scheduledExecutorMap.get(taskInformation.getTitle());
+		if (scheduledExecutorService == null) {
+			// 线程数为1,以免同一索引多个的任务互相影响,导致索引创建失败
+			// TODO Products需要以企业多线程
+			scheduledExecutorService = Executors.newScheduledThreadPool(1);
+			logger.info("New task: " + taskInformation);
+			switch (taskInformation.getScheduleType()) {
+				case FixedRate:
+					scheduledExecutorService.scheduleAtFixedRate(getCommand(taskInformation),
+						taskInformation.getInitialDelay(), taskInformation.getInterval(), TimeUnit.MILLISECONDS);
+					break;
+				case FixedDelay:
+					scheduledExecutorService.scheduleWithFixedDelay(getCommand(taskInformation),
+						taskInformation.getInitialDelay(), taskInformation.getInterval(), TimeUnit.MILLISECONDS);
+					break;
+			}
+			scheduledExecutorMap.put(taskInformation.getTitle(), scheduledExecutorService);
+			message = "已开启定时任务:" + taskInformation;
+			logger.info(message + "\n");
+			return message;
+		} else {
+			message = "已存在运行的定时任务";
+		}
+		return message;
 	}
 
 	/**
@@ -190,22 +210,65 @@ public class TaskServiceImpl implements TaskService {
 	public String stop() {
 		String message = "";
 		if (isStopped()) {
-			message = "定时任务已经停止或者未开启过";
+			message = "全部定时任务已经停止或者未开启过";
+			logger.error(message);
+			return message;
+		}
+		logger.info("Remove old tasks...");
+
+		for (Entry entry : scheduledExecutorMap.entrySet()) {
+			ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) entry.getValue();
+			scheduledExecutorService.shutdownNow();
+		}
+		taskInformations = new ArrayList<>();
+		scheduledExecutorMap = new HashMap<>();
+		message = "已关闭定时任务";
+		logger.info(message + "\n");
+		return message;
+	}
+
+	@Override
+	public String stop(TaskInformation taskInformation) {
+		String message = "";
+		if (isStopped(taskInformation)) {
+			message = "定时任务" + taskInformation.getTitle() + "已经停止或者未开启过";
 			logger.error(message);
 			return message;
 		}
 		logger.info("Remove old tasks...");
+
+		ScheduledExecutorService scheduledExecutorService = scheduledExecutorMap.get(taskInformation.getTitle());
 		scheduledExecutorService.shutdownNow();
+
 		message = "已关闭定时任务";
 		logger.info(message + "\n");
 		return message;
 	}
 
+	@Override
+	public boolean isStopped(TaskInformation taskInformation) {
+		ScheduledExecutorService scheduledExecutorService = scheduledExecutorMap.get(taskInformation.getTitle());
+		return scheduledExecutorService == null || scheduledExecutorService.isTerminated() || scheduledExecutorService.isShutdown();
+	}
+
 	@Override
 	public boolean isStopped() {
-		if (scheduledExecutorService == null) {
-			return true;
+		if (scheduledExecutorMap.size() > 0) {
+			for (Entry entry : scheduledExecutorMap.entrySet()) {
+				ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) entry.getValue();
+				if (scheduledExecutorService != null && (!scheduledExecutorService.isShutdown() || !scheduledExecutorService.isTerminated())) {
+					return false;
+				}
+			}
+		}
+		return true;
+	}
+
+	@Override
+	public boolean isIdle() {
+		if (scheduledExecutorMap == null) {
+			scheduledExecutorMap = new HashMap<>();
 		}
-		return scheduledExecutorService.isShutdown() || scheduledExecutorService.isTerminated();
+		return scheduledExecutorMap.size() < 4;
 	}
 }

+ 1 - 7
mall-search/src/main/java/com/uas/search/service/impl/IndexServiceImpl.java

@@ -5,7 +5,6 @@ import static com.uas.search.constant.SearchConstants.GOODS_TABLE_NAME;
 import static com.uas.search.constant.SearchConstants.PCB_GOODS_TABLE_NAME;
 import static com.uas.search.constant.SearchConstants.PCB_TABLE_NAME;
 import static com.uas.search.constant.SearchConstants.PRODUCTS_PRIVATE_TABLE_NAME;
-import static com.uas.search.constant.SearchConstants.TRADE_GOODS_TABLE_NAME;
 
 import com.alibaba.fastjson.JSONException;
 import com.alibaba.fastjson.JSONObject;
@@ -62,16 +61,11 @@ import java.io.FileNotFoundException;
 import java.io.FileReader;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Field;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import javax.persistence.Column;
-import javax.persistence.MapKeyColumn;
 import org.apache.lucene.document.Document;
 import org.apache.lucene.index.IndexWriter;
 import org.apache.lucene.index.Term;
@@ -157,7 +151,7 @@ public class IndexServiceImpl implements IndexService {
         // 如果索引实时更新处于开启状态,需要关闭(以免两者同时操作索引出现问题)
         if (jmsListener.isRunning(null)) {
             logger.info("索引实时更新服务正在运行,尝试关闭索引实时更新服务...");
-            jmsListener.stop();
+            jmsListener.stop(null);
         }
 
         if (CollectionUtils.isEmpty(tableNames)) {