Browse Source

1.多数据源 逻辑,配置更新

master
zhenghaoyu 11 months ago
parent
commit
6b8ab6dd95
  1. 5
      blade-service/logpm-aftersales/src/main/java/com/logpm/aftersales/AftersalesApplication.java
  2. 50
      blade-service/logpm-aftersales/src/main/java/com/logpm/aftersales/aspect/AsyncAnnotationAspect.java
  3. 82
      blade-service/logpm-aftersales/src/main/java/com/logpm/aftersales/config/ExecutorConfig.java
  4. 4
      blade-service/logpm-aftersales/src/main/java/com/logpm/aftersales/service/impl/AftersalesWorkOrderServiceImpl.java
  5. 2
      blade-service/logpm-basic/src/main/java/com/logpm/basic/BasicApplication.java
  6. 50
      blade-service/logpm-basic/src/main/java/com/logpm/basic/aspect/AsyncAnnotationAspect.java
  7. 82
      blade-service/logpm-basic/src/main/java/com/logpm/basic/config/ExecutorConfig.java
  8. 2
      blade-service/logpm-basicdata/src/main/java/com/logpm/basicdata/BasicDataApplication.java
  9. 50
      blade-service/logpm-basicdata/src/main/java/com/logpm/basicdata/aspect/AsyncAnnotationAspect.java
  10. 82
      blade-service/logpm-basicdata/src/main/java/com/logpm/basicdata/config/ExecutorConfig.java
  11. 2
      blade-service/logpm-distribution/src/main/java/com/logpm/distribution/DistributionApplication.java
  12. 50
      blade-service/logpm-distribution/src/main/java/com/logpm/distribution/aspect/AsyncAnnotationAspect.java
  13. 82
      blade-service/logpm-distribution/src/main/java/com/logpm/distribution/config/ExecutorConfig.java
  14. 124
      blade-service/logpm-distribution/src/main/java/com/logpm/distribution/service/impl/DistributionAsyncServiceImpl.java
  15. 4
      blade-service/logpm-factory/src/main/java/com/logpm/factory/comfac/service/impl/AsyncDataServiceImpl.java
  16. 10
      blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OpFailRetryPushPackageServiceImpl.java
  17. 2
      blade-service/logpm-patch/src/main/java/com/logpm/patch/service/impl/AsyncDataServiceImpl.java
  18. 2
      blade-service/logpm-supervise/src/main/java/com/logpm/supervise/SuperviseApplication.java
  19. 50
      blade-service/logpm-supervise/src/main/java/com/logpm/supervise/aspect/AsyncAnnotationAspect.java
  20. 82
      blade-service/logpm-supervise/src/main/java/com/logpm/supervise/config/ExecutorConfig.java
  21. 50
      blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/aspect/AsyncAnnotationAspect.java
  22. 2
      blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/IAsyncService.java
  23. 2
      blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/AsyncServiceImpl.java
  24. 16
      blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/CarsLoadAsyncServiceImpl.java
  25. 52
      blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/OpenOrderAsyncServiceImpl.java
  26. 4
      blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/PackageTrackLogAsyncServiceImpl.java
  27. 50
      blade-service/logpm-warehouse/src/main/java/com/logpm/warehouse/aspect/AsyncAnnotationAspect.java
  28. 16
      blade-service/logpm-warehouse/src/main/java/com/logpm/warehouse/config/ExecutorConfig.java
  29. 2
      blade-service/logpm-warehouse/src/main/java/com/logpm/warehouse/service/impl/AsyncDataServiceImpl.java

5
blade-service/logpm-aftersales/src/main/java/com/logpm/aftersales/AftersalesApplication.java

