Browse Source

feat(消息队列): B2B消息队列生产者配置

hejq 7 years ago
parent
commit
12e7e724bc

+ 11 - 0
pom.xml

@@ -367,6 +367,17 @@
             <artifactId>spring-rabbit</artifactId>
             <version>1.7.7.RELEASE</version>
         </dependency>
+
+        <dependency>
+            <groupId>com.rabbitmq</groupId>
+            <artifactId>amqp-client</artifactId>
+            <version>4.2.0</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-core</artifactId>
+            <version>4.1.6.RELEASE</version>
+        </dependency>
     </dependencies>
     <build>
         <finalName>platform-b2b</finalName>

+ 304 - 0
src/main/java/com/uas/platform/b2b/support/mq/BaseRabbitReceiveService.java

@@ -0,0 +1,304 @@
+package com.uas.platform.b2b.support.mq;
+
+import com.alibaba.fastjson.JSON;
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.Channel;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.amqp.core.AcknowledgeMode;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
+import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
+import org.springframework.amqp.support.AmqpHeaders;
+import org.springframework.beans.factory.InitializingBean;
+import org.springframework.messaging.handler.annotation.Headers;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * RabbitMq 消息接收处理基类
+ *
+ * @author: wangcanyi
+ * @date: 2018-08-30 17:41
+ **/
+public abstract class BaseRabbitReceiveService implements ChannelAwareMessageListener, InitializingBean {
+	private static final Logger LOGGER = LoggerFactory.getLogger(BaseRabbitReceiveService.class);
+	private static final String DELAY_QUEUE_NAME_SUFFIX = "_delay";
+	/**
+	 * 是否动态创建消息监听
+	 */
+	protected boolean isDynamicCreate = false;
+	/**
+	 * IP地址
+	 */
+	protected String host;
+	/**
+	 * 端口
+	 */
+	protected int port;
+	/**
+	 * Virtual Host
+	 */
+	protected String virtualHost;
+	/**
+	 * 用户名
+	 */
+	protected String username;
+	/**
+	 * 密码
+	 */
+	protected String password;
+	/**
+	 * 队列名
+	 */
+	protected String queueName;
+	/**
+	 * 延时时间,单位:毫秒
+	 */
+	protected String delayTime = "60000";
+	/**
+	 * 心跳时间,单位秒
+	 */
+	protected int requestedHeartBeat = 30;
+	/**
+	 * 延时队列名
+	 */
+	private String delayQueueName;
+
+	/**
+	 * 初始化连接工厂
+	 *
+	 * @return
+	 */
+	private ConnectionFactory initConnectionFactory() {
+		if (StringUtils.isBlank(host)) {
+			throw new IllegalArgumentException("host为空");
+		}
+		if (port <= 0) {
+			throw new IllegalArgumentException("port小于等于0");
+		}
+		if (StringUtils.isBlank(virtualHost)) {
+			throw new IllegalArgumentException("virtualHost为空");
+		}
+		if (StringUtils.isBlank(username)) {
+			throw new IllegalArgumentException("username为空");
+		}
+		if (StringUtils.isBlank(password)) {
+			throw new IllegalArgumentException("password为空");
+		}
+		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
+		connectionFactory.setHost(host);
+		connectionFactory.setPort(port);
+		connectionFactory.setVirtualHost(virtualHost);
+		connectionFactory.setUsername(username);
+		connectionFactory.setPassword(password);
+		connectionFactory.setRequestedHeartBeat(requestedHeartBeat);
+		return connectionFactory;
+	}
+
+	/**
+	 * 动态创建消息监听
+	 *
+	 * @return
+	 */
+	private SimpleMessageListenerContainer simpleMessageListenerContainer() {
+		if (!isDynamicCreate) {
+			return null;
+		}
+		if (this.getClass().isAnnotationPresent(RabbitListener.class)) {
+			throw new IllegalArgumentException("动态创建时,不能配置@RabbitListener注解");
+		}
+		if (StringUtils.isBlank(queueName)) {
+			throw new IllegalArgumentException("queueName为空");
+		}
+		ConnectionFactory connectionFactory = initConnectionFactory();
+		SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
+		container.setQueueNames(queueName);
+		//设置手动应答
+		container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
+		//使用ChannelAwareMessageListener接口,必须实现OnMessage方法
+		container.setMessageListener(this);
+		return container;
+	}
+
+	/**
+	 * 实现InitializingBean的afterPropertiesSet方法,是为了在子类PostConstruct后,才执行该方法
+	 */
+	@Override
+	public void afterPropertiesSet() {
+		SimpleMessageListenerContainer container = simpleMessageListenerContainer();
+		if (container != null) {
+			container.start();
+		}
+	}
+
+	@Override
+	public void onMessage(Message message, Channel channel) {
+		String messageJson = new String(message.getBody());
+		long tag = message.getMessageProperties().getDeliveryTag();
+		String queueName = message.getMessageProperties().getConsumerQueue();
+		processMessage(messageJson, channel, tag, queueName);
+	}
+
+	/**
+	 * 接收消息处理
+	 *
+	 * @param messageBytes
+	 * @param channel
+	 * @param headers
+	 */
+	@RabbitHandler
+	public void receiveMessage(byte[] messageBytes, Channel channel, @Headers Map<String, Object> headers) {
+		receiveMessage(new String(messageBytes), channel, headers);
+	}
+
+	/**
+	 * 接收消息处理
+	 *
+	 * @param messageJson
+	 * @param channel
+	 * @param headers
+	 */
+	@RabbitHandler
+	public void receiveMessage(String messageJson, Channel channel, @Headers Map<String, Object> headers) {
+		long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
+		String queueName = (String) headers.get(AmqpHeaders.CONSUMER_QUEUE);
+		processMessage(messageJson, channel, tag, queueName);
+	}
+
+	/**
+	 * 处理消息
+	 *
+	 * @param messageJson 消息Json内容
+	 * @param channel     管道
+	 * @param tag         Delivery Tag
+	 * @param queueName   队列名
+	 */
+	private void processMessage(String messageJson, Channel channel, long tag, String queueName) {
+		MessageInfo messageInfo = getMessageInfo(messageJson);
+		//当消息不符合格式MessageInfo格式时,丢弃消息
+		if (messageInfo == null) {
+			LOGGER.warn("接收消息处理.消息格式.异常,messageJson:{},tag:{}", messageJson, tag);
+			basicNack(channel, tag);
+			return;
+		}
+		try {
+			LOGGER.info("接收消息处理[BaseRabbitReceiveService.receiveMessage].处理开始,messageJson:{},tag:{}", messageJson, tag);
+			processMessage(messageInfo);
+			LOGGER.info("接收消息处理[BaseRabbitReceiveService.receiveMessage].处理结束,messageJson:{},tag:{}", messageJson, tag);
+		} catch (Exception e) {
+			LOGGER.error("接收消息处理[BaseRabbitReceiveService.receiveMessage].异常,messageJson:{},tag:{}", messageJson, tag, e);
+			//出现异常时,消息转发为延时消息
+			sendDelayMessage(messageInfo, channel, queueName);
+		} finally {
+			basicAck(channel, tag);
+		}
+	}
+
+	/**
+	 * 获取消息内容实体
+	 *
+	 * @param messageJson
+	 * @return
+	 */
+	private MessageInfo getMessageInfo(String messageJson) {
+		MessageInfo messageInfo = null;
+		try {
+			messageInfo = JSON.parseObject(messageJson, MessageInfo.class);
+		} catch (Exception e) {
+			LOGGER.error("获取消息内容实体[BaseRabbitReceiveService.getMessageInfo].异常,messageJson:{}", messageJson, e);
+
+		}
+		return messageInfo;
+	}
+
+	/**
+	 * 发送延时消息
+	 *
+	 * @param messageInfo
+	 * @param channel
+	 * @param queueName
+	 */
+	private void sendDelayMessage(MessageInfo messageInfo, Channel channel, String queueName) {
+		//重试次数+1
+		messageInfo.setRetryCount(messageInfo.getRetryCount() + 1);
+		String messageJson = JSON.toJSONString(messageInfo);
+		try {
+			String dQueueName = getDelayQueueName(channel, queueName);
+			AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
+			//设置延时时间
+			builder.expiration(delayTime);
+			channel.basicPublish("", dQueueName, builder.build(), messageJson.getBytes());
+			LOGGER.info("发送延时消息[BaseRabbitReceiveService.sendDelayMessage].正常,messageJson:{},queueName:{}", messageJson, queueName);
+		} catch (IOException e) {
+			LOGGER.error("发送延时消息[BaseRabbitReceiveService.sendDelayMessage].异常,messageJson:{},queueName:{}", messageJson, queueName, e);
+		}
+	}
+
+	/**
+	 * 获取延时队列名
+	 *
+	 * @param channel
+	 * @param queueName
+	 * @return
+	 * @throws IOException
+	 */
+	private String getDelayQueueName(Channel channel, String queueName) throws IOException {
+		if (StringUtils.isNotBlank(delayQueueName)) {
+			return delayQueueName;
+		}
+		//初始化延时队列
+		String dQueueName = queueName + DELAY_QUEUE_NAME_SUFFIX;
+		Map<String, Object> arguments = new HashMap<>(2);
+		arguments.put("x-dead-letter-exchange", "");
+		arguments.put("x-dead-letter-routing-key", queueName);
+		channel.queueDeclare(dQueueName, true, false, false, arguments);
+		delayQueueName = dQueueName;
+		return dQueueName;
+	}
+
+	/**
+	 * 消息应答No
+	 *
+	 * @param channel
+	 * @param tag
+	 */
+	private void basicNack(Channel channel, long tag) {
+		try {
+			channel.basicNack(tag, false, false);
+			LOGGER.info("接收消息处理.消息应答No[BaseRabbitReceiveService.receiveMessage.basicNack].正常,tag:{}", tag);
+		} catch (IOException e) {
+			LOGGER.error("接收消息处理.消息应答No[BaseRabbitReceiveService.receiveMessage.basicNack].异常,tag:{}", tag, e);
+		}
+	}
+
+	/**
+	 * 消息应答Yes
+	 *
+	 * @param channel
+	 * @param tag
+	 */
+	private void basicAck(Channel channel, long tag) {
+		try {
+			channel.basicAck(tag, false);
+			LOGGER.info("接收消息处理.消息应答Yes[BaseRabbitReceiveService.receiveMessage.basicAck].正常,tag:{}", tag);
+		} catch (IOException e) {
+			LOGGER.error("接收消息处理.消息应答Yes[BaseRabbitReceiveService.receiveMessage.basicAck].异常,tag:{}", tag, e);
+		}
+	}
+
+	/**
+	 * 处理消息
+	 *
+	 * @param messageInfo 消息体
+	 * @throws Exception 处理失败时,抛出异常
+	 */
+	public abstract void processMessage(MessageInfo messageInfo) throws Exception;
+}

