Browse Source

rabbit 配置

liuam 7 years ago
parent
commit
07f4842321

+ 14 - 6
src/main/java/com/uas/ps/inquiry/controller/DeadlineTask.java

@@ -10,6 +10,7 @@ import com.uas.ps.inquiry.model.Enterprise;
 import com.uas.ps.inquiry.model.InquiryData;
 import com.uas.ps.inquiry.model.InquiryRemind;
 import com.uas.ps.inquiry.model.PublicInquiryItem;
+import com.uas.ps.inquiry.rabbit.MessageInfo;
 import com.uas.ps.inquiry.rabbit.RabbitConstants;
 import com.uas.ps.inquiry.service.InquiryService;
 import com.uas.ps.inquiry.util.FlexJsonUtils;
@@ -17,16 +18,14 @@ import com.uas.ps.inquiry.util.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.support.CorrelationData;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.jdbc.core.JdbcTemplate;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
 
-import java.util.ArrayList;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * 求购截止,每日晚上轮询一次
@@ -157,8 +156,17 @@ public class DeadlineTask {
                 try {
                     long start = System.currentTimeMillis();
                     log.info("发送消息");
-                    rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, models);
-                    log.info("公共询价, 定时发动消息: {}, 耗时: {}", FlexJsonUtils.toJsonDeep(models), (System.currentTimeMillis() - start));
+
+                    //设置消息ID
+                    MessageInfo messageInfo = new MessageInfo();
+                    messageInfo.setMsgId(UUID.randomUUID().toString());
+                    //设置时间戳
+                    messageInfo.setTimestamp(System.currentTimeMillis());
+                    messageInfo.setData(models);
+                    CorrelationData correlationData = new CorrelationData(messageInfo.getMsgId());
+                    rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, messageInfo, correlationData);
+
+                    log.info("公共询价, 定时发动消息: {}, 耗时: {}", models.size(), (System.currentTimeMillis() - start));
                 } catch (Exception e) {
                     e.printStackTrace();
                 }

+ 62 - 0
src/main/java/com/uas/ps/inquiry/rabbit/MessageInfo.java

@@ -0,0 +1,62 @@
+package com.uas.ps.inquiry.rabbit;
+
+/**
+ * 消息实体
+ * @author liuam
+ * @since 2018/9/13 0013 下午 14:44
+ */
+public class MessageInfo<T> {
+
+    /**
+     * 消息ID
+     */
+    private String msgId;
+
+    /**
+     * 时间戳
+     */
+    private long timestamp;
+
+    /**
+     * 重试次数
+     */
+    private int retryCount;
+
+    /**
+     * 数据
+     */
+    private T data;
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public int getRetryCount() {
+        return retryCount;
+    }
+
+    public void setRetryCount(int retryCount) {
+        this.retryCount = retryCount;
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        this.data = data;
+    }
+
+}

+ 1 - 0
src/main/java/com/uas/ps/inquiry/rabbit/RabbitConstants.java

@@ -1,6 +1,7 @@
 package com.uas.ps.inquiry.rabbit;
 
 /**
+ * 消息常量类
  * @author liuam
  * @since 2018/9/12 0012 上午 10:10
  */

+ 36 - 3
src/main/java/com/uas/ps/inquiry/service/impl/InquiryServiceImpl.java

@@ -16,6 +16,7 @@ import com.uas.ps.inquiry.page.criteria.LogicalExpression;
 import com.uas.ps.inquiry.page.criteria.PredicateUtils;
 import com.uas.ps.inquiry.page.criteria.SimpleExpression;
 import com.uas.ps.inquiry.page.exception.IllegalOperatorException;
+import com.uas.ps.inquiry.rabbit.MessageInfo;
 import com.uas.ps.inquiry.rabbit.RabbitConstants;
 import com.uas.ps.inquiry.service.InquiryService;
 import com.uas.ps.inquiry.service.PublicInquiryService;
@@ -27,6 +28,7 @@ import javassist.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.support.CorrelationData;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.domain.Page;
 import org.springframework.data.domain.Sort;
@@ -878,7 +880,17 @@ public class InquiryServiceImpl implements InquiryService {
                     model.setSmTemplate(SMS_TEMP_ID);
                     models.add(model);
                     long start = System.currentTimeMillis();
-                    rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, models);
+
+                    //设置消息ID
+                    MessageInfo messageInfo = new MessageInfo();
+                    messageInfo.setMsgId(UUID.randomUUID().toString());
+                    //设置时间戳
+                    messageInfo.setTimestamp(System.currentTimeMillis());
+                    messageInfo.setData(models);
+                    CorrelationData correlationData = new CorrelationData(messageInfo.getMsgId());
+                    rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, messageInfo, correlationData);
+
+
                     log.info("/message 耗时:{},消息数:{}", (System.currentTimeMillis() - start), models.size());
                     log.info("公共询价", "此次{}公司新增{}张公共询价", company, count);
                 } catch (Exception e) {
@@ -928,7 +940,18 @@ public class InquiryServiceImpl implements InquiryService {
                         }
                         if (models.size() >= 500) {
                             long start = System.currentTimeMillis();
-                            rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, models);
+
+
+                            //设置消息ID
+                            MessageInfo messageInfo = new MessageInfo();
+                            messageInfo.setMsgId(UUID.randomUUID().toString());
+                            //设置时间戳
+                            messageInfo.setTimestamp(System.currentTimeMillis());
+                            messageInfo.setData(models);
+                            CorrelationData correlationData = new CorrelationData(messageInfo.getMsgId());
+                            rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, messageInfo, correlationData);
+
+
                             log.info("消息中心生成消息");
                             log.info("发送消息{},耗时:{}", models.size(), (System.currentTimeMillis() - start));
                             models = new ArrayList<>();
