guq 7 лет назад
Родитель
Сommit
06da0eadf1

+ 35 - 0
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/TransfersController.java

@@ -0,0 +1,35 @@
+package com.usoftchina.saas.transfers;
+
+import com.usoftchina.saas.base.Result;
+import com.usoftchina.saas.transfers.po.MessageInfo;
+import com.usoftchina.saas.transfers.service.ReceiveService;
+import com.usoftchina.saas.transfers.service.SendService;
+import com.usoftchina.saas.utils.StringUtils;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.xml.ws.handler.MessageContext;
+
+/**
+ * @author: guq
+ * @create: 2019-01-06 22:15
+ **/
+@RestController
+public class TransfersController {
+
+    @Autowired
+    private SendService sendService;
+
+    @RequestMapping("/sendMsg")
+    public Result sendMsg(@RequestBody MessageInfo info) {
+        if (StringUtils.isEmpty(info)) {
+            return Result.error("信息为空");
+        }
+        sendService.sendMessage(info);
+        return Result.success();
+    }
+
+}

+ 51 - 0
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/service/ReceiveService.java

@@ -0,0 +1,51 @@
+package com.usoftchina.saas.transfers.service;
+
+import com.rabbitmq.client.Channel;
+import com.usoftchina.saas.transfers.po.MessageInfo;
+import org.springframework.amqp.rabbit.annotation.*;
+import org.springframework.amqp.support.AmqpHeaders;
+import org.springframework.messaging.handler.annotation.Headers;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Service;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * @author: guq
+ * @create: 2019-01-06 22:04
+ **/
+@Service
+public class ReceiveService {
+    //配置监听的哪一个队列,同时在没有queue和exchange的情况下会去创建并建立绑定关系
+    @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "test_dev",durable = "true"),
+            exchange = @Exchange(name="test",durable = "true",type = "topic"),
+            key = "abc.*"))
+    @RabbitHandler
+    public void onMessage(@Payload MessageInfo info, @Headers Map<String,Object> headers, Channel channel)
+            throws IOException {
+        //消费者操作
+        System.out.println("---------收到消息,开始消费---------");
+        System.out.println("订单ID:" + info.getBizId());
+
+        /**
+         * Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,
+         * 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
+         * RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
+         */
+
+        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
+
+
+        /**
+         *  multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认
+         *  如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认
+         */
+
+        boolean multiple = false;
+
+        //ACK,确认一条消息已经被消费
+        channel.basicAck(deliveryTag,multiple);
+        //channel.basicNack(deliveryTag,false,true);
+    }
+}

+ 72 - 0
applications/transfers/transfers-server/src/main/java/com/usoftchina/saas/transfers/service/SendService.java

@@ -0,0 +1,72 @@
+package com.usoftchina.saas.transfers.service;
+
+import com.usoftchina.saas.transfers.po.MessageInfo;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.support.CorrelationData;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author: guq
+ * @create: 2019-01-06 22:04
+ **/
+@Service
+public class SendService {
+
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+
+    //发送消息方法调用: 构建自定义对象消息
+    public void sendMessage(MessageInfo info) {
+        // 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
+        rabbitTemplate.setConfirmCallback(confirmCallback);
+        //消息唯一ID
+        CorrelationData correlationData = new CorrelationData(info.getMsgId());
+        rabbitTemplate.convertAndSend("test", "abc.eqw", info, correlationData);
+    }
+
+    //发送消息方法调用: 构建自定义对象消息
+    public void sendDelayMessage(MessageInfo info) {
+        // 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
+        rabbitTemplate.setConfirmCallback(delayConfirmCallback);
+        //消息唯一ID
+        CorrelationData correlationData = new CorrelationData(info.getMsgId());
+        rabbitTemplate.convertAndSend("test", "abc.eqw", info, correlationData);
+    }
+
+    //回调函数: confirm确认
+    final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
+        @Override
+        public void confirm(CorrelationData correlationData, boolean confirm, String cause) {
+            System.err.println("correlationData: " + correlationData);
+            String messageId = correlationData.getId();
+            if (confirm) {
+                System.out.println("发射成功");
+                //如果confirm返回成功 则进行更新
+                // brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
+            } else {
+                //失败则进行具体的后续操作:重试 或者补偿等手段
+                System.err.println("发送失败,原因:" + cause);
+            }
+        }
+    };
+
+    //回调函数: confirm确认
+    final RabbitTemplate.ConfirmCallback delayConfirmCallback = new RabbitTemplate.ConfirmCallback() {
+        @Override
+        public void confirm(CorrelationData correlationData, boolean confirm, String cause) {
+            System.err.println("correlationData: " + correlationData);
+            String messageId = correlationData.getId();
+            if (confirm) {
+                System.out.println("发射成功");
+                //如果confirm返回成功 则进行更新
+                // brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
+            } else {
+                //失败则进行具体的后续操作:重试 或者补偿等手段
+                System.err.println("发送失败,原因:" + cause);
+            }
+        }
+    };
+
+}