pref_mail@163.com
10 months ago
16 changed files with 458 additions and 43 deletions
@ -0,0 +1,31 @@
|
||||
package org.springblade.common.constant.opFailRetryPushPackage; |
||||
|
||||
import lombok.AllArgsConstructor; |
||||
import lombok.Getter; |
||||
|
||||
@AllArgsConstructor |
||||
@Getter |
||||
public enum PushStatus { |
||||
wait("等待中", 1), |
||||
complete("已完成", 2), |
||||
expire("已过期", 3); |
||||
|
||||
private String name; |
||||
private Integer value; |
||||
|
||||
public String getName() { |
||||
return name; |
||||
} |
||||
|
||||
public void setName(String name) { |
||||
this.name = name; |
||||
} |
||||
|
||||
public Integer getValue() { |
||||
return value; |
||||
} |
||||
|
||||
public void setValue(Integer value) { |
||||
this.value = value; |
||||
} |
||||
} |
@ -0,0 +1,56 @@
|
||||
package com.logpm.factory.oupai.entity; |
||||
|
||||
|
||||
import com.baomidou.mybatisplus.annotation.TableField; |
||||
import com.baomidou.mybatisplus.annotation.TableName; |
||||
import io.swagger.annotations.ApiModel; |
||||
import io.swagger.annotations.ApiModelProperty; |
||||
import lombok.Data; |
||||
import lombok.EqualsAndHashCode; |
||||
import org.springblade.core.mp.base.BaseEntity; |
||||
|
||||
@Data |
||||
@TableName("op_fail_retry_push_package") |
||||
@ApiModel(value = "opFailRetryPushPackage对象", description = "欧派失败重推包件") |
||||
@EqualsAndHashCode(callSuper = true) |
||||
public class OpFailRetryPushPackageEntity extends BaseEntity { |
||||
/** 预留1 */ |
||||
@ApiModelProperty(name = "预留1",notes = "") |
||||
private String reserve1 ; |
||||
|
||||
/** 预留2 */ |
||||
@ApiModelProperty(name = "预留2",notes = "") |
||||
private String reserve2 ; |
||||
|
||||
/** 预留3 */ |
||||
@ApiModelProperty(name = "预留3",notes = "") |
||||
private String reserve3 ; |
||||
|
||||
/** 预留4 */ |
||||
@ApiModelProperty(name = "预留4",notes = "") |
||||
private String reserve4 ; |
||||
|
||||
/** 预留5 */ |
||||
@ApiModelProperty(name = "预留5",notes = "") |
||||
private String reserve5 ; |
||||
|
||||
/** 预留5 */ |
||||
@TableField("order_package_code") |
||||
@ApiModelProperty(name = "包件码",notes = "") |
||||
private String orderPackageCode; |
||||
|
||||
/** 预留5 */ |
||||
@ApiModelProperty(name = "执行参数",notes = "") |
||||
private String params ; |
||||
|
||||
/** 预留5 */ |
||||
@TableField("push_status") |
||||
@ApiModelProperty(name = "状态:1=待处理,2=已处理,3=已过期",notes = "") |
||||
private Integer pushStatus ; |
||||
|
||||
/** 预留5 */ |
||||
@ApiModelProperty(name = "类型:1=入库,2=出库",notes = "") |
||||
private Integer type ; |
||||
|
||||
|
||||
} |
@ -0,0 +1,69 @@
|
||||
package com.logpm.factory.jobhandler; |
||||
|
||||
import com.logpm.factory.oupai.entity.OpFailRetryPushPackageEntity; |
||||
import com.logpm.factory.oupai.mapper.OpFailRetryPushPackageMapper; |
||||
import com.logpm.factory.oupai.service.OpFailRetryPushPackageService; |
||||
import com.xxl.job.core.biz.model.ReturnT; |
||||
import com.xxl.job.core.handler.annotation.XxlJob; |
||||
import lombok.AllArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springblade.common.constant.opFailRetryPushPackage.PushStatus; |
||||
import org.springframework.beans.factory.annotation.Qualifier; |
||||
import org.springframework.stereotype.Component; |
||||
|
||||
import java.util.HashMap; |
||||
import java.util.List; |
||||
|
||||
|
||||
@Slf4j |
||||
@Component |
||||
@AllArgsConstructor |
||||
public class OpFailPackageRetryJob { |
||||
|
||||
private final OpFailRetryPushPackageService opFailRetryPushPackageService; |
||||
|
||||
private OpFailRetryPushPackageMapper opFailRetryPushPackageMapper; |
||||
|
||||
// 是否开启多线程处理
|
||||
private static final boolean IsMultithreadingHandle = false; |
||||
|
||||
// 多线程批次处理的时候,每次最大处理的数据量
|
||||
private static final Integer batchHandleQuantity = 1000; |
||||
|
||||
/** |
||||
* 欧派数据推送失败的进行定时重推 |
||||
* @return |
||||
*/ |
||||
@XxlJob("opFailPackageRetryJob") |
||||
public ReturnT<String> execute(String param) { |
||||
|
||||
try { |
||||
if (IsMultithreadingHandle) { |
||||
batchHandleData(null); |
||||
} else { |
||||
HashMap<String, Object> condition = new HashMap<>(); |
||||
condition.put("push_status", PushStatus.wait.getValue()); |
||||
List<OpFailRetryPushPackageEntity> opFailRetryPushPackageEntities = opFailRetryPushPackageMapper.selectByMap(condition); |
||||
opFailRetryPushPackageService.retry(opFailRetryPushPackageEntities); |
||||
} |
||||
}catch (Exception exception){ |
||||
log.error("OpFailPackageRetryJob error:{}",exception.getMessage()); |
||||
} |
||||
|
||||
return ReturnT.SUCCESS; |
||||
} |
||||
|
||||
/** |
||||
* 批次处理 |
||||
* @param startId |
||||
*/ |
||||
private void batchHandleData(Long startId) { |
||||
List<OpFailRetryPushPackageEntity> batchData = opFailRetryPushPackageMapper.selectWaitData(batchHandleQuantity, startId); |
||||
|
||||
if (batchData.size() == batchHandleQuantity) { |
||||
batchHandleData(batchData.get(batchHandleQuantity - 1).getId()); |
||||
} |
||||
|
||||
opFailRetryPushPackageService.retry(batchData); |
||||
} |
||||
} |
@ -0,0 +1,16 @@
|
||||
package com.logpm.factory.oupai.mapper; |
||||
|
||||
|
||||
import com.baomidou.mybatisplus.core.mapper.BaseMapper; |
||||
import com.logpm.factory.oupai.entity.OpFailRetryPushPackageEntity; |
||||
import org.apache.ibatis.annotations.Mapper; |
||||
|
||||
import java.util.List; |
||||
|
||||
@Mapper |
||||
public interface OpFailRetryPushPackageMapper extends BaseMapper<OpFailRetryPushPackageEntity>{ |
||||
void updateStatusToCompleteByIds(List<Long> ids); |
||||
void updateStatusToExpireByIds(List<Long> ids, String date); |
||||
|
||||
List<OpFailRetryPushPackageEntity> selectWaitData(Integer limit, Long startId); |
||||
} |
@ -0,0 +1,33 @@
|
||||
<?xml version="1.0" encoding="UTF-8" ?> |
||||
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" > |
||||
<mapper namespace="com.logpm.factory.oupai.mapper.OpFailRetryPushPackageMapper"> |
||||
<update id="updateStatusToCompleteByIds"> |
||||
UPDATE logpm_factory.op_fail_retry_push_package |
||||
SET push_status = 2 |
||||
WHERE id IN |
||||
<foreach collection="ids" item="id" open="(" separator="," close=")" > |
||||
#{id} |
||||
</foreach> |
||||
</update> |
||||
|
||||
<update id="updateStatusToExpireByIds"> |
||||
UPDATE logpm_factory.op_fail_retry_push_package |
||||
SET push_status = 3 |
||||
WHERE id IN |
||||
<foreach collection="ids" item="id" open="(" separator="," close=")" > |
||||
#{id} |
||||
</foreach> |
||||
AND create_time <![CDATA[ < ]]> #{date} |
||||
</update> |
||||
|
||||
<select id="selectWaitData" resultType="com.logpm.factory.oupai.entity.OpFailRetryPushPackageEntity"> |
||||
SELECT * FROM logpm_factory.op_fail_retry_push_package |
||||
WHERE push_status = 1 |
||||
<if test="startId != null"> |
||||
id > #{startId} |
||||
</if> |
||||
ORDER BY id |
||||
LIMIT #{limit} |
||||
</select> |
||||
|
||||
</mapper> |
@ -0,0 +1,13 @@
|
||||
package com.logpm.factory.oupai.service; |
||||
|
||||
import com.logpm.factory.oupai.entity.OpFailRetryPushPackageEntity; |
||||
|
||||
import java.util.List; |
||||
|
||||
public interface OpFailRetryPushPackageService { |
||||
|
||||
/** |
||||
* 重推 |
||||
*/ |
||||
void retry(List<OpFailRetryPushPackageEntity> waitData); |
||||
} |
@ -0,0 +1,15 @@
|
||||
package com.logpm.factory.oupai.service; |
||||
|
||||
import com.logpm.factory.comfac.dto.OrderStatusDTO; |
||||
|
||||
public interface OpPushFailedPackageRecordService { |
||||
|
||||
/** |
||||
* 记录失败的数据 |
||||
* |
||||
* @param orderPackageCode 包条 |
||||
* @param orderStatusDTO |
||||
*/ |
||||
void record(String orderPackageCode, OrderStatusDTO orderStatusDTO); |
||||
|
||||
} |
@ -0,0 +1,96 @@
|
||||
package com.logpm.factory.oupai.service.impl; |
||||
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson; |
||||
import com.baomidou.mybatisplus.core.conditions.Wrapper; |
||||
import com.logpm.factory.comfac.dto.OrderStatusDTO; |
||||
import com.logpm.factory.oupai.entity.OpFailRetryPushPackageEntity; |
||||
import com.logpm.factory.oupai.mapper.OpFailRetryPushPackageMapper; |
||||
import com.logpm.factory.oupai.service.IOuPaiFactoryService; |
||||
import com.logpm.factory.oupai.service.OpFailRetryPushPackageService; |
||||
import lombok.AllArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
|
||||
import org.springblade.common.constant.opFailRetryPushPackage.PushStatus; |
||||
import org.springframework.scheduling.annotation.Async; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
import java.text.SimpleDateFormat; |
||||
import java.util.*; |
||||
|
||||
/** |
||||
* 欧派数据失败重推 |
||||
*/ |
||||
@Slf4j |
||||
@Service("one") |
||||
@AllArgsConstructor |
||||
public class OpFailRetryPushPackageServiceImpl implements OpFailRetryPushPackageService { |
||||
|
||||
protected OpFailRetryPushPackageMapper opFailRetryPushPackageMapper; |
||||
|
||||
protected IOuPaiFactoryService ouPaiFactoryService; |
||||
|
||||
protected final int MAX_RETRY_TIMES = 3; |
||||
|
||||
@Async |
||||
@Override |
||||
public void retry(List<OpFailRetryPushPackageEntity> waitData){ |
||||
ArrayList<Long> completeIds = new ArrayList<>(); |
||||
ArrayList<Long> failIds = new ArrayList<>(); |
||||
|
||||
waitData.forEach(opFailRetryPushPackageEntity -> executeRetry(opFailRetryPushPackageEntity, completeIds, failIds)); |
||||
|
||||
updatePushStatus(completeIds, failIds); |
||||
} |
||||
|
||||
/** |
||||
* 更新推送状态 |
||||
* @param completeIds |
||||
* @param failIds |
||||
*/ |
||||
protected void updatePushStatus(ArrayList<Long> completeIds, ArrayList<Long> failIds){ |
||||
if (!completeIds.isEmpty()) { |
||||
opFailRetryPushPackageMapper.updateStatusToCompleteByIds(completeIds); |
||||
} |
||||
if (!failIds.isEmpty()) { |
||||
opFailRetryPushPackageMapper.updateStatusToExpireByIds(failIds, getExpireTime()); |
||||
} |
||||
} |
||||
|
||||
/** |
||||
* 获取过期的时间 |
||||
* |
||||
* @return |
||||
*/ |
||||
protected String getExpireTime(){ |
||||
Calendar calendar = Calendar.getInstance(); |
||||
Date date = new Date(); |
||||
|
||||
calendar.setTime(date); |
||||
calendar.add(Calendar.DATE, -MAX_RETRY_TIMES); |
||||
|
||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
||||
|
||||
return sdf.format(calendar.getTime()); |
||||
} |
||||
|
||||
/** |
||||
* 执行重试 |
||||
* @param opFailRetryPushPackageEntity |
||||
* @param completeIds |
||||
*/ |
||||
protected void executeRetry(OpFailRetryPushPackageEntity opFailRetryPushPackageEntity, ArrayList<Long> completeIds, ArrayList<Long> failIds) { |
||||
try { |
||||
OrderStatusDTO orderStatusDTO = new Gson().fromJson(opFailRetryPushPackageEntity.getParams(), OrderStatusDTO.class); |
||||
|
||||
boolean res = ouPaiFactoryService.retryHandleStatusData(orderStatusDTO); |
||||
|
||||
if (res) { |
||||
completeIds.add(opFailRetryPushPackageEntity.getId()); |
||||
} else { |
||||
failIds.add(opFailRetryPushPackageEntity.getId()); |
||||
} |
||||
} catch (Exception e) { |
||||
log.error("executeRetry error: {}", e.getMessage()); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,53 @@
|
||||
package com.logpm.factory.oupai.service.impl; |
||||
|
||||
import com.alibaba.nacos.shaded.com.google.gson.Gson; |
||||
import com.logpm.factory.comfac.dto.OrderStatusDTO; |
||||
import com.logpm.factory.oupai.entity.OpFailRetryPushPackageEntity; |
||||
import com.logpm.factory.oupai.mapper.OpFailRetryPushPackageMapper; |
||||
import com.logpm.factory.oupai.service.OpPushFailedPackageRecordService; |
||||
import lombok.AllArgsConstructor; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.stereotype.Service; |
||||
|
||||
import java.util.Date; |
||||
import java.util.HashMap; |
||||
|
||||
@Slf4j |
||||
@Service |
||||
@AllArgsConstructor |
||||
public class OpPushFailedPackageRecordServiceImpl implements OpPushFailedPackageRecordService { |
||||
|
||||
private OpFailRetryPushPackageMapper opFailRetryPushPackageMapper; |
||||
|
||||
/** |
||||
* 记录失败的数据 |
||||
* |
||||
* @param orderPackageCode 包条 |
||||
* @param orderStatusDTO |
||||
*/ |
||||
@Override |
||||
public void record(String orderPackageCode, OrderStatusDTO orderStatusDTO){ |
||||
|
||||
try { |
||||
OpFailRetryPushPackageEntity opFailRetryPushPackageEntity = new OpFailRetryPushPackageEntity(); |
||||
|
||||
opFailRetryPushPackageEntity.setOrderPackageCode(orderPackageCode); |
||||
opFailRetryPushPackageEntity.setParams(new Gson().toJson(orderStatusDTO)); |
||||
opFailRetryPushPackageEntity.setCreateTime(new Date()); |
||||
|
||||
// 判重
|
||||
// HashMap<String, Object> queryMap = new HashMap<>();
|
||||
// queryMap.put("orderPackageCode", orderPackageCode);
|
||||
// if (opFailRetryPushPackageMapper.selectByMap(queryMap) != null) {
|
||||
// return;
|
||||
// }
|
||||
|
||||
opFailRetryPushPackageMapper.insert(opFailRetryPushPackageEntity); |
||||
|
||||
}catch (Exception e){ |
||||
log.error(e.getMessage(),e); |
||||
} |
||||
|
||||
} |
||||
|
||||
} |
Loading…
Reference in new issue