Эх сурвалжийг харах

增加开启、关闭实时更新的接口;jms接收到消息,等待30秒;

sunyj 9 жил өмнө
parent
commit
1418f8a37f

+ 19 - 20
search-console/src/main/java/com/uas/search/console/controller/IndexController.java

@@ -10,42 +10,41 @@ import com.uas.search.console.service.IndexService;
 
 
 /**
 /**
  * 索引创建相关请求
  * 索引创建相关请求
+ * 
  * @author sunyj
  * @author sunyj
  * @since 2016年8月5日 上午11:42:54
  * @since 2016年8月5日 上午11:42:54
  */
  */
 @Controller
 @Controller
 @RequestMapping("/index")
 @RequestMapping("/index")
 public class IndexController {
 public class IndexController {
-	
+
 	@Autowired
 	@Autowired
 	private IndexService indexService;
 	private IndexService indexService;
-	
+
 	@Autowired
 	@Autowired
 	private AQListener aqListener;
 	private AQListener aqListener;
-	
-	/**
-	 * 初始化创建索引文件
-	 * 
-	 * @return 所用时间 ms
-	 */
+
 	@RequestMapping("/create")
 	@RequestMapping("/create")
 	@ResponseBody
 	@ResponseBody
 	public String initIndexes() {
 	public String initIndexes() {
 		return "Indexes created success in " + indexService.createIndexs() + " ms.";
 		return "Indexes created success in " + indexService.createIndexs() + " ms.";
 	}
 	}
-	
-	/**
-	 * 实时更新索引
-	 * @return
-	 */
+
 	@RequestMapping("/listen")
 	@RequestMapping("/listen")
 	@ResponseBody
 	@ResponseBody
-	public String listen(){
-		new Thread(){
-			public void run() {
-				aqListener.execute();
-			};
-		}.start();
-		return "Listen...";
+	public String listen() {
+		return aqListener.start();
+	}
+
+	@RequestMapping("/listen/start")
+	@ResponseBody
+	public String startListen() {
+		return aqListener.start();
+	}
+
+	@RequestMapping("/listen/stop")
+	@ResponseBody
+	public String stopListen() {
+		return aqListener.stop();
 	}
 	}
 }
 }

+ 50 - 12
search-console/src/main/java/com/uas/search/console/jms/AQListener.java

@@ -42,21 +42,24 @@ public class AQListener {
 	@Autowired
 	@Autowired
 	private QueueMessageParser queueMessageParser;
 	private QueueMessageParser queueMessageParser;
 
 
-	/**
-	 * 标志,判断是否已存在监听线程,防止重复开启监听线程
-	 */
-	private boolean listening = false;
+	// 消息队列的消费者
+	private MessageConsumer consumer;
 
 
 	Logger logger = Logger.getLogger(AQListener.class);
 	Logger logger = Logger.getLogger(AQListener.class);
 
 
-	public void execute() {
-		if (listening) {
-			logger.warn("已存在线程正在实时监听!");
-			return;
+	/**
+	 * 开启实时更新索引
+	 * 
+	 * @return 开启成功与否的提示信息
+	 */
+	public String start() {
+		String message = "";
+		if (consumer != null) {
+			message = "已存在运行的实时更新索引服务";
+			logger.warn(message);
+			return message;
 		}
 		}
-		listening = true;
 
 
-		logger.info("AQListener started...\n");
 		BasicDataSource dataSource = ContextUtils.getApplicationContext().getBean("dataSource",
 		BasicDataSource dataSource = ContextUtils.getApplicationContext().getBean("dataSource",
 				org.apache.commons.dbcp.BasicDataSource.class);
 				org.apache.commons.dbcp.BasicDataSource.class);
 
 
@@ -67,8 +70,7 @@ public class AQListener {
 					dataSource.getPassword());
 					dataSource.getPassword());
 			AQjmsSession session = (AQjmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 			AQjmsSession session = (AQjmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 			Queue queue = session.getQueue(dataSource.getUsername(), SearchConstants.LUCENE_QUEUE_NAME);
 			Queue queue = session.getQueue(dataSource.getUsername(), SearchConstants.LUCENE_QUEUE_NAME);
-			MessageConsumer consumer = session.createConsumer(queue, null, QueueMessageTypeFactory.getFactory(), null,
-					false);
+			consumer = session.createConsumer(queue, null, QueueMessageTypeFactory.getFactory(), null, false);
 
 
 			// 添加监听器,队列中一旦有消息入队,就会接受该消息(并不是真的实时,一般会有10秒以内的延迟)
 			// 添加监听器,队列中一旦有消息入队,就会接受该消息(并不是真的实时,一般会有10秒以内的延迟)
 			consumer.setMessageListener(new MessageListener() {
 			consumer.setMessageListener(new MessageListener() {
@@ -93,9 +95,45 @@ public class AQListener {
 				}
 				}
 			});
 			});
 			connection.start();
 			connection.start();
+			message = "索引实时更新服务成功开启";
+			logger.info(message);
 		} catch (JMSException e) {
 		} catch (JMSException e) {
+			message = "索引实时更新服务开启失败";
+			logger.error(message);
 			e.printStackTrace();
 			e.printStackTrace();
+			try {
+				consumer.close();
+			} catch (JMSException e1) {
+				e1.printStackTrace();
+			}
+			consumer = null;
+		}
+		return message;
+	}
+
+	/**
+	 * 关闭实时更新索引服务
+	 * 
+	 * @return 关闭成功与否的提示信息
+	 */
+	public String stop() {
+		String message = "";
+		if (consumer == null) {
+			message = "实时更新索引服务未开启或已关闭";
+			logger.info(message);
+		} else {
+			try {
+				consumer.close();
+				message = "实时更新索引服务成功关闭";
+				logger.info(message);
+			} catch (JMSException e) {
+				message = "实时更新索引服务关闭失败";
+				logger.error(message);
+				e.printStackTrace();
+			}
+			consumer = null;
 		}
 		}
+		return message;
 	}
 	}
 
 
 	/**
 	/**