Browse Source

消息队列中发送完整数据,以提升索引更新速度

sunyj 8 years ago
parent
commit
acec65430b

+ 7 - 3
search-console-b2b/src/main/java/com/uas/search/console/b2b/controller/IndexController.java

@@ -115,9 +115,13 @@ public class IndexController {
 
 	@RequestMapping("/maintain")
 	@ResponseBody
-	public String maintainIndexes(@RequestParam(required = true) String tableName,
-			@RequestParam(required = true) String ids, @RequestParam(required = true) String method) {
-		return "已维护:" + indexService.maintainIndexes(tableName, ids, method);
+	public String maintainIndexes(@RequestParam(required = true) String tableName, String ids, String data,
+			@RequestParam(required = true) String method) {
+		if (!StringUtils.isEmpty(ids)) {
+			return "已维护:" + indexService.maintainIndexesByIds(tableName, ids, method);
+		} else {
+			return "已维护:" + indexService.maintainIndexesByData(tableName, data, method);
+		}
 	}
 
 	@RequestMapping("/dequeue")

+ 17 - 4
search-console-b2b/src/main/java/com/uas/search/console/b2b/jms/LuceneQueueMessage.java

@@ -25,6 +25,11 @@ public class LuceneQueueMessage {
 	 */
 	private String ids;
 
+	/**
+	 * 更改发生的数据
+	 */
+	private String data;
+
 	/**
 	 * 更改的类型,INSERT、UPDATE、DELETE
 	 */
@@ -59,6 +64,14 @@ public class LuceneQueueMessage {
 		this.ids = ids;
 	}
 
+	public String getData() {
+		return data;
+	}
+
+	public void setData(String data) {
+		this.data = data;
+	}
+
 	public String getMethod() {
 		return method;
 	}
@@ -85,14 +98,14 @@ public class LuceneQueueMessage {
 		}
 		LuceneQueueMessage other = (LuceneQueueMessage) obj;
 		return Objects.equals(messageId, other.getMessageId()) && Objects.equals(tableName, other.getTableName())
-				&& Objects.equals(ids, other.getIds()) && Objects.equals(method, other.getMethod())
-				&& Objects.equals(modifyTime, other.getModifyTime());
+				&& Objects.equals(ids, other.getIds()) && Objects.equals(data, other.getData())
+				&& Objects.equals(method, other.getMethod()) && Objects.equals(modifyTime, other.getModifyTime());
 	}
 
 	@Override
 	public String toString() {
-		return "LuceneQueueMessage [messageId=" + messageId + ", tableName=" + tableName + ", ids=" + ids + ", method="
-				+ method + ", modifyTime=" + modifyTime + "]";
+		return "LuceneQueueMessage [messageId=" + messageId + ", tableName=" + tableName + ", ids=" + ids + ", data="
+				+ data + ", method=" + method + ", modifyTime=" + modifyTime + "]";
 	}
 
 }

+ 1 - 0
search-console-b2b/src/main/java/com/uas/search/console/b2b/jms/LuceneQueueMessageDao.java

@@ -130,6 +130,7 @@ public class LuceneQueueMessageDao {
 				JSONObject jsonObject = JSONObject.parseObject(attributes[0].toString());
 				luceneQueueMessage.setTableName(jsonObject.getString("table"));
 				luceneQueueMessage.setIds(jsonObject.getString("ids"));
+				luceneQueueMessage.setData(jsonObject.getString("data"));
 				luceneQueueMessage.setMethod(jsonObject.getString("method"));
 				luceneQueueMessages.add(luceneQueueMessage);
 			}

+ 39 - 4
search-console-b2b/src/main/java/com/uas/search/console/b2b/jms/QueueMessageParser.java

@@ -36,12 +36,18 @@ public class QueueMessageParser {
 	 * @throws JSONException
 	 */
 	// {"method":"value1","table":"value2","ids":"[1,2,3]"}
+	// {"method":"value1","table":"value2","data":[{"pr_id":1,"pr_code":"ff"},{...}]}
 	public ParsedQueueMessage parse(String message) throws JSONException {
 		if (StringUtils.isEmpty(message) || message.equals("{}")) {
 			return null;
 		}
 		JSONObject jsonObject = JSONObject.parseObject(message);
-		if (!jsonObject.containsKey("method") || !jsonObject.containsKey("table") || !jsonObject.containsKey("ids")) {
+		if (!jsonObject.containsKey("method") || !jsonObject.containsKey("table")) {
+			return null;
+		}
+
+		// ids或data必须有一个存在
+		if (!jsonObject.containsKey("ids") && !jsonObject.containsKey("data")) {
 			return null;
 		}
 
@@ -64,8 +70,13 @@ public class QueueMessageParser {
 
 		// 解析哪个表有更改
 		String table = jsonObject.getString("table");
-		Object[] objects = parseToList(jsonObject,
-				ClassAndTableNameUtils.toClass(Table_name.valueOf(table.toUpperCase()))).toArray();
+		Class<?> clazz = ClassAndTableNameUtils.toClass(Table_name.valueOf(table.toUpperCase()));
+		Object[] objects = null;
+		if (jsonObject.containsKey("ids")) {
+			objects = parseToListByIds(jsonObject, clazz).toArray();
+		} else {
+			objects = parseToListByData(jsonObject, clazz).toArray();
+		}
 
 		parsedQueueMessage.setObjects(objects);
 		return parsedQueueMessage;
@@ -82,7 +93,31 @@ public class QueueMessageParser {
 	 *            实体类型
 	 * @return 实体对象列表
 	 */
-	private <T> List<T> parseToList(JSONObject jsonObject, Class<T> clazz) {
+	private <T> List<T> parseToListByData(JSONObject jsonObject, Class<T> clazz) {
+		List<T> list = new ArrayList<>();
+		JSONArray jsonArray = JSONObject.parseArray(jsonObject.getString("data"));
+		if (jsonArray == null || jsonArray.isEmpty()) {
+			return list;
+		}
+		for (Object obj : jsonArray) {
+			T object = JSONObject.parseObject(obj.toString(), clazz);
+			list.add(object);
+		}
+		return list;
+	}
+
+	/**
+	 * 根据消息队列的消息解析对象
+	 * 
+	 * @param <T>
+	 * 
+	 * @param jsonObject
+	 *            消息转为的json对象
+	 * @param clazz
+	 *            实体类型
+	 * @return 实体对象列表
+	 */
+	private <T> List<T> parseToListByIds(JSONObject jsonObject, Class<T> clazz) {
 		// 获取实体的dao
 		JpaRepository<T, Long> dao = ClassAndTableNameUtils.getDao(clazz);
 		Set<Long> ids = getIds(jsonObject.getJSONArray("ids"));

+ 14 - 1
search-console-b2b/src/main/java/com/uas/search/console/b2b/service/IndexService.java

@@ -85,7 +85,20 @@ public interface IndexService {
 	 *            需对索引做何种更改
 	 * @return 维护的对象
 	 */
-	public List<Object> maintainIndexes(String tableName, String ids, String method);
+	public List<Object> maintainIndexesByIds(String tableName, String ids, String method);
+
+	/**
+	 * 根据实体类型名维护出问题的表的索引
+	 * 
+	 * @param tableName
+	 *            需维护的实体类型名
+	 * @param data
+	 *            需维护的数据,JSONArray格式
+	 * @param method
+	 *            需对索引做何种更改
+	 * @return 维护的对象
+	 */
+	public List<Object> maintainIndexesByData(String tableName, String data, String method);
 
 	/**
 	 * 获取指定数据源实时更新情况的详细信息

+ 19 - 1
search-console-b2b/src/main/java/com/uas/search/console/b2b/service/impl/IndexServiceImpl.java

@@ -476,7 +476,7 @@ public class IndexServiceImpl implements IndexService {
 	}
 
 	@Override
-	public List<Object> maintainIndexes(String tableName, String ids, String method) {
+	public List<Object> maintainIndexesByIds(String tableName, String ids, String method) {
 		if (StringUtils.isEmpty(tableName) || StringUtils.isEmpty(ids) || StringUtils.isEmpty(method)) {
 			throw new SearchException("参数不能为空:tableName,ids,method");
 		}
@@ -498,6 +498,24 @@ public class IndexServiceImpl implements IndexService {
 		return maintainIndexes(parsedQueueMessage);
 	}
 
+	@Override
+	public List<Object> maintainIndexesByData(String tableName, String data, String method) {
+		if (StringUtils.isEmpty(tableName) || StringUtils.isEmpty(data) || StringUtils.isEmpty(method)) {
+			throw new SearchException("参数不能为空:tableName,data,method");
+		}
+		Map<String, Object> map = new HashMap<>();
+		map.put("table", tableName);
+		map.put("data", data);
+		map.put("method", method);
+		ParsedQueueMessage parsedQueueMessage = queueMessageParser.parse(JSONObject.toJSONString(map));
+
+		if (parsedQueueMessage == null) {
+			logger.error("message parsing failed!");
+			return null;
+		}
+		return maintainIndexes(parsedQueueMessage);
+	}
+
 	@Override
 	public SPage<LuceneQueueMessage> getListenDetails(DataSourceQualifier dataSourceQualifier, Integer page,
 			Integer size, String searchContent) {

+ 1 - 0
search-console-b2b/src/main/webapp/WEB-INF/views/console.html

@@ -44,6 +44,7 @@
 				<li><a target="_blank">listenDetails?dataSourceQualifier=platformmanager</a></li>
 				<li>index/downloadData</li>
 				<li><a target="_blank">index/downloadData?tableNames=PURC$ORDERS&startFileIndex=2</a></li>
+				<li>index/maintain?tableName=PURC$ORDERS&method=update&ids=1,2,3&data=[{"pu_id":1234,"pu_code":"adsf"},{...}]</li>
 				<li><a target="_blank">index/maintain?tableName=PURC$ORDERS&method=update&ids=1,2,3</a></li>
 				<li><a target="_blank">index/dequeue?dataSourceQualifier=platformmanager&messageId=42A34BADEF8A2EBDE050007F01001E6A</a></li>
 			</ol>

+ 1 - 1
search-console-b2b/src/main/webapp/WEB-INF/views/listenDetails.html

@@ -25,7 +25,7 @@
 				<span>表名</span>
 			</div>
 			<div class="listHeader maxWidthColumn floatLeft overflowHidden">
-				<span>id(表主键)</span>
+				<span>数据</span>
 			</div>
 			<div class="listHeader minWidthColumn floatLeft overflowHidden">
 				<span>更改类型</span>

+ 23 - 16
search-console-b2b/src/main/webapp/resources/js/listenDetails/app.js

@@ -87,8 +87,8 @@ function listenDetails(dataSourceQualifier, page, size, searchContent) {
 				numberOfCurrentPage = content.length;
 				for (var i = 0; i < numberOfCurrentPage; i++) {
 					addList(i, content[i].tableName, content[i].ids,
-							content[i].method, content[i].modifyTime,
-							content[i].messageId)
+							content[i].data, content[i].method,
+							content[i].modifyTime, content[i].messageId)
 				}
 			}
 			$("#totalElement").html(totalElement);
@@ -111,7 +111,9 @@ function listenDetails(dataSourceQualifier, page, size, searchContent) {
  * @param tableName
  *            表名
  * @param ids
- *            id
+ *            ids
+ * @param data
+ *            data
  * @param method
  *            更改类型,INSERT、UPDATE、DELETE
  * @param modifyTime
@@ -119,14 +121,14 @@ function listenDetails(dataSourceQualifier, page, size, searchContent) {
  * @param messageId
  *            消息的id
  */
-function addList(i, tableName, ids, method, modifyTime, messageId) {
+function addList(i, tableName, ids, data, method, modifyTime, messageId) {
 	var listItemDiv = $("<div class='listItem'></div>");
 	var noDiv = $("<div class='minWidthColumn floatLeft overflowHidden'><span>"
 			+ ((page - 1) * size + i + 1) + "</span></div>");
 	var tableNameDiv = $("<div class='mediumWidthColumn floatLeft overflowHidden'><span>"
 			+ tableName + "</span></div>");
-	var idsDiv = $("<div class='maxWidthColumn floatLeft overflowHidden'><span>"
-			+ ids + "</span></div>");
+	var dataDiv = $("<div class='maxWidthColumn floatLeft overflowHidden'><span>"
+			+ (ids || data) + "</span></div>");
 	var methodDiv = $("<div class='minWidthColumn floatLeft overflowHidden'><span>"
 			+ method + "</span></div>");
 	var modifyTimeDiv = $("<div class='mediumWidthColumn floatLeft overflowHidden'><span>"
@@ -142,15 +144,14 @@ function addList(i, tableName, ids, method, modifyTime, messageId) {
 
 	listItemDiv.append(noDiv);
 	listItemDiv.append(tableNameDiv);
-	listItemDiv.append(idsDiv);
+	listItemDiv.append(dataDiv);
 	listItemDiv.append(methodDiv);
 	listItemDiv.append(modifyTimeDiv);
 	listItemDiv.append(operationDiv);
 	$('#listContentContainer').append(listItemDiv);
 
 	updateButton.click(function() {
-		updateIndex(tableName, ids.substring(1, ids.length - 1), method,
-				messageId, listItemDiv);
+		updateIndex(tableName, ids, data, method, messageId, listItemDiv);
 	});
 }
 
@@ -160,7 +161,9 @@ function addList(i, tableName, ids, method, modifyTime, messageId) {
  * @param tableName
  *            表名
  * @param ids
- *            id
+ *            ids
+ * @param data
+ *            data
  * @param method
  *            需对索引做何种更改
  * @param messageId
@@ -168,14 +171,18 @@ function addList(i, tableName, ids, method, modifyTime, messageId) {
  * @param listItemDiv
  *            所在行
  */
-function updateIndex(tableName, ids, method, messageId, listItemDiv) {
+function updateIndex(tableName, ids, data, method, messageId, listItemDiv) {
 	spinner = showLoading(spinner, spinnerContainer);
-	var updateIndexUrl = "index/maintain?tableName=" + tableName + "&ids="
-			+ ids + "&method=" + method;
 	$.ajax({
-		type : "get",
-		url : updateIndexUrl,
-		success : function(data) {
+		type : "post",
+		url : "index/maintain",
+		data : {
+			"tableName" : tableName,
+			"method" : method,
+			"ids" : ids,
+			"data" : data
+		},
+		success : function(result) {
 			// 更新索引后,出队消息
 			dequeueMessage(dataSourceQualifier, messageId, listItemDiv);
 		},