From 15860d626820f5eb5d4716673e20ee2e0b6c4449 Mon Sep 17 00:00:00 2001 From: chenlong Date: Fri, 29 Mar 2024 17:40:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=AC=A7=E6=B4=BE=E4=B8=8D?= =?UTF-8?q?=E5=AD=98=E5=9C=A8=E7=9A=84=E5=8C=85=E6=9D=A1=E7=9A=84=E9=87=8D?= =?UTF-8?q?=E6=8E=A8=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../opFailRetryPushPackage/PushStatus.java | 22 ++++- .../jobhandler/OpFailPackageRetryJob.java | 69 ++++++++++++++ .../mapper/OpFailRetryPushPackageMapper.java | 5 + .../mapper/OpFailRetryPushPackageMapper.xml | 33 +++++++ .../oupai/service/IOuPaiFactoryService.java | 8 ++ .../OpFailRetryPushPackageService.java | 14 +-- .../OpPushFailedPackageRecordService.java | 15 +++ .../OpFailRetryPushPackageServiceImpl.java | 91 +++++++++++++------ .../OpPushFailedPackageRecordServiceImpl.java | 53 +++++++++++ .../service/impl/OuPaiFactoryServiceImpl.java | 35 ++++++- 10 files changed, 303 insertions(+), 42 deletions(-) create mode 100644 blade-service/logpm-factory/src/main/java/com/logpm/factory/jobhandler/OpFailPackageRetryJob.java create mode 100644 blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/mapper/OpFailRetryPushPackageMapper.xml create mode 100644 blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/OpPushFailedPackageRecordService.java create mode 100644 blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OpPushFailedPackageRecordServiceImpl.java diff --git a/blade-biz-common/src/main/java/org/springblade/common/constant/opFailRetryPushPackage/PushStatus.java b/blade-biz-common/src/main/java/org/springblade/common/constant/opFailRetryPushPackage/PushStatus.java index 6616a0b1a..b106f20fb 100644 --- a/blade-biz-common/src/main/java/org/springblade/common/constant/opFailRetryPushPackage/PushStatus.java +++ b/blade-biz-common/src/main/java/org/springblade/common/constant/opFailRetryPushPackage/PushStatus.java @@ -5,11 +5,27 @@ import lombok.Getter; @AllArgsConstructor @Getter -enum PushStatus { +public enum PushStatus { wait("等待中", 1), complete("已完成", 2), expire("已过期", 3); - private final String name; - private final Integer value; + private String name; + private Integer value; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Integer getValue() { + return value; + } + + public void setValue(Integer value) { + this.value = value; + } } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/jobhandler/OpFailPackageRetryJob.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/jobhandler/OpFailPackageRetryJob.java new file mode 100644 index 000000000..d62b1af9c --- /dev/null +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/jobhandler/OpFailPackageRetryJob.java @@ -0,0 +1,69 @@ +package com.logpm.factory.jobhandler; + +import com.logpm.factory.oupai.entity.OpFailRetryPushPackageEntity; +import com.logpm.factory.oupai.mapper.OpFailRetryPushPackageMapper; +import com.logpm.factory.oupai.service.OpFailRetryPushPackageService; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.handler.annotation.XxlJob; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springblade.common.constant.opFailRetryPushPackage.PushStatus; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +import java.util.HashMap; +import java.util.List; + + +@Slf4j +@Component +@AllArgsConstructor +public class OpFailPackageRetryJob { + + private final OpFailRetryPushPackageService opFailRetryPushPackageService; + + private OpFailRetryPushPackageMapper opFailRetryPushPackageMapper; + + // 是否开启多线程处理 + private static final boolean IsMultithreadingHandle = false; + + // 多线程批次处理的时候,每次最大处理的数据量 + private static final Integer batchHandleQuantity = 1000; + + /** + * 欧派数据推送失败的进行定时重推 + * @return + */ + @XxlJob("opFailPackageRetryJob") + public ReturnT execute(String param) { + + try { + if (IsMultithreadingHandle) { + batchHandleData(null); + } else { + HashMap condition = new HashMap<>(); + condition.put("push_status", PushStatus.wait.getValue()); + List opFailRetryPushPackageEntities = opFailRetryPushPackageMapper.selectByMap(condition); + opFailRetryPushPackageService.retry(opFailRetryPushPackageEntities); + } + }catch (Exception exception){ + log.error("OpFailPackageRetryJob error:{}",exception.getMessage()); + } + + return ReturnT.SUCCESS; + } + + /** + * 批次处理 + * @param startId + */ + private void batchHandleData(Long startId) { + List batchData = opFailRetryPushPackageMapper.selectWaitData(batchHandleQuantity, startId); + + if (batchData.size() == batchHandleQuantity) { + batchHandleData(batchData.get(batchHandleQuantity - 1).getId()); + } + + opFailRetryPushPackageService.retry(batchData); + } +} diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/mapper/OpFailRetryPushPackageMapper.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/mapper/OpFailRetryPushPackageMapper.java index 5092a79ef..393c882c9 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/mapper/OpFailRetryPushPackageMapper.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/mapper/OpFailRetryPushPackageMapper.java @@ -5,7 +5,12 @@ import com.baomidou.mybatisplus.core.mapper.BaseMapper; import com.logpm.factory.oupai.entity.OpFailRetryPushPackageEntity; import org.apache.ibatis.annotations.Mapper; +import java.util.List; + @Mapper public interface OpFailRetryPushPackageMapper extends BaseMapper{ + void updateStatusToCompleteByIds(List ids); + void updateStatusToExpireByIds(List ids, String date); + List selectWaitData(Integer limit, Long startId); } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/mapper/OpFailRetryPushPackageMapper.xml b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/mapper/OpFailRetryPushPackageMapper.xml new file mode 100644 index 000000000..825e5078b --- /dev/null +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/mapper/OpFailRetryPushPackageMapper.xml @@ -0,0 +1,33 @@ + + + + + UPDATE logpm_factory.op_fail_retry_push_package + SET push_status = 2 + WHERE id IN + + #{id} + + + + + UPDATE logpm_factory.op_fail_retry_push_package + SET push_status = 3 + WHERE id IN + + #{id} + + AND create_time #{date} + + + + + diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/IOuPaiFactoryService.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/IOuPaiFactoryService.java index 588e3f22e..dae259581 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/IOuPaiFactoryService.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/IOuPaiFactoryService.java @@ -40,5 +40,13 @@ public interface IOuPaiFactoryService { */ R handleStatusData(OrderStatusDTO orderStatusDTO); + /** + * 欧派推送包件状态 + * @see IOuPaiFactoryService#handleStatusData + * @param orderStatusDTO + * @return + */ + boolean retryHandleStatusData(OrderStatusDTO orderStatusDTO); + R newSystemHandleStatusData(OrderStatusDTO orderStatusDTO); } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/OpFailRetryPushPackageService.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/OpFailRetryPushPackageService.java index 744f5c7c9..8498c61a2 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/OpFailRetryPushPackageService.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/OpFailRetryPushPackageService.java @@ -1,19 +1,13 @@ package com.logpm.factory.oupai.service; -import com.logpm.factory.comfac.dto.OrderStatusDTO; +import com.logpm.factory.oupai.entity.OpFailRetryPushPackageEntity; -public interface OpFailRetryPushPackageService { +import java.util.List; - /** - * 记录失败的数据 - * - * @param orderPackageCode 包条 - * @param orderStatusDTO - */ - void record(String orderPackageCode, OrderStatusDTO orderStatusDTO); +public interface OpFailRetryPushPackageService { /** * 重推 */ - void retry(); + void retry(List waitData); } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/OpPushFailedPackageRecordService.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/OpPushFailedPackageRecordService.java new file mode 100644 index 000000000..6b0acdc11 --- /dev/null +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/OpPushFailedPackageRecordService.java @@ -0,0 +1,15 @@ +package com.logpm.factory.oupai.service; + +import com.logpm.factory.comfac.dto.OrderStatusDTO; + +public interface OpPushFailedPackageRecordService { + + /** + * 记录失败的数据 + * + * @param orderPackageCode 包条 + * @param orderStatusDTO + */ + void record(String orderPackageCode, OrderStatusDTO orderStatusDTO); + +} diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OpFailRetryPushPackageServiceImpl.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OpFailRetryPushPackageServiceImpl.java index 0b5a3e433..8fc004111 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OpFailRetryPushPackageServiceImpl.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OpFailRetryPushPackageServiceImpl.java @@ -1,61 +1,96 @@ package com.logpm.factory.oupai.service.impl; import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.baomidou.mybatisplus.core.conditions.Wrapper; import com.logpm.factory.comfac.dto.OrderStatusDTO; import com.logpm.factory.oupai.entity.OpFailRetryPushPackageEntity; import com.logpm.factory.oupai.mapper.OpFailRetryPushPackageMapper; +import com.logpm.factory.oupai.service.IOuPaiFactoryService; import com.logpm.factory.oupai.service.OpFailRetryPushPackageService; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; + +import org.springblade.common.constant.opFailRetryPushPackage.PushStatus; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; +import java.text.SimpleDateFormat; +import java.util.*; /** * 欧派数据失败重推 */ @Slf4j -@Service +@Service("one") @AllArgsConstructor public class OpFailRetryPushPackageServiceImpl implements OpFailRetryPushPackageService { - private OpFailRetryPushPackageMapper opFailRetryPushPackageMapper; + protected OpFailRetryPushPackageMapper opFailRetryPushPackageMapper; - private static boolean isRetrying = false; - private static List retryData; + protected IOuPaiFactoryService ouPaiFactoryService; - @Override - public void record(String orderPackageCode, OrderStatusDTO orderStatusDTO) { - if (isRetrying) { - return; - } + protected final int MAX_RETRY_TIMES = 3; - try { - OpFailRetryPushPackageEntity opFailRetryPushPackageEntity = new OpFailRetryPushPackageEntity(); + @Async + @Override + public void retry(List waitData){ + ArrayList completeIds = new ArrayList<>(); + ArrayList failIds = new ArrayList<>(); - opFailRetryPushPackageEntity.setOrderPackageCode(orderPackageCode); - opFailRetryPushPackageEntity.setParams(new Gson().toJson(orderStatusDTO)); + waitData.forEach(opFailRetryPushPackageEntity -> executeRetry(opFailRetryPushPackageEntity, completeIds, failIds)); - opFailRetryPushPackageMapper.insert(opFailRetryPushPackageEntity); + updatePushStatus(completeIds, failIds); + } - }catch (Exception e){ - log.error(e.getMessage(),e); + /** + * 更新推送状态 + * @param completeIds + * @param failIds + */ + protected void updatePushStatus(ArrayList completeIds, ArrayList failIds){ + if (!completeIds.isEmpty()) { + opFailRetryPushPackageMapper.updateStatusToCompleteByIds(completeIds); + } + if (!failIds.isEmpty()) { + opFailRetryPushPackageMapper.updateStatusToExpireByIds(failIds, getExpireTime()); } } - @Override - public synchronized void retry() { - isRetrying = true; + /** + * 获取过期的时间 + * + * @return + */ + protected String getExpireTime(){ + Calendar calendar = Calendar.getInstance(); + Date date = new Date(); + + calendar.setTime(date); + calendar.add(Calendar.DATE, -MAX_RETRY_TIMES); - retryData = new ArrayList<>(); + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - // ...... - HashMap condition = new HashMap<>(); - condition.put("status", PushStatus.wait); - opFailRetryPushPackageMapper.selectByMap(condition); + return sdf.format(calendar.getTime()); + } - retryData = null; + /** + * 执行重试 + * @param opFailRetryPushPackageEntity + * @param completeIds + */ + protected void executeRetry(OpFailRetryPushPackageEntity opFailRetryPushPackageEntity, ArrayList completeIds, ArrayList failIds) { + try { + OrderStatusDTO orderStatusDTO = new Gson().fromJson(opFailRetryPushPackageEntity.getParams(), OrderStatusDTO.class); + + boolean res = ouPaiFactoryService.retryHandleStatusData(orderStatusDTO); + + if (res) { + completeIds.add(opFailRetryPushPackageEntity.getId()); + } else { + failIds.add(opFailRetryPushPackageEntity.getId()); + } + } catch (Exception e) { + log.error("executeRetry error: {}", e.getMessage()); + } } } diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OpPushFailedPackageRecordServiceImpl.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OpPushFailedPackageRecordServiceImpl.java new file mode 100644 index 000000000..67e89772e --- /dev/null +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OpPushFailedPackageRecordServiceImpl.java @@ -0,0 +1,53 @@ +package com.logpm.factory.oupai.service.impl; + +import com.alibaba.nacos.shaded.com.google.gson.Gson; +import com.logpm.factory.comfac.dto.OrderStatusDTO; +import com.logpm.factory.oupai.entity.OpFailRetryPushPackageEntity; +import com.logpm.factory.oupai.mapper.OpFailRetryPushPackageMapper; +import com.logpm.factory.oupai.service.OpPushFailedPackageRecordService; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; + +import java.util.Date; +import java.util.HashMap; + +@Slf4j +@Service +@AllArgsConstructor +public class OpPushFailedPackageRecordServiceImpl implements OpPushFailedPackageRecordService { + + private OpFailRetryPushPackageMapper opFailRetryPushPackageMapper; + + /** + * 记录失败的数据 + * + * @param orderPackageCode 包条 + * @param orderStatusDTO + */ + @Override + public void record(String orderPackageCode, OrderStatusDTO orderStatusDTO){ + + try { + OpFailRetryPushPackageEntity opFailRetryPushPackageEntity = new OpFailRetryPushPackageEntity(); + + opFailRetryPushPackageEntity.setOrderPackageCode(orderPackageCode); + opFailRetryPushPackageEntity.setParams(new Gson().toJson(orderStatusDTO)); + opFailRetryPushPackageEntity.setCreateTime(new Date()); + + // 判重 +// HashMap queryMap = new HashMap<>(); +// queryMap.put("orderPackageCode", orderPackageCode); +// if (opFailRetryPushPackageMapper.selectByMap(queryMap) != null) { +// return; +// } + + opFailRetryPushPackageMapper.insert(opFailRetryPushPackageEntity); + + }catch (Exception e){ + log.error(e.getMessage(),e); + } + + } + +} diff --git a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OuPaiFactoryServiceImpl.java b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OuPaiFactoryServiceImpl.java index 1bcf982eb..a04f40e86 100644 --- a/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OuPaiFactoryServiceImpl.java +++ b/blade-service/logpm-factory/src/main/java/com/logpm/factory/oupai/service/impl/OuPaiFactoryServiceImpl.java @@ -94,6 +94,9 @@ public class OuPaiFactoryServiceImpl implements IOuPaiFactoryService { @Autowired private IDistributionParcelListClient distributionParcelListClient; + @Autowired + private OpPushFailedPackageRecordService opPushFailedPackageRecordService; + @Override public String saveOuPaiFactoryOrderDTOByCarCarNumber(String code) { @@ -403,9 +406,33 @@ public class OuPaiFactoryServiceImpl implements IOuPaiFactoryService { return orderCode; } - @Override public R handleStatusData(OrderStatusDTO orderStatusDTO) { + // 重写 handleStatusData, 增加重试参数 + // 原调用执行 + return handleStatusData(orderStatusDTO, false); + } + + /** + * 重推 + * @see #handleStatusData(OrderStatusDTO, boolean) + * @param orderStatusDTO + * @return + */ + public boolean retryHandleStatusData(OrderStatusDTO orderStatusDTO){ + R res = handleStatusData(orderStatusDTO, true); + return res.getCode() == 200; + // 测试功能时使用 +// return res.getCode() == 200 || Math.random() < 0.5; + } + + /** + * + * @param orderStatusDTO + * @param isRetry 是否是重推调用 + * @return + */ + private R handleStatusData(OrderStatusDTO orderStatusDTO, boolean isRetry) { // 推送数据 String status = orderStatusDTO.getStatus(); String unitNo = orderStatusDTO.getUnitNo(); @@ -439,6 +466,12 @@ public class OuPaiFactoryServiceImpl implements IOuPaiFactoryService { FactoryOrderEntity factoryOrder = factoryOrderService.selectEntityByOrderPackageCode(unitNo); if (Objects.isNull(factoryOrder)) { + + // 不是重试的时候,记录对应数据 + if (!isRetry) { + opPushFailedPackageRecordService.record(unitNo, orderStatusDTO); + } + return Resp.fail(400, "未查询到该单据推送订单信息"); } // 如何判断这个包间是非干仓配的数据