19 changed files with 643 additions and 22 deletions
@ -0,0 +1,155 @@ |
|||||||
|
package com.logpm.business.config; |
||||||
|
|
||||||
|
import cn.hutool.core.util.StrUtil; |
||||||
|
import cn.hutool.json.JSONUtil; |
||||||
|
import org.springblade.core.secure.BladeUser; |
||||||
|
import org.springblade.core.secure.utils.AuthUtil; |
||||||
|
import org.springblade.core.tool.utils.ThreadLocalUtil; |
||||||
|
import org.springframework.amqp.core.Message; |
||||||
|
import org.springframework.amqp.core.MessageProperties; |
||||||
|
import org.springframework.amqp.support.converter.AllowedListDeserializingMessageConverter; |
||||||
|
import org.springframework.amqp.support.converter.MessageConversionException; |
||||||
|
import org.springframework.amqp.utils.SerializationUtils; |
||||||
|
import org.springframework.beans.factory.BeanClassLoaderAware; |
||||||
|
import org.springframework.http.HttpHeaders; |
||||||
|
import org.springframework.mock.web.MockHttpServletRequest; |
||||||
|
import org.springframework.remoting.rmi.CodebaseAwareObjectInputStream; |
||||||
|
import org.springframework.stereotype.Component; |
||||||
|
import org.springframework.util.ClassUtils; |
||||||
|
import org.springframework.web.context.request.RequestContextHolder; |
||||||
|
import org.springframework.web.context.request.ServletRequestAttributes; |
||||||
|
|
||||||
|
import java.io.*; |
||||||
|
import java.util.List; |
||||||
|
import java.util.Map; |
||||||
|
|
||||||
|
/** |
||||||
|
* @author zhaoqiaobo |
||||||
|
* @create 2024-05-08 |
||||||
|
*/ |
||||||
|
@Component |
||||||
|
public class CustomMessageConverter extends AllowedListDeserializingMessageConverter implements BeanClassLoaderAware { |
||||||
|
|
||||||
|
private final String defaultCharset = "UTF-8"; |
||||||
|
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader(); |
||||||
|
private String codebaseUrl; |
||||||
|
|
||||||
|
@Deprecated |
||||||
|
public void setCodebaseUrl(String codebaseUrl) { |
||||||
|
this.codebaseUrl = codebaseUrl; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public Object fromMessage(Message message) throws MessageConversionException { |
||||||
|
Object content = null; |
||||||
|
MessageProperties properties = message.getMessageProperties(); |
||||||
|
if (properties != null) { |
||||||
|
String contentType = properties.getContentType(); |
||||||
|
if (contentType != null && contentType.startsWith("text")) { |
||||||
|
String encoding = properties.getContentEncoding(); |
||||||
|
if (encoding == null) { |
||||||
|
encoding = "UTF-8"; |
||||||
|
} |
||||||
|
|
||||||
|
try { |
||||||
|
content = new String(message.getBody(), encoding); |
||||||
|
} catch (UnsupportedEncodingException var8) { |
||||||
|
throw new MessageConversionException("failed to convert text-based Message content", var8); |
||||||
|
} |
||||||
|
} else if (contentType != null && contentType.equals("application/x-java-serialized-object")) { |
||||||
|
try { |
||||||
|
content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl)); |
||||||
|
} catch (IllegalArgumentException | IllegalStateException | IOException var7) { |
||||||
|
throw new MessageConversionException("failed to convert serialized Message content", var7); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
Map<String, Object> headers = properties.getHeaders(); |
||||||
|
HttpHeaders httpHeaders = new HttpHeaders(); |
||||||
|
for (Map.Entry<String, Object> entry : headers.entrySet()) { |
||||||
|
if (StrUtil.equals("Blade-Auth", entry.getKey()) |
||||||
|
|| StrUtil.equals("Authorization", entry.getKey()) |
||||||
|
|| StrUtil.equals("blade-auth", entry.getKey()) |
||||||
|
|| StrUtil.equals("authorization", entry.getKey())) { |
||||||
|
List value = (List) entry.getValue(); |
||||||
|
for (Object o : value) { |
||||||
|
httpHeaders.add(entry.getKey(), String.valueOf(o)); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
|
ThreadLocalUtil.put("bladeContext", httpHeaders); |
||||||
|
Object bladeUser = headers.get("bladeUser"); |
||||||
|
MockHttpServletRequest request = new MockHttpServletRequest(); |
||||||
|
BladeUser bladeUser1 = JSONUtil.toBean(bladeUser.toString(), BladeUser.class); |
||||||
|
request.setAttribute("_BLADE_USER_REQUEST_ATTR_", bladeUser1); |
||||||
|
RequestContextHolder.setRequestAttributes(new ServletRequestAttributes(request)); |
||||||
|
if (content == null) { |
||||||
|
content = message.getBody(); |
||||||
|
} |
||||||
|
return content; |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { |
||||||
|
byte[] bytes = null; |
||||||
|
if (object instanceof byte[]) { |
||||||
|
bytes = (byte[]) object; |
||||||
|
messageProperties.setContentType("application/octet-stream"); |
||||||
|
} else if (object instanceof String) { |
||||||
|
try { |
||||||
|
bytes = ((String) object).getBytes(this.defaultCharset); |
||||||
|
} catch (UnsupportedEncodingException var6) { |
||||||
|
throw new MessageConversionException("failed to convert to Message content", var6); |
||||||
|
} |
||||||
|
|
||||||
|
messageProperties.setContentType("text/plain"); |
||||||
|
messageProperties.setContentEncoding(this.defaultCharset); |
||||||
|
} else if (object instanceof Serializable) { |
||||||
|
try { |
||||||
|
bytes = SerializationUtils.serialize(object); |
||||||
|
} catch (IllegalArgumentException var5) { |
||||||
|
throw new MessageConversionException("failed to convert to serialized Message content", var5); |
||||||
|
} |
||||||
|
|
||||||
|
messageProperties.setContentType("application/x-java-serialized-object"); |
||||||
|
} |
||||||
|
HttpHeaders headers = ThreadLocalUtil.get("bladeContext"); |
||||||
|
if (headers != null && !headers.isEmpty()) { |
||||||
|
headers.forEach((key, values) -> { |
||||||
|
values.forEach((value) -> { |
||||||
|
messageProperties.setHeader(key, new String[]{value}); |
||||||
|
}); |
||||||
|
}); |
||||||
|
} |
||||||
|
BladeUser user = AuthUtil.getUser(); |
||||||
|
BladeUser bladeUser = new BladeUser(); |
||||||
|
bladeUser.setTenantId(user.getTenantId()); |
||||||
|
bladeUser.setUserId(user.getUserId()); |
||||||
|
bladeUser.setAccount(user.getAccount()); |
||||||
|
bladeUser.setRoleId(user.getRoleId()); |
||||||
|
messageProperties.setHeader("bladeUser", JSONUtil.toJsonStr(bladeUser)); |
||||||
|
|
||||||
|
if (bytes != null) { |
||||||
|
messageProperties.setContentLength(bytes.length); |
||||||
|
return new Message(bytes, messageProperties); |
||||||
|
} else { |
||||||
|
throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
@Override |
||||||
|
public void setBeanClassLoader(ClassLoader classLoader) { |
||||||
|
this.beanClassLoader = beanClassLoader; |
||||||
|
} |
||||||
|
|
||||||
|
protected ObjectInputStream createObjectInputStream(InputStream is, String codebaseUrl) throws IOException { |
||||||
|
return new CodebaseAwareObjectInputStream(is, this.beanClassLoader, codebaseUrl) { |
||||||
|
@Override |
||||||
|
protected Class<?> resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException { |
||||||
|
Class<?> clazz = super.resolveClass(classDesc); |
||||||
|
CustomMessageConverter.this.checkAllowedList(clazz); |
||||||
|
return clazz; |
||||||
|
} |
||||||
|
}; |
||||||
|
} |
||||||
|
} |
@ -0,0 +1,208 @@ |
|||||||
|
package com.logpm.business.config; |
||||||
|
|
||||||
|
import com.alibaba.nacos.shaded.com.google.common.collect.Maps; |
||||||
|
import lombok.extern.slf4j.Slf4j; |
||||||
|
import org.springblade.common.constant.RabbitConstant; |
||||||
|
import org.springframework.amqp.core.*; |
||||||
|
import org.springframework.amqp.rabbit.connection.ConnectionFactory; |
||||||
|
import org.springframework.amqp.rabbit.connection.CorrelationData; |
||||||
|
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
||||||
|
import org.springframework.amqp.rabbit.retry.MessageRecoverer; |
||||||
|
import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; |
||||||
|
import org.springframework.context.annotation.Bean; |
||||||
|
import org.springframework.context.annotation.Configuration; |
||||||
|
|
||||||
|
import java.util.Map; |
||||||
|
|
||||||
|
/** |
||||||
|
* RabbitMQ配置,主要是配置队列,如果提前存在该队列,可以省略本配置类 |
||||||
|
* |
||||||
|
* @author yangkai.shen |
||||||
|
*/ |
||||||
|
@Slf4j |
||||||
|
@Configuration |
||||||
|
public class RabbitMqConfiguration { |
||||||
|
|
||||||
|
@Bean |
||||||
|
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ |
||||||
|
RabbitTemplate template = new RabbitTemplate(); |
||||||
|
template.setConnectionFactory(connectionFactory); |
||||||
|
template.setMandatory(true); |
||||||
|
template.setMessageConverter(new CustomMessageConverter()); |
||||||
|
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { |
||||||
|
@Override |
||||||
|
public void confirm(CorrelationData correlationData, boolean b, String s) { |
||||||
|
System.out.println("确认回调-相关数据:"+correlationData); |
||||||
|
System.out.println("确认回调-确认情况:"+b); |
||||||
|
System.out.println("确认回调-原因:"+s); |
||||||
|
} |
||||||
|
}); |
||||||
|
|
||||||
|
template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { |
||||||
|
@Override |
||||||
|
public void returnedMessage(ReturnedMessage returnedMessage) { |
||||||
|
System.out.println("返回回调-消息:"+returnedMessage.getMessage()); |
||||||
|
System.out.println("返回回调-回应码:"+returnedMessage.getReplyCode()); |
||||||
|
System.out.println("返回回调-回应信息:"+returnedMessage.getReplyText()); |
||||||
|
System.out.println("返回回调-交换机:"+returnedMessage.getExchange()); |
||||||
|
System.out.println("返回回调-路由键:"+returnedMessage.getRoutingKey()); |
||||||
|
} |
||||||
|
}); |
||||||
|
return template; |
||||||
|
} |
||||||
|
|
||||||
|
@Bean |
||||||
|
public DirectExchange distributionErrorMessageExchange(){ |
||||||
|
return new DirectExchange(RabbitConstant.DISTRIBUTION_ERROR_EXCHANGE); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public Queue distributionErrorQueue(){ |
||||||
|
return new Queue(RabbitConstant.DISTRIBUTION_ERROR_QUEUE, true); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public Binding distributionErrorBinding(Queue distributionErrorQueue, DirectExchange distributionErrorMessageExchange){ |
||||||
|
return BindingBuilder.bind(distributionErrorQueue).to(distributionErrorMessageExchange).with(RabbitConstant.DISTRIBUTION_ERROR_ROUTING); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* 消费失败队列 |
||||||
|
* @param rabbitTemplate |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
@Bean |
||||||
|
public MessageRecoverer republishMessageRecoverer(RabbitTemplate rabbitTemplate){ |
||||||
|
return new RepublishMessageRecoverer(rabbitTemplate, RabbitConstant.DISTRIBUTION_ERROR_EXCHANGE, RabbitConstant.DISTRIBUTION_ERROR_ROUTING); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
/** |
||||||
|
* 备货扫码任务队列 |
||||||
|
*/ |
||||||
|
@Bean |
||||||
|
public Queue stockupScanQueue() { |
||||||
|
return new Queue(RabbitConstant.STOCKUP_SCAN_QUEUE, true); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public CustomExchange stockupScanExchange() { |
||||||
|
Map<String, Object> args = Maps.newHashMap(); |
||||||
|
args.put("x-delayed-type", "direct"); |
||||||
|
return new CustomExchange(RabbitConstant.STOCKUP_SCAN_EXCHANGE, "x-delayed-message", true, false, args); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public Binding stockupScanBinding(Queue stockupScanQueue, CustomExchange stockupScanExchange) { |
||||||
|
return BindingBuilder.bind(stockupScanQueue).to(stockupScanExchange).with(RabbitConstant.STOCKUP_SCAN_ROUTING).noargs(); |
||||||
|
} |
||||||
|
|
||||||
|
/** |
||||||
|
* 备货判断是否下架任务队列 |
||||||
|
*/ |
||||||
|
@Bean |
||||||
|
public Queue stockupStateUpdateQueue() { |
||||||
|
return new Queue(RabbitConstant.STOCKUP_STATE_UPDATE_QUEUE, true); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public CustomExchange stockupStateUpdateExchange() { |
||||||
|
Map<String, Object> args = Maps.newHashMap(); |
||||||
|
args.put("x-delayed-type", "direct"); |
||||||
|
return new CustomExchange(RabbitConstant.STOCKUP_STATE_UPDATE_EXCHANGE, "x-delayed-message", true, false, args); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public Binding stockupStateUpdateBinding(Queue stockupStateUpdateQueue, CustomExchange stockupStateUpdateExchange) { |
||||||
|
return BindingBuilder.bind(stockupStateUpdateQueue).to(stockupStateUpdateExchange).with(RabbitConstant.STOCKUP_STATE_UPDATE_ROUTING).noargs(); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
/** |
||||||
|
* 文员签收复合推送老系统队列 |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
@Bean |
||||||
|
public Queue clerkCheckPushDataQueue() { |
||||||
|
return new Queue(RabbitConstant.CLERK_CHECK_PUSH_DATA_QUEUE, true); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public CustomExchange clerkCheckPushDataExchange() { |
||||||
|
Map<String, Object> args = Maps.newHashMap(); |
||||||
|
args.put("x-delayed-type", "direct"); |
||||||
|
return new CustomExchange(RabbitConstant.CLERK_CHECK_PUSH_DATA_EXCHANGE, "x-delayed-message", true, false, args); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public Binding clerkCheckPushDataBinding(Queue clerkCheckPushDataQueue, CustomExchange clerkCheckPushDataExchange) { |
||||||
|
return BindingBuilder.bind(clerkCheckPushDataQueue).to(clerkCheckPushDataExchange).with(RabbitConstant.CLERK_CHECK_PUSH_DATA_ROUTING).noargs(); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Bean |
||||||
|
public Queue orderPackageStatusInfoQueue() { |
||||||
|
return new Queue(RabbitConstant.ORDER_PACKAGE_STATUS_INFO_QUEUE, true); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public CustomExchange orderPackageStatusInfoExchange() { |
||||||
|
Map<String, Object> args = Maps.newHashMap(); |
||||||
|
args.put("x-delayed-type", "direct"); |
||||||
|
return new CustomExchange(RabbitConstant.ORDER_PACKAGE_STATUS_INFO_EXCHANGE, "x-delayed-message", true, false, args); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public Binding orderPackageStatusInfoBinding(Queue orderPackageStatusInfoQueue, CustomExchange orderPackageStatusInfoExchange) { |
||||||
|
return BindingBuilder.bind(orderPackageStatusInfoQueue).to(orderPackageStatusInfoExchange).with(RabbitConstant.ORDER_PACKAGE_STATUS_INFO_ROUTING).noargs(); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
/** |
||||||
|
* 文员签收复合推送老系统队列 |
||||||
|
* @return |
||||||
|
*/ |
||||||
|
@Bean |
||||||
|
public Queue billClerkCheckPushDataQueue() { |
||||||
|
return new Queue(RabbitConstant.BILL_CLERK_CHECK_PUSH_DATA_QUEUE, true); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public CustomExchange billClerkCheckPushDataExchange() { |
||||||
|
Map<String, Object> args = Maps.newHashMap(); |
||||||
|
args.put("x-delayed-type", "direct"); |
||||||
|
return new CustomExchange(RabbitConstant.BILL_CLERK_CHECK_PUSH_DATA_EXCHANGE, "x-delayed-message", true, false, args); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public Binding billCclerkCheckPushDataBinding(Queue billClerkCheckPushDataQueue, CustomExchange billClerkCheckPushDataExchange) { |
||||||
|
return BindingBuilder.bind(billClerkCheckPushDataQueue).to(billClerkCheckPushDataExchange).with(RabbitConstant.BILL_CLERK_CHECK_PUSH_DATA_ROUTING).noargs(); |
||||||
|
} |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@Bean |
||||||
|
public Queue businessInConversionQueue() { |
||||||
|
return new Queue(RabbitConstant.BUSINESS_IN_CONVERSION_DATA_QUEUE, true); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public CustomExchange businessInConversionExchange() { |
||||||
|
Map<String, Object> args = Maps.newHashMap(); |
||||||
|
args.put("x-delayed-type", "direct"); |
||||||
|
return new CustomExchange(RabbitConstant.BUSINESS_IN_CONVERSION_DATA_EXCHANGE, "x-delayed-message", true, false, args); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public Binding businessInConversionBinding(Queue businessInConversionQueue, CustomExchange businessInConversionExchange) { |
||||||
|
return BindingBuilder.bind(businessInConversionQueue).to(businessInConversionExchange).with(RabbitConstant.BUSINESS_IN_CONVERSION_DATA_ROUTING).noargs(); |
||||||
|
} |
||||||
|
|
||||||
|
@Bean |
||||||
|
public Queue selfPickupScanQueue() { |
||||||
|
return new Queue(RabbitConstant.SELF_PICKUP_SCAN_QUEUE, true); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public CustomExchange selfPickupScanExchange() { |
||||||
|
Map<String, Object> args = Maps.newHashMap(); |
||||||
|
args.put("x-delayed-type", "direct"); |
||||||
|
return new CustomExchange(RabbitConstant.SELF_PICKUP_SCAN_EXCHANGE, "x-delayed-message", true, false, args); |
||||||
|
} |
||||||
|
@Bean |
||||||
|
public Binding selfPickupScanBinding(Queue selfPickupScanQueue, CustomExchange selfPickupScanExchange) { |
||||||
|
return BindingBuilder.bind(selfPickupScanQueue).to(selfPickupScanExchange).with(RabbitConstant.SELF_PICKUP_SCAN_ROUTING).noargs(); |
||||||
|
} |
||||||
|
|
||||||
|
} |
Loading…
Reference in new issue