From c7fe09b9412d4fc7b17a7738fbaca8f0ea40d5c2 Mon Sep 17 00:00:00 2001 From: zhenghaoyu Date: Fri, 17 May 2024 10:05:23 +0800 Subject: [PATCH] =?UTF-8?q?1.=E5=B9=B2=E7=BA=BF-bug=E4=BF=AE=E5=A4=8D=202.?= =?UTF-8?q?=E5=B9=BF=E6=92=AD=E9=80=9A=E7=9F=A5=E6=90=AD=E5=BB=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../constant/CommonBroadCastMqConstant.java | 13 ++ .../constant/broadcast/FanoutConstants.java | 45 +++++ .../springblade/common/model/FanoutMsg.java | 14 ++ blade-service/logpm-statisticsdata/pom.xml | 13 ++ .../statistics/StatisticsDataApplication.java | 6 +- .../config/CustomMessageConverter.java | 155 ++++++++++++++++++ .../statistics/config/ExecutorConfig.java | 82 +++++++++ .../config/RabbitMqConfiguration.java | 43 +++++ .../config/StatisticsdataConfiguration.java | 38 +++++ .../StatisticsdataLauncherServiceImpl.java | 75 +++++++++ .../listener/mq/WaybillFanoutListener.java | 50 ++++++ .../src/main/resources/application-dev.yml | 43 ++++- .../src/main/resources/application.yml | 22 +-- .../mapper/TrunklineCarsLoadMapper.xml | 3 +- .../service/IOpenOrderAsyncService.java | 3 + .../trunkline/service/ISendFanoutService.java | 7 + .../impl/OpenOrderAsyncServiceImpl.java | 21 +++ .../service/impl/OpenOrderServiceImpl.java | 10 +- .../service/impl/SendFanoutServiceImpl.java | 26 +++ 19 files changed, 644 insertions(+), 25 deletions(-) create mode 100644 blade-biz-common/src/main/java/org/springblade/common/constant/CommonBroadCastMqConstant.java create mode 100644 blade-biz-common/src/main/java/org/springblade/common/constant/broadcast/FanoutConstants.java create mode 100644 blade-biz-common/src/main/java/org/springblade/common/model/FanoutMsg.java create mode 100644 blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/CustomMessageConverter.java create mode 100644 blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/ExecutorConfig.java create mode 100644 blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/RabbitMqConfiguration.java create mode 100644 blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/StatisticsdataConfiguration.java create mode 100644 blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/launcher/StatisticsdataLauncherServiceImpl.java create mode 100644 blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/listener/mq/WaybillFanoutListener.java create mode 100644 blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/ISendFanoutService.java create mode 100644 blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/SendFanoutServiceImpl.java diff --git a/blade-biz-common/src/main/java/org/springblade/common/constant/CommonBroadCastMqConstant.java b/blade-biz-common/src/main/java/org/springblade/common/constant/CommonBroadCastMqConstant.java new file mode 100644 index 000000000..8f6c23740 --- /dev/null +++ b/blade-biz-common/src/main/java/org/springblade/common/constant/CommonBroadCastMqConstant.java @@ -0,0 +1,13 @@ +package org.springblade.common.constant; + +public class CommonBroadCastMqConstant { + + //--------------------开单交换机 + public static final String OPEN_WAYBILL_BROADCAST_EXCHANGE = "trunkline.openWaybill.broadcast.exchange"+ModuleNameConstant.DEVAUTH; + + public static final String OPEN_WAYBILL_BROADCAST_QUEUE = "open_waybill_broadcast_queue"+ModuleNameConstant.DEVAUTH; + public static final String UPDATE_WAYBILL_BROADCAST_QUEUE = "update_waybill_broadcast_queue"+ModuleNameConstant.DEVAUTH; + + + +} diff --git a/blade-biz-common/src/main/java/org/springblade/common/constant/broadcast/FanoutConstants.java b/blade-biz-common/src/main/java/org/springblade/common/constant/broadcast/FanoutConstants.java new file mode 100644 index 000000000..214083578 --- /dev/null +++ b/blade-biz-common/src/main/java/org/springblade/common/constant/broadcast/FanoutConstants.java @@ -0,0 +1,45 @@ +package org.springblade.common.constant.broadcast; + +import org.springblade.common.constant.ModuleNameConstant; + +/** + * 广播常量 + */ +public abstract class FanoutConstants { + + //干线 + public interface trunkline { + + //开单 + interface OPENWAYBILL{ + + //交换机 + String EXCHANGE = "fanout.trunkline.openWaybill" + ModuleNameConstant.DEVAUTH; + + interface QUEUE { + + String STATISTICSDATA_CREATEPACKAGEINFO = "fanout.trunkline.openWaybill.statisticsdata.createPackageinfo" + ModuleNameConstant.DEVAUTH; + + } + + } + + //改单 + interface UPDATEWAYBILL{ + + //交换机 + String EXCHANGE = "fanout.trunkline.updatewaybill" + ModuleNameConstant.DEVAUTH; + + interface QUEUE { + + String STATISTICSDATA_UPDATEPACKAGEINFO = "fanout.trunkline.updatewaybill.statisticsdata.updatePackageinfo" + ModuleNameConstant.DEVAUTH; + + } + + } + + + } + + +} diff --git a/blade-biz-common/src/main/java/org/springblade/common/model/FanoutMsg.java b/blade-biz-common/src/main/java/org/springblade/common/model/FanoutMsg.java new file mode 100644 index 000000000..2d3e77800 --- /dev/null +++ b/blade-biz-common/src/main/java/org/springblade/common/model/FanoutMsg.java @@ -0,0 +1,14 @@ +package org.springblade.common.model; + +import lombok.Builder; +import lombok.Data; + +import java.io.Serializable; + +@Builder +@Data +public class FanoutMsg implements Serializable { + + private String msg; + private String exchange; +} diff --git a/blade-service/logpm-statisticsdata/pom.xml b/blade-service/logpm-statisticsdata/pom.xml index 4f2471905..d4b37f951 100644 --- a/blade-service/logpm-statisticsdata/pom.xml +++ b/blade-service/logpm-statisticsdata/pom.xml @@ -28,6 +28,19 @@ org.springblade blade-starter-excel + + org.springblade + blade-core-auto + compile + + + org.springframework.amqp + spring-amqp + + + org.springframework.amqp + spring-rabbit + diff --git a/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/StatisticsDataApplication.java b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/StatisticsDataApplication.java index 8efd32b1f..34a7caeb6 100644 --- a/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/StatisticsDataApplication.java +++ b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/StatisticsDataApplication.java @@ -19,17 +19,21 @@ package com.logpm.statistics; import org.springblade.common.constant.ModuleNameConstant; import org.springblade.core.cloud.client.BladeCloudApplication; import org.springblade.core.launch.BladeApplication; +import org.springframework.retry.annotation.EnableRetry; +import org.springframework.scheduling.annotation.EnableAsync; /** * Demo启动器 * * @author Chill */ +@EnableAsync +@EnableRetry @BladeCloudApplication public class StatisticsDataApplication { public static void main(String[] args) { - BladeApplication.run(ModuleNameConstant.APPLICATION_TRUNKLINE_NAME, StatisticsDataApplication.class, args); + BladeApplication.run(ModuleNameConstant.APPLICATION_STATISTICSDATA_NAME, StatisticsDataApplication.class, args); } } diff --git a/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/CustomMessageConverter.java b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/CustomMessageConverter.java new file mode 100644 index 000000000..69fc34d26 --- /dev/null +++ b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/CustomMessageConverter.java @@ -0,0 +1,155 @@ +package com.logpm.statistics.config; + +import cn.hutool.core.util.StrUtil; +import cn.hutool.json.JSONUtil; +import org.springblade.core.secure.BladeUser; +import org.springblade.core.secure.utils.AuthUtil; +import org.springblade.core.tool.utils.ThreadLocalUtil; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.support.converter.AllowedListDeserializingMessageConverter; +import org.springframework.amqp.support.converter.MessageConversionException; +import org.springframework.amqp.utils.SerializationUtils; +import org.springframework.beans.factory.BeanClassLoaderAware; +import org.springframework.http.HttpHeaders; +import org.springframework.mock.web.MockHttpServletRequest; +import org.springframework.remoting.rmi.CodebaseAwareObjectInputStream; +import org.springframework.stereotype.Component; +import org.springframework.util.ClassUtils; +import org.springframework.web.context.request.RequestContextHolder; +import org.springframework.web.context.request.ServletRequestAttributes; + +import java.io.*; +import java.util.List; +import java.util.Map; + +/** + * @author zhaoqiaobo + * @create 2024-05-08 + */ +@Component +public class CustomMessageConverter extends AllowedListDeserializingMessageConverter implements BeanClassLoaderAware { + + private volatile String defaultCharset = "UTF-8"; + private ClassLoader beanClassLoader = ClassUtils.getDefaultClassLoader(); + private String codebaseUrl; + + @Deprecated + public void setCodebaseUrl(String codebaseUrl) { + this.codebaseUrl = codebaseUrl; + } + + @Override + public Object fromMessage(Message message) throws MessageConversionException { + Object content = null; + MessageProperties properties = message.getMessageProperties(); + if (properties != null) { + String contentType = properties.getContentType(); + if (contentType != null && contentType.startsWith("text")) { + String encoding = properties.getContentEncoding(); + if (encoding == null) { + encoding = "UTF-8"; + } + + try { + content = new String(message.getBody(), encoding); + } catch (UnsupportedEncodingException var8) { + throw new MessageConversionException("failed to convert text-based Message content", var8); + } + } else if (contentType != null && contentType.equals("application/x-java-serialized-object")) { + try { + content = SerializationUtils.deserialize(this.createObjectInputStream(new ByteArrayInputStream(message.getBody()), this.codebaseUrl)); + } catch (IllegalArgumentException | IllegalStateException | IOException var7) { + throw new MessageConversionException("failed to convert serialized Message content", var7); + } + } + } + Map headers = properties.getHeaders(); + HttpHeaders httpHeaders = new HttpHeaders(); + for (Map.Entry entry : headers.entrySet()) { + if (StrUtil.equals("Blade-Auth", entry.getKey()) + || StrUtil.equals("Authorization", entry.getKey()) + || StrUtil.equals("blade-auth", entry.getKey()) + || StrUtil.equals("authorization", entry.getKey())) { + List value = (List) entry.getValue(); + for (Object o : value) { + httpHeaders.add(entry.getKey(), String.valueOf(o)); + } + } + } + ThreadLocalUtil.put("bladeContext", httpHeaders); + Object bladeUser = headers.get("bladeUser"); + MockHttpServletRequest request = new MockHttpServletRequest(); + BladeUser bladeUser1 = JSONUtil.toBean(bladeUser.toString(), BladeUser.class); + request.setAttribute("_BLADE_USER_REQUEST_ATTR_", bladeUser1); + RequestContextHolder.setRequestAttributes(new ServletRequestAttributes(request)); + if (content == null) { + content = message.getBody(); + } + return content; + } + + @Override + protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { + byte[] bytes = null; + if (object instanceof byte[]) { + bytes = (byte[]) object; + messageProperties.setContentType("application/octet-stream"); + } else if (object instanceof String) { + try { + bytes = ((String) object).getBytes(this.defaultCharset); + } catch (UnsupportedEncodingException var6) { + throw new MessageConversionException("failed to convert to Message content", var6); + } + + messageProperties.setContentType("text/plain"); + messageProperties.setContentEncoding(this.defaultCharset); + } else if (object instanceof Serializable) { + try { + bytes = SerializationUtils.serialize(object); + } catch (IllegalArgumentException var5) { + throw new MessageConversionException("failed to convert to serialized Message content", var5); + } + + messageProperties.setContentType("application/x-java-serialized-object"); + } + HttpHeaders headers = (HttpHeaders) ThreadLocalUtil.get("bladeContext"); + if (headers != null && !headers.isEmpty()) { + headers.forEach((key, values) -> { + values.forEach((value) -> { + messageProperties.setHeader(key, new String[]{value}); + }); + }); + } + BladeUser user = AuthUtil.getUser(); + BladeUser bladeUser = new BladeUser(); + bladeUser.setTenantId(user.getTenantId()); + bladeUser.setUserId(user.getUserId()); + bladeUser.setAccount(user.getAccount()); + bladeUser.setRoleId(user.getRoleId()); + messageProperties.setHeader("bladeUser", JSONUtil.toJsonStr(bladeUser)); + + if (bytes != null) { + messageProperties.setContentLength((long) bytes.length); + return new Message(bytes, messageProperties); + } else { + throw new IllegalArgumentException(this.getClass().getSimpleName() + " only supports String, byte[] and Serializable payloads, received: " + object.getClass().getName()); + } + } + + @Override + public void setBeanClassLoader(ClassLoader classLoader) { + this.beanClassLoader = beanClassLoader; + } + + protected ObjectInputStream createObjectInputStream(InputStream is, String codebaseUrl) throws IOException { + return new CodebaseAwareObjectInputStream(is, this.beanClassLoader, codebaseUrl) { + @Override + protected Class resolveClass(ObjectStreamClass classDesc) throws IOException, ClassNotFoundException { + Class clazz = super.resolveClass(classDesc); + CustomMessageConverter.this.checkAllowedList(clazz); + return clazz; + } + }; + } +} diff --git a/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/ExecutorConfig.java b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/ExecutorConfig.java new file mode 100644 index 000000000..ca5281608 --- /dev/null +++ b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/ExecutorConfig.java @@ -0,0 +1,82 @@ +package com.logpm.statistics.config; + +import lombok.extern.slf4j.Slf4j; +import org.slf4j.MDC; +import org.springblade.core.secure.utils.AuthUtil; +import org.springblade.core.tool.utils.ThreadLocalUtil; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.task.TaskDecorator; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; +import org.springframework.web.context.request.RequestAttributes; +import org.springframework.web.context.request.RequestContextHolder; + +import javax.annotation.Nonnull; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +@Configuration +@Slf4j +@EnableAsync +public class ExecutorConfig { + + @Bean + public Executor asyncExecutor() { + log.info("start async executor"); + ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); +// 配置核心线程数 + threadPoolTaskExecutor.setCorePoolSize(10); +// 配置最大线程数 + threadPoolTaskExecutor.setMaxPoolSize(20); +// 配置队列大小 + threadPoolTaskExecutor.setQueueCapacity(50); +// 配置线程池中线程的名称前缀 + threadPoolTaskExecutor.setThreadNamePrefix("ASYNC_THREAD_"); +// HelloWorldServiceImpl rejection-policy: 当pool已经达到max size时,如何处理新任务: +// CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行; +// AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常; +// DiscardPolicy:丢弃当前将要加入队列的任务; +// DiscardOldestPolicy:丢弃任务队列中最旧的任务; + threadPoolTaskExecutor.setRejectedExecutionHandler( + new ThreadPoolExecutor.CallerRunsPolicy() + ); + threadPoolTaskExecutor.setTaskDecorator(new ContextCopyingDecorator()); + threadPoolTaskExecutor.initialize(); + return threadPoolTaskExecutor; + } + + + static class ContextCopyingDecorator implements TaskDecorator { + @Nonnull + @Override + public Runnable decorate(@Nonnull Runnable runnable) { + RequestAttributes context = RequestContextHolder.currentRequestAttributes(); + String tenantId = AuthUtil.getTenantId(); + Map all = ThreadLocalUtil.getAll(); + Map mdcMap = MDC.getCopyOfContextMap(); + return () -> { + try { + all.keySet().forEach(key -> ThreadLocalUtil.put(key, all.get(key))); + if (mdcMap != null && !mdcMap.isEmpty()) { + MDC.setContextMap(mdcMap); + } + RequestContextHolder.setRequestAttributes(context); + String tenantId1 = AuthUtil.getTenantId(); + + runnable.run(); + } finally { + RequestContextHolder.resetRequestAttributes(); + all.clear(); + if (mdcMap != null) { + mdcMap.clear(); + } + ThreadLocalUtil.clear(); + MDC.clear(); + } + }; + } + } + +} diff --git a/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/RabbitMqConfiguration.java b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/RabbitMqConfiguration.java new file mode 100644 index 000000000..32e272ac8 --- /dev/null +++ b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/RabbitMqConfiguration.java @@ -0,0 +1,43 @@ +package com.logpm.statistics.config; + +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.core.ReturnedMessage; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +@Slf4j +@Configuration +public class RabbitMqConfiguration { + + @Bean + public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory){ + RabbitTemplate template = new RabbitTemplate(); + template.setConnectionFactory(connectionFactory); + template.setMandatory(true); + template.setMessageConverter(new CustomMessageConverter()); + template.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { + @Override + public void confirm(CorrelationData correlationData, boolean b, String s) { + System.out.println("确认回调-相关数据:"+correlationData); + System.out.println("确认回调-确认情况:"+b); + System.out.println("确认回调-原因:"+s); + } + }); + + template.setReturnsCallback(new RabbitTemplate.ReturnsCallback() { + @Override + public void returnedMessage(ReturnedMessage returnedMessage) { + System.out.println("返回回调-消息:"+returnedMessage.getMessage()); + System.out.println("返回回调-回应码:"+returnedMessage.getReplyCode()); + System.out.println("返回回调-回应信息:"+returnedMessage.getReplyText()); + System.out.println("返回回调-交换机:"+returnedMessage.getExchange()); + System.out.println("返回回调-路由键:"+returnedMessage.getRoutingKey()); + } + }); + return template; + } + +} diff --git a/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/StatisticsdataConfiguration.java b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/StatisticsdataConfiguration.java new file mode 100644 index 000000000..0dd5f4700 --- /dev/null +++ b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/config/StatisticsdataConfiguration.java @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2018-2028, Chill Zhuang All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * Neither the name of the dreamlu.net developer nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * Author: Chill 庄骞 (smallchill@163.com) + */ +package com.logpm.statistics.config; + + +import org.mybatis.spring.annotation.MapperScan; +import org.springframework.cloud.openfeign.EnableFeignClients; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; + +/** + * 配置feign、mybatis包名、properties + * + * @author chaos + */ +@Configuration(proxyBeanMethods = false) +@ComponentScan({"org.springblade", "com.logpm"}) +@EnableFeignClients({"org.springblade", "com.logpm"}) +@MapperScan({"org.springblade.**.mapper.**", "com.logpm.**.mapper.**"}) +//@EnableConfigurationProperties(FactoryDataBaseProperties.class) +public class StatisticsdataConfiguration { + + +} diff --git a/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/launcher/StatisticsdataLauncherServiceImpl.java b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/launcher/StatisticsdataLauncherServiceImpl.java new file mode 100644 index 000000000..ea07c1be2 --- /dev/null +++ b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/launcher/StatisticsdataLauncherServiceImpl.java @@ -0,0 +1,75 @@ +/* + * Copyright (c) 2018-2028, Chill Zhuang All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * Neither the name of the dreamlu.net developer nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * Author: Chill 庄骞 (smallchill@163.com) + */ +package com.logpm.statistics.launcher; + +import org.springblade.core.auto.service.AutoService; +import org.springblade.core.launch.constant.NacosConstant; +import org.springblade.core.launch.service.LauncherService; +import org.springblade.core.launch.utils.PropsUtil; +import org.springframework.boot.builder.SpringApplicationBuilder; + +import java.util.Properties; + +/** + * 启动参数拓展 + * + * @author Chill + */ +@AutoService(LauncherService.class) +public class StatisticsdataLauncherServiceImpl implements LauncherService { + + @Override + public void launcher(SpringApplicationBuilder builder, String appName, String profile, boolean isLocalDev) { + Properties props = System.getProperties(); + // 开启多数据源 + PropsUtil.setProperty(props, "spring.datasource.dynamic.enabled", "true"); + // 指定注册配置信息 + PropsUtil.setProperty(props, "spring.cloud.nacos.config.extension-configs[0].data-id", NacosConstant.dataId(appName, profile)); + PropsUtil.setProperty(props, "spring.cloud.nacos.config.extension-configs[0].group", NacosConstant.NACOS_CONFIG_GROUP); + PropsUtil.setProperty(props, "spring.cloud.nacos.config.extension-configs[0].refresh", NacosConstant.NACOS_CONFIG_REFRESH); + + // 分布式事物 +// seata注册地址 +// PropsUtil.setProperty(props, "seata.service.grouplist.default", LauncherConstant.seataAddr(profile)); +//// seata注册group格式 +//// PropsUtil.setProperty(props, "seata.tx-service-group", LauncherConstant.seataServiceGroup(appName)); +//// seata配置服务group +// PropsUtil.setProperty(props, "seata.service.vgroup-mapping.default_tx_group".concat(LauncherConstant.seataServiceGroup(appName)), LauncherConstant.DEFAULT_MODE); +//// seata注册模式配置 +// PropsUtil.setProperty(props, "seata.registry.type", LauncherConstant.NACOS_MODE); +// PropsUtil.setProperty(props, "seata.registry.nacos.server-addr", LauncherConstant.nacosAddr(profile)); +// PropsUtil.setProperty(props, "seata.config.type", LauncherConstant.NACOS_MODE); +// PropsUtil.setProperty(props, "seata.config.nacos.server-addr", LauncherConstant.nacosAddr(profile)); + + + // 指定注册IP + // PropsUtil.setProperty(props, "spring.cloud.nacos.discovery.ip", "127.0.0.1"); + // 指定注册端口 + // PropsUtil.setProperty(props, "spring.cloud.nacos.discovery.port", "8200"); + // 自定义命名空间 + // PropsUtil.setProperty(props, "spring.cloud.nacos.config.namespace", LauncherConstant.NACOS_NAMESPACE); + // PropsUtil.setProperty(props, "spring.cloud.nacos.discovery.namespace", LauncherConstant.NACOS_NAMESPACE); + // 自定义分组 + // PropsUtil.setProperty(props, "spring.cloud.nacos.config.group", NacosConstant.NACOS_CONFIG_GROUP); + // PropsUtil.setProperty(props, "spring.cloud.nacos.discovery.group", NacosConstant.NACOS_CONFIG_GROUP); + } + + @Override + public int getOrder() { + return 20; + } +} diff --git a/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/listener/mq/WaybillFanoutListener.java b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/listener/mq/WaybillFanoutListener.java new file mode 100644 index 000000000..d986ba7e9 --- /dev/null +++ b/blade-service/logpm-statisticsdata/src/main/java/com/logpm/statistics/listener/mq/WaybillFanoutListener.java @@ -0,0 +1,50 @@ +package com.logpm.statistics.listener.mq; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springblade.common.constant.broadcast.FanoutConstants; +import org.springblade.core.secure.BladeUser; +import org.springblade.core.secure.utils.AuthUtil; +import org.springframework.amqp.core.ExchangeTypes; +import org.springframework.amqp.rabbit.annotation.Exchange; +import org.springframework.amqp.rabbit.annotation.Queue; +import org.springframework.amqp.rabbit.annotation.QueueBinding; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +@Slf4j +@Component +@AllArgsConstructor +public class WaybillFanoutListener { + + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(name = FanoutConstants.trunkline.OPENWAYBILL.QUEUE.STATISTICSDATA_CREATEPACKAGEINFO), + exchange = @Exchange(name = FanoutConstants.trunkline.OPENWAYBILL.EXCHANGE, type = ExchangeTypes.FANOUT) + )) + @Transactional(rollbackFor = Exception.class) + public void createPackageInfo(String msg) { + + String tenantId = AuthUtil.getTenantId(); + BladeUser user = AuthUtil.getUser(); + + log.info("tenantId:{}",tenantId); + log.info("user:{}",user); + + log.info("接收到消息:{}", msg); + } + + + + @RabbitListener(bindings = @QueueBinding( + value = @Queue(name = FanoutConstants.trunkline.UPDATEWAYBILL.QUEUE.STATISTICSDATA_UPDATEPACKAGEINFO), + exchange = @Exchange(name = FanoutConstants.trunkline.UPDATEWAYBILL.EXCHANGE, type = ExchangeTypes.FANOUT) + )) + @Transactional(rollbackFor = Exception.class) + public void updateWaybillData(String msg) { + log.info("接收到消息:{}", msg); + } + + +} diff --git a/blade-service/logpm-statisticsdata/src/main/resources/application-dev.yml b/blade-service/logpm-statisticsdata/src/main/resources/application-dev.yml index a04bb4b70..f5a8e1df0 100644 --- a/blade-service/logpm-statisticsdata/src/main/resources/application-dev.yml +++ b/blade-service/logpm-statisticsdata/src/main/resources/application-dev.yml @@ -1,6 +1,6 @@ #服务器端口 server: - port: 8200 + port: 19999 #数据源配置 #spring: @@ -10,6 +10,33 @@ server: # password: ${blade.datasource.dev.password} spring: + #rabbitmq配置 + rabbitmq: + host: 192.168.2.46 + port: 5672 + username: admin + password: admin + #虚拟host 可以不设置,使用server默认host + virtual-host: / + #确认消息已发送到队列(Queue) + publisher-returns: true + publisher-confirm-type: correlated + # 手动提交消息 + listener: + simple: + acknowledge-mode: auto + default-requeue-rejected: false + retry: + enabled: true # 开启消费者失败重试 + initial-interval: 1000 # 初识的失败等待时长为1秒 + multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval + max-attempts: 3 # 最大重试次数 + stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false + direct: + acknowledge-mode: manual + template: + mandatory: true + #排除DruidDataSourceAutoConfigure autoconfigure: exclude: com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceAutoConfigure,com.alibaba.cloud.seata.feign.SeataFeignClientAutoConfiguration @@ -30,15 +57,15 @@ spring: validation-query: select 1 #oracle校验 #validation-query: select 1 from dual - url: ${blade.datasource.factory.master.url} - username: ${blade.datasource.factory.master.username} - password: ${blade.datasource.factory.master.password} - slave: + url: ${blade.datasource.statisticsdata.master.url} + username: ${blade.datasource.statisticsdata.master.username} + password: ${blade.datasource.statisticsdata.master.password} + 627683: druid: #独立校验配置 validation-query: select 1 #oracle校验 #validation-query: select 1 from dual - url: ${blade.datasource.factory.slave.url} - username: ${blade.datasource.factory.slave.username} - password: ${blade.datasource.factory.slave.password} + url: ${blade.datasource.statisticsdata.627683.url} + username: ${blade.datasource.statisticsdata.627683.username} + password: ${blade.datasource.statisticsdata.627683.password} diff --git a/blade-service/logpm-statisticsdata/src/main/resources/application.yml b/blade-service/logpm-statisticsdata/src/main/resources/application.yml index 28bbf533b..3b23e330e 100644 --- a/blade-service/logpm-statisticsdata/src/main/resources/application.yml +++ b/blade-service/logpm-statisticsdata/src/main/resources/application.yml @@ -37,14 +37,14 @@ spring: allow-circular-references: true -xxl: - job: - accessToken: '' - admin: - addresses: http://127.0.0.1:7009/xxl-job-admin - executor: - appname: logpm-factory-xxljob - ip: 127.0.0.1 - logpath: ../data/applogs/logpm-factory-xxljob/jobhandler - logretentiondays: -1 - port: 7018 +#xxl: +# job: +# accessToken: '' +# admin: +# addresses: http://127.0.0.1:7009/xxl-job-admin +# executor: +# appname: logpm-factory-xxljob +# ip: 127.0.0.1 +# logpath: ../data/applogs/logpm-factory-xxljob/jobhandler +# logretentiondays: -1 +# port: 7018 diff --git a/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/mapper/TrunklineCarsLoadMapper.xml b/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/mapper/TrunklineCarsLoadMapper.xml index e66373e36..16521c510 100644 --- a/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/mapper/TrunklineCarsLoadMapper.xml +++ b/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/mapper/TrunklineCarsLoadMapper.xml @@ -157,7 +157,6 @@ where ldpl.is_deleted = 0 and ldpl.order_package_status = '20' and ldpl.is_deleted = 0 - and ldpl.is_transfer = 1 and ldpl.waybill_number is not null and concat(ldpl.order_code,',',ldpl.waybill_number) not in @@ -178,9 +177,11 @@ and ldpl.accept_warehouse_id = #{param.warehouseId} and ldsa.type_service = '4' + and ldpl.is_transfer = 0 and ldsa.type_service != '4' + and ldpl.is_transfer = 1 group by ldpl.order_code, ldpl.waybill_id, diff --git a/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/IOpenOrderAsyncService.java b/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/IOpenOrderAsyncService.java index 7a0c7ac1f..b6004aa8b 100644 --- a/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/IOpenOrderAsyncService.java +++ b/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/IOpenOrderAsyncService.java @@ -1,6 +1,7 @@ package com.logpm.trunkline.service; import com.logpm.trunkline.entity.TrunklineCarsLoadLineEntity; +import com.logpm.warehouse.entity.WarehouseWayBillDetail; import com.logpm.warehouse.entity.WarehouseWaybillEntity; import java.util.List; @@ -21,4 +22,6 @@ public interface IOpenOrderAsyncService { void incomingPackageBatch(List advanceIds, Long userId, Long deptId, String tenantId, String nickName,Integer incomingType,Long warehouseId,String warehouseName,WarehouseWaybillEntity waybillEntity); void saveUpdateLog(Long waybillId, String waybillNo, String trackType, String refer, String operationRemark, String nickName, Long userId, Long warehouseId, String warehouseName, WarehouseWaybillEntity waybillEntity); + + void sendOpenWaybillFanout(WarehouseWaybillEntity waybillEntity, List details); } diff --git a/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/ISendFanoutService.java b/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/ISendFanoutService.java new file mode 100644 index 000000000..670365aee --- /dev/null +++ b/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/ISendFanoutService.java @@ -0,0 +1,7 @@ +package com.logpm.trunkline.service; + +import org.springblade.common.model.FanoutMsg; + +public interface ISendFanoutService{ + void sendFanoutMsg(FanoutMsg fanoutMsg); +} diff --git a/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/OpenOrderAsyncServiceImpl.java b/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/OpenOrderAsyncServiceImpl.java index ca82f0788..567b736fd 100644 --- a/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/OpenOrderAsyncServiceImpl.java +++ b/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/OpenOrderAsyncServiceImpl.java @@ -1,15 +1,19 @@ package com.logpm.trunkline.service.impl; +import com.alibaba.fastjson.JSONObject; import com.logpm.trunkline.dto.InComingDTO; import com.logpm.trunkline.entity.TrunklineCarsLoadEntity; import com.logpm.trunkline.entity.TrunklineCarsLoadLineEntity; import com.logpm.trunkline.entity.TrunklineWaybillTrackEntity; import com.logpm.trunkline.service.*; import com.logpm.trunkline.vo.LoadScanWaybillVO; +import com.logpm.warehouse.entity.WarehouseWayBillDetail; import com.logpm.warehouse.entity.WarehouseWaybillEntity; import com.logpm.warehouse.feign.IWarehouseWaybillClient; import lombok.extern.slf4j.Slf4j; import org.springblade.common.annotations.LogpmAsync; +import org.springblade.common.constant.broadcast.FanoutConstants; +import org.springblade.common.model.FanoutMsg; import org.springblade.common.utils.CommonUtil; import org.springblade.core.secure.BladeUser; import org.springblade.core.secure.utils.AuthUtil; @@ -41,6 +45,8 @@ public class OpenOrderAsyncServiceImpl implements IOpenOrderAsyncService { private ITrunklineCarsLoadLineService trunklineCarsLoadLineService; @Autowired private IInComingService inComingService; + @Autowired + private ISendFanoutService sendFanoutService; @LogpmAsync("asyncExecutor") @Override @@ -294,4 +300,19 @@ public class OpenOrderAsyncServiceImpl implements IOpenOrderAsyncService { //存入运单json } + + @LogpmAsync("asyncExecutor") + @Override + public void sendOpenWaybillFanout(WarehouseWaybillEntity waybillEntity, List details) { + + JSONObject jsonObject = new JSONObject(); + jsonObject.put("waybillNo",waybillEntity.getWaybillNo()); + jsonObject.put("waybillType",waybillEntity.getWaybillType()); + + FanoutMsg build = FanoutMsg.builder().msg(jsonObject.toJSONString()) + .exchange(FanoutConstants.trunkline.OPENWAYBILL.EXCHANGE).build(); + + sendFanoutService.sendFanoutMsg(build); + + } } diff --git a/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/OpenOrderServiceImpl.java b/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/OpenOrderServiceImpl.java index fb6c723db..14242afa4 100644 --- a/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/OpenOrderServiceImpl.java +++ b/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/OpenOrderServiceImpl.java @@ -46,7 +46,6 @@ import org.springblade.core.tool.utils.Func; import org.springblade.core.tool.utils.StringUtil; import org.springblade.system.cache.DictBizCache; import org.springblade.system.feign.IRegionFeign; -import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.web.multipart.MultipartFile; @@ -96,7 +95,6 @@ public class OpenOrderServiceImpl implements IOpenOrderService { private final IBasicdataStoreContactClient basicdataStoreContactClient; private final IAsyncService asyncService; private final ICarsLoadAsyncService carsLoadAsyncService; - private final RabbitTemplate rabbitTemplate; @Override public IPage advancePageList(AdvanceDTO advanceDTO) { @@ -756,11 +754,12 @@ public class OpenOrderServiceImpl implements IOpenOrderService { Long waybillId = warehouseWaybillClient.addEnntity(waybillEntity); waybillEntity.setId(waybillId); + List details = new ArrayList<>(); if (!Objects.isNull(waybillId)) { log.info("#################openZeroWaybill: 运单保存成功"); //保存运单货物明细 List waybillDetailList = openOrderDTO.getAddList(); - List details = new ArrayList<>(); + for (WaybillDetailDTO waybillDetailDTO : waybillDetailList) { WarehouseWayBillDetail warehouseWayBillDetail = new WarehouseWayBillDetail(); warehouseWayBillDetail.setCreateUser(AuthUtil.getUserId()); @@ -841,6 +840,9 @@ public class OpenOrderServiceImpl implements IOpenOrderService { trunklineWaybillOrderEntity.setHandleNumber(waybillEntity.getTotalCount()); trunklineWaybillOrderService.save(trunklineWaybillOrderEntity); + //发布开单广播 + openOrderAsyncService.sendOpenWaybillFanout(waybillEntity, details); + Map map = new HashMap<>(); map.put("waybillId",waybillId); return R.data(map); @@ -4146,7 +4148,7 @@ public class OpenOrderServiceImpl implements IOpenOrderService { distributionStockArticleEntity.setIsZero("1"); Long orderId = distributionStockArticleClient.addData(distributionStockArticleEntity); - if (orderId == 0) { + if (Objects.isNull(orderId)) { log.warn("#################createStockArticle: 保存订单信息失败 orderCode={}", distributionStockArticleEntity.getOrderCode()); throw new CustomerException(405, "保存订单信息失败"); } diff --git a/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/SendFanoutServiceImpl.java b/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/SendFanoutServiceImpl.java new file mode 100644 index 000000000..927a5d3a8 --- /dev/null +++ b/blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/SendFanoutServiceImpl.java @@ -0,0 +1,26 @@ +package com.logpm.trunkline.service.impl; + +import com.logpm.trunkline.service.ISendFanoutService; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springblade.common.model.FanoutMsg; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.retry.annotation.Backoff; +import org.springframework.retry.annotation.Retryable; +import org.springframework.stereotype.Service; + +@Service +@AllArgsConstructor +@Slf4j +public class SendFanoutServiceImpl implements ISendFanoutService { + + private final RabbitTemplate rabbitTemplate; + + @Retryable(value = Exception.class, maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5)) + @Override + public void sendFanoutMsg(FanoutMsg fanoutMsg) { + + rabbitTemplate.convertAndSend(fanoutMsg.getExchange(),null, fanoutMsg.getMsg()); + + } +}