新版仿ok交易所-后端
1
zyy3
2025-10-13 d4352284dc9049fa0d954b562ebb4632b34f4aee
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
package com.yami.trading.huobi.data.job;
 
 
import com.yami.trading.bean.data.domain.Realtime;
import com.yami.trading.bean.item.domain.Item;
import com.yami.trading.common.util.Arith;
import com.yami.trading.huobi.data.AdjustmentValueCache;
import com.yami.trading.huobi.data.DataCache;
import com.yami.trading.huobi.data.internal.DataDBService;
import com.yami.trading.huobi.data.model.AdjustmentValue;
import com.yami.trading.huobi.hobi.HobiDataService;
import com.yami.trading.service.item.ItemService;
import com.yami.trading.service.syspara.SysparaService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
 
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
 
 
public abstract class AbstractGetDataJob implements Runnable {
    public static volatile boolean first = true;
    protected static Logger logger = LoggerFactory.getLogger(StockGetDataJob.class);
    /**
     * 数据接口调用间隔时长(毫秒)
     */
    protected int interval;
    @Autowired
    protected SysparaService sysparaService;
    @Autowired
    protected DataDBService dataDBService;
    @Autowired
    protected HobiDataService hobiDataService;
    @Autowired
    protected ItemService itemService;
 
    public void start() {
        new Thread(this, getName()).start();
    }
 
    public abstract void run();
 
    public abstract String getName();
 
    // 在类中定义静态Random实例
    private static final Random random = new Random();
 
    public abstract void realtimeHandle(String symbols);
 
