Browse Source

feat: 增加rabbitMq 依赖

suntg 7 years ago
parent
commit
4af77f1a17

+ 7 - 0
pom.xml

@@ -360,6 +360,13 @@
             <artifactId>lombok</artifactId>
             <version>1.16.20</version>
         </dependency>
+
+        <!-- spring-rabbit -->
+        <dependency>
+            <groupId>org.springframework.amqp</groupId>
+            <artifactId>spring-rabbit</artifactId>
+            <version>1.7.7.RELEASE</version>
+        </dependency>
     </dependencies>
     <build>
         <finalName>platform-b2b</finalName>

+ 152 - 0
src/main/java/com/uas/platform/b2b/support/mq/MessageInfo.java

@@ -0,0 +1,152 @@
+package com.uas.platform.b2b.support.mq;
+
+/**
+ * 消息内容
+ *
+ * @author: wangcanyi
+ * @date: 2018-08-24 15:08
+ **/
+public class MessageInfo {
+	/**
+	 * 消息ID
+	 */
+	private String msgId;
+	/**
+	 * 用户ID
+	 */
+	private String userId;
+	/**
+	 * 应用号
+	 */
+	private String appId;
+	/**
+	 * 业务类型
+	 */
+	private String bizType;
+	/**
+	 * 业务号
+	 */
+	private String bizId;
+	/**
+	 * 时间戳
+	 */
+	private long timestamp;
+	/**
+	 * 重试次数
+	 */
+	private int retryCount;
+
+	/**
+	 * 消息ID
+	 *
+	 * @return
+	 */
+	public String getMsgId() {
+		return msgId;
+	}
+
+	/**
+	 * @param msgId 消息ID
+	 */
+	public void setMsgId(String msgId) {
+		this.msgId = msgId;
+	}
+
+	/**
+	 * 用户ID
+	 *
+	 * @return
+	 */
+	public String getUserId() {
+		return userId;
+	}
+
+	/**
+	 * @param userId 用户ID
+	 */
+	public void setUserId(String userId) {
+		this.userId = userId;
+	}
+
+	/**
+	 * 应用号
+	 *
+	 * @return
+	 */
+	public String getAppId() {
+		return appId;
+	}
+
+	/**
+	 * @param appId 应用号
+	 */
+	public void setAppId(String appId) {
+		this.appId = appId;
+	}
+
+	/**
+	 * 业务类型
+	 *
+	 * @return
+	 */
+	public String getBizType() {
+		return bizType;
+	}
+
+	/**
+	 * @param bizType 业务类型
+	 */
+	public void setBizType(String bizType) {
+		this.bizType = bizType;
+	}
+
+	/**
+	 * 业务号
+	 *
+	 * @return
+	 */
+	public String getBizId() {
+		return bizId;
+	}
+
+	/**
+	 * @param bizId 业务号
+	 */
+	public void setBizId(String bizId) {
+		this.bizId = bizId;
+	}
+
+	/**
+	 * 时间戳
+	 *
+	 * @return
+	 */
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	/**
+	 * @param timestamp 时间戳
+	 */
+	public void setTimestamp(long timestamp) {
+		this.timestamp = timestamp;
+	}
+
+	/**
+	 * 重试次数
+	 *
+	 * @return
+	 */
+	public int getRetryCount() {
+		return retryCount;
+	}
+
+	/**
+	 * 重试次数
+	 *
+	 * @param retryCount
+	 */
+	public void setRetryCount(int retryCount) {
+		this.retryCount = retryCount;
+	}
+}

+ 0 - 107
src/main/java/com/uas/platform/b2b/support/mq/PushUtils.java

