Selaa lähdekoodia

队列消息结构调整

guq 7 vuotta sitten
vanhempi
commit
9e12a37570

+ 25 - 0
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/config/ExpirationMessagePostProcessor.java

@@ -0,0 +1,25 @@
+package com.usoftchina.saas.transfers.config;
+
+import org.springframework.amqp.AmqpException;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.core.MessagePostProcessor;
+
+/**
+ * @author: guq
+ * @create: 2019-01-07 18:27
+ **/
+public class ExpirationMessagePostProcessor implements MessagePostProcessor {
+
+    private final Long ttl; // 毫秒
+
+    public ExpirationMessagePostProcessor(Long ttl) {
+        this.ttl = ttl;
+    }
+
+    @Override
+    public Message postProcessMessage(Message message) throws AmqpException {
+        message.getMessageProperties() .setExpiration(ttl.toString());
+        // 设置message的失效时间
+        return message;
+    }
+}

+ 44 - 32
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/config/RabbitConfig.java

@@ -1,6 +1,8 @@
 package com.usoftchina.saas.transfers.config;
 
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.context.annotation.Configuration;
 
 import java.io.Serializable;
 
@@ -9,57 +11,67 @@ import java.io.Serializable;
  * @Author chenwei
  * @Date 2018/12/28
  */