+ 2 - 2
src/main/java/com/uas/platform/b2b/support/mq/RabbitSendService.java

@@ -31,8 +31,8 @@ public class RabbitSendService implements RabbitTemplate.ConfirmCallback, Rabbit
 //		如果消息到达exchange,则confirm回调,ack=true
 //		exchange到queue成功,则不回调return
 //		exchange到queue失败,则回调return(否则不回回调,消息就丢了)
-		rabbitTemplate.setConfirmCallback(this);
-		rabbitTemplate.setReturnCallback(this);
+//		rabbitTemplate.setConfirmCallback(this);
+//		rabbitTemplate.setReturnCallback(this);
 	}
 
 	/**

+ 23 - 0
src/main/java/com/uas/platform/b2b/support/mq/config/RabbitMqConfig.java

@@ -0,0 +1,23 @@
+package com.uas.platform.b2b.support.mq.config;
+
+import org.springframework.amqp.core.Queue;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * RabbitMq配置
+ * @author suntg
+ * @date 2019-1-9 17:33:21
+ */
+@Configuration
+public class RabbitMqConfig {
+
+    public static final String QUEUE_B2B_PURCHASE_APCHECK_NAME = "b2b_purchase_apcheck";
+
+
+    @Bean(name = QUEUE_B2B_PURCHASE_APCHECK_NAME)
+    public Queue getSassPurchaseApcheck() {
+        return new Queue(QUEUE_B2B_PURCHASE_APCHECK_NAME, true);
+    }
+
+}

