|
|
@@ -30,16 +30,28 @@ public class ReceiveService {
|
|
|
|
|
|
@Autowired
|
|
|
private SendService sendService;
|
|
|
+ @Autowired
|
|
|
+ private SendProductTask sendProductTask;
|
|
|
+ @Autowired
|
|
|
+ private SendPurchaseEndTask sendPurchaseEndTask;
|
|
|
+ @Autowired
|
|
|
+ private SendPurchaseOutResTask sendPurchaseOutResTask;
|
|
|
+ @Autowired
|
|
|
+ private SendPurchaseOutTask sendPurchaseOutTask;
|
|
|
+ @Autowired
|
|
|
+ private SendPurchaseInResTask sendPurchaseInResTask;
|
|
|
+ @Autowired
|
|
|
+ private SendPurchaseInTask sendPurchaseInTask;
|
|
|
+ @Autowired
|
|
|
+ private SendPurchaseTask sendPurchaseTask;
|
|
|
|
|
|
@Autowired
|
|
|
private BrokerMessageLogMapper brokerMessageLogMapper;
|
|
|
|
|
|
private Logger logger = LoggerFactory.getLogger(getClass());
|
|
|
|
|
|
- private final static String EXCHANGE = "saas_trade_dev_exchange";
|
|
|
private final static String DLEXCHANGE = "saas_trade_dev_dlexchange";
|
|
|
private final static String QUEUE = "saas_trade_dev_queue";
|
|
|
- private final static String DLQUEUE = "saas_trade_dev_dlqueue";
|
|
|
private final static String ROUTINGKEY = "saas_trade_dev_dl.*";
|
|
|
|
|
|
|
|
|
@@ -52,30 +64,39 @@ public class ReceiveService {
|
|
|
public void onMessage(@Payload MessageInfo info, @Headers Map<String,Object> headers, Channel channel)
|
|
|
throws IOException {
|
|
|
//消费者操作
|
|
|
- logger.info("---------收到消息,消息id={},开始消费---------", info.getMsgId());
|
|
|
+ logger.info("---------收到消息,消息id={},开始消费---------", info.getMsgId());
|
|
|
Executable task = null;
|
|
|
- try {
|
|
|
+ try {
|
|
|
+ BillCodeSeq.valueOf(info.getBizType());
|
|
|
+ }catch (Exception e){
|
|
|
+ logger.info("没有此类型消息,直接丢弃");
|
|
|
+ Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
|
|
|
+ boolean multiple = false;
|
|
|
+ channel.basicAck(deliveryTag,multiple);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try {
|
|
|
switch(BillCodeSeq.valueOf(info.getBizType())) {
|
|
|
case PURCHASE: //采购单上传
|
|
|
- task = new SendPurchaseTask();
|
|
|
+ task = sendPurchaseTask;
|
|
|
break;
|
|
|
case PURCHASEIN: //采购验收单过账
|
|
|
- task = new SendPurchaseInTask();
|
|
|
+ task = sendPurchaseInTask;
|
|
|
break;
|
|
|
case PURCHASEINRES: //采购验收单反过账
|
|
|
- task = new SendPurchaseInResTask();
|
|
|
+ task = sendPurchaseInResTask;
|
|
|
break;
|
|
|
case PURCHASEOUT: //采购延退单过账
|
|
|
- task = new SendPurchaseOutTask();
|
|
|
+ task = sendPurchaseOutTask;
|
|
|
break;
|
|
|
case PURCHASEOUTRES: //采购验收单反过账
|
|
|
- task = new SendPurchaseOutResTask();
|
|
|
+ task = sendPurchaseOutResTask;
|
|
|
break;
|
|
|
case PRODUCT: //物料库上传
|
|
|
- task = new SendProductTask();
|
|
|
+ task = sendProductTask;
|
|
|
break;
|
|
|
case PURCHASEEND: //采购单结案
|
|
|
- task = new SendPurchaseEndTask();
|
|
|
+ task = sendPurchaseEndTask;
|
|
|
break;
|
|
|
default:
|
|
|
break;
|
|
|
@@ -106,6 +127,7 @@ public class ReceiveService {
|
|
|
|
|
|
//ACK,确认一条消息已经被消费
|
|
|
channel.basicAck(deliveryTag,multiple);
|
|
|
+ logger.info("消息成功消费,MessageInfo={}",info);
|
|
|
//重新返回队列 重新消费
|
|
|
//channel.basicNack(deliveryTag,false,true);
|
|
|
}
|