Эх сурвалжийг харах

jms接收到消息时,等待60秒;实时更新索引增加对事务回退情况下的处理

sunyj 9 жил өмнө
parent
commit
02c2be5bb6

+ 8 - 2
search-console/src/main/java/com/uas/search/console/jms/AQListener.java

@@ -66,8 +66,6 @@ public class AQListener {
 			QueueConnection connection = queueConnectionFactory.createQueueConnection(dataSource.getUsername(),
 					dataSource.getPassword());
 			AQjmsSession session = (AQjmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-			connection.start();
-
 			Queue queue = session.getQueue(dataSource.getUsername(), SearchConstants.LUCENE_QUEUE_NAME);
 			MessageConsumer consumer = session.createConsumer(queue, null, QueueMessageTypeFactory.getFactory(), null,
 					false);
@@ -76,6 +74,12 @@ public class AQListener {
 			consumer.setMessageListener(new MessageListener() {
 				@Override
 				public void onMessage(Message message) {
+					try {
+						// 等待60秒,为了等待数据表变动的事务提交
+						Thread.sleep(60000);
+					} catch (InterruptedException e1) {
+						e1.printStackTrace();
+					}
 					AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
 					try {
 						QueueMessageTypeFactory payload = (QueueMessageTypeFactory) adtMessage.getAdtPayload();
@@ -88,6 +92,7 @@ public class AQListener {
 					}
 				}
 			});
+			connection.start();
 		} catch (JMSException e) {
 			e.printStackTrace();
 		}
@@ -102,6 +107,7 @@ public class AQListener {
 		ParsedQueueMessage parsedQueueMessage = null;
 		try {
 			parsedQueueMessage = queueMessageParser.parse(message);
+			logger.info(parsedQueueMessage);
 		} catch (JSONException e) {
 			e.printStackTrace();
 		}

+ 28 - 23
search-console/src/main/java/com/uas/search/console/jms/QueueMessageParser.java

@@ -77,10 +77,6 @@ public class QueueMessageParser {
 			return null;
 		}
 
-		if (object == null) {
-			return null;
-		}
-
 		parsedQueueMessage.setObject(object);
 		return parsedQueueMessage;
 	}
@@ -94,14 +90,19 @@ public class QueueMessageParser {
 	 */
 	// {"method":"value1","table":"product$kind","ki_id":5}
 	private KindSimpleInfo parseKind(JSONObject jsonObject) throws JSONException {
-		KindSimpleInfo kind = new KindSimpleInfo();
 		Long kindid = jsonObject.getLong("ki_id");
-		kind.setId(kindid);
-
-		KindSimpleInfo temp = kindDao.findById(kindid);
-		// 如果更改是删除的话,根据id获取到的对象为null
-		if (temp != null) {
-			kind = temp;
+		KindSimpleInfo kind = kindDao.findById(kindid);
+		// 对删除操作的事务回退进行处理
+		// (新增操作回退的话,返回的是null,并不影响索引;
+		// 更新回退的话,数据与先前一样,这时更新索引,索引数据并不变)
+		if (jsonObject.getString("method").equals("delete")) {
+			// 数据库中该类目仍存在,认为事务回退,不删除索引
+			if (kind != null) {
+				return null;
+			} else {
+				kind = new KindSimpleInfo();
+				kind.setId(kindid);
+			}
 		}
 		return kind;
 	}
@@ -115,13 +116,15 @@ public class QueueMessageParser {
 	 */
 	// {"method":"value1","table":"product$brand","br_id":60}
 	private BrandSimpleInfo parseBrand(JSONObject jsonObject) throws JSONException {
-		BrandSimpleInfo brand = new BrandSimpleInfo();
 		Long brandid = jsonObject.getLong("br_id");
-		brand.setId(brandid);
-
-		BrandSimpleInfo temp = brandDao.findById(brandid);
-		if (temp != null) {
-			brand = temp;
+		BrandSimpleInfo brand = brandDao.findById(brandid);
+		if (jsonObject.getString("method").equals("delete")) {
+			if (brand != null) {
+				return null;
+			} else {
+				brand = new BrandSimpleInfo();
+				brand.setId(brandid);
+			}
 		}
 		return brand;
 	}
@@ -135,13 +138,15 @@ public class QueueMessageParser {
 	 */
 	// {"method":"value1","table":"product$component","cmp_id":2029}
 	private ComponentSimpleInfo parseComponent(JSONObject jsonObject) throws JSONException {
-		ComponentSimpleInfo component = new ComponentSimpleInfo();
 		Long componentid = jsonObject.getLong("cmp_id");
-		component.setId(componentid);
-
-		ComponentSimpleInfo temp = componentDao.findById(componentid);
-		if (temp != null) {
-			component = temp;
+		ComponentSimpleInfo component = componentDao.findById(componentid);
+		if (jsonObject.getString("method").equals("delete")) {
+			if (component != null) {
+				return null;
+			} else {
+				component = new ComponentSimpleInfo();
+				component.setId(componentid);
+			}
 		}
 		return component;
 	}