新版仿ok交易所-后端
1
zyy
2025-10-11 59605e4209652fa883a818537c8c637964be79c5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
package com.yami.trading.huobi.data.job;
 
 
import com.yami.trading.bean.data.domain.Realtime;
import com.yami.trading.bean.item.domain.Item;
import com.yami.trading.common.util.Arith;
import com.yami.trading.huobi.data.AdjustmentValueCache;
import com.yami.trading.huobi.data.DataCache;
import com.yami.trading.huobi.data.internal.DataDBService;
import com.yami.trading.huobi.data.model.AdjustmentValue;
import com.yami.trading.huobi.hobi.HobiDataService;
import com.yami.trading.service.item.ItemService;
import com.yami.trading.service.syspara.SysparaService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
 
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.Random;
 
 
public abstract class AbstractGetDataJob implements Runnable {
    public static volatile boolean first = true;
    protected static Logger logger = LoggerFactory.getLogger(StockGetDataJob.class);
    /**
     * 数据接口调用间隔时长(毫秒)
     */
    protected int interval;
    @Autowired
    protected SysparaService sysparaService;
    @Autowired
    protected DataDBService dataDBService;
    @Autowired
    protected HobiDataService hobiDataService;
    @Autowired
    protected ItemService itemService;
 
    public void start() {
        new Thread(this, getName()).start();
    }
 
    public abstract void run();
 
    public abstract String getName();
 
    // 在类中定义静态Random实例
    private static final Random random = new Random();
 
    public abstract void realtimeHandle(String symbols);
 