+ 10 - 0
src/main/java/com/uas/platform/b2b/support/mq/service/RabbitApCheckService.java

@@ -0,0 +1,10 @@
+package com.uas.platform.b2b.support.mq.service;
+
+/**
+ * 对账单消息队列方法
+ *
+ * @author hejq
+ * @date 2019-01-15 16:31
+ */
+public interface RabbitApCheckService {
+}

+ 30 - 0
src/main/java/com/uas/platform/b2b/support/mq/service/impl/RabbitApCheckServiceImpl.java

@@ -0,0 +1,30 @@
+package com.uas.platform.b2b.support.mq.service.impl;
+
+import com.uas.platform.b2b.support.mq.BaseRabbitReceiveService;
+import com.uas.platform.b2b.support.mq.MessageInfo;
+import com.uas.platform.b2b.support.mq.config.RabbitMqConfig;
+import com.uas.platform.b2b.support.mq.service.RabbitApCheckService;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author hejq
+ * @date 2019-01-15 16:31
+ */
+@Service
+@RabbitListener(queues = RabbitMqConfig.QUEUE_B2B_PURCHASE_APCHECK_NAME)
+public class RabbitApCheckServiceImpl extends BaseRabbitReceiveService implements RabbitApCheckService {
+
+    /**
+     * 处理消息
+     *
+     * @param messageInfo 消息体
+     * @throws Exception 处理失败时,抛出异常
+     */
+    @Override
+    public void processMessage(MessageInfo messageInfo) throws Exception {
+        System.out.println("bizType: " + messageInfo.getBizType());
+        System.out.println("bizId: " + messageInfo.getBizId());
+        System.out.println("正常消费完成!");
+    }
+}