    public void handleRealTimeList(List<Realtime> realtimeList) {
        for (Realtime realtime : realtimeList) {
 
            try {
                String symbol = realtime.getSymbol();
                Integer decimal = itemService.getDecimal(symbol); // 虚拟币通常8位小数,需保留足够精度
                Item item = this.itemService.findBySymbol(symbol);
                BigDecimal currentValue = AdjustmentValueCache.getCurrentValue().get(symbol);
                AdjustmentValue delayValue = AdjustmentValueCache.getDelayValue().get(symbol);
 
 
                if (delayValue != null) {
                    if (delayValue.getValue().compareTo(BigDecimal.ZERO) == 0 || delayValue.getSecond() <= 0) {
                        cleanUpKLineCache(symbol); // 清理缓存,跳过后续计算
                        return;
                    }
                    // K线场景专属参数(可根据周期动态调整)
                    double baseFluctuation = 0.1;
                    int trendConsistencyRate = 70; // 70%概率保持与前一个值的波动方向一致(模拟趋势)
                    int maxAdjacentFluctuation = 5; // 相邻值波动不超过前一个值的5%(避免视觉跳变)
 
 
                    Integer frequency = AdjustmentValueCache.getFrequency().get(symbol);
                    List<BigDecimal> preAllocationList = AdjustmentValueCache.getPreAllocationList().get(symbol);
                    Integer currentIndex = AdjustmentValueCache.getCurrentAllocationIndex().get(symbol);
                    List<Boolean> upDownTrend = AdjustmentValueCache.getUpDownTrend().getOrDefault(symbol, new ArrayList<>());
 
                    if (frequency == null) {
                        frequency = (int) Arith.div(Arith.mul(delayValue.getSecond(), 1000.0D), this.interval);
                        AdjustmentValueCache.getFrequency().put(symbol, frequency);
 
                        if (frequency > 1) {
                            preAllocationList = new ArrayList<>(frequency);
                            BigDecimal totalValue = delayValue.getValue(); // 支持正负值
                            // 核心修改1:用绝对值算平均值(避免负数波动范围反向),最后补回原符号
                            BigDecimal totalAbs = totalValue.abs();
                            BigDecimal averageAbs = totalAbs.divide(new BigDecimal(frequency), decimal + 4, RoundingMode.HALF_UP);
                            BigDecimal average = averageAbs.multiply(totalValue.signum() == 1 ? BigDecimal.ONE : BigDecimal.ONE.negate());
                            BigDecimal sum = BigDecimal.ZERO;
 
                            // 优化1:最后k个值分散偏差(k=频率的2%,最少5个,确保最后一根K线无异常)
                            int lastK = Math.max(5, frequency / 50);
                            int normalCount = frequency - lastK;
 
                            // 优化2:前n-k个值——模拟真实行情波动(兼容正负)
                            for (int i = 0; i < normalCount; i++) {
                                BigDecimal randomValue;
                                if (i == 0) {
                                    // 第一个值:基于带符号平均值波动,保留原符号
                                    randomValue = generateTruncatedGaussianValue(average, baseFluctuation, decimal + 4);
                                } else {
                                    boolean lastUp = upDownTrend.get(i - 1);
                                    boolean keepTrend = Math.random() * 100 <= trendConsistencyRate;
 
                                    if (keepTrend) {
                                        // 核心修改2:按前值绝对值算波动幅度,避免负数趋势误判
                                        BigDecimal lastVal = preAllocationList.get(i - 1);
                                        double fluctRange = lastVal.abs().multiply(new BigDecimal(baseFluctuation * 1.2)).doubleValue();
 
                                        if (lastUp) {
                                            // 前一个上涨(无论正负,比前值大即为涨):波动范围[0, fluctRange]
                                            randomValue = generateDirectionalValue(lastVal, 0.0, fluctRange, decimal + 4);
                                        } else {
                                            // 前一个下跌(比前值小即为跌):波动范围[-fluctRange, 0]
                                            randomValue = generateDirectionalValue(lastVal, -fluctRange, 0.0, decimal + 4);
                                        }
                                    } else {
                                        // 反转趋势:基于带符号平均值波动
                                        randomValue = generateTruncatedGaussianValue(average, baseFluctuation, decimal + 4);
                                    }
 
                                    // 核心修改3:相邻波动约束——用前值绝对值算最大波动,超限时按符号截断
                                    BigDecimal lastValue = preAllocationList.get(i - 1);
                                    BigDecimal maxAdjacent = lastValue.abs()
                                            .multiply(new BigDecimal(maxAdjacentFluctuation / 100.0))
                                            .setScale(decimal + 4, RoundingMode.HALF_UP);
                                    BigDecimal adjacentDiff = randomValue.subtract(lastValue);
 
                                    if (adjacentDiff.abs().compareTo(maxAdjacent) > 0) {
                                        randomValue = lastValue.add(
                                                adjacentDiff.signum() == 1 ? maxAdjacent : maxAdjacent.negate()
                                        );
                                    }
                                }
 
                                preAllocationList.add(randomValue);
                                sum = sum.add(randomValue);
                                // 核心修改4:涨跌趋势改为“与前值对比”(而非平均值),兼容正负
                                boolean currentUp = (i == 0)
                                        ? randomValue.compareTo(average) > 0
                                        : randomValue.compareTo(preAllocationList.get(i - 1)) > 0;
                                upDownTrend.add(currentUp);
                            }
 
                            // 优化4:最后k个值——兼容负数总和
                            BigDecimal remaining = totalValue.subtract(sum);
                            // 用剩余值绝对值算平均,补回符号
                            BigDecimal remainingAbs = remaining.abs();
                            BigDecimal avgLastKAbs = remainingAbs.divide(new BigDecimal(lastK), decimal + 4, RoundingMode.HALF_UP);
                            BigDecimal avgLastK = avgLastKAbs.multiply(remaining.signum() == 1 ? BigDecimal.ONE : BigDecimal.ONE.negate());
 
                            for (int i = 0; i < lastK - 1; i++) {
                                BigDecimal smallFluctValue = generateTruncatedGaussianValue(avgLastK, baseFluctuation / 3, decimal + 4);
                                preAllocationList.add(smallFluctValue);
                                sum = sum.add(smallFluctValue);
                                upDownTrend.add(smallFluctValue.compareTo(preAllocationList.get(preAllocationList.size() - 2)) > 0);
                            }
 
                            // 最后1个值:兼容负数偏差
                            BigDecimal finalValue = totalValue.subtract(sum);
                            BigDecimal maxFinalFluct = avgLastK.abs()
                                    .multiply(new BigDecimal(baseFluctuation / 5))
                                    .setScale(decimal + 4, RoundingMode.HALF_UP);
 
                            if (finalValue.abs().compareTo(maxFinalFluct) > 0) {
                                // 超限时按符号截断,保留原方向
                                finalValue = maxFinalFluct.multiply(finalValue.signum() == 1 ? BigDecimal.ONE : BigDecimal.ONE.negate());
                                BigDecimal prevLastValue = preAllocationList.get(preAllocationList.size() - 1);
                                preAllocationList.set(preAllocationList.size() - 1, prevLastValue.add(totalValue.subtract(sum).subtract(finalValue)));
                            }
                            preAllocationList.add(finalValue);
                            upDownTrend.add(finalValue.compareTo(preAllocationList.get(preAllocationList.size() - 2)) > 0);
 
                            // 缓存更新
                            AdjustmentValueCache.getPreAllocationList().put(symbol, preAllocationList);
                            AdjustmentValueCache.getCurrentAllocationIndex().put(symbol, 0);
                            AdjustmentValueCache.getUpDownTrend().put(symbol, upDownTrend);
                            currentIndex = 0;
                        }
                    }
 
                    // 后续分配逻辑:兼容正负延时值
                    if (frequency <= 1) {
                        // 单次分配:直接用带符号值,避免符号丢失
                        BigDecimal delayVal = delayValue.getValue().setScale(decimal, RoundingMode.HALF_UP);
                        if (currentValue == null) {
                            AdjustmentValueCache.getCurrentValue().put(symbol, delayVal);
                        } else {
                            AdjustmentValueCache.getCurrentValue().put(symbol, currentValue.add(delayVal).setScale(decimal, RoundingMode.HALF_UP));
                        }
                        if (!item.getAdjustmentValue().equals(AdjustmentValueCache.getCurrentValue().get(symbol))) {
                            item.setAdjustmentValue(AdjustmentValueCache.getCurrentValue().get(symbol));
                            itemService.saveOrUpdate(item);
                        }
                        cleanUpKLineCache(symbol);
                    } else {
                        // 按预分配列表取值(列表已兼容正负)
                        if (preAllocationList != null && currentIndex != null && currentIndex < preAllocationList.size()) {
                            BigDecimal currentValueFrequency = preAllocationList.get(currentIndex).setScale(decimal, RoundingMode.HALF_UP);
 
                            // 更新当前值:带符号累加
                            if (currentValue == null) {
                                AdjustmentValueCache.getCurrentValue().put(symbol, currentValueFrequency);
                            } else {
                                AdjustmentValueCache.getCurrentValue().put(symbol, currentValue.add(currentValueFrequency).setScale(decimal, RoundingMode.HALF_UP));
                            }
 
                            // 更新延迟值:带符号减法
                            BigDecimal updatedDelayVal = delayValue.getValue().subtract(currentValueFrequency).setScale(decimal, RoundingMode.HALF_UP);
                            delayValue.setValue(updatedDelayVal);
                            delayValue.setSecond(Arith.sub(delayValue.getSecond(), Arith.div(this.interval, 1000.0D)));
                            AdjustmentValueCache.getDelayValue().put(symbol, delayValue);
 
                            int nextIndex = currentIndex + 1;
                            AdjustmentValueCache.getCurrentAllocationIndex().put(symbol, nextIndex);
 
                            if (nextIndex >= frequency) {
                                cleanUpKLineCache(symbol);
                            }
 
                            if (!item.getAdjustmentValue().equals(AdjustmentValueCache.getCurrentValue().get(symbol))) {
                                item.setAdjustmentValue(AdjustmentValueCache.getCurrentValue().get(symbol));
                                itemService.saveOrUpdate(item);
                            }
                        }
                    }
                }
 
                currentValue = AdjustmentValueCache.getCurrentValue().get(realtime.getSymbol());
 
                if (currentValue != null && currentValue.compareTo(BigDecimal.ZERO) != 0) {
                    realtime.setClose(realtime.getClose().add(currentValue).setScale(decimal, RoundingMode.HALF_UP));
                    BigDecimal ask = realtime.getAsk();
                    if(ask!=null){
                        realtime.setAsk(ask.add(currentValue).setScale(decimal, RoundingMode.HALF_UP));
                    }
                    BigDecimal bid = realtime.getBid();
                    if(bid!=null){
                        realtime.setBid(bid.add(currentValue).setScale(decimal, RoundingMode.HALF_UP));
                    }
                    // realtime.setVolume(Arith.add(realtime.getVolume(), Arith.mul(Arith.div(currentValue, realtime.getClose()), realtime.getVolume())));
                    // realtime.setAmount(Arith.add(realtime.getAmount(), Arith.mul(Arith.div(currentValue, realtime.getClose()), realtime.getAmount())));
                }
 
                // 缓存中最新一条Realtime数据
                Realtime realtimeLast = DataCache.getRealtime(symbol);
                // 临时处理:正常10秒超过25%也不合理,丢弃.只有虚拟货币才这样执行
                boolean checkRate = getName().contains("虚拟货币");
                double rate = 0;
                if (!checkRate) {
                    saveData(realtime, symbol, item);
                }else{
                    if (realtimeLast != null) {
                        rate = Math.abs(Arith.sub(realtime.getClose().doubleValue(), realtimeLast.getClose().doubleValue()));
                    }
                    if (null == realtimeLast || Arith.div(rate, realtimeLast.getClose().doubleValue()) < 0.25D) {
                        saveData(realtime, symbol, item);
                    } else {
                        logger.error("当前{}价格{},上一次价格为{}过25%也不合理,丢弃Realtime,不入库", realtime.getSymbol(),realtimeLast.getClose(), realtime.getClose());
                    }
                }
 
 
            } catch (Exception e) {
                logger.error("数据采集失败", e);
 
            }
        }
    }
 
