Browse Source

调整批量成功失败消息处理

wangyc 7 years ago
parent
commit
51e2143d44

+ 53 - 19
mall-search/src/main/java/com/uas/search/jms/JmsListener.java

@@ -12,7 +12,10 @@ import com.uas.search.schedule.TaskInformation;
 import com.uas.search.schedule.TaskService;
 import com.uas.search.schedule.TaskService;
 import com.uas.search.service.IndexService;
 import com.uas.search.service.IndexService;
 import com.uas.search.util.CollectionUtils;
 import com.uas.search.util.CollectionUtils;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 import org.slf4j.Logger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -161,14 +164,36 @@ public class JmsListener {
                 if (CollectionUtils.isEmpty(luceneMessages)) {
                 if (CollectionUtils.isEmpty(luceneMessages)) {
                     return "无消息";
                     return "无消息";
                 }
                 }
-                logger.info("获取luceneMessages" + luceneMessages.size() + "条,耗时:" + (System.currentTimeMillis() - startTime));
-                for (LuceneMessage luceneMessage : luceneMessages){
-                    try {
-                        process(luceneMessage);
-                    } catch (Throwable e) {
-                        logger.error("", e);
-                    }
+                logger.info(title + "获取luceneMessages" + luceneMessages.size() + "条,耗时:" + (System.currentTimeMillis() - startTime));
+                // 按操作类型分组
+                long indexTime = System.currentTimeMillis();
+                Map<String, List<LuceneMessage>> luceneMessageMap = luceneMessages.stream().collect(Collectors.groupingBy(LuceneMessage :: getMethodType));
+                List<LuceneMessage> insertMessage = luceneMessageMap.get("insert");
+                List<LuceneMessage> updateMessage = luceneMessageMap.get("update");
+                List<LuceneMessage> deleteMessage = luceneMessageMap.get("delete");
+
+                List<Long> failedIds = new ArrayList<>();
+                if (!CollectionUtils.isEmpty(insertMessage)) {
+                    failedIds = process(insertMessage);
+                    logger.info("insert索引" + insertMessage.size() + "条,失败" + failedIds.size() + "耗时" + (System.currentTimeMillis() - indexTime));
+                }
+
+                if (!CollectionUtils.isEmpty(updateMessage)) {
+                    failedIds = process(updateMessage);
+                    logger.info("update索引" + updateMessage.size() + "条,失败" + failedIds.size() + "耗时" + (System.currentTimeMillis() - indexTime));
                 }
                 }
+
+                if (!CollectionUtils.isEmpty(deleteMessage)) {
+                    failedIds = process(deleteMessage);
+                    logger.info("delete索引" + deleteMessage.size() + "条,失败" + failedIds.size() + "耗时" + (System.currentTimeMillis() - indexTime));
+                }
+
+                if (!CollectionUtils.isEmpty(failedIds)) {
+                    long retryTime = System.currentTimeMillis();
+                    luceneMessageDao.updateReTryCount(failedIds);
+                    logger.info("处理失败:update retrycount" + failedIds.size() + "条,耗时" + (System.currentTimeMillis() - retryTime));
+                }
+
                 logger.info("处理luceneMessages" + luceneMessages.size() + "条,耗时:" + (System.currentTimeMillis() - startTime));
                 logger.info("处理luceneMessages" + luceneMessages.size() + "条,耗时:" + (System.currentTimeMillis() - startTime));
                 // 如果消息不止一页,立即消费下一页
                 // 如果消息不止一页,立即消费下一页
                 if (sPage.getTotalPage() > 1){
                 if (sPage.getTotalPage() > 1){
@@ -218,19 +243,28 @@ public class JmsListener {
     /**
     /**
      * 对得到的队列消息进行解析,之后根据解析出来的对象,对lucene索引进行添加、更新或删除操作
      * 对得到的队列消息进行解析,之后根据解析出来的对象,对lucene索引进行添加、更新或删除操作
      *
      *
-     * @param luceneMessage 消息
+     * @param luceneMessages 消息
      */
      */
-    private void process(LuceneMessage luceneMessage) {
+    private List<Long> process(List<LuceneMessage> luceneMessages) {
         long startTime = System.currentTimeMillis();
         long startTime = System.currentTimeMillis();
-        logger.info(luceneMessage.toString());
-        Long id = luceneMessage.getId();
-        luceneMessage.setRetryCount(luceneMessage.getRetryCount() + 1);
-        luceneMessageDao.save(luceneMessage);
-        ParsedQueueMessage parsedQueueMessage = queueMessageParser.parse(luceneMessage.getTableName(), luceneMessage.getDataId(), luceneMessage.getMethodType(), luceneMessage.getData());
-        logger.info("获取源数据耗时:" + (System.currentTimeMillis() - startTime));
-        List<Object> maintainedObjects = indexService.maintainIndexes(parsedQueueMessage);
-        logger.info("Maintained... " + maintainedObjects);
-        luceneMessageDao.dequeueLuceneMessage(id);
-        logger.info("单条索引处理耗时:" + (System.currentTimeMillis() - startTime));
+        List<Long> successIds = new ArrayList<>();
+        List<Long> failedIds = new ArrayList<>();
+        for (LuceneMessage luceneMessage : luceneMessages) {
+            try {
+                ParsedQueueMessage parsedQueueMessage = queueMessageParser.parse(luceneMessage.getTableName(), luceneMessage.getDataId(), luceneMessage.getMethodType(), luceneMessage.getData());
+                List<Object> maintainedObjects = indexService.maintainIndexes(parsedQueueMessage);
+                logger.info("单条耗时" + (System.currentTimeMillis() - startTime) + ",Maintained... " + maintainedObjects);
+                successIds.add(luceneMessage.getId());
+            } catch (Exception e) {
+                failedIds.add(luceneMessage.getId());
+            }
+        }
+        if (!CollectionUtils.isEmpty(successIds)) {
+            long successTime = System.currentTimeMillis();
+            luceneMessageDao.dequeueLuceneMessages(successIds);
+            luceneMessageDao.deleteByIds(successIds);
+            logger.info("处理成功" + successIds.size() + "条,耗时" + (System.currentTimeMillis() - successTime));
+        }
+        return failedIds;
     }
     }
 }
 }

+ 28 - 0
mall-search/src/main/java/com/uas/search/jms/LuceneMessageDao.java

@@ -3,6 +3,7 @@ package com.uas.search.jms;
 import com.uas.search.annotation.NotEmpty;
 import com.uas.search.annotation.NotEmpty;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.JpaRepository;
 import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
 import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
+import org.springframework.data.jpa.repository.Modifying;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.jpa.repository.Query;
 import org.springframework.data.jpa.repository.query.Procedure;
 import org.springframework.data.jpa.repository.query.Procedure;
 import org.springframework.data.repository.query.Param;
 import org.springframework.data.repository.query.Param;
@@ -62,4 +63,31 @@ public interface LuceneMessageDao
     @Transactional
     @Transactional
     @Procedure(procedureName = "dequeue_lucene_message")
     @Procedure(procedureName = "dequeue_lucene_message")
     void dequeueLuceneMessage(@NotEmpty("id") Long id);
     void dequeueLuceneMessage(@NotEmpty("id") Long id);
+
+    /**
+     * 批量将已处理消息插入消息历史
+     * @param ids
+     */
+    @Modifying
+    @Transactional
+    @Query(value = "insert into lucene$message_history (mh_dequeue_time, me_id, me_table_name, me_method_type, me_data_id, me_data, me_priority, me_create_time) select now(), me_id, me_table_name, me_method_type, me_data_id, me_data, me_priority, me_create_time from lucene$message where me_id in :ids", nativeQuery = true)
+    void dequeueLuceneMessages(@Param("ids") List<Long> ids);
+
+    /**
+     * 批量删除已处理消息
+     * @param ids
+     */
+    @Modifying
+    @Transactional
+    @Query(value = "delete from lucene$message where me_id in :ids", nativeQuery = true)
+    void deleteByIds(@Param("ids") List<Long> ids);
+
+    /**
+     * 批量更新处理失败消息重试次数+1
+     * @param ids
+     */
+    @Modifying
+    @Transactional
+    @Query(value = "update lucene$message set me_retry_count = me_retry_count+1 where me_id in :ids", nativeQuery = true)
+    void updateReTryCount(@Param("ids") List<Long> ids);
 }
 }

+ 6 - 8
mall-search/src/main/java/com/uas/search/schedule/TaskServiceImpl.java

@@ -1,26 +1,24 @@
 package com.uas.search.schedule;
 package com.uas.search.schedule;
 
 
 import com.uas.search.annotation.NotEmpty;
 import com.uas.search.annotation.NotEmpty;
-import com.uas.search.util.ExceptionUtils;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Service;
 import com.uas.search.util.CollectionUtils;
 import com.uas.search.util.CollectionUtils;
 import com.uas.search.util.StringUtils;
 import com.uas.search.util.StringUtils;
-
 import java.io.File;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.List;
 import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Objects;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
 
 
 /**
 /**
  * 管理定时任务
  * 管理定时任务