19 changed files with 361 additions and 22 deletions
@ -0,0 +1,113 @@
|
||||
package com.logpm.patch.config; |
||||
|
||||
import com.alibaba.nacos.shaded.com.google.common.collect.Maps; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springblade.common.constant.RabbitConstant; |
||||
import org.springframework.amqp.core.*; |
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory; |
||||
import org.springframework.amqp.rabbit.connection.CorrelationData; |
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
|
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类 |
||||
* |
||||
* @author yangkai.shen |
||||
*/ |
||||
@Slf4j |
||||
@Configuration |
||||
public class RabbitMqConfiguration { |
||||
|
||||
@Bean |
||||
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) { |
||||
RabbitTemplate template = new RabbitTemplate(); |
||||
template.setConnectionFactory(connectionFactory); |
||||
template.setMandatory(true); |
||||
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { |
||||
@Override |
||||
public void confirm(CorrelationData correlationData, boolean b, String s) { |
||||
System.out.println("确认回调-相关数据:" + correlationData); |
||||
System.out.println("确认回调-确认情况:" + b); |
||||
System.out.println("确认回调-原因:" + s); |
||||
// Long id = Long.parseLong(correlationData.getId());
|
||||
// UpdateWrapper<PanOrderStatusLog> updateWrapper = new UpdateWrapper<>();
|
||||
// if(b){
|
||||
// //修改处理日志为已处理
|
||||
// updateWrapper.set("status",0)
|
||||
// .eq("id",id);
|
||||
//
|
||||
// }else{
|
||||
// //修改处理日志为已处理
|
||||
// updateWrapper.set("status",2)
|
||||
// .eq("id",id);
|
||||
// }
|
||||
// panOrderStatusLogService.update(updateWrapper);
|
||||
} |
||||
}); |
||||
|
||||
template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { |
||||
@Override |
||||
public void returnedMessage(ReturnedMessage returnedMessage) { |
||||
|
||||
|
||||
if ("mt_business_data_2_factory_exchange".equals(returnedMessage.getExchange()) || "mt_business_data_clerk_check_2_factory_exchange".equals(returnedMessage.getExchange())) { |
||||
return; |
||||
} |
||||
|
||||
System.out.println("返回回调-消息:" + returnedMessage.getMessage()); |
||||
System.out.println("返回回调-回应码:" + returnedMessage.getReplyCode()); |
||||
System.out.println("返回回调-回应信息:" + returnedMessage.getReplyText()); |
||||
System.out.println("返回回调-交换机:" + returnedMessage.getExchange()); |
||||
System.out.println("返回回调-路由键:" + returnedMessage.getRoutingKey()); |
||||
} |
||||
}); |
||||
return template; |
||||
} |
||||
|
||||
// @Bean
|
||||
// public DirectExchange errorMessageExchange() {
|
||||
// return new DirectExchange(RabbitConstant.ERROR_EXCHANGE);
|
||||
// }
|
||||
//
|
||||
// @Bean
|
||||
// public Queue errorQueue() {
|
||||
// return new Queue(RabbitConstant.ERROR_QUEUE, true);
|
||||
// }
|
||||
//
|
||||
// @Bean
|
||||
// public Binding errorBinding(Queue errorQueue, DirectExchange errorMessageExchange) {
|
||||
// return BindingBuilder.bind(errorQueue).to(errorMessageExchange).with(RabbitConstant.ERROR_ROUTING);
|
||||
// }
|
||||
//
|
||||
// /**
|
||||
// * 消费失败队列
|
||||
// *
|
||||
// * @param rabbitTemplate
|
||||
// * @return
|
||||
// */
|
||||
// @Bean
|
||||
// public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate) {
|
||||
// return new RepublishMessageRecoverer(rabbitTemplate, RabbitConstant.ERROR_EXCHANGE, RabbitConstant.ERROR_ROUTING);
|
||||
// }
|
||||
|
||||
|
||||
@Bean |
||||
public Queue syncOldUpdownDataQueue() { |
||||
return new Queue(RabbitConstant.SYNC_OLD_UPDOWN_DATA_QUEUE, true); |
||||
} |
||||
|
||||
@Bean |
||||
public CustomExchange syncOldUpdownDataExchange() { |
||||
Map<String, Object> args = Maps.newHashMap(); |
||||
args.put("x-delayed-type", "direct"); |
||||
return new CustomExchange(RabbitConstant.SYNC_OLD_UPDOWN_DATA_EXCHANGE, "x-delayed-message", true, false, args); |
||||
} |
||||
|
||||
@Bean |
||||
public Binding syncOldUpdownDataBinding(Queue syncOldUpdownDataQueue, CustomExchange syncOldUpdownDataExchange) { |
||||
return BindingBuilder.bind(syncOldUpdownDataQueue).to(syncOldUpdownDataExchange).with(RabbitConstant.SYNC_OLD_UPDOWN_DATA_ROUTING).noargs(); |
||||
} |
||||
} |
@ -0,0 +1,60 @@
|
||||
package com.logpm.patch.receiver; |
||||
|
||||
import com.logpm.warehouse.feign.IWarehouseUpdownTypeClient; |
||||
import com.rabbitmq.client.Channel; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springblade.common.constant.RabbitConstant; |
||||
import org.springframework.amqp.core.Message; |
||||
import org.springframework.amqp.rabbit.annotation.RabbitHandler; |
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener; |
||||
import org.springframework.beans.factory.annotation.Autowired; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import java.io.IOException; |
||||
import java.util.Map; |
||||
|
||||
|
||||
/** |
||||
* 获取订单数据 处理器 |
||||
* |
||||
* @author yangkai.shen |
||||
*/ |
||||
@Slf4j |
||||
@RabbitListener(queues = RabbitConstant.SYNC_OLD_UPDOWN_DATA_QUEUE) |
||||
@Component |
||||
public class SyncUpdownDataHandler { |
||||
|
||||
@Autowired |
||||
private IWarehouseUpdownTypeClient warehouseUpdownTypeClient; |
||||
|
||||
@RabbitHandler |
||||
public void dealWithDataHandler(Map map, Message message, Channel channel) throws IOException { |
||||
// 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
|
||||
log.info("##################dealWithDataHandler: 处理在库订单数据到新系统"); |
||||
String code = (String) map.get("code"); |
||||
Long locationId = (Long) map.get("locationId"); |
||||
Long warehouseId = (Long) map.get("warehouseId"); |
||||
Integer type = (Integer) map.get("type"); |
||||
|
||||
if(type == 4){ |
||||
//整托上架
|
||||
boolean a=warehouseUpdownTypeClient.upShelfTray(code,locationId,warehouseId); |
||||
if(a){ |
||||
log.info("#############syncUpdownData: 上架托盘成功 trayCode={} locationId={}",code,locationId); |
||||
}else{ |
||||
log.error("#############syncUpdownData: XXXX上架托盘失败 trayCode={} locationId={}",code,locationId); |
||||
} |
||||
}else{ |
||||
boolean a=warehouseUpdownTypeClient.upShelfPackage(code,locationId,warehouseId); |
||||
if(a){ |
||||
log.info("#############syncUpdownData: 上架包件成功 orderPackageCode={} locationId={}",code,locationId); |
||||
}else{ |
||||
log.error("#############syncUpdownData: XXXX上架包件失败 orderPackageCode={} locationId={}",code,locationId); |
||||
} |
||||
} |
||||
} |
||||
|
||||
|
||||
|
||||
|
||||
} |
Loading…
Reference in new issue