浏览代码

发送给消息服务的内容发送给rabbitmq

liuam 7 年之前
父节点
当前提交
219a4c554b

+ 6 - 10
pom.xml

@@ -10,11 +10,6 @@
     <packaging>jar</packaging>
 
     <dependencies>
-        <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
-            <scope>test</scope>
-        </dependency>
 
         <dependency>
             <groupId>com.uas.ps</groupId>
@@ -121,11 +116,6 @@
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
         </dependency>
-        <dependency>
-            <groupId>org.junit.jupiter</groupId>
-            <artifactId>junit-jupiter-api</artifactId>
-            <version>RELEASE</version>
-        </dependency>
 
         <dependency>
             <groupId>org.apache.commons</groupId>
@@ -133,6 +123,12 @@
             <version>3.3.2</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
 
     </dependencies>
 

+ 0 - 2
src/main/java/com/uas/ps/inquiry/SchedulingConfig.java

@@ -11,7 +11,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.scheduling.Trigger;
 import org.springframework.scheduling.TriggerContext;
-import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.SchedulingConfigurer;
 import org.springframework.scheduling.config.ScheduledTaskRegistrar;
 import org.springframework.scheduling.support.CronTrigger;
@@ -26,7 +25,6 @@ import java.util.List;
  * created by shicr on 2018/6/8
  **/
 @Configuration