    public void handleRealTimeList(List<Realtime> realtimeList) {
        for (Realtime realtime : realtimeList) {
 
            try {
                String symbol = realtime.getSymbol();
                Integer decimal = itemService.getDecimal(symbol);
                Item item = this.itemService.findBySymbol(symbol);
                BigDecimal currentValue = AdjustmentValueCache.getCurrentValue().get(symbol);
                AdjustmentValue delayValue = AdjustmentValueCache.getDelayValue().get(symbol);
 
                if (delayValue != null) {
                    //延时几次 缓存frequency
                    Integer frequency = AdjustmentValueCache.getFrequency().get(symbol);
                    if (frequency == null) { //首次计算 缓存
                        frequency = (int) Arith.div(Arith.mul(delayValue.getSecond(), 1000.0D), this.interval);
                        AdjustmentValueCache.getFrequency().put(symbol, frequency);
                    }
 
                    if (frequency <= 1) {
                        if (currentValue == null) {
                            AdjustmentValueCache.getCurrentValue().put(symbol, delayValue.getValue());
                        } else {
                            AdjustmentValueCache.getCurrentValue().put(symbol,
                                    delayValue.getValue().add(currentValue));
                        }
 
                        if (!item.getAdjustmentValue().equals(AdjustmentValueCache.getCurrentValue().get(symbol))) {
                            item.setAdjustmentValue(AdjustmentValueCache.getCurrentValue().get(symbol));
                            itemService.saveOrUpdate(item);
                        }
                        AdjustmentValueCache.getDelayValue().remove(symbol);
                        AdjustmentValueCache.getFrequency().remove(symbol);
                    } else {
                        /*// 本次调整值
                        BigDecimal currentValue_frequency = delayValue.getValue().divide(new BigDecimal(frequency), decimal, RoundingMode.HALF_UP);
 
                        if (currentValue == null) {
                            AdjustmentValueCache.getCurrentValue().put(symbol, currentValue_frequency);
                        } else {
                            AdjustmentValueCache.getCurrentValue().put(symbol,
                                    currentValue.add(currentValue_frequency));
                        }
 
                        delayValue.setValue(delayValue.getValue().subtract(currentValue_frequency));
                        delayValue.setSecond(Arith.sub(delayValue.getSecond(), Arith.div(this.interval, 1000.0D)));
                        AdjustmentValueCache.getDelayValue().put(symbol, delayValue);
 
                        if (!item.getAdjustmentValue().equals(AdjustmentValueCache.getCurrentValue().get(symbol))) {
                            item.setAdjustmentValue(AdjustmentValueCache.getCurrentValue().get(symbol));
                            itemService.saveOrUpdate(item);
                        }*/
                        //计算延时加大精度
                        Integer delayDecimal = 10;
                        // 保存原始总值用于计算随机分配
                        BigDecimal totalValue = delayValue.getValue();
                        // 计算已分配次数(从缓存中获取)
                        Integer allocatedCount = AdjustmentValueCache.getAllocatedCount().get(symbol);
                        if (allocatedCount == null) {
                            allocatedCount = 0;
                            // 首次分配时保存总值到缓存,用于后续计算
                            AdjustmentValueCache.getTotalValue().put(symbol, totalValue);
                        } else {
                            //不是首次
                            totalValue = AdjustmentValueCache.getTotalValue().get(symbol);
                        }
 
                        // ########## 新增:判断调整方向(正向/负向)##########
                        boolean isPositiveAdjustment = totalValue.compareTo(BigDecimal.ZERO) > 0; // 正向调整(>0)
                        boolean isNegativeAdjustment = totalValue.compareTo(BigDecimal.ZERO) < 0; // 负向调整(<0)
 
                        BigDecimal currentValue_frequency;
                        // 计算剩余分配次数
                        int remainingAllocations = frequency - allocatedCount;
 
                        if (remainingAllocations > 1) {
                            BigDecimal average = totalValue.divide(new BigDecimal(frequency), delayDecimal, RoundingMode.HALF_UP);
                            BigDecimal randomFactor;
 
                            // 根据调整方向动态调整负因子概率
                            double negativeProbability = isNegativeAdjustment ? 0.6 : 0.3;
                            if (Math.random() <= negativeProbability) {
                                // 负因子:范围 [-0.5, 0)
                                double randomNeg = -0.5 + Math.random() * 0.5; // 简化计算:max - min = 0 - (-0.5) = 0.5
                                randomFactor = new BigDecimal(randomNeg).setScale(delayDecimal, RoundingMode.HALF_UP);
                            } else {
                                // 正因子:范围 [0.3, 1.7)
                                double randomPos = 0.3 + Math.random() * 1.4; // 简化计算:max - min = 1.7 - 0.3 = 1.4
                                randomFactor = new BigDecimal(randomPos).setScale(delayDecimal, RoundingMode.HALF_UP);
                            }
 
                            currentValue_frequency = average.multiply(randomFactor).setScale(delayDecimal, RoundingMode.HALF_UP);
 
                            // 核心修改1:根据调整方向动态约束累计值
                            BigDecimal currentAccumulated = AdjustmentValueCache.getAccumulatedValue().getOrDefault(symbol, BigDecimal.ZERO);
                            BigDecimal tempAccumulated = currentAccumulated.add(currentValue_frequency);
                            if (isPositiveAdjustment) {
                                // 正向调整:累计值不能为负
                                if (tempAccumulated.compareTo(BigDecimal.ZERO) < 0) {
                                    currentValue_frequency = BigDecimal.ONE.divide(new BigDecimal("10").pow(delayDecimal), delayDecimal, RoundingMode.HALF_UP);
                                }
                            } else if (isNegativeAdjustment) {
                                // 负向调整:累计值不能小于目标值(避免过度减值)
                                if (tempAccumulated.compareTo(totalValue) < 0) {
                                    currentValue_frequency = totalValue.subtract(currentAccumulated).divide(new BigDecimal(2), delayDecimal, RoundingMode.HALF_UP);
                                }
                            }
 
                            //剩余的待分配值
                            BigDecimal remainingValue = totalValue.subtract(currentAccumulated);
                            //本次分配后剩余的待分配值
                            BigDecimal tempDelayValue = remainingValue.subtract(currentValue_frequency);
 
                            // 提取公共变量(剩余值的80%)
                            BigDecimal remaining80Percent = remainingValue.multiply(new BigDecimal("0.8"))
                                    .setScale(delayDecimal, RoundingMode.HALF_UP);
 
                            // 统一判断“是否需要修正剩余值”
                            boolean needFixRemaining = (isPositiveAdjustment && tempDelayValue.compareTo(BigDecimal.ZERO) < 0)
                                    || (isNegativeAdjustment && tempDelayValue.compareTo(totalValue) > 0);
                            if (needFixRemaining) {
                                // 直接使用公共变量,避免重复计算
                                currentValue_frequency = remaining80Percent;
                            }
 
                            // 直接使用公共变量作为maxAllowed,无需重复计算
                            if ((isPositiveAdjustment && currentValue_frequency.compareTo(remaining80Percent) > 0)
                                    || (isNegativeAdjustment && currentValue_frequency.compareTo(remaining80Percent) < 0)) {
                                currentValue_frequency = remaining80Percent;
                            }
 
                        } else {
                            // 最后一次分配兜底(支持负值)
                            BigDecimal accumulated = AdjustmentValueCache.getAccumulatedValue().getOrDefault(symbol, BigDecimal.ZERO);
                            currentValue_frequency = totalValue.subtract(accumulated).setScale(delayDecimal, RoundingMode.HALF_UP);
 
                            // 正向调整:最后一次分配值不能为负;负向调整:不能为正(无重复,无需优化)
                            if (isPositiveAdjustment && currentValue_frequency.compareTo(BigDecimal.ZERO) < 0) {
                                currentValue_frequency = BigDecimal.ZERO.setScale(delayDecimal, RoundingMode.HALF_UP);
                            } else if (isNegativeAdjustment && currentValue_frequency.compareTo(BigDecimal.ZERO) > 0) {
                                currentValue_frequency = BigDecimal.ZERO.setScale(delayDecimal, RoundingMode.HALF_UP);
                            }
                        }
 
 
                        // 更新累计值
                        BigDecimal newAccumulated = AdjustmentValueCache.getAccumulatedValue().getOrDefault(symbol, BigDecimal.ZERO)
                                .add(currentValue_frequency);
                        AdjustmentValueCache.getAccumulatedValue().put(symbol, newAccumulated);
                        // 更新分配次数
                        AdjustmentValueCache.getAllocatedCount().put(symbol, allocatedCount + 1);
 
                        // 更新当前值
                        if (currentValue == null) {
                            AdjustmentValueCache.getCurrentValue().put(symbol, currentValue_frequency);
                        } else {
                            AdjustmentValueCache.getCurrentValue().put(symbol,
                                    currentValue.add(currentValue_frequency));
                        }
 
                        // 更新延迟值
                        delayValue.setValue(delayValue.getValue().subtract(currentValue_frequency));
                        delayValue.setSecond(Arith.sub(delayValue.getSecond(), Arith.div(this.interval, 1000.0D)));
                        AdjustmentValueCache.getDelayValue().put(symbol, delayValue);
 
                        // 如果是最后一次分配,清理缓存
                        if (remainingAllocations <= 1) {
                            AdjustmentValueCache.getAllocatedCount().remove(symbol);
                            AdjustmentValueCache.getTotalValue().remove(symbol);
                            AdjustmentValueCache.getAccumulatedValue().remove(symbol);
                            AdjustmentValueCache.getFrequency().remove(symbol);
 
                            AdjustmentValueCache.getDelayValue().remove(symbol);
                        }
 
                        // 保存更新
                        if (!item.getAdjustmentValue().equals(AdjustmentValueCache.getCurrentValue().get(symbol))) {
                            item.setAdjustmentValue(AdjustmentValueCache.getCurrentValue().get(symbol));
                            itemService.saveOrUpdate(item);
                        }
                    }
                }
 
                currentValue = AdjustmentValueCache.getCurrentValue().get(realtime.getSymbol());
 
                if (currentValue != null && currentValue.compareTo(BigDecimal.ZERO) != 0) {
                    realtime.setClose(realtime.getClose().add(currentValue).setScale(decimal, RoundingMode.HALF_UP));
                    BigDecimal ask = realtime.getAsk();
                    if(ask!=null){
                        realtime.setAsk(ask.add(currentValue).setScale(decimal, RoundingMode.HALF_UP));
                    }
                    BigDecimal bid = realtime.getBid();
                    if(bid!=null){
                        realtime.setBid(bid.add(currentValue).setScale(decimal, RoundingMode.HALF_UP));
                    }
                    // realtime.setVolume(Arith.add(realtime.getVolume(), Arith.mul(Arith.div(currentValue, realtime.getClose()), realtime.getVolume())));
                    // realtime.setAmount(Arith.add(realtime.getAmount(), Arith.mul(Arith.div(currentValue, realtime.getClose()), realtime.getAmount())));
                }
 
                // 缓存中最新一条Realtime数据
                Realtime realtimeLast = DataCache.getRealtime(symbol);
                // 临时处理:正常10秒超过25%也不合理,丢弃.只有虚拟货币才这样执行
                boolean checkRate = getName().contains("虚拟货币");
                double rate = 0;
                if (!checkRate) {
                    saveData(realtime, symbol, item);
                }else{
                    if (realtimeLast != null) {
                        rate = Math.abs(Arith.sub(realtime.getClose().doubleValue(), realtimeLast.getClose().doubleValue()));
                    }
                    if (null == realtimeLast || Arith.div(rate, realtimeLast.getClose().doubleValue()) < 0.25D) {
                        saveData(realtime, symbol, item);
                    } else {
                        logger.error("当前{}价格{},上一次价格为{}过25%也不合理,丢弃Realtime,不入库", realtime.getSymbol(),realtimeLast.getClose(), realtime.getClose());
                    }
                }
 
 
            } catch (Exception e) {
                logger.error("数据采集失败", e);
 
            }
        }
    }
 
    private void saveData(Realtime realtime, String symbol, Item item) {
        Double high = DataCache.getRealtimeHigh().get(symbol);
        Double low = DataCache.getRealtimeLow().get(symbol);
        if (realtime.getTs().toString().length() <= 10) {
            realtime.setTs(Long.valueOf(realtime.getTs() + "000"));
        }
        realtime.setName(item.getName());
        if (high == null || realtime.getHigh().doubleValue() > high) {
            DataCache.getRealtimeHigh().put(symbol, realtime.getHigh().doubleValue());
        }
        if ((low == null || realtime.getLow().doubleValue() < low) && realtime.getLow().doubleValue() > 0) {
            DataCache.getRealtimeLow().put(symbol, realtime.getLow().doubleValue());
        }
        this.dataDBService.saveAsyn(realtime);
    }
 
}