@@ -936,7 +959,17 @@ public class InquiryServiceImpl implements InquiryService {
                     }
                     if (!CollectionUtils.isEmpty(models)) {
                         long start = System.currentTimeMillis();
-                        rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, models);
+
+                        //设置消息ID
+                        MessageInfo messageInfo = new MessageInfo();
+                        messageInfo.setMsgId(UUID.randomUUID().toString());
+                        //设置时间戳
+                        messageInfo.setTimestamp(System.currentTimeMillis());
+                        messageInfo.setData(models);
+                        CorrelationData correlationData = new CorrelationData(messageInfo.getMsgId());
+                        rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, messageInfo, correlationData);
+
+
                         log.info("消息中心生成消息");
                         log.info("发送消息{},耗时:{}", models.size(), (System.currentTimeMillis() - start));
                     }

+ 16 - 2
src/main/java/com/uas/ps/inquiry/service/impl/PublicInquiryServiceImpl.java

@@ -15,12 +15,17 @@ import com.uas.ps.inquiry.page.PageInfo;
 import com.uas.ps.inquiry.page.SearchFilter;
 import com.uas.ps.inquiry.page.criteria.*;
 import com.uas.ps.inquiry.page.exception.IllegalOperatorException;
+import com.uas.ps.inquiry.rabbit.MessageInfo;
 import com.uas.ps.inquiry.rabbit.RabbitConstants;
 import com.uas.ps.inquiry.service.PublicInquiryService;
-import com.uas.ps.inquiry.util.*;
+import com.uas.ps.inquiry.util.FlexJsonUtils;
+import com.uas.ps.inquiry.util.HttpUtil;
+import com.uas.ps.inquiry.util.IPageUtils;
+import com.uas.ps.inquiry.util.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.support.CorrelationData;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.dao.DataAccessException;
 import org.springframework.data.domain.Page;
@@ -532,7 +537,16 @@ public class PublicInquiryServiceImpl implements PublicInquiryService {
                 try {
                     long start = System.currentTimeMillis();
                     log.info("发送消息");
-                    rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, models);
+
+                    //设置消息ID
+                    MessageInfo messageInfo = new MessageInfo();
+                    messageInfo.setMsgId(UUID.randomUUID().toString());
+                    //设置时间戳
+                    messageInfo.setTimestamp(System.currentTimeMillis());
+                    messageInfo.setData(models);
+                    CorrelationData correlationData = new CorrelationData(messageInfo.getMsgId());
+                    rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, messageInfo, correlationData);
+
                     log.info("消息请求发送");
                     log.info("/messages , 条数:{},耗时:{}", models.size(), (System.currentTimeMillis() - start));
                 } catch (Exception e) {

+ 1 - 1
src/main/java/com/uas/ps/inquiry/util/ThreadUtils.java

@@ -223,7 +223,7 @@ public class ThreadUtils {
 		}
 
 		private Executor() {
-			this.threadPool = Executors.newFixedThreadPool(1000);
+			this.threadPool = Executors.newFixedThreadPool(100);
 		}
 
 		public Executor setTimeout(int timeout) {

+ 1 - 0
src/main/resources/config/application-cloud.properties

@@ -24,6 +24,7 @@ spring.rabbitmq.listener.simple.retry.enabled=true
 spring.rabbitmq.listener.simple.retry.max-attempts=5
 spring.rabbitmq.cache.channel.size=100
 spring.rabbitmq.cache.channel.checkout-timeout=10000
+spring.rabbitmq.requested-heartbeat=30
 # Access path
 ps.product.url=https://api-product.usoftmall.com/
 ps.message.url=http://api-message.ubtob.com/

+ 1 - 0
src/main/resources/config/application-dev.properties

@@ -24,6 +24,7 @@ spring.rabbitmq.listener.simple.retry.enabled=true
 spring.rabbitmq.listener.simple.retry.max-attempts=5
 spring.rabbitmq.cache.channel.size=100
 spring.rabbitmq.cache.channel.checkout-timeout=10000
+spring.rabbitmq.requested-heartbeat=30
 # Access path
 #ps.product.url=http://192.168.253.12:24000/
 #dong localhost

+ 1 - 0
src/main/resources/config/application-test.properties

@@ -24,6 +24,7 @@ spring.rabbitmq.listener.simple.retry.enabled=true
 spring.rabbitmq.listener.simple.retry.max-attempts=5
 spring.rabbitmq.cache.channel.size=100
 spring.rabbitmq.cache.channel.checkout-timeout=10000
+spring.rabbitmq.requested-heartbeat=30
 # Access path
 ps.product.url=http://218.17.158.219:24000/
 ps.message.url=http://218.17.158.219:24000/message/

+ 1 - 0
src/main/resources/config/application-txcloud.properties

@@ -24,6 +24,7 @@ spring.rabbitmq.listener.simple.retry.enabled=true
 spring.rabbitmq.listener.simple.retry.max-attempts=5
 spring.rabbitmq.cache.channel.size=100
 spring.rabbitmq.cache.channel.checkout-timeout=10000
+spring.rabbitmq.requested-heartbeat=30
 # Access path
 ps.product.url=https://172.21.0.47:8080/
 ps.message.url=http://api-message.ubtob.com/