package com.yami.trading.service.contract;
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.yami.trading.bean.contract.domain.ContractOrder;
|
import com.yami.trading.common.constants.ContractRedisKeys;
|
import com.yami.trading.common.util.RedisUtil;
|
import com.yami.trading.service.WalletService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.boot.ApplicationArguments;
|
import org.springframework.boot.ApplicationRunner;
|
import org.springframework.stereotype.Component;
|
|
import java.math.BigDecimal;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
|
@Slf4j
|
@Component
|
public class ContractLoadCacheService implements ApplicationRunner {
|
|
@Autowired
|
private WalletService walletService;
|
|
@Autowired
|
private ContractOrderService contractOrderService;
|
|
private ScheduledExecutorService scheduler;
|
|
public void load() {
|
try {
|
List<ContractOrder> list = contractOrderService.list(new LambdaQueryWrapper<>(ContractOrder.class).eq(ContractOrder::getState, "submitted"));
|
Map<String, Map<String, ContractOrder>> cacheMap = new ConcurrentHashMap<>();
|
|
// 永续合约:总资产、总保证金、总未实现盈利
|
Map<String, Map<String, BigDecimal>> contractAssetsMap = new ConcurrentHashMap<>();
|
|
for (ContractOrder order : list) {
|
if (ContractOrder.STATE_SUBMITTED.equals(order.getState())) {
|
// 使用computeIfAbsent简化代码
|
Map<String, ContractOrder> orderMap = cacheMap.computeIfAbsent(
|
order.getPartyId(),
|
k -> new ConcurrentHashMap<>()
|
);
|
orderMap.put(order.getOrderNo(), order);
|
|
// 获取单个订单永续合约总资产、总保证金、总未实现盈利
|
Map<String, BigDecimal> contractAssetsOrder = this.walletService.getMoneyContractByOrder(order);
|
|
// 处理资产数据
|
contractAssetsMap.compute(order.getPartyId(), (k, v) -> {
|
if (v == null) {
|
v = new HashMap<>();
|
v.put("money_contract", BigDecimal.ZERO);
|
v.put("money_contract_deposit", BigDecimal.ZERO);
|
v.put("money_contract_profit", BigDecimal.ZERO);
|
}
|
|
// 安全地添加值
|
v.put("money_contract", safeAdd(v.get("money_contract"), contractAssetsOrder.get("money_contract")));
|
v.put("money_contract_deposit", safeAdd(v.get("money_contract_deposit"), contractAssetsOrder.get("money_contract_deposit")));
|
v.put("money_contract_profit", safeAdd(v.get("money_contract_profit"), contractAssetsOrder.get("money_contract_profit")));
|
|
return v;
|
});
|
}
|
|
RedisUtil.set(ContractRedisKeys.CONTRACT_ORDERNO + order.getOrderNo(), order);
|
}
|
|
for (Map.Entry<String, Map<String, ContractOrder>> entry : cacheMap.entrySet()) {
|
RedisUtil.set(ContractRedisKeys.CONTRACT_SUBMITTED_ORDER_PARTY_ID + entry.getKey(), entry.getValue());
|
}
|
|
for (Map.Entry<String, Map<String, BigDecimal>> entry : contractAssetsMap.entrySet()) {
|
RedisUtil.set(ContractRedisKeys.CONTRACT_ASSETS_PARTY_ID + entry.getKey(), entry.getValue().get("money_contract"));
|
RedisUtil.set(ContractRedisKeys.CONTRACT_ASSETS_DEPOSIT_PARTY_ID + entry.getKey(), entry.getValue().get("money_contract_deposit"));
|
RedisUtil.set(ContractRedisKeys.CONTRACT_ASSETS_PROFIT_PARTY_ID + entry.getKey(), entry.getValue().get("money_contract_profit"));
|
}
|
|
log.info("ContractOrder数据更新到Redis完成");
|
} catch (Exception e) {
|
log.error("更新ContractOrder数据到Redis失败", e);
|
}
|
}
|
|
// 安全添加BigDecimal值
|
private BigDecimal safeAdd(BigDecimal a, BigDecimal b) {
|
if (a == null) a = BigDecimal.ZERO;
|
if (b == null) b = BigDecimal.ZERO;
|
return a.add(b);
|
}
|
|
@Override
|
public void run(ApplicationArguments args) {
|
log.info("开始ContractOrder数据加载redis");
|
|
// 创建单线程调度器
|
scheduler = Executors.newSingleThreadScheduledExecutor();
|
|
// 初始加载
|
load();
|
|
// 每隔5秒执行一次更新
|
scheduler.scheduleAtFixedRate(() -> {
|
try {
|
load();
|
} catch (Exception e) {
|
log.error("定时更新ContractOrder数据失败", e);
|
}
|
}, 5, 5, TimeUnit.SECONDS);
|
|
log.info("已启动ContractOrder数据定时更新,每5秒一次");
|
}
|
|
// 添加销毁方法以关闭调度器
|
public void destroy() {
|
if (scheduler != null && !scheduler.isShutdown()) {
|
scheduler.shutdown();
|
try {
|
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
|
scheduler.shutdownNow();
|
}
|
} catch (InterruptedException e) {
|
scheduler.shutdownNow();
|
Thread.currentThread().interrupt();
|
}
|
log.info("ContractOrder数据更新调度器已关闭");
|
}
|
}
|
}
|