-@EnableScheduling
 public class SchedulingConfig implements SchedulingConfigurer {
 
     private final Logger logger = LoggerFactory.getLogger(SchedulingConfig.class);

+ 6 - 9
src/main/java/com/uas/ps/inquiry/controller/DeadlineTask.java

@@ -10,15 +10,15 @@ import com.uas.ps.inquiry.model.Enterprise;
 import com.uas.ps.inquiry.model.InquiryData;
 import com.uas.ps.inquiry.model.InquiryRemind;
 import com.uas.ps.inquiry.model.PublicInquiryItem;
+import com.uas.ps.inquiry.rabbit.RabbitConstants;
 import com.uas.ps.inquiry.service.InquiryService;
 import com.uas.ps.inquiry.util.FlexJsonUtils;
-import com.uas.ps.inquiry.util.HttpUtil;
 import com.uas.ps.inquiry.util.ThreadUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.jdbc.core.JdbcTemplate;
-import org.springframework.scheduling.annotation.EnableAsync;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 import org.springframework.util.CollectionUtils;
@@ -34,7 +34,6 @@ import java.util.Map;
  * @version 2018年6月12日 15:00  询价短信定时发送的任务移到这里  dongbw
  */
 @Component
-@EnableAsync
 public class DeadlineTask {
     @Autowired
     private PurcInquiryItemDao purcInquiryItemDao;
@@ -51,6 +50,9 @@ public class DeadlineTask {
     @Autowired
     private InquiryDataDao inquiryDataDao;
 
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
     private static final Logger log = LoggerFactory.getLogger(DeadlineTask.class);
 
     /**
@@ -98,11 +100,6 @@ public class DeadlineTask {
      */
     private final String INQUIRY_TYPE_SELLER_MALL = "MALL跳转卖家待报价页面";
 
-    /**
-     * 公共消息访问地址
-     */
-    private final String PS_MESSAGE_URL = ContextUtils.getBean(AccessConfiguration.class).getPsMessageUrl();
-
     /**
      * 询价统计通知发送人UU
      */
@@ -160,7 +157,7 @@ public class DeadlineTask {
                 try {
                     long start = System.currentTimeMillis();
                     log.info("发送消息");
-                    String res = HttpUtil.doPost(PS_MESSAGE_URL + "/messages", FlexJsonUtils.toJsonDeep(models));
+                    rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, models);
                     log.info("公共询价, 定时发动消息: {}, 耗时: {}", FlexJsonUtils.toJsonDeep(models), (System.currentTimeMillis() - start));
                 } catch (Exception e) {
                     e.printStackTrace();

+ 22 - 17
src/main/java/com/uas/ps/inquiry/rabbit/RabbitConfig.java

@@ -4,43 +4,48 @@ import org.springframework.amqp.core.AmqpAdmin;
 import org.springframework.amqp.core.Binding;
 import org.springframework.amqp.core.DirectExchange;
 import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
 import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
 import javax.annotation.PostConstruct;
 
 /**
+ * rabbit 配置
  * @author liuam
  * @since 2018/9/11 0011 下午 16:30
  */
 @Configuration
 public class RabbitConfig {
 
-    private final String AMQ_DIRECT = "amq.direct";
-    private final String INQUIRY_MESSAGE = "inquiry_message";
-    private final String INQUIRY_MESSAGE_ROUTINGKEY = "message";
-    private final String INQUIRY_PRODUCT_ROUTINGKEY = "product";
-    private final String INQUIRY_PRODUCT = "inquiry_product";
+
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    @Autowired
+    private RabbitConfirmCallback rabbitConfirmCallback;
 
     @Autowired
     private AmqpAdmin amqpAdmin;
 
     @PostConstruct
     public void init() {
-
-        amqpAdmin.declareExchange(new DirectExchange(AMQ_DIRECT, true, false, null));
-        amqpAdmin.declareQueue(new Queue(INQUIRY_MESSAGE));
-        amqpAdmin.declareQueue(new Queue(INQUIRY_PRODUCT));
-
-        amqpAdmin.declareBinding(new Binding(INQUIRY_MESSAGE, Binding.DestinationType.QUEUE, AMQ_DIRECT, INQUIRY_MESSAGE_ROUTINGKEY, null));
-        amqpAdmin.declareBinding(new Binding(INQUIRY_PRODUCT, Binding.DestinationType.QUEUE, AMQ_DIRECT, INQUIRY_PRODUCT_ROUTINGKEY, null));
+//      如果消息没有到exchange,则confirm回调,ack=false
+//		如果消息到达exchange,则confirm回调,ack=true
+//		exchange到queue成功,则不回调return
+//		exchange到queue失败,则回调return(否则不回回调,消息就丢了)
+        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
+        rabbitTemplate.setConfirmCallback(rabbitConfirmCallback);
+        rabbitTemplate.setReturnCallback(rabbitConfirmCallback);
+
+        amqpAdmin.declareExchange(new DirectExchange(RabbitConstants.AMQ_DIRECT, true, false, null));
+        amqpAdmin.declareQueue(new Queue(RabbitConstants.INQUIRY_MESSAGE));
+        amqpAdmin.declareQueue(new Queue(RabbitConstants.INQUIRY_PRODUCT));
+
+        amqpAdmin.declareBinding(new Binding(RabbitConstants.INQUIRY_MESSAGE, Binding.DestinationType.QUEUE, RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, null));
+        amqpAdmin.declareBinding(new Binding(RabbitConstants.INQUIRY_PRODUCT, Binding.DestinationType.QUEUE, RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_PRODUCT_ROUTINGKEY, null));
     }
 
-    @Bean
-    public Jackson2JsonMessageConverter messageConverter() {
-        return new Jackson2JsonMessageConverter();
-    }
 
 }

+ 35 - 0
src/main/java/com/uas/ps/inquiry/rabbit/RabbitConfirmCallback.java

@@ -0,0 +1,35 @@
+package com.uas.ps.inquiry.rabbit;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.support.CorrelationData;
+import org.springframework.stereotype.Component;
+
+/**
+ * rabbit 生产者确认回调
+ * @author liuam
+ * @since 2018/9/12 0012 上午 9:53
+ */
+@Component
+public class RabbitConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitConfirmCallback.class);
+
+    @Override
+    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+        if (ack) {
+            LOGGER.info("发送消息确认[RabbitConfirmCallback.confirm].正常,correlationData:{}", correlationData.getId());
+        } else {
+            LOGGER.error("发送消息确认[RabbitConfirmCallback.confirm].异常,correlationData:{},cause:{}", correlationData.getId(), cause);
+        }
+    }
+
+    @Override
+    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
+        LOGGER.error("发送消息确认到达队列[RabbitConfirmCallback.returnedMessage].异常,message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",
+                message, replyCode, replyText, exchange, routingKey);
+    }
+
+}