+ 10 - 1
src/main/resources/dev/sys.properties

@@ -23,4 +23,13 @@ messageServiceIp=http://message.ubtob.com/
 searchUrl=http://188.131.128.107:24005
 
 #dfs file url
-dfsFileUrl=http://10.10.100.200:9999
+dfsFileUrl=http://10.10.100.200:9999
+
+#rabbmitmq 配置
+rabbitmq.virtual-host=b2b
+rabbitmq.host=rabbitmq.uuzcc.cn
+rabbitmq.port=5672
+rabbitmq.username=b2b_platform
+rabbitmq.password=platform8888
+rabbitmq.publisher-confirms=true
+rabbitmq.publisher-returns=true

+ 10 - 6
src/main/resources/spring/context.xml

@@ -1,15 +1,16 @@
 <beans xmlns="http://www.springframework.org/schema/beans"
-	   xmlns:context="http://www.springframework.org/schema/context"
-	   xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"
-	   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
-	   xmlns:cache="http://www.springframework.org/schema/cache" xmlns:util="http://www.springframework.org/schema/util"
-	   xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
+       xmlns:context="http://www.springframework.org/schema/context"
+       xmlns:aop="http://www.springframework.org/schema/aop" xmlns:task="http://www.springframework.org/schema/task"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:tx="http://www.springframework.org/schema/tx"
+       xmlns:cache="http://www.springframework.org/schema/cache" xmlns:util="http://www.springframework.org/schema/util"
+       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
 	http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.1.xsd
 	http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.1.xsd
 	http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.1.xsd
 	http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.1.xsd
 	http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache-3.1.xsd