-@ConfigurationProperties("spring.rabbitmq")
+@ConfigurationProperties(prefix = "rabbit.config")
 public class RabbitConfig implements Serializable {
 
-//    @Value("${spring.rabbitmq.host}")
-    private String host;
-//    @Value("${spring.rabbitmq.port}")
-    private int port;
-//    @Value("${spring.rabbitmq.username}")
-    private String username;
-//    @Value("${spring.rabbitmq.password}")
-    private String password;
-//    @Value("${spring.rabbitmq.virtual-host}")
-    private String virtualHost;
-
-    public String getHost() {
-        return host;
+    private String exchange;
+
+    private String dlexchange;
+
+    private String queue;
+
+    private String dlqueue;
+
+    private String routingkey;
+
+    private Long expired;
+
+    public String getExchange() {
+        return exchange;
     }
 
-    public void setHost(String host) {
-        this.host = host;
+    public void setExchange(String exchange) {
+        this.exchange = exchange;
     }
 
-    public int getPort() {
-        return port;
+    public String getDlexchange() {
+        return dlexchange;
     }
 
-    public void setPort(int port) {
-        this.port = port;
+    public void setDlexchange(String dlexchange) {
+        this.dlexchange = dlexchange;
     }
 
-    public String getUsername() {
-        return username;
+    public String getQueue() {
+        return queue;
     }
 
-    public void setUsername(String username) {
-        this.username = username;
+    public void setQueue(String queue) {
+        this.queue = queue;
     }
 
-    public String getPassword() {
-        return password;
+    public Long getExpired() {
+        return expired;
+    }
+
+    public void setExpired(Long expired) {
+        this.expired = expired;
+    }
+
+    public String getDlqueue() {
+
+        return dlqueue;
     }
 
-    public void setPassword(String password) {
-        this.password = password;
+    public void setDlqueue(String dlqueue) {
+        this.dlqueue = dlqueue;
     }
 
-    public String getVirtualHost() {
-        return virtualHost;
+    public String getRoutingkey() {
+        return routingkey;
     }
 
-    public void setVirtualHost(String virtualHost) {
-        this.virtualHost = virtualHost;
+    public void setRoutingkey(String routingkey) {
+        this.routingkey = routingkey;
     }
 }

+ 11 - 0
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/mapper/BrokerMessageLogMapper.java

@@ -0,0 +1,11 @@
+package com.usoftchina.saas.transfers.mapper;
+
+import com.usoftchina.saas.transfers.po.BrokerMessagelog;
+
+/**
+ * @author: guq
+ * @create: 2019-01-07 20:37
+ **/
+public interface BrokerMessageLogMapper {
+    BrokerMessagelog getMessage(String msgId);
+}

+ 53 - 25
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/service/BaseRabbitReceiveService.java

@@ -1,3 +1,4 @@
+/*
 package com.usoftchina.saas.transfers.service;
 
 import com.rabbitmq.client.AMQP;
@@ -24,11 +25,13 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+*/
 /**
  * @Description RabbitMq 消息接收处理基类
  * @Author chenwei
  * @Date 2018/12/27
- */
+ *//*
+
 public abstract class BaseRabbitReceiveService implements ChannelAwareMessageListener {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(BaseRabbitReceiveService.class);
@@ -47,10 +50,12 @@ public abstract class BaseRabbitReceiveService implements ChannelAwareMessageLis
     protected int requestedHeartBeat = 30;
     protected String delayQueueName;
 
-    /**
+    */
+/**
      * 初始化连接
      * @return
-     */
+     *//*
+
     private ConnectionFactory initConnectionFactory(){
         if (StringUtils.isEmpty(host)) {
             throw new IllegalArgumentException("host为空");
@@ -77,10 +82,12 @@ public abstract class BaseRabbitReceiveService implements ChannelAwareMessageLis
         return connectionFactory;
     }
 
-    /**
+    */
+/**
      * 动态创建消息监听
      * @return
-     */
+     *//*
+
     @Bean
     protected SimpleMessageListenerContainer simpleMessageListenerContainer(){
         if (!isDynamicCreate){
@@ -110,24 +117,28 @@ public abstract class BaseRabbitReceiveService implements ChannelAwareMessageLis
         processMessage(messageJson, channel, tag, queueName);
     }
 
-    /**
+    */
+/**
      * 接收消息处理
      * @param messageBytes
      * @param channel
      * @param headers
-     */
+     *//*
+
     @RabbitHandler
     public void receiveMessage(byte[] messageBytes, Channel channel, @Headers Map<String, Object> headers) {
         receiveMessage(new String(messageBytes), channel, headers);
     }
 
-    /**
+    */
+/**
      * 接收消息处理
      *
      * @param messageJson
      * @param channel
      * @param headers
-     */
+     *//*
+
     @RabbitHandler
     public void receiveMessage(String messageJson, Channel channel, @Headers Map<String, Object> headers) {
         long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
@@ -135,13 +146,15 @@ public abstract class BaseRabbitReceiveService implements ChannelAwareMessageLis
         processMessage(messageJson, channel, tag, queueName);
     }
 
-    /**
+    */
+/**
      * 处理消息
      * @param messageJson
      * @param channel
      * @param tag
      * @param queueName
-     */
+     *//*
+
     private void processMessage(String messageJson, Channel channel, long tag, String queueName) {
         MessageInfo messageInfo = getMessageInfo(messageJson);
         //当消息不符合格式MessageInfo格式时,丢弃消息
@@ -164,11 +177,13 @@ public abstract class BaseRabbitReceiveService implements ChannelAwareMessageLis
         }
     }
 
-    /**
+    */
+/**
      * 获取消息内容
      * @param messageJson
      * @return
-     */
+     *//*
+
     private MessageInfo getMessageInfo(String messageJson) {
         MessageInfo messageInfo = null;
         try {
@@ -180,15 +195,18 @@ public abstract class BaseRabbitReceiveService implements ChannelAwareMessageLis
         return messageInfo;
     }
 
-    /**
+    */
+/**
      * 发送延时消息
      * @param messageInfo
      * @param channel
      * @param queueName
-     */
+     *//*
+
     private void sendDelayMessage(MessageInfo messageInfo, Channel channel, String queueName) {
         //重试次数+1
-        /*messageInfo.setRetryCount(messageInfo.getRetryCount() + 1);
+        */
+/*messageInfo.setRetryCount(messageInfo.getRetryCount() + 1);
         String messageJson = JsonUtils.toJsonString(messageInfo);
         try {
             String dQueueName = getDelayQueueName(channel, queueName);
@@ -199,16 +217,19 @@ public abstract class BaseRabbitReceiveService implements ChannelAwareMessageLis
             LOGGER.info("发送延时消息[BaseRabbitReceiveService.sendDelayMessage].正常,messageJson:{},queueName:{}", messageJson, queueName);
         } catch (IOException e) {
             LOGGER.error("发送延时消息[BaseRabbitReceiveService.sendDelayMessage].异常,messageJson:{},queueName:{}", messageJson, queueName, e);
-        }*/
+        }*//*
+
     }
 
-    /**
+    */
+/**
      * 获取延时队列
      * @param channel
      * @param queueName
      * @return
      * @throws IOException
-     */
+     *//*
+
     private String getDelayQueueName(Channel channel, String queueName) throws IOException {
         if (!StringUtils.isEmpty(delayQueueName)) {
             return delayQueueName;
@@ -223,11 +244,13 @@ public abstract class BaseRabbitReceiveService implements ChannelAwareMessageLis
         return dQueueName;
     }
 
-    /**
+    */
+/**
      * 消息应答No
      * @param channel
      * @param tag
-     */
+     *//*
+
     private void basicNack(Channel channel, long tag) {
         try {
             channel.basicNack(tag, false, false);
@@ -237,11 +260,13 @@ public abstract class BaseRabbitReceiveService implements ChannelAwareMessageLis
         }
     }
 
-    /**
+    */
+/**
      * 消息应答Yes
      * @param channel
      * @param tag
-     */
+     *//*
+
     private void basicAck(Channel channel, long tag) {
         try {
             channel.basicAck(tag, false);
@@ -251,11 +276,14 @@ public abstract class BaseRabbitReceiveService implements ChannelAwareMessageLis
         }
     }
 
-    /**
+    */
+/**
      * 处理消息
      * @param messageInfo
      * @throws Exception
-     */
+     *//*
+
     public abstract void processMessage(MessageInfo messageInfo) throws Exception;
 
 }
+*/

+ 14 - 4
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/service/RabbitReceiveService.java

@@ -1,3 +1,4 @@
+/*
 package com.usoftchina.saas.transfers.service;
 
 import com.usoftchina.saas.transfers.config.RabbitConfig;
@@ -10,11 +11,13 @@ import org.springframework.stereotype.Service;
 
 import javax.annotation.PostConstruct;
 
+*/
 /**
  * @Description 消息消费
  * @Author chenwei
  * @Date 2018/12/28
- */
+ *//*
+
 @Service
 @EnableConfigurationProperties(RabbitConfig.class)
 public class RabbitReceiveService extends BaseRabbitReceiveService {
@@ -47,14 +50,20 @@ public class RabbitReceiveService extends BaseRabbitReceiveService {
     }
 
     private void processMessageForReceivable(String bizId, String bizType) throws Exception {
-        /*  根据不同的bizType,方案:1.调用不同的方法.    2.处理自己需要变量操作,调用同一个方法  */
+        */
+/*  根据不同的bizType,方案:1.调用不同的方法.    2.处理自己需要变量操作,调用同一个方法  *//*
+
        switch (bizType){
            case "PURCHASE":
-               /* 执行必要的操作 */
+               */
+/* 执行必要的操作 *//*
+
                execute();
                break;
            case "PURCHASECHANGE":
-               /* 执行必要的操作 */
+               */
+/* 执行必要的操作 *//*
+
                break;
            default:
                throw new Exception();
@@ -66,3 +75,4 @@ public class RabbitReceiveService extends BaseRabbitReceiveService {
     }
 
 }
+*/

+ 13 - 5
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/service/RabbitSendService.java

@@ -1,3 +1,4 @@
+/*
 package com.usoftchina.saas.transfers.service;
 
 import com.usoftchina.saas.transfers.po.MessageInfo;
@@ -15,11 +16,13 @@ import org.springframework.stereotype.Service;
 import javax.annotation.PostConstruct;
 import java.util.UUID;
 
+*/
 /**
  * @Description RabbitMQ发送消息
  * @Author chenwei
  * @Date 2018/12/27
- */
+ *//*
+
 @Service
 public class RabbitSendService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
 
@@ -38,7 +41,8 @@ public class RabbitSendService implements RabbitTemplate.ConfirmCallback, Rabbit
         rabbitTemplate.setReturnCallback(this);
     }
 
-    /**
+    */
+/**
      *
      * @param queueName 队列名
      * @param userId    用户ID
@@ -46,18 +50,21 @@ public class RabbitSendService implements RabbitTemplate.ConfirmCallback, Rabbit
      * @param bizType   业务类型
      * @param bizId     业务ID
      * @return  消息ID
-     */
+     *//*
+
     public String sendMessage(String queueName, String userId, String appId, String bizType, String bizId){
         MessageInfo messageInfo = new MessageInfo(userId, appId, bizType, bizId);
         return sendMessage(queueName, messageInfo);
     }
 
-    /**
+    */
+/**
      * 发送消息
      * @param queueName     队列名
      * @param messageInfo   消息体
      * @return
-     */
+     *//*
+
     public String sendMessage(String queueName, MessageInfo messageInfo){
         if (StringUtils.isEmpty(queueName)){
             throw new IllegalArgumentException("QueueName(队列名)不能为空!");
@@ -95,3 +102,4 @@ public class RabbitSendService implements RabbitTemplate.ConfirmCallback, Rabbit
                 message, replyCode, replyText, exchange, routingKey);
     }
 }
+*/

+ 15 - 6
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/service/ReceiveService.java

@@ -1,9 +1,13 @@
 package com.usoftchina.saas.transfers.service;
 
 import com.rabbitmq.client.Channel;
+import com.usoftchina.saas.transfers.config.RabbitConfig;
 import com.usoftchina.saas.transfers.po.MessageInfo;
 import org.springframework.amqp.rabbit.annotation.*;
 import org.springframework.amqp.support.AmqpHeaders;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.messaging.handler.annotation.Headers;
 import org.springframework.messaging.handler.annotation.Payload;
 import org.springframework.stereotype.Service;
@@ -17,31 +21,36 @@ import java.util.Map;
  **/
 @Service
 public class ReceiveService {
+
+    private final static String EXCHANGE = "saas_trade_dev_exchange";
+    private final static String DLEXCHANGE = "saas_trade_dev_dlexchange";
+    private final static String QUEUE = "saas_trade_dev_queue";
+    private final static String DLQUEUE = "saas_trade_dev_dlqueue";
+    private final static String ROUTINGKEY = "saas_trade_dev_dl.*";
+
     //配置监听的哪一个队列,同时在没有queue和exchange的情况下会去创建并建立绑定关系
-    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "test_dev",durable = "true"),
-            exchange = @Exchange(name="test",durable = "true",type = "topic"),
-            key = "abc.*"))
+    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE,durable = "true"),
+            exchange = @Exchange(name= DLEXCHANGE ,durable = "true",type = "topic"),
+            key = ROUTINGKEY))
     @RabbitHandler
     public void onMessage(@Payload MessageInfo info, @Headers Map<String,Object> headers, Channel channel)
             throws IOException {
         //消费者操作
         System.out.println("---------收到消息,开始消费---------");
         System.out.println("订单ID:" + info.getBizId());
+        System.out.println("信息ID:" + info.getMsgId());
 
         /**
          * Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,
          * 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
          * RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
          */
-
         Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
 
-
         /**
          *  multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认
          *  如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认
          */
-
         boolean multiple = false;
 
         //ACK,确认一条消息已经被消费

+ 24 - 5
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/service/SendService.java

@@ -1,9 +1,16 @@
 package com.usoftchina.saas.transfers.service;
 
+import com.usoftchina.saas.transfers.config.ExpirationMessagePostProcessor;
+import com.usoftchina.saas.transfers.config.RabbitConfig;
+import com.usoftchina.saas.transfers.mapper.BrokerMessageLogMapper;
+import com.usoftchina.saas.transfers.po.BrokerMessagelog;
 import com.usoftchina.saas.transfers.po.MessageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.amqp.rabbit.support.CorrelationData;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.stereotype.Service;
 
 /**
@@ -11,11 +18,20 @@ import org.springframework.stereotype.Service;
  * @create: 2019-01-06 22:04
  **/
 @Service
+@EnableConfigurationProperties(RabbitConfig.class)
 public class SendService {
 
     @Autowired
     private RabbitTemplate rabbitTemplate;
 
+    @Autowired
+    private RabbitConfig rabbitConfig;
+
+    @Autowired
+    private BrokerMessageLogMapper brokerMessageLogMapper;
+
+    private Logger logger = LoggerFactory.getLogger(getClass());
+
 
     //发送消息方法调用: 构建自定义对象消息
     public void sendMessage(MessageInfo info) {
@@ -23,7 +39,7 @@ public class SendService {
         rabbitTemplate.setConfirmCallback(confirmCallback);
         //消息唯一ID
         CorrelationData correlationData = new CorrelationData(info.getMsgId());
-        rabbitTemplate.convertAndSend("test", "abc.eqw", info, correlationData);
+        rabbitTemplate.convertAndSend(rabbitConfig.getDlexchange(), rabbitConfig.getRoutingkey(), info, correlationData);
     }
 
     //发送消息方法调用: 构建自定义对象消息
@@ -32,22 +48,25 @@ public class SendService {
         rabbitTemplate.setConfirmCallback(delayConfirmCallback);
         //消息唯一ID
         CorrelationData correlationData = new CorrelationData(info.getMsgId());
-        rabbitTemplate.convertAndSend("test", "abc.eqw", info, correlationData);
+        rabbitTemplate.convertAndSend(rabbitConfig.getExchange(), rabbitConfig.getRoutingkey(), info,
+                new ExpirationMessagePostProcessor(rabbitConfig.getExpired()), correlationData);
     }
 
     //回调函数: confirm确认
     final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
         @Override
         public void confirm(CorrelationData correlationData, boolean confirm, String cause) {
-            System.err.println("correlationData: " + correlationData);
+            System.out.println("correlationData: " + correlationData);
             String messageId = correlationData.getId();
             if (confirm) {
-                System.out.println("发射成功");
+                logger.info("msgId={},已发送服务器", correlationData.getId());
                 //如果confirm返回成功 则进行更新
                 // brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
             } else {
                 //失败则进行具体的后续操作:重试 或者补偿等手段
-                System.err.println("发送失败,原因:" + cause);
+                logger.error("msgId={},发送失败:{}", correlationData.getId(), cause);
+                BrokerMessagelog message = brokerMessageLogMapper.getMessage(correlationData.getId());
+               // sendDelayMessage((MessageInfo) message.getMessage());
             }
         }
     };

+ 11 - 1
applications/transfers/transfers-server/src/main/resources/application.yml

@@ -14,6 +14,8 @@ spring:
     virtual-host: dev
     username: saas
     password: select123***
+    publisher-returns: true
+    publisher-confirms: true
   zipkin:
     sender:
       type: rabbit
@@ -62,4 +64,12 @@ auth:
   public-key: auth/pub.key
 ribbon:
   ReadTimeout: 10000
-  ConnectTimeout: 10000
+  ConnectTimeout: 10000
+rabbit:
+  config:
+    exchange: saas_trade_dev_exchange
+    dlexchange: saas_trade_dev_dlexchange
+    queue: saas_trade_dev_queue
+    dlqueue: saas_trade_dev_dlqueue
+    routingkey: saas_trade_dev_dl.*
+    expired: 30000

+ 16 - 10
applications/transfers/transfers-server/src/test/java/com/usoftchina/saas/transfers/service/RabbitSendServiceTest.java

@@ -1,5 +1,6 @@
 package com.usoftchina.saas.transfers.service;
 
+import com.usoftchina.saas.transfers.po.MessageInfo;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -18,19 +19,24 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
 public class RabbitSendServiceTest {
 
     @Autowired
-    private RabbitSendService rabbitSendService;
-
-    private final String QUEUENAME = "saas_trade";
-    private final String USERID = "1";
-    private final String APPID = "trade";
-    private final String BIZTYPE = "PURCHASE";
-    private final String BIZID = "1";
+    private SendService sendService;
 
+    @Test
+    public void TestA_Send() {
+        MessageInfo msg = new MessageInfo();
+        msg.setBizId("测试");
+        msg.setMsgId("测试0001");
+        sendService.sendMessage(msg);
+    }
 
     @Test
-    public void sendMessage() throws Exception {
-        String messageId = rabbitSendService.sendMessage(QUEUENAME, USERID, APPID, BIZTYPE, BIZID);
-        System.out.println(messageId);
+    public void TestB_SendDelay() {
+        MessageInfo msg = new MessageInfo();
+        msg.setBizId("测试");
+        msg.setMsgId("测试延迟0001");
+        sendService.sendDelayMessage(msg);
     }
 
+
+
 }