Browse Source

处理确认链上数据问题

dev
long 3 years ago
parent
commit
3ae88e4678
  1. 2
      conflux-admin/src/main/java/com/conflux/web/controller/nft/domain/ConfluxExecutor.java
  2. 3
      conflux-admin/src/main/java/com/conflux/web/controller/nft/mapper/NftCollectionMapper.java
  3. 4
      conflux-admin/src/main/java/com/conflux/web/controller/nft/service/IHandlerStrategy.java
  4. 2
      conflux-admin/src/main/java/com/conflux/web/controller/nft/service/INftCollectionService.java
  5. 5
      conflux-admin/src/main/java/com/conflux/web/controller/nft/service/INftInfoV2Handler.java
  6. 84
      conflux-admin/src/main/java/com/conflux/web/controller/nft/service/impl/ConfluxServiceImpl.java
  7. 35
      conflux-admin/src/main/java/com/conflux/web/controller/nft/service/impl/HandlerStrategy.java
  8. 9
      conflux-admin/src/main/java/com/conflux/web/controller/nft/service/impl/NftCollectionServiceImpl.java
  9. 5
      conflux-admin/src/main/java/com/conflux/web/controller/nft/service/impl/NftInfoV2ImplHandler.java
  10. 5
      conflux-admin/src/main/resources/application-dev.yml
  11. 7
      conflux-admin/src/main/resources/application-druid.yml
  12. 7
      conflux-admin/src/main/resources/application-prod.yml
  13. 17
      conflux-framework/src/main/java/com/conflux/framework/config/NftRequestPool.java
  14. 11
      conflux-framework/src/main/java/com/conflux/framework/config/ThreadPoolConfig.java
  15. 14
      conflux-system/src/main/resources/mapper/system/NftCollectionMapper.xml

2
conflux-admin/src/main/java/com/conflux/web/controller/nft/domain/ConfluxExecutor.java