+ 34 - 0
src/main/java/com/uas/ps/inquiry/rabbit/RabbitConstants.java

@@ -0,0 +1,34 @@
+package com.uas.ps.inquiry.rabbit;
+
+/**
+ * @author liuam
+ * @since 2018/9/12 0012 上午 10:10
+ */
+public class RabbitConstants {
+
+    /**
+     * exchange name
+     */
+    public final static String AMQ_DIRECT = "amq.direct";
+
+    /**
+     * message queue name
+     */
+    public final static String INQUIRY_MESSAGE = "inquiry_message";
+
+    /**
+     * message routingKey
+     */
+    public final static String INQUIRY_MESSAGE_ROUTINGKEY = "message";
+
+    /**
+     * product routingKey
+     */
+    public final static String INQUIRY_PRODUCT_ROUTINGKEY = "product";
+
+    /**
+     * product routingKey
+     */
+    public final static String INQUIRY_PRODUCT = "inquiry_product";
+
+}

+ 0 - 79
src/main/java/com/uas/ps/inquiry/rabbit/RabbitSendService.java

@@ -1,79 +0,0 @@
-package com.uas.ps.inquiry.rabbit;
-
-import com.alibaba.fastjson.JSON;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.amqp.core.Message;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.amqp.rabbit.support.CorrelationData;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import javax.annotation.PostConstruct;
-import java.util.UUID;
-
-/**
- * RabbitMQ 发送消息
- *
- * @author: wangcanyi
- * @date: 2018-08-30 15:11
- **/
-@Service
-public class RabbitSendService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
-	private static final Logger LOGGER = LoggerFactory.getLogger(RabbitSendService.class);
-	@Autowired
-	private RabbitTemplate rabbitTemplate;
-
-	@PostConstruct
-	public void init() {
-//		如果消息没有到exchange,则confirm回调,ack=false
-//		如果消息到达exchange,则confirm回调,ack=true
-//		exchange到queue成功,则不回调return
-//		exchange到queue失败,则回调return(否则不回回调,消息就丢了)
-		rabbitTemplate.setConfirmCallback(this);
-		rabbitTemplate.setReturnCallback(this);
-	}
-
-
-	/**
-	 * 发送消息
-	 *
-	 * @param queueName   队列名
-	 * @param object 消息体
-	 * @return 消息ID
-	 */
-	public String sendMessage(String queueName, Object object) {
-		if (StringUtils.isBlank(queueName)) {
-			throw new IllegalArgumentException("queueName不能为空");
-		}
-		if (object == null) {
-			throw new IllegalArgumentException("messageInfo不能为空");
-		}
-		//设置消息ID
-		messageInfo.setMsgId(UUID.randomUUID().toString());
-		//设置时间戳
-		messageInfo.setTimestamp(System.currentTimeMillis());
-		CorrelationData correlationData = new CorrelationData(messageInfo.getMsgId());
-		String messageJson = JSON.toJSONString(messageInfo);
-		rabbitTemplate.convertAndSend(queueName, (Object) messageJson, correlationData);
-		LOGGER.info("发送消息[RabbitSendService.sendMessage].正常,queueName:{},messageInfo:{},correlationData:{}",
-				queueName, messageJson, correlationData.getId());
-		return messageInfo.getMsgId();
-	}
-
-	@Override
-	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
-		if (ack) {
-			LOGGER.info("发送消息确认[RabbitSendService.confirm].正常,correlationData:{}", correlationData.getId());
-		} else {
-			LOGGER.error("发送消息确认[RabbitSendService.confirm].异常,correlationData:{},cause:{}", correlationData.getId(), cause);
-		}
-	}
-
-	@Override
-	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
-		LOGGER.error("发送消息确认到达队列[RabbitSendService.returnedMessage].异常,message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",
-				message, replyCode, replyText, exchange, routingKey);
-	}
-}