@@ -1,107 +0,0 @@
-package com.uas.platform.b2b.support.mq;
-
-import com.uas.platform.b2b.core.util.ContextUtils;
-import com.uas.platform.b2b.dao.UserBaseInfoDao;
-import com.uas.platform.b2b.manage.model.AccountInfo;
-import com.uas.platform.b2b.model.AccessToken;
-import com.uas.platform.b2b.model.UserBaseInfo;
-import com.uas.platform.b2b.service.AccessTokenService;
-import com.uas.platform.b2b.support.SysConf;
-import com.uas.platform.core.util.HttpUtil;
-import com.uas.platform.core.util.HttpUtil.Response;
-import com.uas.platform.core.util.serializer.FlexJsonUtils;
-import org.springframework.http.HttpStatus;
-
-import java.io.*;
-import java.net.*;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class PushUtils {
-
-	private static UserBaseInfoDao userRepository;
-	private static AccessTokenService accessTokenService;
-	private static SysConf sysConf;
-
-	private static UserBaseInfoDao getUserRepository() {
-		if (userRepository == null) {
-            userRepository = ContextUtils.getBean(UserBaseInfoDao.class);
-        }
-		return userRepository;
-	}
-
-	private static AccessTokenService getAccessTokenService() {
-		if (accessTokenService == null) {
-            accessTokenService = ContextUtils.getBean(AccessTokenService.class);
-        }
-		return accessTokenService;
-	}
-
-	private static SysConf getSysConf() {
-		if (sysConf == null) {
-            sysConf = ContextUtils.getBean(SysConf.class);
-        }
-		return sysConf;
-	}
-
-	/**
-	 * 调用接口注册IM
-	 * 
-	 * @param user
-	 * @param enUU
-	 * @return
-	 * @throws Exception
-	 */
-	private static Map<String, Object> sendUserToManage(UserBaseInfo user, long enUU) throws Exception {
-		Map<String, String> params = new HashMap<String, String>();
-		params.put("data", FlexJsonUtils.toJson(new AccountInfo(user, enUU)));
-		Response response = HttpUtil.sendPostRequest(getSysConf().getManageInner() + "/public/account", params, true);
-		if (response.getStatusCode() == HttpStatus.OK.value()) {
-			List<Map<String, Object>> resultMap = FlexJsonUtils.fromJsonArray(response.getResponseText(), HashMap.class);
-			if (resultMap.size() > 0) {
-				if ("true".equals(String.valueOf(resultMap.get(0).get("ok")))) {
-					return resultMap.get(0);
-				}
-				throw new Exception(String.valueOf(resultMap.get(0).get("error")));
-			}
-		}
-		throw new Exception(response.getResponseText());
-	}
-
-	public static void push(long enUU, long userUU, String title, String content, String url, String pageTitle) {
-		UserBaseInfo user = getUserRepository().findOne(userUU);
-		if (user != null) {
-			// 没有IM账号的,需要先注册
-			if (user.getUserIMId() == null) {
-				try {
-					Map<String, Object> res = sendUserToManage(user, enUU);
-					Object imid = res.get("userImid");
-					if (imid != null) {
-                        user.setUserIMId(Long.parseLong(imid.toString()));
-                    }
-					user = getUserRepository().save(user);
-				} catch (Exception e) {
-				}
-			}
-			if (user.getUserIMId() != null && user.getUserIMId() > 0) {
-				Map<String, Object> params = new HashMap<String, Object>();
-				params.put("userid", user.getUserIMId());
-				params.put("enUU", enUU);
-				params.put("platform", "B2B");
-				params.put("title", title);
-				params.put("content", content);
-				AccessToken token = getAccessTokenService().createNew(user, enUU, null, 259200);// 有效期:三天
-				// ex: http://www.usoftchina.com/openapi/webpage?access_token=282a8ba2fa1f4fdc9e025c1eea2ad7e8&redirect_page=http%3A%2F%2Fwww.usoftchina.com%2F%23sale%2Forder%2F123
-				try {
-					params.put("url", String.format("%s/openapi/webpage?access_token=%s&redirect_page=%s", getSysConf().getB2b(),
-							token.getId(), URLEncoder.encode(getSysConf().getB2b() + "/" + url, "utf-8")));
-					// 调用推送接口
-					Pusher.push(params);
-				} catch (UnsupportedEncodingException e) {
-				}
-			}
-		}
-	}
-
-}

+ 0 - 29
src/main/java/com/uas/platform/b2b/support/mq/Pusher.java

@@ -1,29 +0,0 @@
-package com.uas.platform.b2b.support.mq;
-
-import java.util.Map;
-
-import com.uas.platform.b2b.core.util.ContextUtils;
-import com.uas.platform.b2b.support.SysConf;
-import com.uas.platform.core.util.HttpUtil;
-
-public class Pusher {
-
-	private static SysConf sysConf;
-
-	private static SysConf getSysConf() {
-		if (sysConf == null) {
-            sysConf = ContextUtils.getBean(SysConf.class);
-        }
-		return sysConf;
-	}
-
-	private final static String api = getSysConf().getIm() + "/tigase/baiduPush";
-
-	public static void push(Map<String, Object> params) {
-		try {
-			HttpUtil.sendPostRequest(api, params);
-		} catch (Exception e) {
-		}
-	}
-
-}

