Browse Source

重新实现索引实时更新

sunyj 8 years ago
parent
commit
688242ebfe

+ 9 - 14
src/main/java/com/uas/search/Application.java

@@ -1,18 +1,17 @@
 package com.uas.search;
 
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.PrintStream;
-
+import com.uas.search.jms.JmsListener;
+import com.uas.search.service.RealTimeUpdateMonitorService;
+import com.uas.search.util.ContextUtils;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.transaction.annotation.EnableTransactionManagement;
 import org.springframework.web.servlet.config.annotation.EnableWebMvc;
 
-import com.uas.search.schedule.service.TaskService;
-import com.uas.search.service.RealTimeUpdateMonitorService;
-import com.uas.search.util.ContextUtils;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
 
 @EnableTransactionManagement
 @EnableWebMvc
@@ -43,12 +42,8 @@ public class Application {
 					.getBean(RealTimeUpdateMonitorService.class);
 			realTimeUpdateMonitorService.newTask();
 
-			// 开启定时任务
-			TaskService taskService = ContextUtils.getBean(TaskService.class);
-			if (!taskService.isStopped()) {
-				taskService.stop();
-			}
-			taskService.start();
+			JmsListener jmsListener = ContextUtils.getBean(JmsListener.class);
+			jmsListener.start(null);
 		}
 	}
 }

+ 4 - 4
src/main/java/com/uas/search/controller/IndexController.java