+ 7 - 8
src/main/java/com/uas/ps/inquiry/service/impl/InquiryServiceImpl.java

@@ -16,6 +16,7 @@ import com.uas.ps.inquiry.page.criteria.LogicalExpression;
 import com.uas.ps.inquiry.page.criteria.PredicateUtils;
 import com.uas.ps.inquiry.page.criteria.SimpleExpression;
 import com.uas.ps.inquiry.page.exception.IllegalOperatorException;
+import com.uas.ps.inquiry.rabbit.RabbitConstants;
 import com.uas.ps.inquiry.service.InquiryService;
 import com.uas.ps.inquiry.service.PublicInquiryService;
 import com.uas.ps.inquiry.util.FlexJsonUtils;
@@ -25,6 +26,7 @@ import com.uas.ps.inquiry.util.ThreadUtils;
 import javassist.NotFoundException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.domain.Page;
 import org.springframework.data.domain.Sort;
@@ -88,10 +90,6 @@ public class InquiryServiceImpl implements InquiryService {
      */
     private final String INQUIRY_TYPE_SELLER_MALL = "MALL跳转卖家待报价页面";
 
-    /**
-     * 公共消息访问地址
-     */
-    private final String PS_MESSAGE_URL = ContextUtils.getBean(AccessConfiguration.class).getPsMessageUrl();
 
     /**
      * 询价统计通知发送人UU
@@ -139,7 +137,8 @@ public class InquiryServiceImpl implements InquiryService {
     @Autowired
     private InquiryEnRemindDao inquiryEnRemindDao;
 
-
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
     /**
      * 公共物料访问地址
      */
@@ -874,7 +873,7 @@ public class InquiryServiceImpl implements InquiryService {
                     model.setSmTemplate(SMS_TEMP_ID);
                     models.add(model);
                     long start = System.currentTimeMillis();
-                    HttpUtil.doPost(PS_MESSAGE_URL + "/messages", FlexJsonUtils.toJsonDeep(models));
+                    rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, models);
                     log.info("/message 耗时:{},消息数:{}", (System.currentTimeMillis() - start), models.size());
                     log.info("公共询价", "此次{}公司新增{}张公共询价", company, count);
                 } catch (Exception e) {
@@ -924,7 +923,7 @@ public class InquiryServiceImpl implements InquiryService {
                         }
                         if (models.size() >= 500) {
                             long start = System.currentTimeMillis();
-                            String res = HttpUtil.doPost(PS_MESSAGE_URL + "/messages", FlexJsonUtils.toJsonDeep(models));
+                            rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, models);
                             log.info("消息中心生成消息");
                             log.info("发送消息{},耗时:{}", models.size(), (System.currentTimeMillis() - start));
                             models = new ArrayList<>();
@@ -932,7 +931,7 @@ public class InquiryServiceImpl implements InquiryService {
                     }
                     if (!CollectionUtils.isEmpty(models)) {
                         long start = System.currentTimeMillis();
-                        String res = HttpUtil.doPost(PS_MESSAGE_URL + "/messages", FlexJsonUtils.toJsonDeep(models));
+                        rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, models);
                         log.info("消息中心生成消息");
                         log.info("发送消息{},耗时:{}", models.size(), (System.currentTimeMillis() - start));
                     }

+ 5 - 7
src/main/java/com/uas/ps/inquiry/service/impl/PublicInquiryServiceImpl.java