+ 96 - 0
src/main/java/com/uas/platform/b2b/support/mq/RabbitSendService.java

@@ -0,0 +1,96 @@
+package com.uas.platform.b2b.support.mq;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.rabbit.support.CorrelationData;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.UUID;
+
+/**
+ * RabbitMQ 发送消息
+ *
+ * @author: wangcanyi
+ * @date: 2018-08-30 15:11
+ **/
+@Service
+public class RabbitSendService implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
+	private static final Logger LOGGER = LoggerFactory.getLogger(RabbitSendService.class);
+	@Autowired
+	private RabbitTemplate rabbitTemplate;
+
+	@PostConstruct
+	public void init() {
+//		如果消息没有到exchange,则confirm回调,ack=false
+//		如果消息到达exchange,则confirm回调,ack=true
+//		exchange到queue成功,则不回调return
+//		exchange到queue失败,则回调return(否则不回回调,消息就丢了)
+		rabbitTemplate.setConfirmCallback(this);
+		rabbitTemplate.setReturnCallback(this);
+	}
+
+	/**
+	 * @param queueName 队列名
+	 * @param userId    用户ID
+	 * @param appId     应用号
+	 * @param bizType   业务类型
+	 * @param bizId     业务ID
+	 * @return 消息ID
+	 */
+	public String sendMessage(String queueName, String userId, String appId, String bizType, String bizId) {
+		MessageInfo messageInfo = new MessageInfo();
+		messageInfo.setUserId(userId);
+		messageInfo.setAppId(appId);
+		messageInfo.setBizType(bizType);
+		messageInfo.setBizId(bizId);
+		return sendMessage(queueName, messageInfo);
+	}
+
+
+	/**
+	 * 发送消息
+	 *
+	 * @param queueName   队列名
+	 * @param messageInfo 消息体
+	 * @return 消息ID
+	 */
+	public String sendMessage(String queueName, MessageInfo messageInfo) {
+		if (StringUtils.isBlank(queueName)) {
+			throw new IllegalArgumentException("queueName不能为空");
+		}
+		if (messageInfo == null) {
+			throw new IllegalArgumentException("messageInfo不能为空");
+		}
+		//设置消息ID
+		messageInfo.setMsgId(UUID.randomUUID().toString());
+		//设置时间戳
+		messageInfo.setTimestamp(System.currentTimeMillis());
+		CorrelationData correlationData = new CorrelationData(messageInfo.getMsgId());
+		String messageJson = JSON.toJSONString(messageInfo);
+		rabbitTemplate.convertAndSend(queueName, (Object) messageJson, correlationData);
+		LOGGER.info("发送消息[RabbitSendService.sendMessage].正常,queueName:{},messageInfo:{},correlationData:{}",
+				queueName, messageJson, correlationData.getId());
+		return messageInfo.getMsgId();
+	}
+
+	@Override
+	public void confirm(CorrelationData correlationData, boolean ack, String cause) {
+		if (ack) {
+			LOGGER.info("发送消息确认[RabbitSendService.confirm].正常,correlationData:{}", correlationData.getId());
+		} else {
+			LOGGER.error("发送消息确认[RabbitSendService.confirm].异常,correlationData:{},cause:{}", correlationData.getId(), cause);
+		}
+	}
+
+	@Override
+	public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
+		LOGGER.error("发送消息确认到达队列[RabbitSendService.returnedMessage].异常,message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",
+				message, replyCode, replyText, exchange, routingKey);
+	}
+}

+ 19 - 0
src/main/java/com/uas/platform/b2b/support/mq/RabbitTemplateConfiguration.java

@@ -0,0 +1,19 @@
+package com.uas.platform.b2b.support.mq;
+
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * RabbitTemplateConfiguration
+ * @author suntg
+ * @date 2019-1-15 16:11:42
+ */
+@Configuration
+public class RabbitTemplateConfiguration {
+
+    @Bean
+    public RabbitTemplate rabbitTemplate() {
+        return new RabbitTemplate();
+    }
+}