    private void saveData(Realtime realtime, String symbol, Item item) {
        Double high = DataCache.getRealtimeHigh().get(symbol);
        Double low = DataCache.getRealtimeLow().get(symbol);
        if (realtime.getTs().toString().length() <= 10) {
            realtime.setTs(Long.valueOf(realtime.getTs() + "000"));
        }
        realtime.setName(item.getName());
        if (high == null || realtime.getHigh().doubleValue() > high) {
            DataCache.getRealtimeHigh().put(symbol, realtime.getHigh().doubleValue());
        }
        if ((low == null || realtime.getLow().doubleValue() < low) && realtime.getLow().doubleValue() > 0) {
            DataCache.getRealtimeLow().put(symbol, realtime.getLow().doubleValue());
        }
        this.dataDBService.saveAsyn(realtime);
    }
 
 
 
    // ------------------------------ 工具方法 ------------------------------
    /**
     * 生成截断正态分布值(限制在平均值±fluctuation范围内,模拟真实小波动)
     */
    private BigDecimal generateTruncatedGaussianValue(BigDecimal average, double fluctuation, int scale) {
        double factor = nextGaussian() * 0.25; // 标准差0.25,99.7%概率在±0.75内(更集中)
        factor = Math.max(-1.0, Math.min(1.0, factor)); // 截断极端值
        BigDecimal fluctuationValue = average.multiply(new BigDecimal(factor * fluctuation))
                .setScale(scale, RoundingMode.HALF_UP);
        return average.add(fluctuationValue);
    }
 
