|
@@ -5,6 +5,9 @@ import com.usoftchina.saas.transfers.config.RabbitConfig;
|
|
|
import com.usoftchina.saas.transfers.mapper.BrokerMessageLogMapper;
|
|
import com.usoftchina.saas.transfers.mapper.BrokerMessageLogMapper;
|
|
|
import com.usoftchina.saas.transfers.po.BrokerMessagelog;
|
|
import com.usoftchina.saas.transfers.po.BrokerMessagelog;
|
|
|
import com.usoftchina.saas.transfers.po.MessageInfo;
|
|
import com.usoftchina.saas.transfers.po.MessageInfo;
|
|
|
|
|
+import com.usoftchina.saas.transfers.po.Status;
|
|
|
|
|
+import com.usoftchina.saas.utils.JsonUtils;
|
|
|
|
|
+import com.usoftchina.saas.utils.StringUtils;
|
|
|
import org.slf4j.Logger;
|
|
import org.slf4j.Logger;
|
|
|
import org.slf4j.LoggerFactory;
|
|
import org.slf4j.LoggerFactory;
|
|
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
@@ -13,6 +16,8 @@ import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
|
import org.springframework.boot.context.properties.EnableConfigurationProperties;
|
|
|
import org.springframework.stereotype.Service;
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
|
|
|
|
+import java.util.Date;
|
|
|
|
|
+
|
|
|
/**
|
|
/**
|
|
|
* @author: guq
|
|
* @author: guq
|
|
|
* @create: 2019-01-06 22:04
|
|
* @create: 2019-01-06 22:04
|
|
@@ -35,58 +40,78 @@ public class SendService {
|
|
|
|
|
|
|
|
//发送消息方法调用: 构建自定义对象消息
|
|
//发送消息方法调用: 构建自定义对象消息
|
|
|
public void sendMessage(MessageInfo info) {
|
|
public void sendMessage(MessageInfo info) {
|
|
|
|
|
+ String messageId = info.getMsgId();
|
|
|
|
|
+ //消息入库
|
|
|
|
|
+ BrokerMessagelog messagelog = new BrokerMessagelog();
|
|
|
|
|
+ messagelog.setCreateTime(new Date());
|
|
|
|
|
+ messagelog.setMessage(JsonUtils.toJsonString(info));
|
|
|
|
|
+ messagelog.setMsgId(messageId);
|
|
|
|
|
+ messagelog.setStatus(Status.SEND.getDisplay());
|
|
|
|
|
+ messagelog.setRetry(0);
|
|
|
|
|
+ brokerMessageLogMapper.insertSelective(messagelog);
|
|
|
|
|
+
|
|
|
// 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
|
|
// 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
|
|
|
rabbitTemplate.setConfirmCallback(confirmCallback);
|
|
rabbitTemplate.setConfirmCallback(confirmCallback);
|
|
|
//消息唯一ID
|
|
//消息唯一ID
|
|
|
- CorrelationData correlationData = new CorrelationData(info.getMsgId());
|
|
|
|
|
|
|
+ CorrelationData correlationData = new CorrelationData(messageId);
|
|
|
rabbitTemplate.convertAndSend(rabbitConfig.getDlexchange(), rabbitConfig.getRoutingkey(), info, correlationData);
|
|
rabbitTemplate.convertAndSend(rabbitConfig.getDlexchange(), rabbitConfig.getRoutingkey(), info, correlationData);
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- //发送消息方法调用: 构建自定义对象消息
|
|
|
|
|
- public void sendDelayMessage(MessageInfo info) {
|
|
|
|
|
- // 通过实现 ConfirmCallback 接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中
|
|
|
|
|
- rabbitTemplate.setConfirmCallback(delayConfirmCallback);
|
|
|
|
|
- //消息唯一ID
|
|
|
|
|
- CorrelationData correlationData = new CorrelationData(info.getMsgId());
|
|
|
|
|
- rabbitTemplate.convertAndSend(rabbitConfig.getExchange(), rabbitConfig.getRoutingkey(), info,
|
|
|
|
|
- new ExpirationMessagePostProcessor(rabbitConfig.getExpired()), correlationData);
|
|
|
|
|
|
|
+ logger.info("msgId={}, 消息发送", messageId);
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
//回调函数: confirm确认
|
|
//回调函数: confirm确认
|
|
|
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
|
|
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
|
|
|
@Override
|
|
@Override
|
|
|
public void confirm(CorrelationData correlationData, boolean confirm, String cause) {
|
|
public void confirm(CorrelationData correlationData, boolean confirm, String cause) {
|
|
|
- System.out.println("correlationData: " + correlationData);
|
|
|
|
|
String messageId = correlationData.getId();
|
|
String messageId = correlationData.getId();
|
|
|
if (confirm) {
|
|
if (confirm) {
|
|
|
- logger.info("msgId={},已发送服务器", correlationData.getId());
|
|
|
|
|
- //如果confirm返回成功 则进行更新
|
|
|
|
|
- // brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
|
|
|
|
|
|
|
+ logger.info("msgId={},已发送服务器", messageId);
|
|
|
} else {
|
|
} else {
|
|
|
- //失败则进行具体的后续操作:重试 或者补偿等手段
|
|
|
|
|
- BrokerMessagelog message = brokerMessageLogMapper.getMessage(correlationData.getId());
|
|
|
|
|
- // sendDelayMessage((MessageInfo) message.getMessage());
|
|
|
|
|
|
|
+ //失败则进行具体的后续操作:进入延迟队列
|
|
|
|
|
+ BrokerMessagelog message = brokerMessageLogMapper.getLiveMessage(messageId);
|
|
|
|
|
+ String info = message.getMessage();
|
|
|
|
|
+ if (!StringUtils.isEmpty(info)) {
|
|
|
|
|
+ MessageInfo messageInfo = JsonUtils.fromJsonString(info, MessageInfo.class);
|
|
|
|
|
+ sendDelayMessage(messageInfo, cause);
|
|
|
|
|
+ }
|
|
|
logger.error("msgId={},发送失败:{}", correlationData.getId(), cause);
|
|
logger.error("msgId={},发送失败:{}", correlationData.getId(), cause);
|
|
|
-
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
- //回调函数: confirm确认
|
|
|
|
|
|
|
+ //发送消息方法调用: 构建自定义对象消息
|
|
|
|
|
+ public void sendDelayMessage(MessageInfo info, String error) {
|
|
|
|
|
+ if (StringUtils.isEmpty(info) || StringUtils.isEmpty(info.getMsgId())) {
|
|
|
|
|
+ logger.error("消息为空");
|
|
|
|
|
+ return;
|
|
|
|
|
+ }
|
|
|
|
|
+ String messageId = info.getMsgId();
|
|
|
|
|
+ //更新状态且失败次数加1
|
|
|
|
|
+ BrokerMessagelog messagelog = new BrokerMessagelog();
|
|
|
|
|
+ messagelog.setMsgId(messageId);
|
|
|
|
|
+ messagelog.setStatus(Status.RETRY.getDisplay());
|
|
|
|
|
+ messagelog.setReason(error);
|
|
|
|
|
+ brokerMessageLogMapper.updateMessageLogandAddRetry(messagelog);
|
|
|
|
|
+
|
|
|
|
|
+ //发送至延迟队列
|
|
|
|
|
+ rabbitTemplate.setConfirmCallback(confirmCallback);
|
|
|
|
|
+ //消息唯一ID
|
|
|
|
|
+ CorrelationData correlationData = new CorrelationData(info.getMsgId());
|
|
|
|
|
+ rabbitTemplate.convertAndSend(rabbitConfig.getExchange(), rabbitConfig.getRoutingkey(), info,
|
|
|
|
|
+ new ExpirationMessagePostProcessor(rabbitConfig.getExpired()), correlationData);
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ /*//回调函数: confirm确认
|
|
|
final RabbitTemplate.ConfirmCallback delayConfirmCallback = new RabbitTemplate.ConfirmCallback() {
|
|
final RabbitTemplate.ConfirmCallback delayConfirmCallback = new RabbitTemplate.ConfirmCallback() {
|
|
|
@Override
|
|
@Override
|
|
|
public void confirm(CorrelationData correlationData, boolean confirm, String cause) {
|
|
public void confirm(CorrelationData correlationData, boolean confirm, String cause) {
|
|
|
- System.err.println("correlationData: " + correlationData);
|
|
|
|
|
String messageId = correlationData.getId();
|
|
String messageId = correlationData.getId();
|
|
|
if (confirm) {
|
|
if (confirm) {
|
|
|
- System.out.println("发射成功");
|
|
|
|
|
- //如果confirm返回成功 则进行更新
|
|
|
|
|
- // brokerMessageLogMapper.changeBrokerMessageLogStatus(messageId, Constants.ORDER_SEND_SUCCESS, new Date());
|
|
|
|
|
|
|
+ logger.info("msgId={},已发送延迟队列", messageId);
|
|
|
} else {
|
|
} else {
|
|
|
//失败则进行具体的后续操作:重试 或者补偿等手段
|
|
//失败则进行具体的后续操作:重试 或者补偿等手段
|
|
|
- System.err.println("发送失败,原因:" + cause);
|
|
|
|
|
|
|
+ logger.info("msgId={},发送延迟队列失败:{}", messageId, cause);
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
- };
|
|
|
|
|
|
|
+ };*/
|
|
|
|
|
|
|
|
}
|
|
}
|