@ -19,9 +19,7 @@ package com.logpm.aftersales;
import org.springblade.common.constant.ModuleNameConstant;
import org.springblade.core.cloud.client.BladeCloudApplication;
import org.springblade.core.launch.BladeApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
import org.springblade.core.transaction.annotation.SeataCloudApplication;
/**
* Demo启动器
@ -29,6 +27,7 @@ import org.springframework.web.socket.server.standard.ServerEndpointExporter;
* @author Chill
*/
@BladeCloudApplication
@SeataCloudApplication
//@EnableWebSocket
public class AftersalesApplication {

50
blade-service/logpm-aftersales/src/main/java/com/logpm/aftersales/aspect/AsyncAnnotationAspect.java

@ -0,0 +1,50 @@
package com.logpm.aftersales.aspect;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.StringUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Aspect
@Component
public class AsyncAnnotationAspect {
/**
* 定义一个切点匹配所有带有@Async("asyncExecutor")注解的方法
* 注意实际上Spring Framework自带对@Async("asyncExecutor")的处理直接这样配置可能会导致预期之外的行为
*/
@Around("@annotation(org.springframework.scheduling.annotation.Async)")
public Object logAroundAsyncMethods(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Async myAsync = method.getAnnotation(Async.class);
String annotationValue = myAsync.value();
if(StringUtil.isNotBlank(annotationValue) && annotationValue.equals("asyncExecutor")){
// 在方法执行前的操作
String tenantId = AuthUtil.getTenantId();
DynamicDataSourceContextHolder.push(tenantId);
// 执行原方法
Object result = joinPoint.proceed();
// 在方法执行后的操作
DynamicDataSourceContextHolder.poll();
return result;
}else{
return joinPoint.proceed();
}
}
}

82
blade-service/logpm-aftersales/src/main/java/com/logpm/aftersales/config/ExecutorConfig.java

@ -0,0 +1,82 @@
package com.logpm.aftersales.config;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.ThreadLocalUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {
@Bean
public Executor asyncExecutor() {
log.info("start async executor");
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 配置核心线程数
threadPoolTaskExecutor.setCorePoolSize(10);
// 配置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(20);
// 配置队列大小
threadPoolTaskExecutor.setQueueCapacity(50);
// 配置线程池中线程的名称前缀
threadPoolTaskExecutor.setThreadNamePrefix("ASYNC_THREAD_");
// HelloWorldServiceImpl rejection-policy: 当pool已经达到max size时,如何处理新任务:
// CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行;
// AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常;
// DiscardPolicy:丢弃当前将要加入队列的任务;
// DiscardOldestPolicy:丢弃任务队列中最旧的任务;
threadPoolTaskExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
threadPoolTaskExecutor.setTaskDecorator(new ContextCopyingDecorator());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
static class ContextCopyingDecorator implements TaskDecorator {
@Nonnull
@Override
public Runnable decorate(@Nonnull Runnable runnable) {
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
String tenantId = AuthUtil.getTenantId();
Map<String, Object> all = ThreadLocalUtil.getAll();
Map<String, String> mdcMap = MDC.getCopyOfContextMap();
return () -> {
try {
all.keySet().forEach(key -> ThreadLocalUtil.put(key, all.get(key)));
if (mdcMap != null && !mdcMap.isEmpty()) {
MDC.setContextMap(mdcMap);
}
RequestContextHolder.setRequestAttributes(context);
String tenantId1 = AuthUtil.getTenantId();
runnable.run();
} finally {
RequestContextHolder.resetRequestAttributes();
all.clear();
if (mdcMap != null) {
mdcMap.clear();
}
ThreadLocalUtil.clear();
MDC.clear();
}
};
}
}
}

4
blade-service/logpm-aftersales/src/main/java/com/logpm/aftersales/service/impl/AftersalesWorkOrderServiceImpl.java

@ -1918,7 +1918,7 @@ public class AftersalesWorkOrderServiceImpl extends BaseServiceImpl<AftersalesWo
* @return
*/
@Override
@Async
@Async("asyncExecutor")
public Boolean addSurveyRecord(BladeUser user, Long workOrderId, String results, String difference) {
//添加跟踪记录
AftersaleSurveyRecordDTO aftersaleSurveyRecordDTO = changName(new AftersalesWorkOrderDTO(), user);
@ -2074,7 +2074,7 @@ public class AftersalesWorkOrderServiceImpl extends BaseServiceImpl<AftersalesWo
*
* @param aftersalesWorkOrderDTO
*/
@Async
@Async("asyncExecutor")
@Override
public void addAssignCustomerService(AftersalesWorkOrderDTO aftersalesWorkOrderDTO, BladeUser user) {
BasicdataWarehouseEntity myCurrentWarehouse = warehouseClient.getMyCurrentWarehouse();

2
blade-service/logpm-basic/src/main/java/com/logpm/basic/BasicApplication.java

@ -5,6 +5,7 @@ package com.logpm.basic;
import org.springblade.common.constant.ModuleNameConstant;
import org.springblade.core.cloud.client.BladeCloudApplication;
import org.springblade.core.launch.BladeApplication;
import org.springblade.core.transaction.annotation.SeataCloudApplication;
/**
* Basic启动器
@ -12,6 +13,7 @@ import org.springblade.core.launch.BladeApplication;
* @author lmy
*/
@BladeCloudApplication
@SeataCloudApplication
public class BasicApplication {
public static void main(String[] args) {

50
blade-service/logpm-basic/src/main/java/com/logpm/basic/aspect/AsyncAnnotationAspect.java

@ -0,0 +1,50 @@
package com.logpm.basic.aspect;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.StringUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Aspect
@Component
public class AsyncAnnotationAspect {
/**
* 定义一个切点匹配所有带有@Async("asyncExecutor")注解的方法
* 注意实际上Spring Framework自带对@Async("asyncExecutor")的处理直接这样配置可能会导致预期之外的行为
*/
@Around("@annotation(org.springframework.scheduling.annotation.Async)")
public Object logAroundAsyncMethods(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Async myAsync = method.getAnnotation(Async.class);
String annotationValue = myAsync.value();
if(StringUtil.isNotBlank(annotationValue) && annotationValue.equals("asyncExecutor")){
// 在方法执行前的操作
String tenantId = AuthUtil.getTenantId();
DynamicDataSourceContextHolder.push(tenantId);
// 执行原方法
Object result = joinPoint.proceed();
// 在方法执行后的操作
DynamicDataSourceContextHolder.poll();
return result;
}else{
return joinPoint.proceed();
}
}
}

82
blade-service/logpm-basic/src/main/java/com/logpm/basic/config/ExecutorConfig.java

@ -0,0 +1,82 @@
package com.logpm.basic.config;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.ThreadLocalUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {
@Bean
public Executor asyncExecutor() {
log.info("start async executor");
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 配置核心线程数
threadPoolTaskExecutor.setCorePoolSize(10);
// 配置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(20);
// 配置队列大小
threadPoolTaskExecutor.setQueueCapacity(50);
// 配置线程池中线程的名称前缀
threadPoolTaskExecutor.setThreadNamePrefix("ASYNC_THREAD_");
// HelloWorldServiceImpl rejection-policy: 当pool已经达到max size时,如何处理新任务:
// CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行;
// AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常;
// DiscardPolicy:丢弃当前将要加入队列的任务;
// DiscardOldestPolicy:丢弃任务队列中最旧的任务;
threadPoolTaskExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
threadPoolTaskExecutor.setTaskDecorator(new ContextCopyingDecorator());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
static class ContextCopyingDecorator implements TaskDecorator {
@Nonnull
@Override
public Runnable decorate(@Nonnull Runnable runnable) {
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
String tenantId = AuthUtil.getTenantId();
Map<String, Object> all = ThreadLocalUtil.getAll();
Map<String, String> mdcMap = MDC.getCopyOfContextMap();
return () -> {
try {
all.keySet().forEach(key -> ThreadLocalUtil.put(key, all.get(key)));
if (mdcMap != null && !mdcMap.isEmpty()) {
MDC.setContextMap(mdcMap);
}
RequestContextHolder.setRequestAttributes(context);
String tenantId1 = AuthUtil.getTenantId();
runnable.run();
} finally {
RequestContextHolder.resetRequestAttributes();
all.clear();
if (mdcMap != null) {
mdcMap.clear();
}
ThreadLocalUtil.clear();
MDC.clear();
}
};
}
}
}

2
blade-service/logpm-basicdata/src/main/java/com/logpm/basicdata/BasicDataApplication.java

@ -4,6 +4,7 @@ package com.logpm.basicdata;
import org.springblade.common.constant.ModuleNameConstant;
import org.springblade.core.cloud.client.BladeCloudApplication;
import org.springblade.core.launch.BladeApplication;
import org.springblade.core.transaction.annotation.SeataCloudApplication;
//import org.springblade.core.transaction.annotation.SeataCloudApplication;
/**
@ -13,6 +14,7 @@ import org.springblade.core.launch.BladeApplication;
*/
@BladeCloudApplication
@SeataCloudApplication
public class BasicDataApplication {
public static void main(String[] args) {

50
blade-service/logpm-basicdata/src/main/java/com/logpm/basicdata/aspect/AsyncAnnotationAspect.java

@ -0,0 +1,50 @@
package com.logpm.basicdata.aspect;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.StringUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Aspect
@Component
public class AsyncAnnotationAspect {
/**
* 定义一个切点匹配所有带有@Async("asyncExecutor")注解的方法
* 注意实际上Spring Framework自带对@Async("asyncExecutor")的处理直接这样配置可能会导致预期之外的行为
*/
@Around("@annotation(org.springframework.scheduling.annotation.Async)")
public Object logAroundAsyncMethods(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Async myAsync = method.getAnnotation(Async.class);
String annotationValue = myAsync.value();
if(StringUtil.isNotBlank(annotationValue) && annotationValue.equals("asyncExecutor")){
// 在方法执行前的操作
String tenantId = AuthUtil.getTenantId();
DynamicDataSourceContextHolder.push(tenantId);
// 执行原方法
Object result = joinPoint.proceed();
// 在方法执行后的操作
DynamicDataSourceContextHolder.poll();
return result;
}else{
return joinPoint.proceed();
}
}
}

82
blade-service/logpm-basicdata/src/main/java/com/logpm/basicdata/config/ExecutorConfig.java

@ -0,0 +1,82 @@
package com.logpm.basicdata.config;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.ThreadLocalUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {
@Bean
public Executor asyncExecutor() {
log.info("start async executor");
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 配置核心线程数
threadPoolTaskExecutor.setCorePoolSize(10);
// 配置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(20);
// 配置队列大小
threadPoolTaskExecutor.setQueueCapacity(50);
// 配置线程池中线程的名称前缀
threadPoolTaskExecutor.setThreadNamePrefix("ASYNC_THREAD_");
// HelloWorldServiceImpl rejection-policy: 当pool已经达到max size时,如何处理新任务:
// CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行;
// AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常;
// DiscardPolicy:丢弃当前将要加入队列的任务;
// DiscardOldestPolicy:丢弃任务队列中最旧的任务;
threadPoolTaskExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
threadPoolTaskExecutor.setTaskDecorator(new ContextCopyingDecorator());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
static class ContextCopyingDecorator implements TaskDecorator {
@Nonnull
@Override
public Runnable decorate(@Nonnull Runnable runnable) {
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
String tenantId = AuthUtil.getTenantId();
Map<String, Object> all = ThreadLocalUtil.getAll();
Map<String, String> mdcMap = MDC.getCopyOfContextMap();
return () -> {
try {
all.keySet().forEach(key -> ThreadLocalUtil.put(key, all.get(key)));
if (mdcMap != null && !mdcMap.isEmpty()) {
MDC.setContextMap(mdcMap);
}
RequestContextHolder.setRequestAttributes(context);
String tenantId1 = AuthUtil.getTenantId();
runnable.run();
} finally {
RequestContextHolder.resetRequestAttributes();
all.clear();
if (mdcMap != null) {
mdcMap.clear();
}
ThreadLocalUtil.clear();
MDC.clear();
}
};
}
}
}

2
blade-service/logpm-distribution/src/main/java/com/logpm/distribution/DistributionApplication.java

@ -19,6 +19,7 @@ package com.logpm.distribution;
import org.springblade.common.constant.ModuleNameConstant;
import org.springblade.core.cloud.client.BladeCloudApplication;
import org.springblade.core.launch.BladeApplication;
import org.springblade.core.transaction.annotation.SeataCloudApplication;
/**
* Demo启动器
@ -26,6 +27,7 @@ import org.springblade.core.launch.BladeApplication;
* @author Chill
*/
@BladeCloudApplication
@SeataCloudApplication
public class DistributionApplication {
public static void main(String[] args) {

50
blade-service/logpm-distribution/src/main/java/com/logpm/distribution/aspect/AsyncAnnotationAspect.java

@ -0,0 +1,50 @@
package com.logpm.distribution.aspect;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.StringUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Aspect
@Component
public class AsyncAnnotationAspect {
/**
* 定义一个切点匹配所有带有@Async("asyncExecutor")注解的方法
* 注意实际上Spring Framework自带对@Async("asyncExecutor")的处理直接这样配置可能会导致预期之外的行为
*/
@Around("@annotation(org.springframework.scheduling.annotation.Async)")
public Object logAroundAsyncMethods(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Async myAsync = method.getAnnotation(Async.class);
String annotationValue = myAsync.value();
if(StringUtil.isNotBlank(annotationValue) && annotationValue.equals("asyncExecutor")){
// 在方法执行前的操作
String tenantId = AuthUtil.getTenantId();
DynamicDataSourceContextHolder.push(tenantId);
// 执行原方法
Object result = joinPoint.proceed();
// 在方法执行后的操作
DynamicDataSourceContextHolder.poll();
return result;
}else{
return joinPoint.proceed();
}
}
}

82
blade-service/logpm-distribution/src/main/java/com/logpm/distribution/config/ExecutorConfig.java

@ -0,0 +1,82 @@
package com.logpm.distribution.config;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.ThreadLocalUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {
@Bean
public Executor asyncExecutor() {
log.info("start async executor");
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 配置核心线程数
threadPoolTaskExecutor.setCorePoolSize(10);
// 配置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(20);
// 配置队列大小
threadPoolTaskExecutor.setQueueCapacity(50);
// 配置线程池中线程的名称前缀
threadPoolTaskExecutor.setThreadNamePrefix("ASYNC_THREAD_");
// HelloWorldServiceImpl rejection-policy: 当pool已经达到max size时,如何处理新任务:
// CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行;
// AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常;
// DiscardPolicy:丢弃当前将要加入队列的任务;
// DiscardOldestPolicy:丢弃任务队列中最旧的任务;
threadPoolTaskExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
threadPoolTaskExecutor.setTaskDecorator(new ContextCopyingDecorator());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
static class ContextCopyingDecorator implements TaskDecorator {
@Nonnull
@Override
public Runnable decorate(@Nonnull Runnable runnable) {
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
String tenantId = AuthUtil.getTenantId();
Map<String, Object> all = ThreadLocalUtil.getAll();
Map<String, String> mdcMap = MDC.getCopyOfContextMap();
return () -> {
try {
all.keySet().forEach(key -> ThreadLocalUtil.put(key, all.get(key)));
if (mdcMap != null && !mdcMap.isEmpty()) {
MDC.setContextMap(mdcMap);
}
RequestContextHolder.setRequestAttributes(context);
String tenantId1 = AuthUtil.getTenantId();
runnable.run();
} finally {
RequestContextHolder.resetRequestAttributes();
all.clear();
if (mdcMap != null) {
mdcMap.clear();
}
ThreadLocalUtil.clear();
MDC.clear();
}
};
}
}
}

124
blade-service/logpm-distribution/src/main/java/com/logpm/distribution/service/impl/DistributionAsyncServiceImpl.java

@ -17,59 +17,10 @@ import com.logpm.distribution.dto.DistributionStockArticleDTO;
import com.logpm.distribution.dto.app.DistrilbutionloadingscanDTO;
import com.logpm.distribution.dto.app.StockupDTO;
import com.logpm.distribution.dto.app.StockupZeroDTO;
import com.logpm.distribution.entity.DisStockListDetailEntity;
import com.logpm.distribution.entity.DistributionBillLadingScanEntity;
import com.logpm.distribution.entity.DistributionDeliveryDetailsEntity;
import com.logpm.distribution.entity.DistributionDeliveryListEntity;
import com.logpm.distribution.entity.DistributionLoadscanEntity;
import com.logpm.distribution.entity.DistributionLoadscaninvnEntity;
import com.logpm.distribution.entity.DistributionParcelListEntity;
import com.logpm.distribution.entity.DistributionParcelNumberEntity;
import com.logpm.distribution.entity.DistributionReservationEntity;
import com.logpm.distribution.entity.DistributionReservationPackageEntity;
import com.logpm.distribution.entity.DistributionReservationStockarticleEntity;
import com.logpm.distribution.entity.DistributionReservationStocklistEntity;
import com.logpm.distribution.entity.DistributionReservationZeroPackageEntity;
import com.logpm.distribution.entity.DistributionSignforEntity;
import com.logpm.distribution.entity.DistributionStockArticleEntity;
import com.logpm.distribution.entity.DistributionStockEntity;
import com.logpm.distribution.entity.DistributionStockListEntity;
import com.logpm.distribution.entity.DistributionStockupEntity;
import com.logpm.distribution.entity.DistributionStockupInfoEntity;
import com.logpm.distribution.entity.DistrilbutionBillLadingEntity;
import com.logpm.distribution.entity.DistrilbutionBillPackageEntity;
import com.logpm.distribution.entity.DistrilbutionBillStockEntity;
import com.logpm.distribution.mapper.DistributionBillLadingScanMapper;
import com.logpm.distribution.mapper.DistributionDeliveryDetailsMapper;
import com.logpm.distribution.mapper.DistributionDeliveryListMapper;
import com.logpm.distribution.mapper.DistributionLoadscanMapper;
import com.logpm.distribution.mapper.DistributionLoadscaninvnMapper;
import com.logpm.distribution.mapper.DistributionParcelListMapper;
import com.logpm.distribution.mapper.DistributionReservationMapper;
import com.logpm.distribution.mapper.DistributionReservationPackageMapper;
import com.logpm.distribution.mapper.DistributionReservationStockarticleMapper;
import com.logpm.distribution.mapper.DistributionReservationStocklistMapper;
import com.logpm.distribution.mapper.DistributionSignforMapper;
import com.logpm.distribution.mapper.DistributionStockListMapper;
import com.logpm.distribution.mapper.DistributionStockMapper;
import com.logpm.distribution.mapper.DistributionStockupInfoMapper;
import com.logpm.distribution.mapper.DistributionStockupMapper;
import com.logpm.distribution.mapper.DistrilbutionBillLadingMapper;
import com.logpm.distribution.service.IDisStockListDetailService;
import com.logpm.distribution.service.IDistributionAsyncService;
import com.logpm.distribution.service.IDistributionDeliveryDetailsService;
import com.logpm.distribution.service.IDistributionParcelNumberService;
import com.logpm.distribution.service.IDistributionReservationStockarticleService;
import com.logpm.distribution.service.IDistributionReservationStocklistService;
import com.logpm.distribution.service.IDistributionReservationZeroPackageService;
import com.logpm.distribution.service.IDistributionStockArticleService;
import com.logpm.distribution.service.IDistrilbutionBillPackageService;
import com.logpm.distribution.service.IDistrilbutionBillStockService;
import com.logpm.distribution.vo.DistributionParcelNumberVO;
import com.logpm.distribution.vo.DistributionSignPrintVO;
import com.logpm.distribution.vo.DistributionStockPackageVO;
import com.logpm.distribution.vo.DistributionStockupSelfVO;
import com.logpm.distribution.vo.DistrilbutionBillStockVO;
import com.logpm.distribution.entity.*;
import com.logpm.distribution.mapper.*;
import com.logpm.distribution.service.*;
import com.logpm.distribution.vo.*;
import com.logpm.factory.comfac.dto.OrderStatusDTO;
import com.logpm.factory.mt.dto.MtReceiveContentDTO;
import com.logpm.factory.mt.dto.MtReceiveDTO;
@ -77,7 +28,6 @@ import com.logpm.factory.mt.dto.MtReceiveImagesDTO;
import com.logpm.factory.mt.feign.IMtOrderMainClinet;
import com.logpm.factory.oupai.feign.IOuPaiFactoryClinet;
import com.logpm.factorydata.enums.BrandEnums;
import org.springblade.common.constant.WorkNodeEnums;
import com.logpm.factorydata.util.FactoryDataMessageSender;
import com.logpm.factorydata.vo.NodePushMsg;
import com.logpm.trunkline.dto.AddWaybillTrackDTO;
@ -91,6 +41,7 @@ import org.springblade.common.constant.DistributionTypeConstant;
import org.springblade.common.constant.Inventory.InventoryLoadingStatusConstant;
import org.springblade.common.constant.Inventory.InventoryPackageStatusConstant;
import org.springblade.common.constant.Inventory.InventorySigningStatusConstant;
import org.springblade.common.constant.WorkNodeEnums;
import org.springblade.common.constant.common.IsOrNoConstant;
import org.springblade.common.constant.delivery.DeliveryLoadingStatusConstant;
import org.springblade.common.constant.delivery.DeliveryStatusConstant;
@ -102,20 +53,12 @@ import org.springblade.common.constant.orderpackage.OrderPackageLoadingStatusCon
import org.springblade.common.constant.orderpackage.OrderPackageReservationStatusConstant;
import org.springblade.common.constant.orderpackage.OrderPackageStatusConstant;
import org.springblade.common.constant.orderpackage.OrderPackageStockupStatusConstant;
import org.springblade.common.constant.reservation.ReservationInventoryLoadingStatusConstant;
import org.springblade.common.constant.reservation.ReservationInventorySigningStatusConstant;
import org.springblade.common.constant.reservation.ReservationInventoryStatusConstant;
import org.springblade.common.constant.reservation.ReservationLoadingStatusConstant;
import org.springblade.common.constant.reservation.ReservationOrderStatusConstant;
import org.springblade.common.constant.reservation.ReservationPackageStatusConstant;
import org.springblade.common.constant.reservation.ReservationSigningStatusConstant;
import org.springblade.common.constant.reservation.ReservationStockupStatusConstant;
import org.springblade.common.constant.reservation.*;
import org.springblade.common.constant.signing.SignforStatusConstant;
import org.springblade.common.constant.stockup.StockupStatusConstant;
import org.springblade.common.constant.stockup.StockupTypeConstant;
import org.springblade.core.log.exception.ServiceException;
import org.springblade.core.secure.BladeUser;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.Func;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@ -123,12 +66,7 @@ import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -183,7 +121,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
// private final
@Override
@Async
@Async("asyncExecutor")
@Transactional(isolation = Isolation.READ_UNCOMMITTED, rollbackFor = Exception.class)
public void makeSureByPackage(DistrilbutionloadingscanDTO distrilbutionloadingscanDTO) {
log.info("[makeSureByPackage]线程开启>>>>>>>>>>>>>>>>>>>>>>>>{}", distrilbutionloadingscanDTO);
@ -238,7 +176,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
* @return
*/
@Override
@Async
@Async("asyncExecutor")
@Transactional(rollbackFor = Exception.class)
public Boolean getSelfPickup(Long id) {
DistributionParcelListEntity entity = new DistributionParcelListEntity();
@ -254,7 +192,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
* @return
*/
@Override
@Async
@Async("asyncExecutor")
@Transactional(rollbackFor = Exception.class)
public Boolean getOrderSelfPickup(Long id) {
DistributionStockArticleEntity entity = new DistributionStockArticleEntity();
@ -264,7 +202,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
}
@Override
@Async
@Async("asyncExecutor")
@Transactional(rollbackFor = Exception.class)
public Boolean getInventorySelfPickup(Long id) {
DistributionStockEntity distributionStock = new DistributionStockEntity();
@ -275,7 +213,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
}
@Override
@Async
@Async("asyncExecutor")
@Transactional(rollbackFor = Exception.class)
public Boolean getInventoryOrderSelfPickup(Long id) {
DistributionDeliveryDetailsEntity deliveryDetails = new DistributionDeliveryDetailsEntity();
@ -303,7 +241,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
*
* @param id 订单ID
*/
@Async
@Async("asyncExecutor")
@Override
public void updateStockArticleStock(Long id) {
log.debug("###查询订单备货状态");
@ -331,7 +269,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
*
* @param ids 订单ID
*/
@Async
@Async("asyncExecutor")
@Override
public void updateParcelListReservation(List<Long> ids) {
log.debug("####修改包件预约状态");
@ -356,7 +294,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
}
@Async
@Async("asyncExecutor")
@Override
public Boolean getInventoryNumUpdate(Long id, Integer num, Integer type) {
//查询
@ -379,7 +317,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
}
@Async
@Async("asyncExecutor")
@Override
@Transactional(isolation = Isolation.READ_UNCOMMITTED, rollbackFor = Exception.class)
public void changeOrderSignforStatus(DistributionParcelListEntity parcelListEntity) {
@ -400,7 +338,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
}
@Async
@Async("asyncExecutor")
@Override
@Transactional(isolation = Isolation.READ_UNCOMMITTED, rollbackFor = Exception.class)
public void changeDiscussDeliveryListStatus(String barcode, Long deliveryId) {
@ -460,7 +398,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
});
}
@Async
@Async("asyncExecutor")
@Override
@Transactional(isolation = Isolation.READ_UNCOMMITTED, rollbackFor = Exception.class)
public void changeMarketDeliveryListStatus(String barcode, Long deliveryId, Long reservationId) {
@ -506,7 +444,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
checkReservationAndDeliveryLoadingStatus(deliveryId, reservationId);
}
@Async
@Async("asyncExecutor")
@Override
@Transactional(isolation = Isolation.READ_UNCOMMITTED, rollbackFor = Exception.class)
public void checkDeliverySignStatus(String barcode, Long deliveryId) {
@ -697,7 +635,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
distributionDeliveryListMapper.updateById(deliveryListEntity);
}
@Async
@Async("asyncExecutor")
@Override
@Transactional(isolation = Isolation.READ_UNCOMMITTED, rollbackFor = Exception.class)
public void checkDeliverySignStatusByReservation(Long deliveryId) {
@ -1503,7 +1441,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
}
@Override
@Async
@Async("asyncExecutor")
public void sendFactory(DistributionParcelListEntity distributionParcelListEntity, String signingTime, Long reservationId, String reservationCode, String warehouseName, String userName) {
DistributionStockArticleEntity stockArticleEntity = distributionStockArticleService.getById(distributionParcelListEntity.getStockArticleId());
log.info(">>>>>>>>>>> 签收推送 对象 {}", distributionParcelListEntity);
@ -1537,7 +1475,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
@Override
@Transactional(rollbackFor = Exception.class)
@Async
@Async("asyncExecutor")
public void sendReviewFactory(Long signingId, String warehouseName, Long warehouseId) {
String brands = "梦天,欧派,志邦";//可追加
log.info(">>>>>>>>>>> 签收推送 对象 {}", "signingId:=" + signingId + "warehouseName:=" + warehouseName + "warehouseId:=" + warehouseId);
@ -1690,7 +1628,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
}
@Override
@Async
@Async("asyncExecutor")
@Transactional
public void maintenanceOrderStatus(String orderCode, Long warehouseId) {
distributionStockArticleService.updateOrderInfo(orderCode, warehouseId);
@ -1698,7 +1636,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
@Override
@Transactional
@Async
@Async("asyncExecutor")
public void releaseSource(Long reservationId, Long warehouseId) {
String method = "##########################DistributionAsyncServiceImpl.releaseSource";
DistributionReservationEntity reservationEntity = distributionReservationMapper.selectById(reservationId);
@ -1890,7 +1828,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
* @param stockupDTO
*/
@Override
@Async
@Async("asyncExecutor")
public void updateStockupDate(StockupDTO stockupDTO) {
Integer scanType = stockupDTO.getScanType();//扫码类型 1 包件 2库存品
@ -2019,7 +1957,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
* 备货库存品备货时间回显
*/
@Override
@Async
@Async("asyncExecutor")
public void updateStockupStockListDate(StockupDTO stockupDTO) {
Integer scanType = stockupDTO.getScanType();//扫码类型 1 包件 2库存品
Long reservationId = stockupDTO.getReservationId();//预约ID
@ -2241,7 +2179,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
/**
* 自提签收片段是否签收完全
*/
@Async
@Async("asyncExecutor")
@Override
@Transactional(rollbackFor = Exception.class)
public void getStockUPstate(Long billLadingId) {
@ -2327,7 +2265,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
}
@Async
@Async("asyncExecutor")
@Override
@Transactional(isolation = Isolation.READ_UNCOMMITTED, rollbackFor = Exception.class)
public void checkStockArticleSignStatus(DistributionParcelListEntity parcelListEntity) {
@ -2384,7 +2322,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
}
@Async
@Async("asyncExecutor")
@Override
@Transactional(isolation = Isolation.READ_UNCOMMITTED, rollbackFor = Exception.class)
public void changeOrderStatus(String barcode, Long deliveryId) {
@ -2403,7 +2341,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
distributionStockArticleService.updateById(stockArticleEntity);
}
@Async
@Async("asyncExecutor")
@Override
@Transactional(isolation = Isolation.READ_UNCOMMITTED, rollbackFor = Exception.class)
public void changeDeliverySignforListStatus(Long deliveryId, DistributionParcelListEntity parcelListEntity) {
@ -2527,7 +2465,7 @@ public class DistributionAsyncServiceImpl implements IDistributionAsyncService {
distributionStockArticleService.updateById(stockArticleEntity);
}
@Async
@Async("asyncExecutor")
@Override
@Transactional(isolation = Isolation.READ_UNCOMMITTED, rollbackFor = Exception.class)
public void checkStockArticleLoadingStatus(DistributionParcelListEntity parcelListEntity) {

4
blade-service/logpm-factory/src/main/java/com/logpm/factory/comfac/service/impl/AsyncDataServiceImpl.java

@ -117,7 +117,7 @@ public class AsyncDataServiceImpl implements IAsyncDataService {
* 处理皮阿诺数据推送到汇通老库
*/
@Override
@Async
@Async("asyncExecutor")
public void handlerPanDataToHt(Long id) {
log.info("################handlerDataToHt: 处理皮阿诺订单数据到");
//查询需要同步的订单
@ -324,7 +324,7 @@ public class AsyncDataServiceImpl implements IAsyncDataService {
@Override
@Async
@Async("asyncExecutor")
public void handlerMtDataToHt(Long mainId) {
log.info("################handlerMtDataToHt: 处理梦天订单数据到");
//查询需要同步的订单

10
blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OpFailRetryPushPackageServiceImpl.java

@ -1,7 +1,6 @@
package com.logpm.factory.oupai.service.impl;
import com.alibaba.nacos.shaded.com.google.gson.Gson;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.logpm.factory.comfac.dto.OrderStatusDTO;
import com.logpm.factory.oupai.entity.OpFailRetryPushPackageEntity;
import com.logpm.factory.oupai.mapper.OpFailRetryPushPackageMapper;
@ -9,13 +8,14 @@ import com.logpm.factory.oupai.service.IOuPaiFactoryService;
import com.logpm.factory.oupai.service.OpFailRetryPushPackageService;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.constant.opFailRetryPushPackage.PushStatus;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.List;
/**
* 欧派数据失败重推
@ -31,7 +31,7 @@ public class OpFailRetryPushPackageServiceImpl implements OpFailRetryPushPackage
protected final int MAX_RETRY_TIMES = 7;
@Async
@Async("asyncExecutor")
@Override
public void retry(List<OpFailRetryPushPackageEntity> waitData){
ArrayList<Long> completeIds = new ArrayList<>();

2
blade-service/logpm-patch/src/main/java/com/logpm/patch/service/impl/AsyncDataServiceImpl.java

@ -40,7 +40,7 @@ public class AsyncDataServiceImpl implements IAsyncDataService {
@Override
@Async
@Async("asyncExecutor")
public void syncInventoryToPlatform(SyncInventoryEntity syncInventoryEntity) {
Integer pageSize = 500;//处理的每页条数

2
blade-service/logpm-supervise/src/main/java/com/logpm/supervise/SuperviseApplication.java

@ -5,6 +5,7 @@ package com.logpm.supervise;
import org.springblade.common.constant.ModuleNameConstant;
import org.springblade.core.cloud.client.BladeCloudApplication;
import org.springblade.core.launch.BladeApplication;
import org.springblade.core.transaction.annotation.SeataCloudApplication;
/**
* Basic启动器
@ -12,6 +13,7 @@ import org.springblade.core.launch.BladeApplication;
* @author lmy
*/
@BladeCloudApplication
@SeataCloudApplication
public class SuperviseApplication {
public static void main(String[] args) {

50
blade-service/logpm-supervise/src/main/java/com/logpm/supervise/aspect/AsyncAnnotationAspect.java

@ -0,0 +1,50 @@
package com.logpm.supervise.aspect;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.StringUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Aspect
@Component
public class AsyncAnnotationAspect {
/**
* 定义一个切点匹配所有带有@Async("asyncExecutor")注解的方法
* 注意实际上Spring Framework自带对@Async("asyncExecutor")的处理直接这样配置可能会导致预期之外的行为
*/
@Around("@annotation(org.springframework.scheduling.annotation.Async)")
public Object logAroundAsyncMethods(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Async myAsync = method.getAnnotation(Async.class);
String annotationValue = myAsync.value();
if(StringUtil.isNotBlank(annotationValue) && annotationValue.equals("asyncExecutor")){
// 在方法执行前的操作
String tenantId = AuthUtil.getTenantId();
DynamicDataSourceContextHolder.push(tenantId);
// 执行原方法
Object result = joinPoint.proceed();
// 在方法执行后的操作
DynamicDataSourceContextHolder.poll();
return result;
}else{
return joinPoint.proceed();
}
}
}

82
blade-service/logpm-supervise/src/main/java/com/logpm/supervise/config/ExecutorConfig.java

@ -0,0 +1,82 @@
package com.logpm.supervise.config;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.ThreadLocalUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@Configuration
@Slf4j
@EnableAsync
public class ExecutorConfig {
@Bean
public Executor asyncExecutor() {
log.info("start async executor");
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
// 配置核心线程数
threadPoolTaskExecutor.setCorePoolSize(10);
// 配置最大线程数
threadPoolTaskExecutor.setMaxPoolSize(20);
// 配置队列大小
threadPoolTaskExecutor.setQueueCapacity(50);
// 配置线程池中线程的名称前缀
threadPoolTaskExecutor.setThreadNamePrefix("ASYNC_THREAD_");
// HelloWorldServiceImpl rejection-policy: 当pool已经达到max size时,如何处理新任务:
// CallerRunsPolicy: 不在新线程中执行任务,而是由调用者所在的线程来执行;
// AbortPolicy: 拒绝执行新任务,并抛出RejectedExecutionException异常;
// DiscardPolicy:丢弃当前将要加入队列的任务;
// DiscardOldestPolicy:丢弃任务队列中最旧的任务;
threadPoolTaskExecutor.setRejectedExecutionHandler(
new ThreadPoolExecutor.CallerRunsPolicy()
);
threadPoolTaskExecutor.setTaskDecorator(new ContextCopyingDecorator());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
static class ContextCopyingDecorator implements TaskDecorator {
@Nonnull
@Override
public Runnable decorate(@Nonnull Runnable runnable) {
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
String tenantId = AuthUtil.getTenantId();
Map<String, Object> all = ThreadLocalUtil.getAll();
Map<String, String> mdcMap = MDC.getCopyOfContextMap();
return () -> {
try {
all.keySet().forEach(key -> ThreadLocalUtil.put(key, all.get(key)));
if (mdcMap != null && !mdcMap.isEmpty()) {
MDC.setContextMap(mdcMap);
}
RequestContextHolder.setRequestAttributes(context);
String tenantId1 = AuthUtil.getTenantId();
runnable.run();
} finally {
RequestContextHolder.resetRequestAttributes();
all.clear();
if (mdcMap != null) {
mdcMap.clear();
}
ThreadLocalUtil.clear();
MDC.clear();
}
};
}
}
}

50
blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/aspect/AsyncAnnotationAspect.java

@ -0,0 +1,50 @@
package com.logpm.trunkline.aspect;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.StringUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Aspect
@Component
public class AsyncAnnotationAspect {
/**
* 定义一个切点匹配所有带有@Async("asyncExecutor")注解的方法
* 注意实际上Spring Framework自带对@Async("asyncExecutor")的处理直接这样配置可能会导致预期之外的行为
*/
@Around("@annotation(org.springframework.scheduling.annotation.Async)")
public Object logAroundAsyncMethods(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Async myAsync = method.getAnnotation(Async.class);
String annotationValue = myAsync.value();
if(StringUtil.isNotBlank(annotationValue) && annotationValue.equals("asyncExecutor")){
// 在方法执行前的操作
String tenantId = AuthUtil.getTenantId();
DynamicDataSourceContextHolder.push(tenantId);
// 执行原方法
Object result = joinPoint.proceed();
// 在方法执行后的操作
DynamicDataSourceContextHolder.poll();
return result;
}else{
return joinPoint.proceed();
}
}
}

2
blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/IAsyncService.java

@ -1,14 +1,12 @@
package com.logpm.trunkline.service;
import com.logpm.trunkline.entity.TrunklineAdvanceDetailEntity;
import org.springframework.scheduling.annotation.Async;
import java.util.List;
import java.util.concurrent.CompletableFuture;
public interface IAsyncService {
@Async("asyncExecutor")
CompletableFuture<Boolean> getResponseFromCp(List<TrunklineAdvanceDetailEntity> list, int queryType);
}

2
blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/AsyncServiceImpl.java

@ -4,6 +4,7 @@ import com.logpm.trunkline.entity.TrunklineAdvanceDetailEntity;
import com.logpm.trunkline.service.IAsyncService;
import com.logpm.trunkline.service.ITrunklineAdvanceDetailService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.List;
@ -15,6 +16,7 @@ public class AsyncServiceImpl implements IAsyncService {
@Autowired
private ITrunklineAdvanceDetailService advanceDetailService;
@Async("asyncExecutor")
@Override
public CompletableFuture<Boolean> getResponseFromCp(List<TrunklineAdvanceDetailEntity> list, int queryType) {
return CompletableFuture

16
blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/CarsLoadAsyncServiceImpl.java

@ -56,7 +56,7 @@ public class CarsLoadAsyncServiceImpl implements ICarsLoadAsyncService {
@Autowired
private ITrunklineCarsUnloadLogService trunklineCarsUnloadLogService;
@Async
@Async("asyncExecutor")
@Override
public void saveLog(TrunklineCarsLoadEntity carsLoadEntity, TrunklineCarsLoadLineEntity carsLoadLineEntity, int cardLoadType,String nickName,Long userId) {
String nodeName = null;
@ -166,7 +166,7 @@ public class CarsLoadAsyncServiceImpl implements ICarsLoadAsyncService {
trunklineCarsLoadLogService.save(carsLoadLogEntity);
}
@Async
@Async("asyncExecutor")
@Override
public void saveCostShareRecord(Long loadId, TrunklineCarsLoadEntity carsLoadEntity, Long userId, String tenantId, String deptIds) {
Long deptId = null;
@ -251,7 +251,7 @@ public class CarsLoadAsyncServiceImpl implements ICarsLoadAsyncService {
}
@Async
@Async("asyncExecutor")
@Override
public void costShareByLoadId(Long loadId,TrunklineCarsLoadEntity carsLoadEntity) {
@ -332,7 +332,7 @@ public class CarsLoadAsyncServiceImpl implements ICarsLoadAsyncService {
}
@Async
@Async("asyncExecutor")
@Override
public void abnormalListStartCarByLoadIdAndWarehouseId(Long loadId, Long warehouseId,Long userId,Long deptId,String nickName,String tenantId,String warehouseName) {
@ -373,7 +373,7 @@ public class CarsLoadAsyncServiceImpl implements ICarsLoadAsyncService {
}
}
@Async
@Async("asyncExecutor")
@Override
public void abnormalListUnloadByLoadIdAndWarehouseId(Long loadId, Long warehouseId, String warehouseName, Long loadScanId, String tenantId, Long userId, String nickName, Long deptId) {
TrunklineCarsLoadScanEntity carsLoadScanEntity = trunklineCarsLoadScanService.getById(loadScanId);
@ -408,7 +408,7 @@ public class CarsLoadAsyncServiceImpl implements ICarsLoadAsyncService {
}
}
@Async
@Async("asyncExecutor")
@Override
public void abnormalListUnloadCheckByLoadIdAndWarehouseId(Long loadId, Long warehouseId, String warehouseName, String tenantId, Long userId, String nickName, Long deptId) {
@ -489,7 +489,7 @@ public class CarsLoadAsyncServiceImpl implements ICarsLoadAsyncService {
}
}
@Async
@Async("asyncExecutor")
@Override
public void dealwithAfterAbnormalPackage(String orderPackageCode, Long warehouseId, String warehouseName, String carsNo, Long userId, Long aLong, String nickName) {
@ -512,7 +512,7 @@ public class CarsLoadAsyncServiceImpl implements ICarsLoadAsyncService {
}
@Async
@Async("asyncExecutor")
@Override
public void savaUnloadLogBatch(List<TrunklineCarsUnloadLogEntity> unloadLogList) {
trunklineCarsUnloadLogService.savaUnloadLogBatch(unloadLogList);

52
blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/OpenOrderAsyncServiceImpl.java

@ -9,8 +9,9 @@ import com.logpm.trunkline.vo.LoadScanWaybillVO;
import com.logpm.warehouse.entity.WarehouseWaybillEntity;
import com.logpm.warehouse.feign.IWarehouseWaybillClient;
import lombok.extern.slf4j.Slf4j;
import org.springblade.common.constant.TenantNum;
import org.springblade.common.utils.CommonUtil;
import org.springblade.core.secure.BladeUser;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.system.cache.UserCache;
import org.springblade.system.entity.User;
import org.springframework.beans.factory.annotation.Autowired;
@ -28,6 +29,7 @@ public class OpenOrderAsyncServiceImpl implements IOpenOrderAsyncService {
@Autowired
private ITrunklineWaybillTrackService trunklineWaybillTrackService;
@Lazy
@Autowired
private ITrunklineCarsLoadService trunklineCarsLoadService;
@ -40,9 +42,11 @@ public class OpenOrderAsyncServiceImpl implements IOpenOrderAsyncService {
@Autowired
private IInComingService inComingService;
@Async
@Async("asyncExecutor")
@Override
public void saveLog(Long waybillId, String waybillNo, String trackType, String refer, String operationRemark, String nickName,Long userId,Long warehouseId,String warehouseName) {
// String tenantId = AuthUtil.getTenantId();
// DynamicDataSourceContextHolder.push(tenantId);
TrunklineWaybillTrackEntity waybillTrackEntity = new TrunklineWaybillTrackEntity();
waybillTrackEntity.setWarehouseId(warehouseId);
waybillTrackEntity.setWarehouseName(warehouseName);
@ -52,19 +56,25 @@ public class OpenOrderAsyncServiceImpl implements IOpenOrderAsyncService {
waybillTrackEntity.setRefer(refer);
waybillTrackEntity.setOperationRemark(operationRemark);
waybillTrackEntity.setCreateUserName(nickName);
waybillTrackEntity.setTenantId(TenantNum.HUITONGCODE);
waybillTrackEntity.setCreateUser(userId);
waybillTrackEntity.setUpdateUser(userId);
// waybillTrackEntity.setTenantId(TenantNum.HUITONGCODE);
// waybillTrackEntity.setCreateUser(userId);
// waybillTrackEntity.setUpdateUser(userId);
waybillTrackEntity.setCheckStatus(1);
waybillTrackEntity.setCheckTime(new Date());
trunklineWaybillTrackService.save(waybillTrackEntity);
// DynamicDataSourceContextHolder.clear();
}
@Async
@Async("asyncExecutor")
@Override
public void saveStartCarLog(TrunklineCarsLoadLineEntity carsLoadLineEntity,String nickName,Long userId) {
Long loadId = carsLoadLineEntity.getLoadId();
Integer sort = carsLoadLineEntity.getSort();
String tenantId = AuthUtil.getTenantId();
BladeUser user = AuthUtil.getUser();
// DynamicDataSourceContextHolder.push(tenantId);
TrunklineCarsLoadEntity carsLoadEntity = trunklineCarsLoadService.getById(loadId);
if(Objects.isNull(carsLoadEntity)){
log.warn("##############saveStartCarLog: 配载信息不存在 loadId={}",loadId);
@ -102,10 +112,30 @@ public class OpenOrderAsyncServiceImpl implements IOpenOrderAsyncService {
operationRemark = "干线从"+nodeName+"发车,实际发车时间"+ CommonUtil.dateToStringGeneral(new Date());
saveLog(waybillId,waybillNo,"40",nodeName+"已发车到"+nextWarehouseName,operationRemark,nickName,userId,nodeId,nodeName);
}
// DynamicDataSourceContextHolder.clear();
}
private void saveTrackLog(Long waybillId, String waybillNo, String trackType, String refer, String operationRemark, String nickName,Long userId,Long warehouseId,String warehouseName) {
TrunklineWaybillTrackEntity waybillTrackEntity = new TrunklineWaybillTrackEntity();
waybillTrackEntity.setWarehouseId(warehouseId);
waybillTrackEntity.setWarehouseName(warehouseName);
waybillTrackEntity.setWaybillId(waybillId);
waybillTrackEntity.setWaybillNo(waybillNo);
waybillTrackEntity.setTrackType(trackType);
waybillTrackEntity.setRefer(refer);
waybillTrackEntity.setOperationRemark(operationRemark);
waybillTrackEntity.setCreateUserName(nickName);
// waybillTrackEntity.setTenantId(TenantNum.HUITONGCODE);
// waybillTrackEntity.setCreateUser(userId);
// waybillTrackEntity.setUpdateUser(userId);
waybillTrackEntity.setCheckStatus(1);
waybillTrackEntity.setCheckTime(new Date());
trunklineWaybillTrackService.save(waybillTrackEntity);
}
@Async
@Async("asyncExecutor")
@Override
public void saveCancelStartCarLog(TrunklineCarsLoadLineEntity carsLoadLineEntity, String nickName,Long userId) {
Long loadId = carsLoadLineEntity.getLoadId();
@ -135,7 +165,7 @@ public class OpenOrderAsyncServiceImpl implements IOpenOrderAsyncService {
}
}
@Async
@Async("asyncExecutor")
@Override
public void saveArriveCarLog(TrunklineCarsLoadLineEntity carsLoadLineEntity, String nickName,Long userId) {
Long loadId = carsLoadLineEntity.getLoadId();
@ -165,7 +195,7 @@ public class OpenOrderAsyncServiceImpl implements IOpenOrderAsyncService {
}
}
@Async
@Async("asyncExecutor")
@Override
public void saveCancelArriveCarLog(TrunklineCarsLoadLineEntity carsLoadLineEntity, String nickName,Long userId) {
Long loadId = carsLoadLineEntity.getLoadId();
@ -195,7 +225,7 @@ public class OpenOrderAsyncServiceImpl implements IOpenOrderAsyncService {
}
}
@Async
@Async("asyncExecutor")
@Override
public void saveUnloadStartCarLog(TrunklineCarsLoadLineEntity carsLoadLineEntity, String nickName,Long userId) {
Long loadId = carsLoadLineEntity.getLoadId();
@ -227,7 +257,7 @@ public class OpenOrderAsyncServiceImpl implements IOpenOrderAsyncService {
}
}
@Async
@Async("asyncExecutor")
@Override
public void incomingPackageBatch(List<Long> advanceIds, Long userId, Long deptId, String tenantId, String nickName,Integer incomingType,Long warehouseId) {
InComingDTO inComingDTO = new InComingDTO();

4
blade-service/logpm-trunkline/src/main/java/com/logpm/trunkline/service/impl/PackageTrackLogAsyncServiceImpl.java

@ -21,7 +21,7 @@ public class PackageTrackLogAsyncServiceImpl implements IPackageTrackLogAsyncSer
private final IWarehousePackageTrackLogClient warehousePackageTrackLogClient;
private final ITrunklineWaybillPackageService waybillPackageService;
@Async
@Async("asyncExecutor")
@Override
public void addPackageTrackLog(String tenantId, Long userId, Long deptId, String nickName, List<String> orderPackageCodes, Long warehouseId, String warehouseName, Integer workNode, String content) {
List<WarehousePackageTrackLogEntity> list = new ArrayList<>();
@ -49,7 +49,7 @@ public class PackageTrackLogAsyncServiceImpl implements IPackageTrackLogAsyncSer
waybillPackageService.updatePackageStatus(orderPackageCodes, workNode);
}
@Async
@Async("asyncExecutor")
@Override
public void addBatchPackageTrackLog(List<WarehousePackageTrackLogEntity> addPackageTrackLogList, List<String> orderPackageCodes, Integer workNode) {
warehousePackageTrackLogClient.addLogList(addPackageTrackLogList);

50
blade-service/logpm-warehouse/src/main/java/com/logpm/warehouse/aspect/AsyncAnnotationAspect.java

@ -0,0 +1,50 @@
package com.logpm.warehouse.aspect;
import com.baomidou.dynamic.datasource.toolkit.DynamicDataSourceContextHolder;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springblade.core.secure.utils.AuthUtil;
import org.springblade.core.tool.utils.StringUtil;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Aspect
@Component
public class AsyncAnnotationAspect {
/**
* 定义一个切点匹配所有带有@Async("asyncExecutor")注解的方法
* 注意实际上Spring Framework自带对@Async("asyncExecutor")的处理直接这样配置可能会导致预期之外的行为
*/
@Around("@annotation(org.springframework.scheduling.annotation.Async)")
public Object logAroundAsyncMethods(ProceedingJoinPoint joinPoint) throws Throwable {
MethodSignature signature = (MethodSignature) joinPoint.getSignature();
Method method = signature.getMethod();
Async myAsync = method.getAnnotation(Async.class);
String annotationValue = myAsync.value();
if(StringUtil.isNotBlank(annotationValue) && annotationValue.equals("asyncExecutor")){
// 在方法执行前的操作
String tenantId = AuthUtil.getTenantId();
DynamicDataSourceContextHolder.push(tenantId);
// 执行原方法
Object result = joinPoint.proceed();
// 在方法执行后的操作
DynamicDataSourceContextHolder.poll();
return result;
}else{
return joinPoint.proceed();
}
}
}

16
blade-service/logpm-warehouse/src/main/java/com/logpm/warehouse/config/ExecutorConfig.java

@ -1,6 +1,8 @@
package com.logpm.warehouse.config;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.MDC;
import org.springblade.core.tool.utils.ThreadLocalUtil;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskDecorator;
@ -10,6 +12,7 @@ import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import javax.annotation.Nonnull;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
@ -49,12 +52,25 @@ public class ExecutorConfig {
@Override
public Runnable decorate(@Nonnull Runnable runnable) {
RequestAttributes context = RequestContextHolder.currentRequestAttributes();
Map<String, Object> all = ThreadLocalUtil.getAll();
Map<String, String> mdcMap = MDC.getCopyOfContextMap();
return () -> {
try {
all.keySet().forEach(key -> ThreadLocalUtil.put(key, all.get(key)));
if (mdcMap != null && !mdcMap.isEmpty()) {
MDC.setContextMap(mdcMap);
}
RequestContextHolder.setRequestAttributes(context);
runnable.run();
} finally {
RequestContextHolder.resetRequestAttributes();
all.clear();
if (mdcMap != null) {
mdcMap.clear();
}
ThreadLocalUtil.clear();
MDC.clear();
}
};
}

2
blade-service/logpm-warehouse/src/main/java/com/logpm/warehouse/service/impl/AsyncDataServiceImpl.java

@ -79,7 +79,7 @@ public class AsyncDataServiceImpl implements IAsyncDataService {
private ISyncTaskErrorLogService syncTaskErrorLogService;
@Override
@Async
@Async("asyncExecutor")
public void syncTaskData(String questNum, Long warehouseId,String tenantId,Long userId,String nickName,Long deptId) {
log.info("###############syncTaskData: 同步盘点任务数据开始 questNum={}",questNum);
QueryWrapper<TaskQuestEntity> queryWrapper = new QueryWrapper<>();

Loading…
Cancel
Save