@@ -64,8 +64,8 @@ public class IndexController {
 
 	@RequestMapping("/listen/start")
 	@ResponseBody
-	public String startListen(Long waitInterval, HttpServletRequest request) {
-		return jmsListener.start(waitInterval);
+	public String startListen(Long period, HttpServletRequest request) {
+		return jmsListener.start(period);
 	}
 
 	@RequestMapping("/listen/stop")
@@ -76,11 +76,11 @@ public class IndexController {
 
 	@RequestMapping("/listen/restart")
 	@ResponseBody
-	public String restartListen(Long waitInterval, HttpServletRequest request) {
+	public String restartListen(Long period, HttpServletRequest request) {
 		if (jmsListener.isRunning()) {
 			jmsListener.stop();
 		}
-		return jmsListener.start(waitInterval);
+		return jmsListener.start(period);
 	}
 
 	@RequestMapping("/listen/details")

+ 0 - 169
src/main/java/com/uas/search/jms/AQListener.java

@@ -1,169 +0,0 @@
-package com.uas.search.jms;
-
-import java.util.Properties;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Queue;
-import javax.jms.QueueConnection;
-import javax.jms.QueueConnectionFactory;
-import javax.jms.Session;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import com.alibaba.druid.pool.DruidDataSource;
-import com.alibaba.fastjson.JSONException;
-import com.uas.search.constant.SearchConstants;
-import com.uas.search.model.ParsedQueueMessage;
-import com.uas.search.service.IndexService;
-
-import oracle.jms.AQjmsAdtMessage;
-import oracle.jms.AQjmsFactory;
-import oracle.jms.AQjmsSession;
-
-/**
- * 对数据库的消息队列进行实时监听
- * 
- * @author sunyj
- * @since 2016年7月7日 下午8:49:26
- */
-@Service
-public class AQListener {
-
-	@Autowired
-	private IndexService indexService;
-
-	@Autowired
-	private QueueMessageParser queueMessageParser;
-
-	@Autowired
-	private DruidDataSource dataSource;
-
-	// 消息队列的消费者
-	private MessageConsumer consumer;
-
-	private static Logger logger = LoggerFactory.getLogger(AQListener.class);
-
-	/**
-	 * 开启实时更新索引
-	 * 
-	 * @param waitInterval
-	 *            每次接收到jms消息后等待的时间(秒)
-	 * @return 开启成功与否的提示信息
-	 */
-	public String start(final Long waitInterval) {
-		String message = "";
-		if (isRunning()) {
-			message = "已存在运行的索引实时更新服务";
-			logger.warn(message);
-			return message;
-		}
-
-		try {
-			QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(dataSource.getUrl(),
-					new Properties());
-			QueueConnection connection = queueConnectionFactory.createQueueConnection(dataSource.getUsername(),
-					dataSource.getPassword());
-			AQjmsSession session = (AQjmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-			Queue queue = session.getQueue(dataSource.getUsername(), SearchConstants.LUCENE_QUEUE_NAME);
-			consumer = session.createConsumer(queue, null, QueueMessageTypeFactory.getFactory(), null, false);
-
-			// 添加监听器,队列中一旦有消息入队,就会接受该消息(并不是真的实时,一般会有10秒以内的延迟)
-			consumer.setMessageListener(new MessageListener() {
-				@Override
-				public void onMessage(Message message) {
-					try {
-						if (waitInterval != null) {
-							// 等待waitInterval秒,为了等待数据表变动的事务提交
-							// 如果时间过短,事务可能还未提交
-							// 如果时间过长,实时更新速度很慢
-							Thread.sleep(waitInterval * 1000);
-						}
-					} catch (InterruptedException e) {
-						logger.error("", e);
-					}
-					AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
-					try {
-						QueueMessageTypeFactory payload = (QueueMessageTypeFactory) adtMessage.getAdtPayload();
-						// 对出队的消息进行解析、处理
-						process(payload.getMessage());
-					} catch (Throwable e) {
-						logger.error("", e);
-					}
-				}
-			});
-			connection.start();
-			message = "索引实时更新服务成功开启";
-			logger.info(message);
-		} catch (Throwable e) {
-			message = "索引实时更新服务开启失败";
-			logger.error(message, e);
-			if (consumer != null) {
-				try {
-					consumer.close();
-				} catch (JMSException e1) {
-					logger.error("", e1);
-				}
-				consumer = null;
-			}
-		}
-		return message;
-	}
-
-	/**
-	 * 关闭实时更新索引服务
-	 * 
-	 * @return 关闭成功与否的提示信息
-	 */
-	public String stop() {
-		String message = "";
-		if (!isRunning()) {
-			message = "索引实时更新服务未开启或已关闭";
-			logger.warn(message);
-		} else {
-			try {
-				consumer.close();
-				message = "索引实时更新服务成功关闭";
-				logger.info(message);
-			} catch (JMSException e) {
-				message = "索引实时更新服务关闭失败";
-				logger.error(message, e);
-			}
-			consumer = null;
-		}
-		return message;
-	}
-
-	/**
-	 * @return 索引实时更新服务是否正在运行
-	 */
-	public boolean isRunning() {
-		return consumer != null;
-	}
-
-	/**
-	 * 对得到的队列消息进行解析,之后根据解析出来的对象,对lucene索引进行添加、更新或删除操作
-	 * 
-	 * @param message
-	 */
-	private void process(String message) {
-		ParsedQueueMessage parsedQueueMessage = null;
-		logger.info(message);
-		try {
-			parsedQueueMessage = queueMessageParser.parse(message);
-		} catch (JSONException e) {
-			logger.error("", e);
-		}
-
-		if (parsedQueueMessage == null) {
-			logger.error("message parsing failed!");
-			return;
-		}
-		indexService.maintainIndexes(parsedQueueMessage);
-	}
-}

+ 203 - 0
src/main/java/com/uas/search/jms/JmsListener.java

@@ -0,0 +1,203 @@
+package com.uas.search.jms;
+
+import com.uas.search.constant.model.SPage;
+import com.uas.search.model.LuceneMessage;
+import com.uas.search.model.ParsedQueueMessage;
+import com.uas.search.schedule.model.TaskInformation;
+import com.uas.search.schedule.service.Executable;
+import com.uas.search.schedule.service.TaskService;
+import com.uas.search.service.IndexService;
+import com.uas.search.service.LuceneMessageService;
+import com.uas.search.util.CollectionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**对数据库的消息队列进行实时监听
+ * @author sunyj
+ * @since 2017/10/11 16:20
+ */
+@Service
+public class JmsListener {
+
+    /**
+     * 第一次执行的延迟时间间隔为1秒
+     */
+    private static final long INITIAL_DELAY = 1000;
+    /**
+     * 两次任务之间的等待时间间隔为 1 秒钟
+     */
+    private static final long PERIOD = 1000;
+
+    @Autowired
+    private IndexService indexService;
+
+    @Autowired
+    private TaskService taskService;
+
+    @Autowired
+    private LuceneMessageService luceneMessageService;
+
+    @Autowired
+    private QueueMessageParser queueMessageParser;
+
+    private TaskInformation taskInformation;
+
+    private Logger logger = LoggerFactory.getLogger(getClass());
+
+    /**
+     * 开启实时更新索引
+     *
+     * @param period
+     *            每次接收到jms消息后等待的时间(秒)
+     * @return 开启成功与否的提示信息
+     */
+    public String start(Long period) {
+        if(period == null){
+            period = PERIOD;
+        }else{
+            period *= 1000;
+        }
+        if(period <= 0){
+            throw new IllegalArgumentException("period 不合法:" + period);
+        }
+
+        String message = "";
+        if (isRunning()) {
+            message = "已存在运行的索引实时更新服务";
+            logger.warn(message);
+            return message;
+        }
+
+        try {
+//            QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(dataSource.getUrl(),
+//                    new Properties());
+//            QueueConnection connection = queueConnectionFactory.createQueueConnection(dataSource.getUsername(),
+//                    dataSource.getPassword());
+//            AQjmsSession session = (AQjmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//            Queue queue = session.getQueue(dataSource.getUsername(), SearchConstants.LUCENE_QUEUE_NAME);
+//            consumer = session.createConsumer(queue, null, QueueMessageTypeFactory.getFactory(), null, false);
+            String title = "实时更新";
+            Executable command = new Executable() {
+                @Override
+                public String execute() {
+                    SPage<LuceneMessage> sPage = luceneMessageService.findAll(1, 100);
+                    List<LuceneMessage> luceneMessages = sPage.getContent();
+                    if (CollectionUtils.isEmpty(luceneMessages)) {
+                        return "无消息";
+                    }
+                    for(LuceneMessage luceneMessage : luceneMessages){
+                        try {
+                            process(luceneMessage);
+                        } catch (Throwable e) {
+                            logger.error("", e);
+                            continue;
+                        }
+                    }
+                    // 如果消息不止一页,立即消费下一页
+                    if(sPage.getPage() > 1){
+                        execute();
+                    }
+                    return "正常";
+                }
+            };
+            taskInformation = new TaskInformation(title, command, INITIAL_DELAY, period);
+            taskService.newTask(taskInformation);
+            if (!taskService.isStopped()) {
+                taskService.stop();
+            }
+            taskService.start();
+//
+//            // 添加监听器,队列中一旦有消息入队,就会接受该消息(并不是真的实时,一般会有10秒以内的延迟)
+//            consumer.setMessageListener(new MessageListener() {
+//                @Override
+//                public void onMessage(Message message) {
+//                    try {
+//                        if (period != null) {
+//                            // 等待waitInterval秒,为了等待数据表变动的事务提交
+//                            // 如果时间过短,事务可能还未提交
+//                            // 如果时间过长,实时更新速度很慢
+//                            Thread.sleep(period * 1000);
+//                        }
+//                    } catch (InterruptedException e) {
+//                        logger.error("", e);
+//                    }
+//                    AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
+//                    try {
+//                        QueueMessageTypeFactory payload = (QueueMessageTypeFactory) adtMessage.getAdtPayload();
+//                        // 对出队的消息进行解析、处理
+//                        process(payload.getMessage());
+//                    } catch (Throwable e) {
+//                        logger.error("", e);
+//                    }
+//                }
+//            });
+//            connection.start();
+            message = "索引实时更新服务成功开启";
+            logger.info(message);
+        } catch (Throwable e) {
+            message = "索引实时更新服务开启失败";
+            logger.error(message, e);
+            if (taskInformation != null) {
+//                try {
+//                    consumer.close();
+//                } catch (JMSException e1) {
+//                    logger.error("", e1);
+//                }
+                taskInformation = null;
+            }
+        }
+        return message;
+    }
+
+    /**
+     * 关闭实时更新索引服务
+     *
+     * @return 关闭成功与否的提示信息
+     */
+    public String stop() {
+        String message = "";
+        if (!isRunning()) {
+            message = "索引实时更新服务未开启或已关闭";
+            logger.warn(message);
+        } else {
+//            try {
+//                consumer.close();
+//                message = "索引实时更新服务成功关闭";
+//                logger.info(message);
+//            } catch (JMSException e) {
+//                message = "索引实时更新服务关闭失败";
+//                logger.error(message, e);
+//            }
+            taskService.remove(taskInformation.getCode());
+            taskInformation = null;
+            if (!taskService.isStopped()) {
+                taskService.stop();
+            }
+            taskService.start();
+        }
+        return message;
+    }
+
+    /**
+     * @return 索引实时更新服务是否正在运行
+     */
+    public boolean isRunning() {
+        return taskInformation != null && taskService.exist(taskInformation.getCode());
+    }
+
+    /**
+     * 对得到的队列消息进行解析,之后根据解析出来的对象,对lucene索引进行添加、更新或删除操作
+     *
+     * @param luceneMessage
+     */
+    private void process(LuceneMessage luceneMessage) {
+        logger.info(luceneMessage.toString());
+        ParsedQueueMessage parsedQueueMessage = queueMessageParser.parse(luceneMessage.getTableName(), luceneMessage.getDataId(), luceneMessage.getMethodType(), luceneMessage.getData());
+        indexService.maintainIndexes(parsedQueueMessage);
+        luceneMessageService.dequeueLuceneMessage(luceneMessage.getId());
+    }
+}

+ 22 - 3
src/main/java/com/uas/search/schedule/model/TaskInformation.java

@@ -3,6 +3,7 @@ package com.uas.search.schedule.model;
 import java.util.Objects;
 
 import com.uas.search.schedule.service.Executable;
+import com.uas.search.util.NumberGenerator;
 
 /**
  * 定时任务信息
@@ -11,6 +12,12 @@ import com.uas.search.schedule.service.Executable;
  * @since 2016年12月19日 上午10:21:28
  */
 public class TaskInformation {
+
+	/**
+	 * 任务 code
+	 */
+	private String code;
+
 	/**
 	 * 任务标题
 	 */
@@ -36,13 +43,25 @@ public class TaskInformation {
 	}
 
 	public TaskInformation(String title, Executable command, long initialDelay, long period) {
-		super();
+		init();
 		this.title = title;
 		this.command = command;
 		this.initialDelay = initialDelay;
 		this.period = period;
 	}
 
+	public void init(){
+		code = NumberGenerator.generateId();
+	}
+
+	public String getCode() {
+		return code;
+	}
+
+	public void setCode(String code) {
+		this.code = code;
+	}
+
 	public String getTitle() {
 		return title;
 	}
@@ -84,14 +103,14 @@ public class TaskInformation {
 			return false;
 		}
 		TaskInformation other = (TaskInformation) obj;
-		// command不好比较,不进行比较
+		// command不好比较,不进行比较,也不比较 code
 		return Objects.equals(title, other.getTitle()) && initialDelay == other.getInitialDelay()
 				&& period == other.getPeriod();
 	}
 
 	@Override
 	public String toString() {
-		return "TaskInformation [title=" + title + ", initialDelay=" + initialDelay + ", period=" + period + "]";
+		return "TaskInformation [code=" + code + ", title=" + title + ", initialDelay=" + initialDelay + ", period=" + period + "]";
 	}
 
 }

+ 16 - 0
src/main/java/com/uas/search/schedule/service/TaskService.java

@@ -2,6 +2,7 @@ package com.uas.search.schedule.service;
 
 import java.util.List;
 
+import com.uas.search.annotation.NotEmpty;
 import com.uas.search.schedule.model.TaskInformation;
 
 /**
@@ -19,6 +20,21 @@ public interface TaskService {
 	 */
 	public void newTask(TaskInformation taskInformation);
 
+	/**
+	 * 根据指定的 code,移除定时任务
+	 *
+	 * @param code 任务的 code
+	 */
+	void remove(@NotEmpty("code") String code);
+
+	/**
+	 * 是否存在定时任务
+	 *
+	 * @param code 任务的 code
+	 * @return true 则存在
+	 */
+	boolean exist(@NotEmpty("code") String code);
+
 	/**
 	 * 获取定时任务信息
 	 * 

+ 27 - 4
src/main/java/com/uas/search/schedule/service/impl/TaskServiceImpl.java

@@ -6,10 +6,12 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.Objects;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import com.uas.search.annotation.NotEmpty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.stereotype.Service;
@@ -52,21 +54,42 @@ public class TaskServiceImpl implements TaskService {
 	public void newTask(TaskInformation taskInformation) {
 		if (taskInformation == null || StringUtils.isEmpty(taskInformation.getTitle())
 				|| taskInformation.getCommand() == null) {
-			throw new NullPointerException();
+			throw new IllegalArgumentException("定时任务的 title 和 command 不可为空");
 		}
-		if (containsTask(taskInformation)) {
-			throw new SearchException("任务已存在:" + taskInformation);
+		if (contain(taskInformation)) {
+			throw new IllegalStateException("任务已存在:" + taskInformation);
 		}
 		taskInformations.add(taskInformation);
 	}
 
+	@Override
+	public void remove(@NotEmpty("code") String code) {
+		for (TaskInformation d : taskInformations) {
+			if (Objects.equals(code, d.getCode())) {
+				taskInformations.remove(d);
+				return;
+			}
+		}
+		throw new IllegalArgumentException("定时任务不存在:" + code);
+	}
+
+	@Override
+	public boolean exist(@NotEmpty("code") String code) {
+		for (TaskInformation d : taskInformations) {
+			if (Objects.equals(code, d.getCode())) {
+				return true;
+			}
+		}
+		return false;
+	}
+
 	/**
 	 * 判断定时任务是否已经存在
 	 * 
 	 * @param taskInformation
 	 * @return
 	 */
-	private boolean containsTask(TaskInformation taskInformation) {
+	private boolean contain(TaskInformation taskInformation) {
 		if (CollectionUtils.isEmpty(taskInformations)) {
 			return false;
 		}

+ 36 - 0
src/main/java/com/uas/search/util/NumberGenerator.java

@@ -0,0 +1,36 @@
+package com.uas.search.util;
+
+import java.util.Calendar;
+import java.util.Date;
+
+/**
+ * 数字生成工具类
+ *
+ * @author sunyj
+ * @since 2017年8月22日 上午11:34:44
+ */
+public class NumberGenerator {
+
+    /**
+     * 生成id时的偏移量
+     */
+    private static int offset = 0;
+
+    /**
+     * @return 产生唯一的id(一毫秒内不超过100个则不重复)
+     */
+    public static String generateId() {
+        // 偏移量,不超过100个
+        offset = (offset + 1) % 100;
+        // 当前毫秒数
+        long now = new Date().getTime();
+        // 起始时间 2017-01-01 00:00:00:000
+        Calendar startCalendar = Calendar.getInstance();
+        startCalendar.set(2017, 0, 1, 0, 0, 0);
+        startCalendar.set(Calendar.MILLISECOND, 0);
+        // 偏移量加16,保证得到的是两位16进制字符
+        String hex = Long.toHexString(now - startCalendar.getTimeInMillis()) + Integer.toHexString(offset + 16);
+        // 最后反转并转为大写
+        return hex.toUpperCase();
+    }
+}

+ 1 - 1
src/main/resources/application.yml

@@ -24,4 +24,4 @@ security:
  ignored: false
   
 schedule:
- auto-start: false
+ auto-start: true

+ 1 - 1
src/main/webapp/WEB-INF/views/console.html

@@ -84,7 +84,7 @@
 				<li><a target="_blank">index/create</a></li>
 				<li><a target="_blank">index/create?tableNames=product$brand,trade$order</a></li>
 				<li><a target="_blank">index/downloadComponentData?startFileIndex=20</a></li>
-				<li><a target="_blank">index/listen/start?waitInterval=10</a></li>
+				<li><a target="_blank">index/listen/start?period=10</a></li>
 				<li><a target="_blank">index/listen/stop</a></li>
 				<li><a target="_blank">index/listen/restart</a></li>
 				<li><a target="_blank">index/listen/details?page=1&size=10</a></li>