|
|
@@ -1,281 +1,281 @@
|
|
|
-package com.uas.search.console.b2b.jms;
|
|
|
-
|
|
|
-import com.alibaba.druid.pool.DruidDataSource;
|
|
|
-import com.alibaba.fastjson.JSONException;
|
|
|
-import com.uas.search.console.b2b.DruidDBConfiguration;
|
|
|
-import com.uas.search.console.b2b.model.ParsedQueueMessage;
|
|
|
-import com.uas.search.console.b2b.service.IndexService;
|
|
|
-import com.uas.search.console.b2b.util.SearchConstants;
|
|
|
-import com.uas.search.console.b2b.util.SearchConstants.DataSourceQualifier;
|
|
|
-import com.uas.search.util.CollectionUtils;
|
|
|
-import oracle.jms.AQjmsAdtMessage;
|
|
|
-import oracle.jms.AQjmsFactory;
|
|
|
-import oracle.jms.AQjmsSession;
|
|
|
-import org.slf4j.Logger;
|
|
|
-import org.slf4j.LoggerFactory;
|
|
|
-import org.springframework.beans.factory.InitializingBean;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.stereotype.Service;
|
|
|
-
|
|
|
-import javax.jms.*;
|
|
|
-import java.util.HashSet;
|
|
|
-import java.util.Map;
|
|
|
-import java.util.Properties;
|
|
|
-import java.util.concurrent.ConcurrentHashMap;
|
|
|
-
|
|
|
-/**
|
|
|
- * 对数据库的消息队列进行实时监听
|
|
|
- *
|
|
|
- * @author sunyj
|
|
|
- * @since 2016年11月9日 上午11:29:33
|
|
|
- */
|
|
|
-@Service
|
|
|
-public class AQListener implements InitializingBean {
|
|
|
-
|
|
|
- @Autowired
|
|
|
- private IndexService indexService;
|
|
|
-
|
|
|
- @Autowired
|
|
|
- private QueueMessageParser queueMessageParser;
|
|
|
-
|
|
|
- /**
|
|
|
- * 数据源
|
|
|
- */
|
|
|
- private Map<DataSourceQualifier, DruidDataSource> dataSources = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
- /**
|
|
|
- * 消息队列的消费者
|
|
|
- */
|
|
|
- private Map<DataSourceQualifier, MessageConsumer> consumers = new ConcurrentHashMap<>();
|
|
|
-
|
|
|
- private Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
-
|
|
|
- @Override
|
|
|
- public void afterPropertiesSet() throws Exception {
|
|
|
- for (DataSourceQualifier dataSourceQualifier : DataSourceQualifier.values()) {
|
|
|
- dataSources.put(dataSourceQualifier, DruidDBConfiguration.getDataSource(dataSourceQualifier));
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 开启实时更新索引
|
|
|
- *
|
|
|
- * @param waitInterval
|
|
|
- * 每次接收到jms消息后等待的时间(秒)
|
|
|
- * @return 开启成功与否的提示信息
|
|
|
- */
|
|
|
- public String start(HashSet<DataSourceQualifier> dataSourceQualifiers, Long waitInterval) {
|
|
|
- // 不指定数据源标识时,开启所有监听器
|
|
|
- if (CollectionUtils.isEmpty(dataSourceQualifiers)) {
|
|
|
- dataSourceQualifiers = allDataSourceTypes();
|
|
|
- }
|
|
|
- String message = "";
|
|
|
- try {
|
|
|
- for (DataSourceQualifier qualifier : dataSourceQualifiers) {
|
|
|
- message += "\n" + startListener(qualifier, waitInterval);
|
|
|
- }
|
|
|
- } catch (Throwable e) {
|
|
|
- logger.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- return message;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 添加监听器
|
|
|
- *
|
|
|
- * @param dataSourceQualifier
|
|
|
- * @param waitInterval
|
|
|
- * @throws JMSException
|
|
|
- */
|
|
|
- private String startListener(DataSourceQualifier dataSourceQualifier, Long waitInterval) {
|
|
|
- String message = "";
|
|
|
- if (isRunning(dataSourceQualifier)) {
|
|
|
- message = "已存在运行的索引实时更新服务:" + dataSourceQualifier.getQualifier();
|
|
|
- logger.warn(message);
|
|
|
- return message;
|
|
|
- }
|
|
|
- DruidDataSource dataSource = dataSources.get(dataSourceQualifier);
|
|
|
- try {
|
|
|
- QueueConnection connection = getQueueConnection(dataSource);
|
|
|
- MessageConsumer consumer = getMessageConsumer(connection, dataSource.getUsername());
|
|
|
- consumers.put(dataSourceQualifier, consumer);
|
|
|
- // 添加监听器,队列中一旦有消息入队,就会接受该消息(并不是真的实时,一般会有10秒以内的延迟)
|
|
|
- consumer.setMessageListener(new JmsMessageListener(waitInterval));
|
|
|
- connection.start();
|
|
|
- message = "索引实时更新服务开启成功:" + dataSourceQualifier.getQualifier();
|
|
|
- logger.info(message);
|
|
|
- } catch (Throwable e) {
|
|
|
- message = "索引实时更新服务开启失败:" + dataSourceQualifier.getQualifier();
|
|
|
- logger.error(message, e);
|
|
|
- try {
|
|
|
- closeConsumer(dataSourceQualifier);
|
|
|
- } catch (JMSException e1) {
|
|
|
- logger.error("", e1);
|
|
|
- }
|
|
|
- }
|
|
|
- return message;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 根据数据源获取QueueConnection
|
|
|
- *
|
|
|
- * @param dataSource
|
|
|
- * @return
|
|
|
- * @throws JMSException
|
|
|
- */
|
|
|
- private QueueConnection getQueueConnection(DruidDataSource dataSource) throws JMSException {
|
|
|
- QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(dataSource.getUrl(),
|
|
|
- new Properties());
|
|
|
- return queueConnectionFactory.createQueueConnection(dataSource.getUsername(), dataSource.getPassword());
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 根据QueueConnection和数据库用户名获取MessageConsumer
|
|
|
- *
|
|
|
- * @param connection
|
|
|
- * @return
|
|
|
- * @throws JMSException
|
|
|
- */
|
|
|
- private MessageConsumer getMessageConsumer(QueueConnection connection, String userName) throws JMSException {
|
|
|
- AQjmsSession session = (AQjmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
|
- Queue queue = session.getQueue(userName, SearchConstants.LUCENE_QUEUE_NAME);
|
|
|
- return session.createConsumer(queue, null, QueueMessageTypeFactory.getFactory(), null, false);
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 关闭实时更新索引服务
|
|
|
- *
|
|
|
- * @param dataSourceQualifiers
|
|
|
- * @return 关闭成功与否的提示信息
|
|
|
- */
|
|
|
- public String stop(HashSet<DataSourceQualifier> dataSourceQualifiers) {
|
|
|
- // 不指定数据源标识时,关闭所有监听器
|
|
|
- if (CollectionUtils.isEmpty(dataSourceQualifiers)) {
|
|
|
- dataSourceQualifiers = allDataSourceTypes();
|
|
|
- }
|
|
|
- String message = "";
|
|
|
- try {
|
|
|
- for (DataSourceQualifier qualifier : dataSourceQualifiers) {
|
|
|
- message += "\n" + stopListener(qualifier);
|
|
|
- }
|
|
|
- } catch (Throwable e) {
|
|
|
- logger.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- return message;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 关闭监听器
|
|
|
- *
|
|
|
- * @param dataSourceQualifier
|
|
|
- * @return
|
|
|
- */
|
|
|
- private String stopListener(DataSourceQualifier dataSourceQualifier) {
|
|
|
- String message = "";
|
|
|
- if (!isRunning(dataSourceQualifier)) {
|
|
|
- message = "索引实时更新服务未开启或已关闭:" + dataSourceQualifier.getQualifier();
|
|
|
- logger.warn(message);
|
|
|
- } else {
|
|
|
- try {
|
|
|
- closeConsumer(dataSourceQualifier);
|
|
|
- message = "索引实时更新服务关闭成功:" + dataSourceQualifier.getQualifier();
|
|
|
- logger.info(message);
|
|
|
- } catch (Throwable e) {
|
|
|
- message = "索引实时更新服务关闭失败:" + dataSourceQualifier.getQualifier();
|
|
|
- logger.error(message, e);
|
|
|
- }
|
|
|
- }
|
|
|
- return message;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 关闭消息队列的消费者
|
|
|
- *
|
|
|
- * @param dataSourceQualifier
|
|
|
- * @throws JMSException
|
|
|
- */
|
|
|
- private void closeConsumer(DataSourceQualifier dataSourceQualifier) throws JMSException {
|
|
|
- MessageConsumer consumer = consumers.get(dataSourceQualifier);
|
|
|
- if (consumer != null) {
|
|
|
- consumer.close();
|
|
|
- consumers.remove(dataSourceQualifier);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @param dataSourceQualifier
|
|
|
- * @return 索引实时更新服务是否正在运行
|
|
|
- */
|
|
|
- public boolean isRunning(DataSourceQualifier dataSourceQualifier) {
|
|
|
- if (dataSourceQualifier == null) {
|
|
|
- throw new IllegalArgumentException("dataSourceQualifier 为空");
|
|
|
- }
|
|
|
- MessageConsumer consumer = consumers.get(dataSourceQualifier);
|
|
|
- return consumer != null;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * @return 所有数据源标识
|
|
|
- */
|
|
|
- private HashSet<DataSourceQualifier> allDataSourceTypes() {
|
|
|
- HashSet<DataSourceQualifier> dataSourceQualifiers = new HashSet<>();
|
|
|
- DataSourceQualifier[] types = DataSourceQualifier.values();
|
|
|
- for (DataSourceQualifier qualifier : types) {
|
|
|
- dataSourceQualifiers.add(qualifier);
|
|
|
- }
|
|
|
- return dataSourceQualifiers;
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 对得到的队列消息进行解析,之后根据解析出来的对象,对lucene索引进行添加、更新或删除操作
|
|
|
- *
|
|
|
- * @param message
|
|
|
- */
|
|
|
- private void process(String message) {
|
|
|
- ParsedQueueMessage parsedQueueMessage = null;
|
|
|
- logger.info(message);
|
|
|
- try {
|
|
|
- parsedQueueMessage = queueMessageParser.parse(message);
|
|
|
- } catch (JSONException e) {
|
|
|
- logger.error("", e);
|
|
|
- }
|
|
|
-
|
|
|
- if (parsedQueueMessage == null) {
|
|
|
- logger.error("message parsing failed!");
|
|
|
- return;
|
|
|
- }
|
|
|
- indexService.maintainIndexes(parsedQueueMessage);
|
|
|
- }
|
|
|
-
|
|
|
- class JmsMessageListener implements MessageListener {
|
|
|
-
|
|
|
- private Long waitInterval;
|
|
|
-
|
|
|
- public JmsMessageListener(Long waitInterval) {
|
|
|
- this.waitInterval = waitInterval;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onMessage(Message message) {
|
|
|
- try {
|
|
|
- if (waitInterval != null) {
|
|
|
- // 等待waitInterval秒,为了等待数据表变动的事务提交
|
|
|
- // 如果时间过短,事务可能还未提交
|
|
|
- // 如果时间过长,实时更新速度很慢
|
|
|
- Thread.sleep(waitInterval * 1000);
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- logger.error("", e);
|
|
|
- }
|
|
|
- AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
|
|
|
- try {
|
|
|
- QueueMessageTypeFactory payload = (QueueMessageTypeFactory) adtMessage.getAdtPayload();
|
|
|
- // 对出队的消息进行解析、处理
|
|
|
- process(payload.getMessage());
|
|
|
- } catch (Throwable e) {
|
|
|
- logger.error("", e);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-}
|
|
|
+package com.uas.search.console.b2b.jms;
|
|
|
+
|
|
|
+import com.alibaba.druid.pool.DruidDataSource;
|
|
|
+import com.alibaba.fastjson.JSONException;
|
|
|
+import com.uas.search.console.b2b.DruidDBConfiguration;
|
|
|
+import com.uas.search.console.b2b.model.ParsedQueueMessage;
|
|
|
+import com.uas.search.console.b2b.service.IndexService;
|
|
|
+import com.uas.search.console.b2b.util.SearchConstants;
|
|
|
+import com.uas.search.console.b2b.util.SearchConstants.DataSourceQualifier;
|
|
|
+import com.uas.search.util.CollectionUtils;
|
|
|
+import oracle.jms.AQjmsAdtMessage;
|
|
|
+import oracle.jms.AQjmsFactory;
|
|
|
+import oracle.jms.AQjmsSession;
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.beans.factory.InitializingBean;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import javax.jms.*;
|
|
|
+import java.util.HashSet;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.Properties;
|
|
|
+import java.util.concurrent.ConcurrentHashMap;
|
|
|
+
|
|
|
+/**
|
|
|
+ * 对数据库的消息队列进行实时监听
|
|
|
+ *
|
|
|
+ * @author sunyj
|
|
|
+ * @since 2016年11月9日 上午11:29:33
|
|
|
+ */
|
|
|
+@Service
|
|
|
+public class JmsListener implements InitializingBean {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private IndexService indexService;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private QueueMessageParser queueMessageParser;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 数据源
|
|
|
+ */
|
|
|
+ private Map<DataSourceQualifier, DruidDataSource> dataSources = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 消息队列的消费者
|
|
|
+ */
|
|
|
+ private Map<DataSourceQualifier, MessageConsumer> consumers = new ConcurrentHashMap<>();
|
|
|
+
|
|
|
+ private Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void afterPropertiesSet() throws Exception {
|
|
|
+ for (DataSourceQualifier dataSourceQualifier : DataSourceQualifier.values()) {
|
|
|
+ dataSources.put(dataSourceQualifier, DruidDBConfiguration.getDataSource(dataSourceQualifier));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 开启实时更新索引
|
|
|
+ *
|
|
|
+ * @param waitInterval
|
|
|
+ * 每次接收到jms消息后等待的时间(秒)
|
|
|
+ * @return 开启成功与否的提示信息
|
|
|
+ */
|
|
|
+ public String start(HashSet<DataSourceQualifier> dataSourceQualifiers, Long waitInterval) {
|
|
|
+ // 不指定数据源标识时,开启所有监听器
|
|
|
+ if (CollectionUtils.isEmpty(dataSourceQualifiers)) {
|
|
|
+ dataSourceQualifiers = allDataSourceTypes();
|
|
|
+ }
|
|
|
+ String message = "";
|
|
|
+ try {
|
|
|
+ for (DataSourceQualifier qualifier : dataSourceQualifiers) {
|
|
|
+ message += "\n" + startListener(qualifier, waitInterval);
|
|
|
+ }
|
|
|
+ } catch (Throwable e) {
|
|
|
+ logger.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ return message;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 添加监听器
|
|
|
+ *
|
|
|
+ * @param dataSourceQualifier
|
|
|
+ * @param waitInterval
|
|
|
+ * @throws JMSException
|
|
|
+ */
|
|
|
+ private String startListener(DataSourceQualifier dataSourceQualifier, Long waitInterval) {
|
|
|
+ String message = "";
|
|
|
+ if (isRunning(dataSourceQualifier)) {
|
|
|
+ message = "已存在运行的索引实时更新服务:" + dataSourceQualifier.getQualifier();
|
|
|
+ logger.warn(message);
|
|
|
+ return message;
|
|
|
+ }
|
|
|
+ DruidDataSource dataSource = dataSources.get(dataSourceQualifier);
|
|
|
+ try {
|
|
|
+ QueueConnection connection = getQueueConnection(dataSource);
|
|
|
+ MessageConsumer consumer = getMessageConsumer(connection, dataSource.getUsername());
|
|
|
+ consumers.put(dataSourceQualifier, consumer);
|
|
|
+ // 添加监听器,队列中一旦有消息入队,就会接受该消息(并不是真的实时,一般会有10秒以内的延迟)
|
|
|
+ consumer.setMessageListener(new JmsMessageListener(waitInterval));
|
|
|
+ connection.start();
|
|
|
+ message = "索引实时更新服务开启成功:" + dataSourceQualifier.getQualifier();
|
|
|
+ logger.info(message);
|
|
|
+ } catch (Throwable e) {
|
|
|
+ message = "索引实时更新服务开启失败:" + dataSourceQualifier.getQualifier();
|
|
|
+ logger.error(message, e);
|
|
|
+ try {
|
|
|
+ closeConsumer(dataSourceQualifier);
|
|
|
+ } catch (JMSException e1) {
|
|
|
+ logger.error("", e1);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return message;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据数据源获取QueueConnection
|
|
|
+ *
|
|
|
+ * @param dataSource
|
|
|
+ * @return
|
|
|
+ * @throws JMSException
|
|
|
+ */
|
|
|
+ private QueueConnection getQueueConnection(DruidDataSource dataSource) throws JMSException {
|
|
|
+ QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(dataSource.getUrl(),
|
|
|
+ new Properties());
|
|
|
+ return queueConnectionFactory.createQueueConnection(dataSource.getUsername(), dataSource.getPassword());
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 根据QueueConnection和数据库用户名获取MessageConsumer
|
|
|
+ *
|
|
|
+ * @param connection
|
|
|
+ * @return
|
|
|
+ * @throws JMSException
|
|
|
+ */
|
|
|
+ private MessageConsumer getMessageConsumer(QueueConnection connection, String userName) throws JMSException {
|
|
|
+ AQjmsSession session = (AQjmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
|
|
+ Queue queue = session.getQueue(userName, SearchConstants.LUCENE_QUEUE_NAME);
|
|
|
+ return session.createConsumer(queue, null, QueueMessageTypeFactory.getFactory(), null, false);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 关闭实时更新索引服务
|
|
|
+ *
|
|
|
+ * @param dataSourceQualifiers
|
|
|
+ * @return 关闭成功与否的提示信息
|
|
|
+ */
|
|
|
+ public String stop(HashSet<DataSourceQualifier> dataSourceQualifiers) {
|
|
|
+ // 不指定数据源标识时,关闭所有监听器
|
|
|
+ if (CollectionUtils.isEmpty(dataSourceQualifiers)) {
|
|
|
+ dataSourceQualifiers = allDataSourceTypes();
|
|
|
+ }
|
|
|
+ String message = "";
|
|
|
+ try {
|
|
|
+ for (DataSourceQualifier qualifier : dataSourceQualifiers) {
|
|
|
+ message += "\n" + stopListener(qualifier);
|
|
|
+ }
|
|
|
+ } catch (Throwable e) {
|
|
|
+ logger.error(e.getMessage(), e);
|
|
|
+ }
|
|
|
+ return message;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 关闭监听器
|
|
|
+ *
|
|
|
+ * @param dataSourceQualifier
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ private String stopListener(DataSourceQualifier dataSourceQualifier) {
|
|
|
+ String message = "";
|
|
|
+ if (!isRunning(dataSourceQualifier)) {
|
|
|
+ message = "索引实时更新服务未开启或已关闭:" + dataSourceQualifier.getQualifier();
|
|
|
+ logger.warn(message);
|
|
|
+ } else {
|
|
|
+ try {
|
|
|
+ closeConsumer(dataSourceQualifier);
|
|
|
+ message = "索引实时更新服务关闭成功:" + dataSourceQualifier.getQualifier();
|
|
|
+ logger.info(message);
|
|
|
+ } catch (Throwable e) {
|
|
|
+ message = "索引实时更新服务关闭失败:" + dataSourceQualifier.getQualifier();
|
|
|
+ logger.error(message, e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return message;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 关闭消息队列的消费者
|
|
|
+ *
|
|
|
+ * @param dataSourceQualifier
|
|
|
+ * @throws JMSException
|
|
|
+ */
|
|
|
+ private void closeConsumer(DataSourceQualifier dataSourceQualifier) throws JMSException {
|
|
|
+ MessageConsumer consumer = consumers.get(dataSourceQualifier);
|
|
|
+ if (consumer != null) {
|
|
|
+ consumer.close();
|
|
|
+ consumers.remove(dataSourceQualifier);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @param dataSourceQualifier
|
|
|
+ * @return 索引实时更新服务是否正在运行
|
|
|
+ */
|
|
|
+ public boolean isRunning(DataSourceQualifier dataSourceQualifier) {
|
|
|
+ if (dataSourceQualifier == null) {
|
|
|
+ throw new IllegalArgumentException("dataSourceQualifier 为空");
|
|
|
+ }
|
|
|
+ MessageConsumer consumer = consumers.get(dataSourceQualifier);
|
|
|
+ return consumer != null;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @return 所有数据源标识
|
|
|
+ */
|
|
|
+ private HashSet<DataSourceQualifier> allDataSourceTypes() {
|
|
|
+ HashSet<DataSourceQualifier> dataSourceQualifiers = new HashSet<>();
|
|
|
+ DataSourceQualifier[] types = DataSourceQualifier.values();
|
|
|
+ for (DataSourceQualifier qualifier : types) {
|
|
|
+ dataSourceQualifiers.add(qualifier);
|
|
|
+ }
|
|
|
+ return dataSourceQualifiers;
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 对得到的队列消息进行解析,之后根据解析出来的对象,对lucene索引进行添加、更新或删除操作
|
|
|
+ *
|
|
|
+ * @param message
|
|
|
+ */
|
|
|
+ private void process(String message) {
|
|
|
+ ParsedQueueMessage parsedQueueMessage = null;
|
|
|
+ logger.info(message);
|
|
|
+ try {
|
|
|
+ parsedQueueMessage = queueMessageParser.parse(message);
|
|
|
+ } catch (JSONException e) {
|
|
|
+ logger.error("", e);
|
|
|
+ }
|
|
|
+
|
|
|
+ if (parsedQueueMessage == null) {
|
|
|
+ logger.error("message parsing failed!");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ indexService.maintainIndexes(parsedQueueMessage);
|
|
|
+ }
|
|
|
+
|
|
|
+ class JmsMessageListener implements MessageListener {
|
|
|
+
|
|
|
+ private Long waitInterval;
|
|
|
+
|
|
|
+ public JmsMessageListener(Long waitInterval) {
|
|
|
+ this.waitInterval = waitInterval;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onMessage(Message message) {
|
|
|
+ try {
|
|
|
+ if (waitInterval != null) {
|
|
|
+ // 等待waitInterval秒,为了等待数据表变动的事务提交
|
|
|
+ // 如果时间过短,事务可能还未提交
|
|
|
+ // 如果时间过长,实时更新速度很慢
|
|
|
+ Thread.sleep(waitInterval * 1000);
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ logger.error("", e);
|
|
|
+ }
|
|
|
+ AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
|
|
|
+ try {
|
|
|
+ QueueMessageTypeFactory payload = (QueueMessageTypeFactory) adtMessage.getAdtPayload();
|
|
|
+ // 对出队的消息进行解析、处理
|
|
|
+ process(payload.getMessage());
|
|
|
+ } catch (Throwable e) {
|
|
|
+ logger.error("", e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|