新版仿ok交易所-后端
1
zj
2025-09-07 e4174aed067985155c157f10b0c4294f18eb2359
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package com.yami.trading.service.contract;
 
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.yami.trading.bean.contract.domain.ContractOrder;
import com.yami.trading.common.constants.ContractRedisKeys;
import com.yami.trading.common.util.RedisUtil;
import com.yami.trading.service.WalletService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
 
import java.math.BigDecimal;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
 
@Slf4j
@Component
public class ContractLoadCacheService implements ApplicationRunner {
 
    @Autowired
    private WalletService walletService;
 
    @Autowired
    private ContractOrderService contractOrderService;
 
    private ScheduledExecutorService scheduler;
 
    public void load() {
        try {
            List<ContractOrder> list = contractOrderService.list(new LambdaQueryWrapper<>(ContractOrder.class).eq(ContractOrder::getState, "submitted"));
            Map<String, Map<String, ContractOrder>> cacheMap = new ConcurrentHashMap<>();
 
            // 永续合约:总资产、总保证金、总未实现盈利
            Map<String, Map<String, BigDecimal>> contractAssetsMap = new ConcurrentHashMap<>();
 
            for (ContractOrder order : list) {
                if (ContractOrder.STATE_SUBMITTED.equals(order.getState())) {
                    // 使用computeIfAbsent简化代码
                    Map<String, ContractOrder> orderMap = cacheMap.computeIfAbsent(
                            order.getPartyId(),
                            k -> new ConcurrentHashMap<>()
                    );
                    orderMap.put(order.getOrderNo(), order);
 
                    // 获取单个订单永续合约总资产、总保证金、总未实现盈利
                    Map<String, BigDecimal> contractAssetsOrder = this.walletService.getMoneyContractByOrder(order);
 
                    // 处理资产数据
                    contractAssetsMap.compute(order.getPartyId(), (k, v) -> {
                        if (v == null) {
                            v = new HashMap<>();
                            v.put("money_contract", BigDecimal.ZERO);
                            v.put("money_contract_deposit", BigDecimal.ZERO);
                            v.put("money_contract_profit", BigDecimal.ZERO);
                        }
 
                        // 安全地添加值
                        v.put("money_contract", safeAdd(v.get("money_contract"), contractAssetsOrder.get("money_contract")));
                        v.put("money_contract_deposit", safeAdd(v.get("money_contract_deposit"), contractAssetsOrder.get("money_contract_deposit")));
                        v.put("money_contract_profit", safeAdd(v.get("money_contract_profit"), contractAssetsOrder.get("money_contract_profit")));
 
                        return v;
                    });
                }
 
                RedisUtil.set(ContractRedisKeys.CONTRACT_ORDERNO + order.getOrderNo(), order);
            }
 
            for (Map.Entry<String, Map<String, ContractOrder>> entry : cacheMap.entrySet()) {
                RedisUtil.set(ContractRedisKeys.CONTRACT_SUBMITTED_ORDER_PARTY_ID + entry.getKey(), entry.getValue());
            }
 
            for (Map.Entry<String, Map<String, BigDecimal>> entry : contractAssetsMap.entrySet()) {
                RedisUtil.set(ContractRedisKeys.CONTRACT_ASSETS_PARTY_ID + entry.getKey(), entry.getValue().get("money_contract"));
                RedisUtil.set(ContractRedisKeys.CONTRACT_ASSETS_DEPOSIT_PARTY_ID + entry.getKey(), entry.getValue().get("money_contract_deposit"));
                RedisUtil.set(ContractRedisKeys.CONTRACT_ASSETS_PROFIT_PARTY_ID + entry.getKey(), entry.getValue().get("money_contract_profit"));
            }
 
            log.info("ContractOrder数据更新到Redis完成");
        } catch (Exception e) {
            log.error("更新ContractOrder数据到Redis失败", e);
        }
    }
 
    // 安全添加BigDecimal值
    private BigDecimal safeAdd(BigDecimal a, BigDecimal b) {
        if (a == null) a = BigDecimal.ZERO;
        if (b == null) b = BigDecimal.ZERO;
        return a.add(b);
    }
 
    @Override
    public void run(ApplicationArguments args) {
        log.info("开始ContractOrder数据加载redis");
 
        // 创建单线程调度器
        scheduler = Executors.newSingleThreadScheduledExecutor();
 
        // 初始加载
        load();
 
        // 每隔5秒执行一次更新
        scheduler.scheduleAtFixedRate(() -> {
            try {
                load();
            } catch (Exception e) {
                log.error("定时更新ContractOrder数据失败", e);
            }
        }, 5, 5, TimeUnit.SECONDS);
 
        log.info("已启动ContractOrder数据定时更新,每5秒一次");
    }
 
    // 添加销毁方法以关闭调度器
    public void destroy() {
        if (scheduler != null && !scheduler.isShutdown()) {
            scheduler.shutdown();
            try {
                if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
                    scheduler.shutdownNow();
                }
            } catch (InterruptedException e) {
                scheduler.shutdownNow();
                Thread.currentThread().interrupt();
            }
            log.info("ContractOrder数据更新调度器已关闭");
        }
    }
}