@@ -15,10 +15,12 @@ import com.uas.ps.inquiry.page.PageInfo;
 import com.uas.ps.inquiry.page.SearchFilter;
 import com.uas.ps.inquiry.page.criteria.*;
 import com.uas.ps.inquiry.page.exception.IllegalOperatorException;
+import com.uas.ps.inquiry.rabbit.RabbitConstants;
 import com.uas.ps.inquiry.service.PublicInquiryService;
 import com.uas.ps.inquiry.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.dao.DataAccessException;
 import org.springframework.data.domain.Page;
@@ -98,7 +100,8 @@ public class PublicInquiryServiceImpl implements PublicInquiryService {
     @Autowired
     private InquiryDataDao inquiryDataDao;
 
-
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
     /**
      * 应用来源
      */
@@ -170,11 +173,6 @@ public class PublicInquiryServiceImpl implements PublicInquiryService {
      */
     private final String PS_PRODUCT_URL = ContextUtils.getBean(AccessConfiguration.class).getPsProductUrl();
 
-    /**
-     * 公共消息访问地址
-     */
-    private final String PS_MESSAGE_URL = ContextUtils.getBean(AccessConfiguration.class).getPsMessageUrl();
-
     /**
      * 商城链接
      */
@@ -534,7 +532,7 @@ public class PublicInquiryServiceImpl implements PublicInquiryService {
                 try {
                     long start = System.currentTimeMillis();
                     log.info("发送消息");
-                    String res = HttpUtil.doPost(PS_MESSAGE_URL + "/messages", FlexJsonUtils.toJsonDeep(models));
+                    rabbitTemplate.convertAndSend(RabbitConstants.AMQ_DIRECT, RabbitConstants.INQUIRY_MESSAGE_ROUTINGKEY, models);
                     log.info("消息请求发送");
                     log.info("/messages , 条数:{},耗时:{}", models.size(), (System.currentTimeMillis() - start));
                 } catch (Exception e) {

+ 2 - 1
src/main/resources/application.yml

@@ -7,8 +7,9 @@ spring:
     dialect: org.hibernate.dialect.MySQL5Dialect
     hbm2ddl:
      auto: update
+ profiles:
+   active: dev
 
 sso:
  secretKey: 0taQcW073Z7G628g5H
 
-

+ 29 - 0
src/main/test/java/com/uas/ps/inquiry/ApplicationContextRegister.java

@@ -0,0 +1,29 @@
+package com.uas.ps.inquiry;
+
+import com.uas.ps.core.util.ContextUtils;
+import org.apache.log4j.Logger;
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+/**
+ * 注册Spring上下文对象
+ * @author liuam
+ * @since 2018/6/25 0025 下午 15:09
+ */
+@Component
+public class ApplicationContextRegister implements ApplicationContextAware {
+
+    private static Logger logger = Logger.getLogger(ApplicationContextRegister.class);
+
+    /**
+     * 将上下文对象保存到ContextUtils中
+     * @param applicationContext
+     * @throws BeansException
+     */
+    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
+        ContextUtils.setApplicationContext(applicationContext);
+        logger.debug("ApplicationContext registed");
+    }
+}

+ 30 - 0
src/main/test/java/com/uas/ps/inquiry/TestInquiry.java

@@ -0,0 +1,30 @@
+package com.uas.ps.inquiry;
+
+import com.uas.ps.inquiry.rabbit.RabbitConstants;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.context.annotation.Import;
+import org.springframework.test.context.junit4.SpringRunner;
+
+/**
+ * @author liuam
+ * @since 2018/9/12 0012 上午 11:24
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest(classes = Application.class)
+@Import(ApplicationContextRegister.class)
+public class TestInquiry {
+
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    @Test
+    public void testRabbitReceive() {
+        Object o = rabbitTemplate.receiveAndConvert(RabbitConstants.INQUIRY_MESSAGE);
+        System.out.println(o);
+    }
+
+}