-	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd">
+	http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
 
 	<context:property-placeholder location="classpath*:${profile}/*.properties" />
 
@@ -223,4 +224,7 @@
 		  init-method="init">
 		<property name="configPath" value="classpath:${profile}/account.properties" />
 	</bean>
+
+    <!--rabbit配置-->
+    <import resource="rabbitmq-producer.xml"/>
 </beans>

+ 56 - 0
src/main/resources/spring/rabbitmq-producer.xml

@@ -0,0 +1,56 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<beans xmlns="http://www.springframework.org/schema/beans"
+       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
+       xsi:schemaLocation="http://www.springframework.org/schema/beans
+       http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
+       http://www.springframework.org/schema/rabbit
+       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
+
+    <!--配置connection-factory,指定连接rabbit server参数 -->
+    <bean id="rabbitConnectionFactory"
+          class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
+        <constructor-arg value="${rabbitmq.host}"/>
+        <property name="username" value="${rabbitmq.username}"/>
+        <property name="password" value="${rabbitmq.password}"/>
+        <!--<property name="channelCacheSize" value="${rabbit.channelCacheSize}"/>-->
+        <property name="port" value="${rabbitmq.port}"/>
+        <property name="virtualHost" value="${rabbitmq.virtual-host}"/>
+    </bean>
+    <rabbit:admin connection-factory="rabbitConnectionFactory"/>
+
+    <!-- autoDelete:是否自动删除 durable:持久化  -->
+    <rabbit:queue name="test_liam_queue" durable="true"/>
+    <!--<rabbit:queue name="${rabbit.queue}" durable="true"/>-->
+    <!-- topic主题 -->
+    <rabbit:topic-exchange name="test.liam.topic.exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true">
+        <bindings>
+            <binding queue="test_liam_queue" pattern="*.liam.queue" />
+        </bindings>
+    </rabbit:topic-exchange>
+    <!-- 绑定交换机和队列 -->
+   <!-- <rabbit:direct-exchange name="${rabbit.exchange}" xmlns="http://www.springframework.org/schema/rabbit" durable="true" >
+        <rabbit:bindings>
+            <rabbit:binding queue="${rabbit.queue}" key="${rabbit.routingKey}" />
+        </rabbit:bindings>
+    </rabbit:direct-exchange>-->
+
+    <!-- 创建rabbitTemplate 消息模板类 -->
+    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
+        <constructor-arg ref="rabbitConnectionFactory"/>
+        <!--消息确认回调 -->
+        <property name="confirmCallback" ref="producer"/>
+    </bean>
+
+    <bean name="producer" class="com.uas.platform.b2b.support.mq.RabbitSendService"/>
+
+    <bean id="messageContainer" class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
+        <!--<property name="queues" ref="${rabbit.queue}"/>-->
+        <property name="exposeListenerChannel" value="true"/>
+        <property name="maxConcurrentConsumers" value="10"/>
+        <property name="concurrentConsumers" value="2"/>
+        <property name="connectionFactory" ref="rabbitConnectionFactory"/>
+        <property name="acknowledgeMode" value="MANUAL"/>
+    </bean>
+
+</beans>

+ 24 - 0
src/test/java/com/uas/platform/b2b/mq/RabbitServiceTest.java

@@ -0,0 +1,24 @@
+package com.uas.platform.b2b.mq;
+
+import com.uas.platform.b2b.BaseJunitTest;
+import com.uas.platform.b2b.support.mq.RabbitSendService;
+import org.junit.Test;
+
+import javax.annotation.Resource;
+
+/**
+ * RabbitSendService 测试
+ *
+ * @author hejq
+ * @date 2019-01-15 16:26
+ */
+public class RabbitServiceTest extends BaseJunitTest {
+
+    @Resource
+    private RabbitSendService rabbitSendService;
+
+    @Test
+    public void testSendApCheck() {
+        rabbitSendService.sendMessage("b2b_purchase_apcheck","platform_b2b", "B2B", "POST_SALE_APCHECK", "4643");
+    }
+}