Browse Source

队列消息结构调整

guq 7 years ago
parent
commit
58afc0606b

+ 4 - 0
applications/transfers/transfers-server/pom.xml

@@ -48,6 +48,10 @@
             <groupId>org.springframework.cloud</groupId>
             <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.usoftchina.saas</groupId>
+            <artifactId>commons-dto</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 20 - 0
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/mapper/BrokerMessageLogMapper.java

@@ -8,4 +8,24 @@ import com.usoftchina.saas.transfers.po.BrokerMessagelog;
  **/
 public interface BrokerMessageLogMapper {
     BrokerMessagelog getMessage(String msgId);
+
+    BrokerMessagelog getLiveMessage(String msgId);
+
+    BrokerMessagelog getDeadMessage(String msgId);
+
+    int deleteByPrimaryKey(Integer id);
+
+    int insert(BrokerMessagelog record);
+
+    int insertSelective(BrokerMessagelog record);
+
+    BrokerMessagelog selectByPrimaryKey(Integer id);
+
+    int updateByPrimaryKeySelective(BrokerMessagelog record);
+
+    int updateByPrimaryKey(BrokerMessagelog record);
+
+    int updateMessageLogandAddRetry(BrokerMessagelog record);
+
+    void updateSuccess(String msgId);
 }

+ 29 - 15
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/po/BrokerMessagelog.java

@@ -1,19 +1,41 @@
 package com.usoftchina.saas.transfers.po;
 
+import java.io.Serializable;
 import java.util.Date;
 
 /**
  * @author: guq
  * @create: 2018-12-29 09:46
  **/
