Browse Source

引入rabbitmq

liuam 7 years ago
parent
commit
b2860fa3a8

+ 11 - 0
pom.xml

@@ -112,6 +112,11 @@
             <version>3.2.0</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>com.alibaba</groupId>
             <artifactId>fastjson</artifactId>
@@ -122,6 +127,12 @@
             <version>RELEASE</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.3.2</version>
+        </dependency>
+
 
     </dependencies>
 

+ 46 - 0
src/main/java/com/uas/ps/inquiry/rabbit/RabbitConfig.java

@@ -0,0 +1,46 @@
+package com.uas.ps.inquiry.rabbit;
+
+import org.springframework.amqp.core.AmqpAdmin;
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.DirectExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.annotation.PostConstruct;
+
+/**
+ * @author liuam
+ * @since 2018/9/11 0011 下午 16:30
+ */
+@Configuration
+public class RabbitConfig {
+
+    private final String AMQ_DIRECT = "amq.direct";
+    private final String INQUIRY_MESSAGE = "inquiry_message";
+    private final String INQUIRY_MESSAGE_ROUTINGKEY = "message";
+    private final String INQUIRY_PRODUCT_ROUTINGKEY = "product";
+    private final String INQUIRY_PRODUCT = "inquiry_product";
+
+    @Autowired
+    private AmqpAdmin amqpAdmin;
+
+    @PostConstruct
+    public void init() {
+
+        amqpAdmin.declareExchange(new DirectExchange(AMQ_DIRECT, true, false, null));
+        amqpAdmin.declareQueue(new Queue(INQUIRY_MESSAGE));
+        amqpAdmin.declareQueue(new Queue(INQUIRY_PRODUCT));
+
+        amqpAdmin.declareBinding(new Binding(INQUIRY_MESSAGE, Binding.DestinationType.QUEUE, AMQ_DIRECT, INQUIRY_MESSAGE_ROUTINGKEY, null));
+        amqpAdmin.declareBinding(new Binding(INQUIRY_PRODUCT, Binding.DestinationType.QUEUE, AMQ_DIRECT, INQUIRY_PRODUCT_ROUTINGKEY, null));
+    }
+
+    @Bean
+    public Jackson2JsonMessageConverter messageConverter() {
+        return new Jackson2JsonMessageConverter();
+    }
+
+}

+ 79 - 0
src/main/java/com/uas/ps/inquiry/rabbit/RabbitSendService.java

@@ -0,0 +1,79 @@
+package com.uas.ps.inquiry.rabbit;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.support.CorrelationData;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.UUID;
+
+/**
+ * RabbitMQ 发送消息
+ *
+ * @author: wangcanyi
+ * @date: 2018-08-30 15:11
+ **/
+@Service
+public class RabbitSendService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
+	private static final Logger LOGGER = LoggerFactory.getLogger(RabbitSendService.class);
+	@Autowired
+	private RabbitTemplate rabbitTemplate;
+
+	@PostConstruct
+	public void init() {
+//		如果消息没有到exchange,则confirm回调,ack=false
+//		如果消息到达exchange,则confirm回调,ack=true
+//		exchange到queue成功,则不回调return
+//		exchange到queue失败,则回调return(否则不回回调,消息就丢了)
+		rabbitTemplate.setConfirmCallback(this);
+		rabbitTemplate.setReturnCallback(this);
+	}
+
+
+	/**
+	 * 发送消息
+	 *
+	 * @param queueName   队列名
+	 * @param object 消息体
+	 * @return 消息ID
+	 */
+	public String sendMessage(String queueName, Object object) {
+		if (StringUtils.isBlank(queueName)) {
+			throw new IllegalArgumentException("queueName不能为空");
+		}
+		if (object == null) {
+			throw new IllegalArgumentException("messageInfo不能为空");
+		}
+		//设置消息ID
+		messageInfo.setMsgId(UUID.randomUUID().toString());
+		//设置时间戳
+		messageInfo.setTimestamp(System.currentTimeMillis());
+		CorrelationData correlationData = new CorrelationData(messageInfo.getMsgId());
+		String messageJson = JSON.toJSONString(messageInfo);
+		rabbitTemplate.convertAndSend(queueName, (Object) messageJson, correlationData);
+		LOGGER.info("发送消息[RabbitSendService.sendMessage].正常,queueName:{},messageInfo:{},correlationData:{}",
+				queueName, messageJson, correlationData.getId());
+		return messageInfo.getMsgId();
+	}
+
+	@Override
+	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+		if (ack) {
+			LOGGER.info("发送消息确认[RabbitSendService.confirm].正常,correlationData:{}", correlationData.getId());
+		} else {
+			LOGGER.error("发送消息确认[RabbitSendService.confirm].异常,correlationData:{},cause:{}", correlationData.getId(), cause);
+		}
+	}
+
+	@Override
+	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
+		LOGGER.error("发送消息确认到达队列[RabbitSendService.returnedMessage].异常,message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",
+				message, replyCode, replyText, exchange, routingKey);
+	}
+}

+ 7 - 0
src/main/resources/config/application-dev.properties

@@ -11,6 +11,13 @@ spring.datasource.hikari.pool-name=DatebookHikariCP
 spring.datasource.hikari.max-lifetime=1800000
 spring.datasource.hikari.connection-timeout=600000
 spring.datasource.hikari.connection-test-query=SELECT 1
+
+#================ rabbit ====================#
+spring.rabbitmq.host=134.175.196.24
+spring.rabbitmq.port=5672
+spring.rabbitmq.username=finance
+spring.rabbitmq.password=finance8888
+spring.rabbitmq.virtual-host=finance
 # Access path
 #ps.product.url=http://192.168.253.12:24000/
 #dong localhost

+ 7 - 0
src/main/resources/config/application-test.properties

@@ -12,6 +12,13 @@ spring.datasource.hikari.max-lifetime=1800000
 spring.datasource.hikari.connection-timeout=600000
 spring.datasource.hikari.connection-test-query=SELECT 1
 
+#================ rabbit ====================#
+spring.rabbitmq.host=134.175.196.24
+spring.rabbitmq.port=5672
+spring.rabbitmq.username=finance
+spring.rabbitmq.password=finance8888
+spring.rabbitmq.virtual-host=ps-inquiry
+
 # Access path
 ps.product.url=http://218.17.158.219:24000/
 ps.message.url=http://218.17.158.219:24000/message/