@ -67,7 +67,7 @@ public class ConfluxExecutor {
org.web3j.abi.Utils.typeMap(tokenURIs, Utf8String.class)));
}
public String awardItemDataStr(Account.Option option, String address, List<BigInteger> _tokenIds, String tokenURI) throws Exception {
public synchronized String awardItemDataStr(Account.Option option, String address, List<BigInteger> _tokenIds, String tokenURI) throws Exception {
return this.account.call(option,
new conflux.web3j.types.Address(this.contract),
AWARDITEMDATASTR,

3
conflux-admin/src/main/java/com/conflux/web/controller/nft/mapper/NftCollectionMapper.java

@ -2,6 +2,7 @@ package com.conflux.web.controller.nft.mapper;
import com.conflux.web.controller.nft.domain.NftCollection;
import com.conflux.web.controller.nft.domain.dto.NftDTO;
import org.apache.ibatis.annotations.Param;
import java.math.BigInteger;
@ -72,5 +73,7 @@ public interface NftCollectionMapper
int updateByTokenId(@Param("hexTokenId")BigInteger hexTokenId,@Param("time")Date time);
int updateByListTokenId(@Param("list")List<NftDTO> list, @Param("time")Date time);
Long selectMaxNftId(@Param("contract")String contract);
}

4
conflux-admin/src/main/java/com/conflux/web/controller/nft/service/IHandlerStrategy.java

@ -3,6 +3,10 @@ package com.conflux.web.controller.nft.service;
import conflux.web3j.Cfx;
import java.util.Map;
public interface IHandlerStrategy {
boolean relayHandler(Cfx cfx, String json, int type,String contract);
boolean relayHandler(Map<String,Object> map);
}

2
conflux-admin/src/main/java/com/conflux/web/controller/nft/service/INftCollectionService.java

@ -84,5 +84,7 @@ public interface INftCollectionService
void updateCollect(NftDTO nftDTO,String contract);
void updateCollect(List<NftDTO> nftDTO,String contract);
Long queryTotalNum(String contract);
}

5
conflux-admin/src/main/java/com/conflux/web/controller/nft/service/INftInfoV2Handler.java

@ -1,9 +1,14 @@
package com.conflux.web.controller.nft.service;
import com.conflux.web.controller.nft.domain.dto.NftDTO;
import conflux.web3j.Cfx;
import java.util.List;
public interface INftInfoV2Handler {
void send721NftTransfer(Cfx cfx, String json,String contract);
void send721NftTransfer(List<NftDTO> nftDataList ,String contract);
}

84
conflux-admin/src/main/java/com/conflux/web/controller/nft/service/impl/ConfluxServiceImpl.java

@ -81,6 +81,10 @@ public class ConfluxServiceImpl implements ConfluxService {
@Autowired
@Qualifier("asyncExecutorNftPush")
private Executor asyncExecutorNftPush;
@Autowired
@Qualifier("asyncExecutorNftRqurest")
private Executor asyncExecutorNftRqurest;
@Autowired
private IPrivateKeyService privateKeyService;
@Autowired
@ -300,7 +304,7 @@ public class ConfluxServiceImpl implements ConfluxService {
}
@SneakyThrows
public AjaxResult push(CheckArgs checkArgs) {
public AjaxResult push(CheckArgs checkArgs) {
//判断参数
if (null == checkArgs) {
return AjaxResult.error("参数为空!");
@ -374,14 +378,14 @@ public class ConfluxServiceImpl implements ConfluxService {
opt.withGasPrice(contractConfig.getGasPrice());
//获取账户余额
BigInteger balance = cfx.getBalance(account.getAddress()).sendAndGet();
Account sponsorWhitelistControlAccount=Account.create(cfx,AESUtil.decrypt(contractConfig.getPrivateKey()));
SponsorWhitelistControl sponsorWhitelistControl=new SponsorWhitelistControl(sponsorWhitelistControlAccount);
Account sponsorWhitelistControlAccount = Account.create(cfx, AESUtil.decrypt(contractConfig.getPrivateKey()));
SponsorWhitelistControl sponsorWhitelistControl = new SponsorWhitelistControl(sponsorWhitelistControlAccount);
Address address = new Address(contractConfig.getContract());
//获取代付账户余额
BigInteger sponsoredBalanceForCollateral = sponsorWhitelistControl.getSponsoredBalanceForCollateral(address.getABIAddress());
//剩余燃气值
BigInteger sponsoredBalanceForGas = sponsorWhitelistControl.getSponsoredBalanceForGas(address.getABIAddress());
log.info("[--------------------->mintNft][balance]{}[sponsoredBalanceForCollateral:]{}[sponsoredBalanceForGas:]{}", balance,sponsoredBalanceForCollateral,sponsoredBalanceForGas);
log.info("[--------------------->mintNft][balance]{}[sponsoredBalanceForCollateral:]{}[sponsoredBalanceForGas:]{}", balance, sponsoredBalanceForCollateral, sponsoredBalanceForGas);
if (balance.compareTo(new BigInteger("40000000000000000000")) < 0) {
log.info("[--------------------->mintNft][balance not enough 50]{}", balance);
//sms notice
@ -404,7 +408,7 @@ public class ConfluxServiceImpl implements ConfluxService {
est = confluxExecutor.getEstimate(cfx, new Address(owner),
new Address(contract), AddressUtil.getHexAddress(owner), tokenIds);
//判断所需要的费用
if (sponsoredBalanceForCollateral.compareTo(est.getGasUsed())<0){
if (sponsoredBalanceForCollateral.compareTo(est.getGasUsed()) < 0) {
return AjaxResult.error("代付余额不足");
}
log.info("[--------------------->mintNft][GasUsed]{}", est.getGasUsed());
@ -423,11 +427,11 @@ public class ConfluxServiceImpl implements ConfluxService {
est = confluxExecutor.getEstimateDataStr(cfx, new Address(owner),
new Address(contract), AddressUtil.getHexAddress(owner), tokenIds, tokenURIs);
//判断所需要的费用
if (sponsoredBalanceForCollateral.compareTo(est.getGasUsed())<0){
if (sponsoredBalanceForCollateral.compareTo(est.getGasUsed()) < 0) {
return AjaxResult.error("代付余额不足");
}
log.info("[--------------------->mintNft][GasUsed]{}", est.getGasUsed());
log.info("[--------------------->mintNft][StorageCollateralized]{}", est.getStorageCollateralized());
log.info("[--------------------->awardItemDataStr][GasUsed]{}", est.getGasUsed());
log.info("[--------------------->awardItemDataStr][StorageCollateralized]{}", est.getStorageCollateralized());
opt.withGasLimit(est.getGasUsed());
opt.withStorageLimit(est.getStorageCollateralized());
hash = confluxExecutor.awardItemDataStr(opt,
@ -443,7 +447,7 @@ public class ConfluxServiceImpl implements ConfluxService {
est = confluxExecutor.getEstimateData(cfx, new Address(owner),
new Address(contract), AddressUtil.getHexAddress(owner), tokenIds, tokenURIs);
//判断所需要的费用
if (sponsoredBalanceForCollateral.compareTo(est.getGasUsed())<0){
if (sponsoredBalanceForCollateral.compareTo(est.getGasUsed()) < 0) {
return AjaxResult.error("代付余额不足");
}
log.info("[--------------------->mintNft][GasUsed]{}", est.getGasUsed());
@ -465,7 +469,7 @@ public class ConfluxServiceImpl implements ConfluxService {
est = confluxExecutor.getEstimateTransferFrom(cfx, new Address(owner),
new Address(contract), AddressUtil.getHexAddress(owner), AddressUtil.getHexAddress(checkArgs.getToUser()), nftCollection.getNftId());
//判断所需要的费用
if (sponsoredBalanceForCollateral.compareTo(est.getGasUsed())<0){
if (sponsoredBalanceForCollateral.compareTo(est.getGasUsed()) < 0) {
return AjaxResult.error("代付余额不足");
}
log.info("[--------------------->mintNft][GasUsed]{}", est.getGasUsed());
@ -527,7 +531,7 @@ public class ConfluxServiceImpl implements ConfluxService {
* @param contract
* @param unitName
*/
public void doPushData(String contract, String unitName) {
public void doPushData(String contract, String unitName) {
boolean isFalg = true;
CollectConfig collect = (CollectConfig) redisUtils.get(contract);
if (null == collect) {
@ -540,7 +544,7 @@ public class ConfluxServiceImpl implements ConfluxService {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
log.info("[dispatchHandlerEorre]{}",e);
log.info("[dispatchHandlerEorre]{}", e);
}
}
//=======检查数据=======
@ -561,7 +565,7 @@ public class ConfluxServiceImpl implements ConfluxService {
cfx = Cfx.create(collect.getNode(), 3, 1000);
} catch (Exception e) {
e.printStackTrace();
log.info("[Create Cfx Exception}] {}",e);
log.info("[Create Cfx Exception}] {}", e);
}
while (isFalg) {
try {
@ -569,7 +573,7 @@ public class ConfluxServiceImpl implements ConfluxService {
} catch (InterruptedException e) {
e.printStackTrace();
}
if (executeMakeUp(collect, contract,eventParams,gson,cfx)) {
if (executeMakeUp(collect, contract, eventParams, gson, cfx)) {
isFalg = false;
}
}
@ -582,7 +586,7 @@ public class ConfluxServiceImpl implements ConfluxService {
* @param eventParams
* @return
*/
public boolean executeMakeUp(CollectConfig collect, String contract, List<EventParam> eventParams,Gson gson,Cfx cfx) {
public synchronized boolean executeMakeUp(CollectConfig collect, String contract, List<EventParam> eventParams, Gson gson, Cfx cfx) {
// log.info("[dispatchHandler]{}", "开始监听数据");
if (CollectionUtils.isEmpty(eventParams)) {
@ -591,14 +595,14 @@ public class ConfluxServiceImpl implements ConfluxService {
}
FilterMapVO vo = getFilter(eventParams);
//获取数据高度
String keyNumber =nftConfig.getEnvironment()+ contract + "number";
String keyNumber = nftConfig.getEnvironment() + contract + "number";
long from = 0L;
if (null == redisUtils.get(keyNumber)) {
from = collect.getEpochNumber();
redisUtils.set(keyNumber,from);
}else {
redisUtils.set(keyNumber, from);
} else {
Object number = redisUtils.get(keyNumber);
from= Long.valueOf(String.valueOf(number));
from = Long.valueOf(String.valueOf(number));
}
// Invoke cfx method
BigInteger epoch = cfx.getEpochNumber().sendAndGet();
@ -626,10 +630,14 @@ public class ConfluxServiceImpl implements ConfluxService {
//log.info("[executeMakeUp][此区间暂无日志]:from:{} to:{}", from, to);
isSuccess = true;
}
if (logList.size() > 0) {
log.info("需要确认上链的数据:{}", logList.toArray());
Map<String, Object> map = new HashMap<>();
List<String> list = new ArrayList<>();
List<Integer> intList = new ArrayList<>();
String typeEvAddress=null;
for (Log l : logList) {
log.info("需要确认上链的gson数据:{}", gson.toJson(l));
// log.info("需要确认上链的gson数据:{}", gson.toJson(l));
//处理高度信息
String evAddress = l.getTopics().get(0);
String tokenAddress = l.getAddress().getAddress();
@ -642,23 +650,41 @@ public class ConfluxServiceImpl implements ConfluxService {
log.info("[dispatchHandler][暂无符合条件的数据]");
return false;
}
boolean res;
if (type == 20000) {
typeEvAddress=evAddress;
}
//boolean res;
try {
res = iHandlerStrategy.relayHandler(cfx, gson.toJson(l), type, contract);
list.add(gson.toJson(l));
intList.add(type);
//res = iHandlerStrategy.relayHandler(cfx, gson.toJson(l), type, contract);
} catch (Exception e) {
e.printStackTrace();
isSuccess = false;
log.info("[dispatchHandler][数据错误未分配]{}", l.getData());
break;
}
// if (res) {
// log.info("[dispatchHandler][分配成功]{}", evAddress);
// isSuccess = true;
// } else {
// log.info("[dispatchHandler][数据错误未分配]{}", l.getData());
// isSuccess = false;
// break;
// }
}
try {
// map.put("cfx",cfx);
map.put("list", list);
map.put("type", intList);
map.put("contract", contract);
boolean res = iHandlerStrategy.relayHandler(map);
if (res) {
log.info("[dispatchHandler][分配成功]{}", evAddress);
log.info("[dispatchHandler][分配成功]{}", typeEvAddress);
isSuccess = true;
} else {
log.info("[dispatchHandler][数据错误未分配]{}", l.getData());
isSuccess = false;
break;
}
} catch (Exception e) {
log.error("更新确认链上数据失败:", e);
}
}
//分析数据拉取成功
@ -719,7 +745,7 @@ public class ConfluxServiceImpl implements ConfluxService {
*/
private synchronized List<BigInteger> autoincrement(String contract, List<String> list) {
long totalNum = 0L;
String key=nftConfig.getEnvironment()+contract+"autoinc";
String key = nftConfig.getEnvironment() + contract + "autoinc";
if (null == redisUtils.get(key)) {
totalNum = nftCollectionService.queryTotalNum(contract);
} else {

35
conflux-admin/src/main/java/com/conflux/web/controller/nft/service/impl/HandlerStrategy.java

@ -1,16 +1,25 @@
package com.conflux.web.controller.nft.service.impl;
import cn.hutool.core.date.DateUtil;
import com.conflux.web.controller.nft.domain.NftData;
import com.conflux.web.controller.nft.domain.dto.NftDTO;
import com.conflux.web.controller.nft.service.IHandlerStrategy;
import com.conflux.web.controller.nft.service.INftInfoV2Handler;
import com.conflux.web.controller.util.CfxUtils;
import conflux.web3j.Cfx;
import conflux.web3j.response.Log;
import jnr.ffi.annotations.In;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class HandlerStrategy implements IHandlerStrategy {
public class HandlerStrategy extends BaseHandler implements IHandlerStrategy {
@Resource
private INftInfoV2Handler iNftInfoV2ImplHandler;
@ -25,4 +34,28 @@ public class HandlerStrategy implements IHandlerStrategy {
}
return false;
}
@Override
public boolean relayHandler(Map<String, Object> map) {
//Cfx cfx=(Cfx) map.get("cfx");
List<String> list=(List<String>) map.get("list");
List<Integer> typeList=(List<Integer>) map.get("type");
List<NftDTO> nftDataList=new ArrayList<>();
String contract =(String) map.get("contract");
if (list.size()==typeList.size()){
for (int i = 0; i <typeList.size() ; i++) {
if (typeList.get(i)==20000){
Log logInfo = getLogInfo(list.get(i));
NftDTO nftDTO = new NftDTO();
nftDTO.setOwner(NftInfoV2ImplHandler.decode(logInfo.getTopics().get(2)).getValue());
nftDTO.setTokenId(CfxUtils.getNumber(logInfo.getTopics().get(3)));
nftDTO.setEpochNumber(logInfo.getEpochNumber().get().longValue());
nftDTO.setUpdateTime(DateUtil.currentSeconds());
nftDataList.add(nftDTO);
}
}
}
iNftInfoV2ImplHandler.send721NftTransfer(nftDataList,contract);
return false;
}
}

9
conflux-admin/src/main/java/com/conflux/web/controller/nft/service/impl/NftCollectionServiceImpl.java

@ -156,6 +156,15 @@ public class NftCollectionServiceImpl implements INftCollectionService {
}
}
@Override
public void updateCollect(List<NftDTO> nftDTO, String contract) {
int i = nftCollectionMapper.updateByListTokenId(nftDTO, new Date());
if (i<0){
log.info("确认上链失败");
log.info("[updateByListTokenId]{}",Arrays.toString(nftDTO.toArray()));
}
}
@Override
public Long queryTotalNum(String contract) {
return nftCollectionMapper.selectMaxNftId(contract );

5
conflux-admin/src/main/java/com/conflux/web/controller/nft/service/impl/NftInfoV2ImplHandler.java

@ -43,6 +43,11 @@ public class NftInfoV2ImplHandler extends BaseHandler implements INftInfoV2Handl
nftCollectionService.updateCollect(nftDTO,contract);
}
@Override
public void send721NftTransfer(List<NftDTO> nftDataList, String contract) {
nftCollectionService.updateCollect(nftDataList,contract);
}
public static Address decode(String encodedResult) {
TypeReference returnTypeRef = TypeReference.create(Address.class);
List<Address> decoded = FunctionReturnDecoder.decode(encodedResult, Arrays.asList(returnTypeRef));

5
conflux-admin/src/main/resources/application-dev.yml

@ -105,4 +105,9 @@ async-pool:
maxPoolSize: 8
keepAliveSeconds: 120
queueCapacity: 1000
nft-request:
corePoolSize: 1
maxPoolSize: 1
keepAliveSeconds: 120
queueCapacity: 300

7
conflux-admin/src/main/resources/application-druid.yml

@ -104,4 +104,9 @@ async-pool:
corePoolSize: 4
maxPoolSize: 8
keepAliveSeconds: 120
queueCapacity: 1000
queueCapacity: 1000
nft-request:
corePoolSize: 1
maxPoolSize: 1
keepAliveSeconds: 120
queueCapacity: 300

7
conflux-admin/src/main/resources/application-prod.yml

@ -103,4 +103,9 @@ async-pool:
corePoolSize: 4
maxPoolSize: 8
keepAliveSeconds: 120
queueCapacity: 1000
queueCapacity: 1000
nft-request:
corePoolSize: 1
maxPoolSize: 1
keepAliveSeconds: 120
queueCapacity: 300

17
conflux-framework/src/main/java/com/conflux/framework/config/NftRequestPool.java

@ -0,0 +1,17 @@
package com.conflux.framework.config;
import com.conflux.framework.config.properties.AbstractExecutorPool;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;
/**
* @Author _007long
* @Date 2022 07 26
**/
@Component
@ConfigurationProperties(prefix = "async-pool.nft-request")
@Data
public class NftRequestPool extends AbstractExecutorPool {
private String threadNamePrefix = "handler nft request data executor-";
}

11
conflux-framework/src/main/java/com/conflux/framework/config/ThreadPoolConfig.java

@ -26,10 +26,13 @@ public class ThreadPoolConfig
private final NftSaveDataPool nftSaveDataPool;
private final NftRequestPool nftRequestPool;
@Autowired
public ThreadPoolConfig(NftPushPool nftPushPool,NftSaveDataPool nftSaveDataPool){
public ThreadPoolConfig(NftPushPool nftPushPool, NftSaveDataPool nftSaveDataPool, NftRequestPool nftRequestPool){
this.nftPushPool=nftPushPool;
this.nftSaveDataPool=nftSaveDataPool;
this.nftRequestPool = nftRequestPool;
}
// @Bean(name = "threadPoolTaskExecutor")
// public ThreadPoolTaskExecutor threadPoolTaskExecutor()
@ -63,7 +66,11 @@ public class ThreadPoolConfig
return initExcutor(nftPushPool, nftPushPool.getThreadNamePrefix(), callerRunsPolicy);
}
@Bean(name = "asyncExecutorNftRqurest")
public Executor asyncExecutorNftRqurest() {
ThreadPoolExecutor.CallerRunsPolicy callerRunsPolicy = new ThreadPoolExecutor.CallerRunsPolicy();
return initExcutor(nftRequestPool, nftRequestPool.getThreadNamePrefix(), callerRunsPolicy);
}
private Executor initExcutor(AbstractExecutorPool abstractExecutorPool, String threadName, RejectedExecutionHandler rejectedExecutionHandler){
ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();
threadPool.setCorePoolSize(abstractExecutorPool.getCorePoolSize());

14
conflux-system/src/main/resources/mapper/system/NftCollectionMapper.xml

@ -97,10 +97,10 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</insert>
<insert id="insertListNftCollection">
insert into nft_collection
(unit_name, information_table_id,nft_id,nft_uri,contract,on_chain_status,created_time)
(unit_name, information_table_id,nft_id,nft_uri,contract,hash_code,gas_used,on_chain_status,created_time)
VALUES
<foreach collection ="nftList" item="item" separator =",">
(#{item.unitName}, #{item.informationTableId},#{item.nftId},#{item.nftUri},#{item.contract},1,#{time})
(#{item.unitName}, #{item.informationTableId},#{item.nftId},#{item.nftUri},#{item.contract},#{item.hashCode},#{item.gasUsed},1,#{time})
</foreach >
</insert>
@ -130,6 +130,16 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
updated_time = #{time}
where nft_id = #{hexTokenId} and on_chain_status = 1
</update>
<update id="updateByListTokenId">
update nft_collection
set
on_chain_status = 2,
updated_time = #{time}
where nft_id in
<foreach collection="list" item="item" open="(" separator="," close=")">
#{item.tokenId}
</foreach>
</update>
<delete id="deleteNftCollectionById" parameterType="Long">
delete from nft_collection where id = #{id}

Loading…
Cancel
Save