-public class BrokerMessagelog {
+public class BrokerMessagelog implements Serializable{
 
     private Long id;
-    private String messageId;
+    private String msgId;
     private String message;
-    private Integer tryCount;
     private String status;
+
     private Date createTime;
+    private int retry;
+    /**
+     * 失败原因
+     */
+    private String reason;
+    public String getReason() {
+        return reason;
+    }
+
+    public void setReason(String reason) {
+        this.reason = reason;
+    }
+
+
+    public int getRetry() {
+        return retry;
+    }
+
+    public void setRetry(int retry) {
+        this.retry = retry;
+    }
 
     public Long getId() {
         return id;
@@ -23,12 +45,12 @@ public class BrokerMessagelog {
         this.id = id;
     }
 
-    public String getMessageId() {
-        return messageId;
+    public String getMsgId() {
+        return msgId;
     }
 
-    public void setMessageId(String messageId) {
-        this.messageId = messageId;
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
     }
 
     public String getMessage() {
@@ -39,14 +61,6 @@ public class BrokerMessagelog {
         this.message = message;
     }
 
-    public Integer getTryCount() {
-        return tryCount;
-    }
-
-    public void setTryCount(Integer tryCount) {
-        this.tryCount = tryCount;
-    }
-
     public String getStatus() {
         return status;
     }

+ 2 - 13
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/po/MessageInfo.java

@@ -1,6 +1,7 @@
 package com.usoftchina.saas.transfers.po;
 
 import java.io.Serializable;
+import java.util.Date;
 
 /**
  * @Description 消息内容
@@ -49,19 +50,6 @@ public class MessageInfo implements Serializable {
         this.timestamp = timestamp;
     }
 
-    public int getRetryCount() {
-        return retryCount;
-    }
-
-    public void setRetryCount(int retryCount) {
-        this.retryCount = retryCount;
-    }
-
-    /**
-     * 重试次数
-     */
-    private int retryCount;
-
     public String getMsgId() {
         return msgId;
     }
@@ -107,6 +95,7 @@ public class MessageInfo implements Serializable {
         this.appId = appId;
         this.bizType = bizType;
         this.bizId = bizId;
+        this.timestamp = new Date().getTime();
     }
 
     public MessageInfo() {

+ 26 - 0
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/po/Status.java

@@ -0,0 +1,26 @@
+package com.usoftchina.saas.transfers.po;
+
+public enum Status {
+
+    SEND("已发送"),
+
+    RETRY("重新发送"),
+
+    FAILURE("投递失败"),
+
+    SUCCESS("投递成功");
+
+    private String display;
+
+    public String getDisplay() {
+        return display;
+    }
+
+    public void setDisplay(String display) {
+        this.display = display;
+    }
+
+    Status(String display) {
+        this.display = display;
+    }
+}

+ 45 - 4
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/service/ReceiveService.java

@@ -1,8 +1,16 @@
 package com.usoftchina.saas.transfers.service;
 
 import com.rabbitmq.client.Channel;
+import com.usoftchina.saas.commons.po.BillCodeSeq;
 import com.usoftchina.saas.transfers.config.RabbitConfig;
+import com.usoftchina.saas.transfers.mapper.BrokerMessageLogMapper;
+import com.usoftchina.saas.transfers.po.BrokerMessagelog;
 import com.usoftchina.saas.transfers.po.MessageInfo;
+import com.usoftchina.saas.transfers.task.*;
+import com.usoftchina.saas.utils.JsonUtils;
+import com.usoftchina.saas.utils.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.amqp.rabbit.annotation.*;
 import org.springframework.amqp.support.AmqpHeaders;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -11,6 +19,7 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.messaging.handler.annotation.Headers;
 import org.springframework.messaging.handler.annotation.Payload;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
 
 import java.io.IOException;
 import java.util.Map;
@@ -22,31 +31,62 @@ import java.util.Map;
 @Service
 public class ReceiveService {
 
+    @Autowired
+    private SendService sendService;
+
+    @Autowired
+    private BrokerMessageLogMapper brokerMessageLogMapper;
+
+    private Logger logger = LoggerFactory.getLogger(getClass());
+
     private final static String EXCHANGE = "saas_trade_dev_exchange";
     private final static String DLEXCHANGE = "saas_trade_dev_dlexchange";
     private final static String QUEUE = "saas_trade_dev_queue";
     private final static String DLQUEUE = "saas_trade_dev_dlqueue";
     private final static String ROUTINGKEY = "saas_trade_dev_dl.*";
 
+
     //配置监听的哪一个队列,同时在没有queue和exchange的情况下会去创建并建立绑定关系
     @RabbitListener(bindings = @QueueBinding(value = @Queue(value = QUEUE,durable = "true"),
             exchange = @Exchange(name= DLEXCHANGE ,durable = "true",type = "topic"),
             key = ROUTINGKEY))
     @RabbitHandler
+    @Transactional
     public void onMessage(@Payload MessageInfo info, @Headers Map<String,Object> headers, Channel channel)
             throws IOException {
         //消费者操作
-        System.out.println("---------收到消息,开始消费---------");
-        System.out.println("订单ID:" + info.getBizId());
-        System.out.println("信息ID:" + info.getMsgId());
+       logger.info("---------收到消息,消息id={},开始消费---------", info.getMsgId());
+        Executable task = null;
+       try {
+           switch(BillCodeSeq.valueOf(info.getBizType())){
+               case PURCHASE:
+                   task = new SendPurchaseTask();
+                   break;
+               case PURCHASEIN:
+                   task = new SendPurchaseInTask();
+                   break;
+               case PURCHASEOUT:
+                   task = new SendPurchaseOutTask();
+                   break;
+           }
+
+           task.execute();
 
+           //更新状态
+           brokerMessageLogMapper.updateSuccess(info.getMsgId());
+        } catch (Exception e) {
+            BrokerMessagelog message = brokerMessageLogMapper.getLiveMessage(info.getMsgId());
+            if (null != message && !StringUtils.isEmpty(message.getMessage())) {
+                MessageInfo messageInfo = JsonUtils.fromJsonString(message.getMessage(), MessageInfo.class);
+                sendService.sendDelayMessage(messageInfo, e.getMessage());
+            }
+        }
         /**
          * Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,
          * 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
          * RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
          */
         Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
-
         /**
          *  multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认
          *  如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认
@@ -55,6 +95,7 @@ public class ReceiveService {
 
         //ACK,确认一条消息已经被消费
         channel.basicAck(deliveryTag,multiple);
+        //重新返回队列 重新消费
         //channel.basicNack(deliveryTag,false,true);
     }
 }

+ 49 - 24
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/service/SendService.java

@@ -5,6 +5,9 @@ import com.usoftchina.saas.transfers.config.RabbitConfig;
 import com.usoftchina.saas.transfers.mapper.BrokerMessageLogMapper;
 import com.usoftchina.saas.transfers.po.BrokerMessagelog;
 import com.usoftchina.saas.transfers.po.MessageInfo;
+import com.usoftchina.saas.transfers.po.Status;
+import com.usoftchina.saas.utils.JsonUtils;
+import com.usoftchina.saas.utils.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.amqp.rabbit.core.RabbitTemplate;
@@ -13,6 +16,8 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.stereotype.Service;
 
+import java.util.Date;
+
 /**
  * @author: guq
  * @create: 2019-01-06 22:04
@@ -35,56 +40,76 @@ public class SendService {
 
     //发送消息方法调用: 构建自定义对象消息
     public void sendMessage(MessageInfo info) {
+        String messageId = info.getMsgId();
+        //消息入库
+        BrokerMessagelog messagelog = new BrokerMessagelog();
+        messagelog.setCreateTime(new Date());
+        messagelog.setMessage(JsonUtils.toJsonString(info));
+        messagelog.setMsgId(messageId);
+        messagelog.setStatus(Status.SEND.getDisplay());
+        messagelog.setRetry(0);
+        brokerMessageLogMapper.insertSelective(messagelog);
+
         // 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
         rabbitTemplate.setConfirmCallback(confirmCallback);
         //消息唯一ID
-        CorrelationData correlationData = new CorrelationData(info.getMsgId());
+        CorrelationData correlationData = new CorrelationData(messageId);
         rabbitTemplate.convertAndSend(rabbitConfig.getDlexchange(), rabbitConfig.getRoutingkey(), info, correlationData);
-    }
-
-    //发送消息方法调用: 构建自定义对象消息
-    public void sendDelayMessage(MessageInfo info) {
-        // 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
-        rabbitTemplate.setConfirmCallback(delayConfirmCallback);
-        //消息唯一ID
-        CorrelationData correlationData = new CorrelationData(info.getMsgId());
-        rabbitTemplate.convertAndSend(rabbitConfig.getExchange(), rabbitConfig.getRoutingkey(), info,
-                new ExpirationMessagePostProcessor(rabbitConfig.getExpired()), correlationData);
+        logger.info("msgId={}, 消息发送", messageId);
     }
 
     //回调函数: confirm确认
     final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
         @Override
         public void confirm(CorrelationData correlationData, boolean confirm, String cause) {
-            System.out.println("correlationData: " + correlationData);
             String messageId = correlationData.getId();
             if (confirm) {
-                logger.info("msgId={},已发送服务器", correlationData.getId());
-                //如果confirm返回成功 则进行更新
-                // brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
+                logger.info("msgId={},已发送服务器", messageId);
             } else {
-                //失败则进行具体的后续操作:重试 或者补偿等手段
-                BrokerMessagelog message = brokerMessageLogMapper.getMessage(correlationData.getId());
-                // sendDelayMessage((MessageInfo) message.getMessage());
+                //失败则进行具体的后续操作:进入延迟队列
+                BrokerMessagelog message = brokerMessageLogMapper.getMessage(messageId);
+                String info = message.getMessage();
+                if (!StringUtils.isEmpty(info)) {
+                    MessageInfo messageInfo = JsonUtils.fromJsonString(info, MessageInfo.class);
+                    sendDelayMessage(messageInfo, cause);
+                }
                 logger.error("msgId={},发送失败:{}", correlationData.getId(), cause);
-
             }
         }
     };
 
+    //发送消息方法调用: 构建自定义对象消息
+    public void sendDelayMessage(MessageInfo info, String error) {
+        if (StringUtils.isEmpty(info) || StringUtils.isEmpty(info.getMsgId())) {
+            logger.error("消息为空");
+            return;
+        }
+        String messageId = info.getMsgId();
+        //更新状态且失败次数加1
+        BrokerMessagelog messagelog = new BrokerMessagelog();
+        messagelog.setMsgId(messageId);
+        messagelog.setStatus(Status.RETRY.getDisplay());
+        messagelog.setReason(error);
+        brokerMessageLogMapper.updateMessageLogandAddRetry(messagelog);
+
+        //发送至延迟队列
+        rabbitTemplate.setConfirmCallback(delayConfirmCallback);
+        //消息唯一ID
+        CorrelationData correlationData = new CorrelationData(info.getMsgId());
+        rabbitTemplate.convertAndSend(rabbitConfig.getExchange(), rabbitConfig.getRoutingkey(), info,
+                new ExpirationMessagePostProcessor(rabbitConfig.getExpired()), correlationData);
+    }
+
     //回调函数: confirm确认
     final RabbitTemplate.ConfirmCallback delayConfirmCallback = new RabbitTemplate.ConfirmCallback() {
         @Override
         public void confirm(CorrelationData correlationData, boolean confirm, String cause) {
-            System.err.println("correlationData: " + correlationData);
             String messageId = correlationData.getId();
             if (confirm) {
-                System.out.println("发射成功");
-                //如果confirm返回成功 则进行更新
-                // brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
+                logger.info("msgId={},已发送延迟队列", messageId);
             } else {
                 //失败则进行具体的后续操作:重试 或者补偿等手段
-                System.err.println("发送失败,原因:" + cause);
+                logger.info("msgId={},发送延迟队列失败:{}", messageId, cause);
             }
         }
     };

+ 22 - 0
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/task/Executable.java

@@ -0,0 +1,22 @@
+package com.usoftchina.saas.transfers.task;
+
+/**
+ * @author: guq
+ * @create: 2019-01-08 13:38
+ **/
+public abstract class Executable {
+
+
+
+    /**
+     * @return 任务名称
+     */
+    protected String getTaskName() {
+        return this.getClass().getSimpleName();
+    }
+
+    /**
+     * 执行任务
+     */
+    public abstract void execute();
+}

+ 12 - 0
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/task/SendPurchaseInTask.java

@@ -0,0 +1,12 @@
+package com.usoftchina.saas.transfers.task;
+
+/**
+ * @author: guq
+ * @create: 2019-01-08 13:53
+ **/
+public class SendPurchaseInTask extends Executable{
+    @Override
+    public void execute() {
+        //TODO
+    }
+}

+ 12 - 0
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/task/SendPurchaseOutTask.java

@@ -0,0 +1,12 @@
+package com.usoftchina.saas.transfers.task;
+
+/**
+ * @author: guq
+ * @create: 2019-01-08 13:53
+ **/
+public class SendPurchaseOutTask extends Executable{
+    @Override
+    public void execute() {
+        //TODO
+    }
+}

+ 12 - 0
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/task/SendPurchaseTask.java

@@ -0,0 +1,12 @@
+package com.usoftchina.saas.transfers.task;
+
+/**
+ * @author: guq
+ * @create: 2019-01-08 13:45
+ **/
+public class SendPurchaseTask extends Executable {
+    @Override
+    public void execute() {
+        //TODO
+    }
+}

+ 122 - 0
applications/transfers/transfers-server/src/main/resources/mapper/BrokermessagelogMapper.xml

@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
+<mapper namespace="com.usoftchina.saas.transfers.mapper.BrokerMessageLogMapper" >
+  <resultMap id="BaseResultMap" type="com.usoftchina.saas.transfers.po.BrokerMessagelog" >
+    <id column="id" property="id" jdbcType="INTEGER" />
+    <result column="msgId" property="msgId" jdbcType="VARCHAR" />
+    <result column="message" property="message" jdbcType="VARCHAR" />
+    <result column="status" property="status" jdbcType="VARCHAR" />
+    <result column="createTime" property="createTime" jdbcType="TIMESTAMP" />
+    <result column="reason" property="reason" jdbcType="VARCHAR" />
+    <result column="retry" property="retry" jdbcType="INTEGER" />
+  </resultMap>
+  <sql id="Base_Column_List" >
+    id, msgId, message, status, createTime, reason, retry
+  </sql>
+  <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
+    select 
+    <include refid="Base_Column_List" />
+    from brokermessagelog
+    where id = #{id,jdbcType=INTEGER}
+  </select>
+
+  <delete id="deleteByPrimaryKey" parameterType="java.lang.Integer" >
+    delete from brokermessagelog
+    where id = #{id,jdbcType=INTEGER}
+  </delete>
+  <insert id="insert" parameterType="com.usoftchina.saas.transfers.po.BrokerMessagelog" >
+    insert into brokermessagelog (msgId, message,
+      status, createTime, reason
+      )
+    values ( #{msgId,jdbcType=VARCHAR}, #{message,jdbcType=VARCHAR},
+      #{status,jdbcType=VARCHAR}, #{createTime,jdbcType=TIMESTAMP}, #{reason,jdbcType=VARCHAR}
+      )
+  </insert>
+  <insert id="insertSelective" parameterType="com.usoftchina.saas.transfers.po.BrokerMessagelog" >
+    insert into brokermessagelog
+    <trim prefix="(" suffix=")" suffixOverrides="," >
+      <if test="msgId != null" >
+        msgId,
+      </if>
+      <if test="message != null" >
+        message,
+      </if>
+      <if test="status != null" >
+        status,
+      </if>
+      <if test="createTime != null" >
+        createTime,
+      </if>
+      <if test="reason != null" >
+        reason,
+      </if>
+    </trim>
+    <trim prefix="values (" suffix=")" suffixOverrides="," >
+      <if test="msgId != null" >
+        #{msgId,jdbcType=VARCHAR},
+      </if>
+      <if test="message != null" >
+        #{message,jdbcType=VARCHAR},
+      </if>
+      <if test="status != null" >
+        #{status,jdbcType=VARCHAR},
+      </if>
+      <if test="createTime != null" >
+        #{createTime,jdbcType=TIMESTAMP},
+      </if>
+      <if test="reason != null" >
+        #{reason,jdbcType=VARCHAR},
+      </if>
+    </trim>
+  </insert>
+  <update id="updateByPrimaryKeySelective" parameterType="com.usoftchina.saas.transfers.po.BrokerMessagelog" >
+    update brokermessagelog
+    <set >
+      <if test="message != null" >
+        message = #{message,jdbcType=VARCHAR},
+      </if>
+      <if test="status != null" >
+        status = #{status,jdbcType=VARCHAR},
+      </if>
+      <if test="createTime != null" >
+        createTime = #{createTime,jdbcType=TIMESTAMP},
+      </if>
+      <if test="reason != null" >
+        reason = #{reason,jdbcType=VARCHAR},
+      </if>
+    </set>
+    where msgId = #{msgId,jdbcType=VARCHAR}
+  </update>
+
+  <update id="updateMessageLogandAddRetry" parameterType="com.usoftchina.saas.transfers.po.BrokerMessagelog" >
+    update brokermessagelog
+    set  status = #{status}, reason = #{reason}, retry = retry + 1
+    where msgId = #{msgId,jdbcType=VARCHAR}
+  </update>
+
+  <update id="updateByPrimaryKey" parameterType="com.usoftchina.saas.transfers.po.BrokerMessagelog" >
+    update brokermessagelog
+    set msgId = #{msgId,jdbcType=VARCHAR},
+      message = #{message,jdbcType=VARCHAR},
+      status = #{status,jdbcType=VARCHAR},
+      createTime = #{createTime,jdbcType=TIMESTAMP},
+      reason = #{reason,jdbcType=VARCHAR}
+    where id = #{id,jdbcType=INTEGER}
+  </update>
+
+  <select id="getMessage" parameterType="string" resultMap="BaseResultMap">
+    select * from BrokerMessagelog where msgId = #{msgId}
+  </select>
+
+  <select id="getLiveMessage" parameterType="string" resultMap="BaseResultMap">
+    select * from BrokerMessagelog where msgId = #{msgId} and retry &lt;= 3
+  </select>
+
+  <select id="getDeadMessage" parameterType="string" resultMap="BaseResultMap">
+    select * from BrokerMessagelog where msgId = #{msgId} and retry &gt; 3
+  </select>
+
+  <update id="updateSuccess" parameterType="string">
+    update BrokerMessagelog set status='投递成功',reason='' where msgid = #{msgId}
+  </update>
+</mapper>