    /**
     * 生成指定方向的波动值(如[0, +0.1]表示只涨不跌)
     */
    private BigDecimal generateDirectionalValue(BigDecimal average, double minFactor, double maxFactor, int scale) {
        double factor = minFactor + Math.random() * (maxFactor - minFactor);
        BigDecimal fluctuationValue = average.multiply(new BigDecimal(factor))
                .setScale(scale, RoundingMode.HALF_UP);
        return average.add(fluctuationValue);
    }
 
    /**
     * K线场景专属缓存清理(含波动方向列表)
     */
    private void cleanUpKLineCache(String symbol) {
        AdjustmentValueCache.getDelayValue().remove(symbol);
        AdjustmentValueCache.getFrequency().remove(symbol);
        AdjustmentValueCache.getPreAllocationList().remove(symbol);
        AdjustmentValueCache.getCurrentAllocationIndex().remove(symbol);
        AdjustmentValueCache.getUpDownTrend().remove(symbol); // 新增:清理波动方向缓存
    }
 
    /**
     * 获取K线周期(根据symbol或配置判断,示例返回1=1min,5=5min等)
     */
    private int getKLineCycle(String symbol) {
        // 实际场景可从配置或symbol后缀获取(如BTC-USDT-1MIN → 1)
        return 1;
    }
 
    /**
     * 正态分布随机数生成(均值0,标准差1)
     */
    private static double nextGaussian() {
        double u1 = Math.random();
        double u2 = Math.random();
        return Math.sqrt(-2 * Math.log(u1)) * Math.cos(2 * Math.PI * u2);
    }
}