19 changed files with 644 additions and 25 deletions
@ -0,0 +1,13 @@
|
||||
package org.springblade.common.constant; |
||||
|
||||
public class CommonBroadCastMqConstant { |
||||
|
||||
//--------------------开单交换机
|
||||
public static final String OPEN_WAYBILL_BROADCAST_EXCHANGE = "trunkline.openWaybill.broadcast.exchange"+ModuleNameConstant.DEVAUTH; |
||||
|
||||
public static final String OPEN_WAYBILL_BROADCAST_QUEUE = "open_waybill_broadcast_queue"+ModuleNameConstant.DEVAUTH; |
||||
public static final String UPDATE_WAYBILL_BROADCAST_QUEUE = "update_waybill_broadcast_queue"+ModuleNameConstant.DEVAUTH; |
||||
|
||||
|
||||
|
||||
} |
@ -0,0 +1,45 @@
|
||||
package org.springblade.common.constant.broadcast; |
||||
|
||||
import org.springblade.common.constant.ModuleNameConstant; |
||||
|
||||
/** |
||||
* 广播常量 |
||||
*/ |
||||
public abstract class FanoutConstants { |
||||
|
||||
//干线
|
||||
public interface trunkline { |
||||
|
||||
//开单
|
||||
interface OPENWAYBILL{ |
||||
|
||||
//交换机
|
||||
String EXCHANGE = "fanout.trunkline.openWaybill" + ModuleNameConstant.DEVAUTH; |
||||
|
||||
interface QUEUE { |
||||
|
||||
String STATISTICSDATA_CREATEPACKAGEINFO = "fanout.trunkline.openWaybill.statisticsdata.createPackageinfo" + ModuleNameConstant.DEVAUTH; |
||||
|
||||
} |
||||
|
||||
} |
||||
|
||||
//改单
|
||||
interface UPDATEWAYBILL{ |
||||
|
||||
//交换机
|
||||
String EXCHANGE = "fanout.trunkline.updatewaybill" + ModuleNameConstant.DEVAUTH; |
||||
|
||||
interface QUEUE { |
||||
|
||||
String STATISTICSDATA_UPDATEPACKAGEINFO = "fanout.trunkline.updatewaybill.statisticsdata.updatePackageinfo" + ModuleNameConstant.DEVAUTH; |
||||
|
||||
} |
||||
|
||||
} |
||||
|
||||
|
||||
} |
||||
|
||||
|
||||
} |
@ -0,0 +1,14 @@
|
||||
package org.springblade.common.model; |
||||
|
||||
import lombok.Builder; |
||||
import lombok.Data; |
||||
|
||||
import java.io.Serializable; |
||||
|
||||
@Builder |
||||
@Data |
||||
public class FanoutMsg implements Serializable { |
||||
|
||||
private String msg; |
||||
private String exchange; |
||||
} |
@ -0,0 +1,155 @@
|
||||
package com.logpm.statistics.config; |
||||
|
||||
import cn.hutool.core.util.StrUtil; |
||||
import cn.hutool.json.JSONUtil; |
||||
import org.springblade.core.secure.BladeUser; |
||||
import org.springblade.core.secure.utils.AuthUtil; |
||||
import org.springblade.core.tool.utils.ThreadLocalUtil; |
||||
import org.springframework.amqp.core.Message; |
||||
import org.springframework.amqp.core.MessageProperties; |
||||
import org.springframework.amqp.support.converter.AllowedListDeserializingMessageConverter; |
||||
import org.springframework.amqp.support.converter.MessageConversionException; |
||||
import org.springframework.amqp.utils.SerializationUtils; |
||||
import org.springframework.beans.factory.BeanClassLoaderAware; |
||||
import org.springframework.http.HttpHeaders; |
||||
import org.springframework.mock.web.MockHttpServletRequest; |
||||
import org.springframework.remoting.rmi.CodebaseAwareObjectInputStream; |
||||
import org.springframework.stereotype.Component; |
||||
import org.springframework.util.ClassUtils; |
||||
import org.springframework.web.context.request.RequestContextHolder; |
||||
import org.springframework.web.context.request.ServletRequestAttributes; |
||||
|
||||
import java.io.*; |
||||
import java.util.List; |
||||
import java.util.Map; |
||||
|
||||
/** |
||||
* @author zhaoqiaobo |
||||
* @create 2024-05-08 |
||||
*/ |
||||
@Component |
||||
public class CustomMessageConverter extends AllowedListDeserializingMessageConverter implements BeanClassLoaderAware { |
||||
|
||||
private volatile String defaultCharset = "UTF-8"; |
||||
private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader(); |
||||
private String codebaseUrl; |
||||
|
||||
@Deprecated |
||||
public void setCodebaseUrl(String codebaseUrl) { |
||||
this.codebaseUrl = codebaseUrl; |
||||
} |
||||
|
||||
@Override |
||||
public Object fromMessage(Message message) throws MessageConversionException { |
||||
Object content = null; |
||||
MessageProperties properties = message.getMessageProperties(); |
||||
if (properties != null) { |
||||
String contentType = properties.getContentType(); |
||||
if (contentType != null && contentType.startsWith("text")) { |
||||
String encoding = properties.getContentEncoding(); |
||||
if (encoding == null) { |
||||
encoding = "UTF-8"; |
||||
} |
||||
|
||||
try { |
||||
content = new String(message.getBody(), encoding); |
||||
} catch (UnsupportedEncodingException var8) { |
||||
throw new MessageConversionException("failed to convert text-based Message content", var8); |
||||
} |
||||
} else if (contentType != null && contentType.equals("application/x-java-serialized-object")) { |
||||
try { |
||||
content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl)); |
||||
} catch (IllegalArgumentException | IllegalStateException | IOException var7) { |
||||
throw new MessageConversionException("failed to convert serialized Message content", var7); |
||||
} |
||||
} |
||||
} |
||||
Map<String, Object> headers = properties.getHeaders(); |
||||
HttpHeaders httpHeaders = new HttpHeaders(); |
||||
for (Map.Entry<String, Object> entry : headers.entrySet()) { |
||||
if (StrUtil.equals("Blade-Auth", entry.getKey()) |
||||
|| StrUtil.equals("Authorization", entry.getKey()) |
||||
|| StrUtil.equals("blade-auth", entry.getKey()) |
||||
|| StrUtil.equals("authorization", entry.getKey())) { |
||||
List value = (List) entry.getValue(); |
||||
for (Object o : value) { |
||||
httpHeaders.add(entry.getKey(), String.valueOf(o)); |
||||
} |
||||
} |
||||
} |
||||
ThreadLocalUtil.put("bladeContext", httpHeaders); |
||||
Object bladeUser = headers.get("bladeUser"); |
||||
MockHttpServletRequest request = new MockHttpServletRequest(); |
||||
BladeUser bladeUser1 = JSONUtil.toBean(bladeUser.toString(), BladeUser.class); |
||||
request.setAttribute("_BLADE_USER_REQUEST_ATTR_", bladeUser1); |
||||
RequestContextHolder.setRequestAttributes(new ServletRequestAttributes(request)); |
||||
if (content == null) { |
||||
content = message.getBody(); |
||||
} |
||||
return content; |
||||
} |
||||
|
||||
@Override |
||||
protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { |
||||
byte[] bytes = null; |
||||
if (object instanceof byte[]) { |
||||
bytes = (byte[]) object; |
||||
messageProperties.setContentType("application/octet-stream"); |
||||
} else if (object instanceof String) { |
||||
try { |
||||
bytes = ((String) object).getBytes(this.defaultCharset); |
||||
} catch (UnsupportedEncodingException var6) { |
||||
throw new MessageConversionException("failed to convert to Message content", var6); |
||||
} |
||||
|
||||
messageProperties.setContentType("text/plain"); |
||||
messageProperties.setContentEncoding(this.defaultCharset); |
||||
} else if (object instanceof Serializable) { |
||||
try { |
||||
bytes = SerializationUtils.serialize(object); |
||||
} catch (IllegalArgumentException var5) { |
||||
throw new MessageConversionException("failed to convert to serialized Message content", var5); |
||||
} |
||||
|
||||
messageProperties.setContentType("application/x-java-serialized-object"); |
||||
} |
||||
HttpHeaders headers = (HttpHeaders) ThreadLocalUtil.get("bladeContext"); |
||||
if (headers != null && !headers.isEmpty()) { |
||||
headers.forEach((key, values) -> { |
||||
values.forEach((value) -> { |
||||
messageProperties.setHeader(key, new String[]{value}); |
||||
}); |
||||
}); |
||||
} |
||||
BladeUser user = AuthUtil.getUser(); |
||||
BladeUser bladeUser = new BladeUser(); |
||||
bladeUser.setTenantId(user.getTenantId()); |
||||
bladeUser.setUserId(user.getUserId()); |
||||
bladeUser.setAccount(user.getAccount()); |
||||
bladeUser.setRoleId(user.getRoleId()); |
||||
messageProperties.setHeader("bladeUser", JSONUtil.toJsonStr(bladeUser)); |
||||
|
||||
if (bytes != null) { |
||||
messageProperties.setContentLength((long) bytes.length); |
||||
return new Message(bytes, messageProperties); |
||||
} else { |
||||
throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); |
||||
} |
||||
} |
||||
|
||||
@Override |
||||
public void setBeanClassLoader(ClassLoader classLoader) { |
||||
this.beanClassLoader = beanClassLoader; |
||||
} |
||||
|
||||
protected ObjectInputStream createObjectInputStream(InputStream is, String codebaseUrl) throws IOException { |
||||
return new CodebaseAwareObjectInputStream(is, this.beanClassLoader, codebaseUrl) { |
||||
@Override |
||||
protected Class<?> resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException { |
||||
Class<?> clazz = super.resolveClass(classDesc); |
||||
CustomMessageConverter.this.checkAllowedList(clazz); |
||||
return clazz; |
||||
} |
||||
}; |
||||
} |
||||
} |
@ -0,0 +1,82 @@
|
||||
package com.logpm.statistics.config; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.slf4j.MDC; |
||||
import org.springblade.core.secure.utils.AuthUtil; |
||||
import org.springblade.core.tool.utils.ThreadLocalUtil; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
import org.springframework.core.task.TaskDecorator; |
||||
import org.springframework.scheduling.annotation.EnableAsync; |
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
||||
import org.springframework.web.context.request.RequestAttributes; |
||||
import org.springframework.web.context.request.RequestContextHolder; |
||||
|
||||
import javax.annotation.Nonnull; |
||||
import java.util.Map; |
||||
import java.util.concurrent.Executor; |
||||
import java.util.concurrent.ThreadPoolExecutor; |
||||
|
||||
@Configuration |
||||
@Slf4j |
||||
@EnableAsync |
||||
public class ExecutorConfig { |
||||
|
||||
@Bean |
||||
public Executor asyncExecutor() { |
||||
log.info("start async executor"); |
||||
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); |
||||
// 配置核心线程数
|
||||
threadPoolTaskExecutor.setCorePoolSize(10); |
||||
// 配置最大线程数
|
||||
threadPoolTaskExecutor.setMaxPoolSize(20); |
||||
// 配置队列大小
|
||||
threadPoolTaskExecutor.setQueueCapacity(50); |
||||
// 配置线程池中线程的名称前缀
|
||||
threadPoolTaskExecutor.setThreadNamePrefix("ASYNC_THREAD_"); |
||||
// HelloWorldServiceImpl rejection-policy: 当pool已经达到max size时,如何处理新任务:
|
||||
// CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行;
|
||||
// AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常;
|
||||
// DiscardPolicy:丢弃当前将要加入队列的任务;
|
||||
// DiscardOldestPolicy:丢弃任务队列中最旧的任务;
|
||||
threadPoolTaskExecutor.setRejectedExecutionHandler( |
||||
new ThreadPoolExecutor.CallerRunsPolicy() |
||||
); |
||||
threadPoolTaskExecutor.setTaskDecorator(new ContextCopyingDecorator()); |
||||
threadPoolTaskExecutor.initialize(); |
||||
return threadPoolTaskExecutor; |
||||
} |
||||
|
||||
|
||||
static class ContextCopyingDecorator implements TaskDecorator { |
||||
@Nonnull |
||||
@Override |
||||
public Runnable decorate(@Nonnull Runnable runnable) { |
||||
RequestAttributes context = RequestContextHolder.currentRequestAttributes(); |
||||
String tenantId = AuthUtil.getTenantId(); |
||||
Map<String, Object> all = ThreadLocalUtil.getAll(); |
||||
Map<String, String> mdcMap = MDC.getCopyOfContextMap(); |
||||
return () -> { |
||||
try { |
||||
all.keySet().forEach(key -> ThreadLocalUtil.put(key, all.get(key))); |
||||
if (mdcMap != null && !mdcMap.isEmpty()) { |
||||
MDC.setContextMap(mdcMap); |
||||
} |
||||
RequestContextHolder.setRequestAttributes(context); |
||||
String tenantId1 = AuthUtil.getTenantId(); |
||||
|
||||
runnable.run(); |
||||
} finally { |
||||
RequestContextHolder.resetRequestAttributes(); |
||||
all.clear(); |
||||
if (mdcMap != null) { |
||||
mdcMap.clear(); |
||||
} |
||||
ThreadLocalUtil.clear(); |
||||
MDC.clear(); |
||||
} |
||||
}; |
||||
} |
||||
} |
||||
|
||||
} |
@ -0,0 +1,43 @@
|
||||
package com.logpm.statistics.config; |
||||
|
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.amqp.core.ReturnedMessage; |
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory; |
||||
import org.springframework.amqp.rabbit.connection.CorrelationData; |
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
||||
import org.springframework.context.annotation.Bean; |
||||
import org.springframework.context.annotation.Configuration; |
||||
|
||||
@Slf4j |
||||
@Configuration |
||||
public class RabbitMqConfiguration { |
||||
|
||||
@Bean |
||||
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ |
||||
RabbitTemplate template = new RabbitTemplate(); |
||||
template.setConnectionFactory(connectionFactory); |
||||
template.setMandatory(true); |
||||
template.setMessageConverter(new CustomMessageConverter()); |
||||
template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { |
||||
@Override |
||||
public void confirm(CorrelationData correlationData, boolean b, String s) { |
||||
System.out.println("确认回调-相关数据:"+correlationData); |
||||
System.out.println("确认回调-确认情况:"+b); |
||||
System.out.println("确认回调-原因:"+s); |
||||
} |
||||
}); |
||||
|
||||
template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { |
||||
@Override |
||||
public void returnedMessage(ReturnedMessage returnedMessage) { |
||||
System.out.println("返回回调-消息:"+returnedMessage.getMessage()); |
||||
System.out.println("返回回调-回应码:"+returnedMessage.getReplyCode()); |
||||
System.out.println("返回回调-回应信息:"+returnedMessage.getReplyText()); |
||||
System.out.println("返回回调-交换机:"+returnedMessage.getExchange()); |
||||
System.out.println("返回回调-路由键:"+returnedMessage.getRoutingKey()); |
||||
} |
||||
}); |
||||
return template; |
||||
} |
||||
|
||||
} |
@ -0,0 +1,38 @@
|
||||
/* |
||||
* Copyright (c) 2018-2028, Chill Zhuang All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are met: |
||||
* |
||||
* Redistributions of source code must retain the above copyright notice, |
||||
* this list of conditions and the following disclaimer. |
||||
* Redistributions in binary form must reproduce the above copyright |
||||
* notice, this list of conditions and the following disclaimer in the |
||||
* documentation and/or other materials provided with the distribution. |
||||
* Neither the name of the dreamlu.net developer nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* Author: Chill 庄骞 (smallchill@163.com) |
||||
*/ |
||||
package com.logpm.statistics.config; |
||||
|
||||
|
||||
import org.mybatis.spring.annotation.MapperScan; |
||||
import org.springframework.cloud.openfeign.EnableFeignClients; |
||||
import org.springframework.context.annotation.ComponentScan; |
||||
import org.springframework.context.annotation.Configuration; |
||||
|
||||
/** |
||||
* 配置feign、mybatis包名、properties |
||||
* |
||||
* @author chaos |
||||
*/ |
||||
@Configuration(proxyBeanMethods = false) |
||||
@ComponentScan({"org.springblade", "com.logpm"}) |
||||
@EnableFeignClients({"org.springblade", "com.logpm"}) |
||||
@MapperScan({"org.springblade.**.mapper.**", "com.logpm.**.mapper.**"}) |
||||
//@EnableConfigurationProperties(FactoryDataBaseProperties.class)
|
||||
public class StatisticsdataConfiguration { |
||||
|
||||
|
||||
} |
@ -0,0 +1,75 @@
|
||||
/* |
||||
* Copyright (c) 2018-2028, Chill Zhuang All rights reserved. |
||||
* |
||||
* Redistribution and use in source and binary forms, with or without |
||||
* modification, are permitted provided that the following conditions are met: |
||||
* |
||||
* Redistributions of source code must retain the above copyright notice, |
||||
* this list of conditions and the following disclaimer. |
||||
* Redistributions in binary form must reproduce the above copyright |
||||
* notice, this list of conditions and the following disclaimer in the |
||||
* documentation and/or other materials provided with the distribution. |
||||
* Neither the name of the dreamlu.net developer nor the names of its |
||||
* contributors may be used to endorse or promote products derived from |
||||
* this software without specific prior written permission. |
||||
* Author: Chill 庄骞 (smallchill@163.com) |
||||
*/ |
||||
package com.logpm.statistics.launcher; |
||||
|
||||
import org.springblade.core.auto.service.AutoService; |
||||
import org.springblade.core.launch.constant.NacosConstant; |
||||
import org.springblade.core.launch.service.LauncherService; |
||||
import org.springblade.core.launch.utils.PropsUtil; |
||||
import org.springframework.boot.builder.SpringApplicationBuilder; |
||||
|
||||
import java.util.Properties; |
||||
|
||||
/** |
||||
* 启动参数拓展 |
||||
* |
||||
* @author Chill |
||||
*/ |
||||
@AutoService(LauncherService.class) |
||||
public class StatisticsdataLauncherServiceImpl implements LauncherService { |
||||
|
||||
@Override |
||||
public void launcher(SpringApplicationBuilder builder, String appName, String profile, boolean isLocalDev) { |
||||
Properties props = System.getProperties(); |
||||
// 开启多数据源
|
||||
PropsUtil.setProperty(props, "spring.datasource.dynamic.enabled", "true"); |
||||
// 指定注册配置信息
|
||||
PropsUtil.setProperty(props, "spring.cloud.nacos.config.extension-configs[0].data-id", NacosConstant.dataId(appName, profile)); |
||||
PropsUtil.setProperty(props, "spring.cloud.nacos.config.extension-configs[0].group", NacosConstant.NACOS_CONFIG_GROUP); |
||||
PropsUtil.setProperty(props, "spring.cloud.nacos.config.extension-configs[0].refresh", NacosConstant.NACOS_CONFIG_REFRESH); |
||||
|
||||
// 分布式事物
|
||||
// seata注册地址
|
||||
// PropsUtil.setProperty(props, "seata.service.grouplist.default", LauncherConstant.seataAddr(profile));
|
||||
//// seata注册group格式
|
||||
//// PropsUtil.setProperty(props, "seata.tx-service-group", LauncherConstant.seataServiceGroup(appName));
|
||||
//// seata配置服务group
|
||||
// PropsUtil.setProperty(props, "seata.service.vgroup-mapping.default_tx_group".concat(LauncherConstant.seataServiceGroup(appName)), LauncherConstant.DEFAULT_MODE);
|
||||
//// seata注册模式配置
|
||||
// PropsUtil.setProperty(props, "seata.registry.type", LauncherConstant.NACOS_MODE);
|
||||
// PropsUtil.setProperty(props, "seata.registry.nacos.server-addr", LauncherConstant.nacosAddr(profile));
|
||||
// PropsUtil.setProperty(props, "seata.config.type", LauncherConstant.NACOS_MODE);
|
||||
// PropsUtil.setProperty(props, "seata.config.nacos.server-addr", LauncherConstant.nacosAddr(profile));
|
||||
|
||||
|
||||
// 指定注册IP
|
||||
// PropsUtil.setProperty(props, "spring.cloud.nacos.discovery.ip", "127.0.0.1");
|
||||
// 指定注册端口
|
||||
// PropsUtil.setProperty(props, "spring.cloud.nacos.discovery.port", "8200");
|
||||
// 自定义命名空间
|
||||
// PropsUtil.setProperty(props, "spring.cloud.nacos.config.namespace", LauncherConstant.NACOS_NAMESPACE);
|
||||
// PropsUtil.setProperty(props, "spring.cloud.nacos.discovery.namespace", LauncherConstant.NACOS_NAMESPACE);
|
||||
// 自定义分组
|
||||
// PropsUtil.setProperty(props, "spring.cloud.nacos.config.group", NacosConstant.NACOS_CONFIG_GROUP);
|
||||
// PropsUtil.setProperty(props, "spring.cloud.nacos.discovery.group", NacosConstant.NACOS_CONFIG_GROUP);
|
||||
} |
||||
|
||||
@Override |
||||
public int getOrder() { |
||||
return 20; |
||||
} |
||||
} |
@ -0,0 +1,50 @@
|
||||
package com.logpm.statistics.listener.mq; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springblade.common.constant.broadcast.FanoutConstants; |
||||
import org.springblade.core.secure.BladeUser; |
||||
import org.springblade.core.secure.utils.AuthUtil; |
||||
import org.springframework.amqp.core.ExchangeTypes; |
||||
import org.springframework.amqp.rabbit.annotation.Exchange; |
||||
import org.springframework.amqp.rabbit.annotation.Queue; |
||||
import org.springframework.amqp.rabbit.annotation.QueueBinding; |
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener; |
||||
import org.springframework.stereotype.Component; |
||||
import org.springframework.transaction.annotation.Transactional; |
||||
|
||||
@Slf4j |
||||
@Component |
||||
@AllArgsConstructor |
||||
public class WaybillFanoutListener { |
||||
|
||||
|
||||
@RabbitListener(bindings = @QueueBinding( |
||||
value = @Queue(name = FanoutConstants.trunkline.OPENWAYBILL.QUEUE.STATISTICSDATA_CREATEPACKAGEINFO), |
||||
exchange = @Exchange(name = FanoutConstants.trunkline.OPENWAYBILL.EXCHANGE, type = ExchangeTypes.FANOUT) |
||||
)) |
||||
@Transactional(rollbackFor = Exception.class) |
||||
public void createPackageInfo(String msg) { |
||||
|
||||
String tenantId = AuthUtil.getTenantId(); |
||||
BladeUser user = AuthUtil.getUser(); |
||||
|
||||
log.info("tenantId:{}",tenantId); |
||||
log.info("user:{}",user); |
||||
|
||||
log.info("接收到消息:{}", msg); |
||||
} |
||||
|
||||
|
||||
|
||||
@RabbitListener(bindings = @QueueBinding( |
||||
value = @Queue(name = FanoutConstants.trunkline.UPDATEWAYBILL.QUEUE.STATISTICSDATA_UPDATEPACKAGEINFO), |
||||
exchange = @Exchange(name = FanoutConstants.trunkline.UPDATEWAYBILL.EXCHANGE, type = ExchangeTypes.FANOUT) |
||||
)) |
||||
@Transactional(rollbackFor = Exception.class) |
||||
public void updateWaybillData(String msg) { |
||||
log.info("接收到消息:{}", msg); |
||||
} |
||||
|
||||
|
||||
} |
@ -0,0 +1,7 @@
|
||||
package com.logpm.trunkline.service; |
||||
|
||||
import org.springblade.common.model.FanoutMsg; |
||||
|
||||
public interface ISendFanoutService{ |
||||
void sendFanoutMsg(FanoutMsg fanoutMsg); |
||||
} |
@ -0,0 +1,26 @@
|
||||
package com.logpm.trunkline.service.impl; |
||||
|
||||
import com.logpm.trunkline.service.ISendFanoutService; |
||||
import lombok.AllArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springblade.common.model.FanoutMsg; |
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate; |
||||
import org.springframework.retry.annotation.Backoff; |
||||
import org.springframework.retry.annotation.Retryable; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
@Service |
||||
@AllArgsConstructor |
||||
@Slf4j |
||||
public class SendFanoutServiceImpl implements ISendFanoutService { |
||||
|
||||
private final RabbitTemplate rabbitTemplate; |
||||
|
||||
@Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5)) |
||||
@Override |
||||
public void sendFanoutMsg(FanoutMsg fanoutMsg) { |
||||
|
||||
rabbitTemplate.convertAndSend(fanoutMsg.getExchange(),null, fanoutMsg.getMsg()); |
||||
|
||||
} |
||||
} |
Loading…
Reference in new issue