liuam 7 жил өмнө
parent
commit
43e954ebbd

+ 5 - 0
pom.xml

@@ -109,6 +109,11 @@
             <artifactId>sso-common</artifactId>
             <version>0.0.1-SNAPSHOT</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+        </dependency>
     </dependencies>
 
     <build>

+ 2 - 0
src/main/java/com/uas/ps/message/Application.java

@@ -2,6 +2,7 @@ package com.uas.ps.message;
 
 import com.uas.ps.core.util.ContextUtils;
 import com.uas.sso.web.AccountConfigurer;
+import org.springframework.amqp.rabbit.annotation.EnableRabbit;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.event.ApplicationPreparedEvent;
@@ -23,6 +24,7 @@ import java.util.logging.Logger;
  */
 @SpringBootApplication
 @EnableWebMvc
+@EnableRabbit
 //@EnableSSO
 public class Application {
 

+ 133 - 0
src/main/java/com/uas/ps/message/rabbit/BaseMessageListener.java

@@ -0,0 +1,133 @@
+package com.uas.ps.message.rabbit;
+
+import com.alibaba.fastjson.JSON;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
+
+import java.io.IOException;
+
+/**
+ * base MessageListener, 实现了消息确认,消息失败延迟机制
+ * @author liuam
+ * @since 2018/9/13 0013 下午 14:37
+ */
+public abstract class BaseMessageListener implements ChannelAwareMessageListener {
+
+    private final static Logger logger = LoggerFactory.getLogger(BaseMessageListener.class);
+
+
+    @Override
+    public void onMessage(Message message, Channel channel) throws Exception {
+        String messageJson = new String(message.getBody());
+        long tag = message.getMessageProperties().getDeliveryTag();
+        String queueName = message.getMessageProperties().getConsumerQueue();
+        processMessage(messageJson, channel, tag, queueName);
+    }
+
+    /**
+     * 处理消息
+     *
+     * @param messageJson 消息Json内容
+     * @param channel     管道
+     * @param tag         Delivery Tag
+     * @param queueName   队列名
+     */
+    private void processMessage(String messageJson, Channel channel, long tag, String queueName) {
+        MessageInfo messageInfo = getMessageInfo(messageJson);
+        //当消息不符合格式MessageInfo格式时,丢弃消息
+        if (messageInfo == null) {
+            logger.warn("接收消息处理.消息格式.异常,messageJson:{},tag:{}", messageJson, tag);
+            basicNack(channel, tag);
+            return;
+        }
+        try {
+            logger.info("接收消息处理[RabbitMessageListener.receiveMessage].处理开始,messageJson:{},tag:{}", messageJson, tag);
+            processMessage(messageInfo);
+            logger.info("接收消息处理[RabbitMessageListener.receiveMessage].处理结束,messageJson:{},tag:{}", messageJson, tag);
+        } catch (Exception e) {
+            logger.error("接收消息处理[RabbitMessageListener.receiveMessage].异常,messageJson:{},tag:{}", messageJson, tag, e);
+            //出现异常时,消息转发为延时消息
+            sendDelayMessage(messageInfo, channel, queueName);
+        } finally {
+            basicAck(channel, tag);
+        }
+    }
+
+    /**
+     * 发送延时消息
+     *
+     * @param messageInfo
+     * @param channel
+     * @param queueName
+     */
+    private void sendDelayMessage(MessageInfo messageInfo, Channel channel, String queueName) {
+        //重试次数+1
+        messageInfo.setRetryCount(messageInfo.getRetryCount() + 1);
+        String messageJson = JSON.toJSONString(messageInfo);
+
+        try {
+            String dalayQueueName = queueName + RabbitConstants.DELAY_QUEUE_NAME_SUFFIX;
+            AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
+            builder.expiration(RabbitConstants.DELAY_TIME);
+
+            channel.basicPublish(RabbitConstants.AMQ_DIRECT, dalayQueueName, builder.build(), messageJson.getBytes());
+            logger.info("发送延时消息[RabbitMessageListener.sendDelayMessage].正常,messageJson:{},queueName:{}", messageJson, queueName);
+        } catch (IOException e) {
+            logger.error("发送延时消息[RabbitMessageListener.sendDelayMessage].异常,messageJson:{},queueName:{}", messageJson, queueName, e);
+        }
+    }
+
+    /**
+     * 处理消息
+     *
+     * @param messageInfo 消息体
+     * @throws Exception 处理失败时,抛出异常
+     */
+    public abstract void processMessage(MessageInfo messageInfo) throws Exception;
+
+
+    private MessageInfo getMessageInfo(String messageJson) {
+        MessageInfo messageInfo = null;
+        try {
+            messageInfo = JSON.parseObject(messageJson, MessageInfo.class);
+        } catch (Exception e) {
+            logger.error("获取消息内容实体[RabbitMessageListener.getMessageInfo].异常,messageJson:{}", messageJson, e);
+        }
+        return messageInfo;
+    }
+
+    /**
+     * 消息应答No
+     *
+     * @param channel
+     * @param tag
+     */
+    private void basicNack(Channel channel, long tag) {
+        try {
+            channel.basicNack(tag, false, false);
+            logger.info("接收消息处理.消息应答No[RabbitMessageListener.receiveMessage.basicNack].正常,tag:{}", tag);
+        } catch (IOException e) {
+            logger.error("接收消息处理.消息应答No[RabbitMessageListener.receiveMessage.basicNack].异常,tag:{}", tag, e);
+        }
+    }
+
+    /**
+     * 消息应答Yes
+     *
+     * @param channel
+     * @param tag
+     */
+    private void basicAck(Channel channel, long tag) {
+        try {
+            channel.basicAck(tag, false);
+            logger.info("接收消息处理.消息应答Yes[RabbitMessageListener.receiveMessage.basicAck].正常,tag:{}", tag);
+        } catch (IOException e) {
+            logger.error("接收消息处理.消息应答Yes[RabbitMessageListener.receiveMessage.basicAck].异常,tag:{}", tag, e);
+        }
+    }
+
+}

+ 28 - 0
src/main/java/com/uas/ps/message/rabbit/InquiryMessageListener.java

@@ -0,0 +1,28 @@
+package com.uas.ps.message.rabbit;
+
+import com.uas.ps.message.service.MessageService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+/**
+ * 对询价发送的消息,处理的类
+ * @author liuam
+ * @since 2018/9/13 0013 下午 15:45
+ */
+@Component
+public class InquiryMessageListener extends BaseMessageListener {
+
+    @Autowired
+    private MessageService messageService;
+
+    /**
+     * 处理消息
+     *
+     * @param messageInfo 消息体
+     * @throws Exception 处理失败时,抛出异常
+     */
+    @Override
+    public void processMessage(MessageInfo messageInfo) throws Exception {
+        messageService.saveMessages(messageInfo.getData().toString());
+    }
+}

+ 62 - 0
src/main/java/com/uas/ps/message/rabbit/MessageInfo.java

@@ -0,0 +1,62 @@
+package com.uas.ps.message.rabbit;
+
+/**
+ * 消息实体
+ * @author liuam
+ * @since 2018/9/13 0013 下午 14:44
+ */
+public class MessageInfo<T> {
+
+    /**
+     * 消息ID
+     */
+    private String msgId;
+
+    /**
+     * 时间戳
+     */
+    private long timestamp;
+
+    /**
+     * 重试次数
+     */
+    private int retryCount;
+
+    /**
+     * 数据
+     */
+    private T data;
+
+    public String getMsgId() {
+        return msgId;
+    }
+
+    public void setMsgId(String msgId) {
+        this.msgId = msgId;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    public int getRetryCount() {
+        return retryCount;
+    }
+
+    public void setRetryCount(int retryCount) {
+        this.retryCount = retryCount;
+    }
+
+    public T getData() {
+        return data;
+    }
+
+    public void setData(T data) {
+        this.data = data;
+    }
+
+}

+ 57 - 0
src/main/java/com/uas/ps/message/rabbit/RabbitConfig.java

@@ -0,0 +1,57 @@
+package com.uas.ps.message.rabbit;
+
+import org.springframework.amqp.core.*;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * rabbit 配置
+ * @author liuam
+ * @since 2018/9/13 0013 下午 14:22
+ */
+@Configuration
+public class RabbitConfig {
+
+    @Autowired
+    private AmqpAdmin amqpAdmin;
+
+    @Autowired
+    private InquiryMessageListener inquiryMessageListener;
+
+    @PostConstruct
+    public void init() {
+        amqpAdmin.declareExchange(new DirectExchange(RabbitConstants.AMQ_DIRECT, true, false, null));
+
+        Map<String, Object> args = new HashMap<String, Object>();
+        args.put("x-dead-letter-exchange", RabbitConstants.AMQ_DIRECT);
+        args.put("x-dead-letter-routing-key", RabbitConstants.DEAD_LETTER_ROUTING_KEY);
+        amqpAdmin.declareQueue(new Queue(RabbitConstants.INQUIRY_MESSAGE + RabbitConstants.DELAY_QUEUE_NAME_SUFFIX, true, false, false, args));
+        amqpAdmin.declareBinding(new Binding(RabbitConstants.INQUIRY_MESSAGE + RabbitConstants.DELAY_QUEUE_NAME_SUFFIX, Binding.DestinationType.QUEUE, RabbitConstants.AMQ_DIRECT, RabbitConstants.DEAD_LETTER_ROUTING_KEY, null));
+    }
+
+
+    @Bean
+    public SimpleMessageListenerContainer messageListenerContainer(RabbitTemplate rabbitTemplate) {
+        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
+        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(rabbitTemplate.getConnectionFactory());
+
+        container.setQueues(new Queue(RabbitConstants.INQUIRY_MESSAGE));
+        container.setExposeListenerChannel(true);
+        container.setMaxConcurrentConsumers(RabbitConstants.MAXCONCURRENTCONSUMERS);
+        container.setConcurrentConsumers(RabbitConstants.CONCURRENTCONSUMERS);
+        //设置手动应答
+        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+        container.setMessageListener(inquiryMessageListener);
+        container.setPrefetchCount(RabbitConstants.PREFETCH_COUNT);
+        return container;
+    }
+
+}

+ 47 - 0
src/main/java/com/uas/ps/message/rabbit/RabbitConstants.java

@@ -0,0 +1,47 @@
+package com.uas.ps.message.rabbit;
+
+/**
+ * rabbit 常量类
+ * @author liuam
+ * @since 2018/9/13 0013 下午 15:06
+ */
+public class RabbitConstants {
+
+    /**
+     * 延迟队列后缀名
+     */
+    public static final String DELAY_QUEUE_NAME_SUFFIX = "_delay";
+
+    /**
+     * 询价发送到消息中心消息队列名
+     */
+    public static final String INQUIRY_MESSAGE = "inquiry_message";
+
+    /**
+     * direct exchange name
+     */
+    public static final String AMQ_DIRECT = "amq.direct";
+
+    /**
+     * 死信队列路由key
+     */
+    public static final String DEAD_LETTER_ROUTING_KEY = "message_delay";
+
+    /**
+     * 延时时间,单位:毫秒
+     */
+    public static final String DELAY_TIME = "60000";
+
+    /**
+     * 没有确认消息的消息总数,当达到这个数值后,rabbitMQ 不再推送消息
+     */
+    public static final int PREFETCH_COUNT = 200;
+
+    /**
+     * 最大消费者
+     */
+    public static final int MAXCONCURRENTCONSUMERS = 3;
+
+    public static final int CONCURRENTCONSUMERS = 1;
+
+}

+ 15 - 1
src/main/resources/config/application-dev.properties

@@ -10,4 +10,18 @@ spring.datasource.hikari.idle-timeout=30000
 spring.datasource.hikari.pool-name=DatebookHikariCP
 spring.datasource.hikari.max-lifetime=1800000
 spring.datasource.hikari.connection-timeout=120000
-spring.datasource.hikari.connection-test-query=SELECT 1
+spring.datasource.hikari.connection-test-query=SELECT 1
+
+#================ rabbit ====================#
+spring.rabbitmq.host=134.175.196.24
+spring.rabbitmq.port=5672
+spring.rabbitmq.username=finance
+spring.rabbitmq.password=finance8888
+spring.rabbitmq.virtual-host=finance
+spring.rabbitmq.publisher-confirms=true
+spring.rabbitmq.publisher-returns=true
+spring.rabbitmq.listener.simple.retry.enabled=true
+spring.rabbitmq.listener.simple.retry.max-attempts=5
+spring.rabbitmq.cache.channel.size=100
+spring.rabbitmq.cache.channel.checkout-timeout=10000
+spring.rabbitmq.requested-heartbeat=30

+ 15 - 1
src/main/resources/config/application-prod.properties

@@ -10,4 +10,18 @@ spring.datasource.hikari.idle-timeout=30000
 spring.datasource.hikari.pool-name=DatebookHikariCP
 spring.datasource.hikari.max-lifetime=1800000
 spring.datasource.hikari.connection-timeout=120000
-spring.datasource.hikari.connection-test-query=SELECT 1
+spring.datasource.hikari.connection-test-query=SELECT 1
+
+#================ rabbit ====================#
+spring.rabbitmq.host=134.175.196.24
+spring.rabbitmq.port=5672
+spring.rabbitmq.username=finance
+spring.rabbitmq.password=finance8888
+spring.rabbitmq.virtual-host=finance
+spring.rabbitmq.publisher-confirms=true
+spring.rabbitmq.publisher-returns=true
+spring.rabbitmq.listener.simple.retry.enabled=true
+spring.rabbitmq.listener.simple.retry.max-attempts=5
+spring.rabbitmq.cache.channel.size=100
+spring.rabbitmq.cache.channel.checkout-timeout=10000
+spring.rabbitmq.requested-heartbeat=30

+ 15 - 1
src/main/resources/config/application-test.properties

@@ -10,4 +10,18 @@ spring.datasource.hikari.idle-timeout=30000
 spring.datasource.hikari.pool-name=DatebookHikariCP
 spring.datasource.hikari.max-lifetime=1800000
 spring.datasource.hikari.connection-timeout=120000
-spring.datasource.hikari.connection-test-query=SELECT 1
+spring.datasource.hikari.connection-test-query=SELECT 1
+
+#================ rabbit ====================#
+spring.rabbitmq.host=134.175.196.24
+spring.rabbitmq.port=5672
+spring.rabbitmq.username=finance
+spring.rabbitmq.password=finance8888
+spring.rabbitmq.virtual-host=finance
+spring.rabbitmq.publisher-confirms=true
+spring.rabbitmq.publisher-returns=true
+spring.rabbitmq.listener.simple.retry.enabled=true
+spring.rabbitmq.listener.simple.retry.max-attempts=5
+spring.rabbitmq.cache.channel.size=100
+spring.rabbitmq.cache.channel.checkout-timeout=10000
+spring.rabbitmq.requested-heartbeat=30