新版仿ok交易所-后端
zyy
2025-10-13 adbdffdb3b80eed8c7110c0583f8ae2f216b7990
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
package com.yami.trading.huobi.data.job;
 
 
import com.yami.trading.bean.data.domain.Realtime;
import com.yami.trading.bean.item.domain.Item;
import com.yami.trading.common.util.Arith;
import com.yami.trading.huobi.data.AdjustmentValueCache;
import com.yami.trading.huobi.data.DataCache;
import com.yami.trading.huobi.data.internal.DataDBService;
import com.yami.trading.huobi.data.model.AdjustmentValue;
import com.yami.trading.huobi.hobi.HobiDataService;
import com.yami.trading.service.item.ItemService;
import com.yami.trading.service.syspara.SysparaService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
 
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
 
 
public abstract class AbstractGetDataJob implements Runnable {
    public static volatile boolean first = true;
    protected static Logger logger = LoggerFactory.getLogger(StockGetDataJob.class);
    /**
     * 数据接口调用间隔时长(毫秒)
     */
    protected int interval;
    @Autowired
    protected SysparaService sysparaService;
    @Autowired
    protected DataDBService dataDBService;
    @Autowired
    protected HobiDataService hobiDataService;
    @Autowired
    protected ItemService itemService;
 
    public void start() {
        new Thread(this, getName()).start();
    }
 
    public abstract void run();
 
    public abstract String getName();
 
    // 在类中定义静态Random实例
    private static final Random random = new Random();
 
    public abstract void realtimeHandle(String symbols);
 
    public void handleRealTimeList(List<Realtime> realtimeList) {
        for (Realtime realtime : realtimeList) {
 
            try {
                String symbol = realtime.getSymbol();
                Integer decimal = itemService.getDecimal(symbol); // 虚拟币通常8位小数,需保留足够精度
                Item item = this.itemService.findBySymbol(symbol);
                BigDecimal currentValue = AdjustmentValueCache.getCurrentValue().get(symbol);
                AdjustmentValue delayValue = AdjustmentValueCache.getDelayValue().get(symbol);
 
 
                if (delayValue != null) {
 
                    // K线场景专属参数(可根据周期动态调整)
                    //int kLineCycle = getKLineCycle(symbol); // 获取K线周期(如1min=1, 5min=5)
                    //double baseFluctuation = kLineCycle <= 1 ? 0.05 : 0.08; // 短周期(1min)±5%,长周期(5min)±8%
                    double baseFluctuation = 0.1;
                    int trendConsistencyRate = 70; // 70%概率保持与前一个值的波动方向一致(模拟趋势)
                    int maxAdjacentFluctuation = 5; // 相邻值波动不超过前一个值的5%(避免视觉跳变)
 
 
                    Integer frequency = AdjustmentValueCache.getFrequency().get(symbol);
                    List<BigDecimal> preAllocationList = AdjustmentValueCache.getPreAllocationList().get(symbol);
                    Integer currentIndex = AdjustmentValueCache.getCurrentAllocationIndex().get(symbol);
                    // 新增:记录前一个值的波动方向(用于趋势一致性)
                    List<Boolean> upDownTrend = AdjustmentValueCache.getUpDownTrend().getOrDefault(symbol, new ArrayList<>());
 
                    if (frequency == null) {
                        frequency = (int) Arith.div(Arith.mul(delayValue.getSecond(), 1000.0D), this.interval);
                        AdjustmentValueCache.getFrequency().put(symbol, frequency);
 
                        if (frequency > 1) {
                            preAllocationList = new ArrayList<>(frequency);
                            BigDecimal totalValue = delayValue.getValue();
                            // 虚拟币精度高,中间计算保留decimal+4位(避免精度丢失)
                            BigDecimal average = totalValue.divide(new BigDecimal(frequency), decimal + 4, RoundingMode.HALF_UP);
                            BigDecimal sum = BigDecimal.ZERO;
 
                            // 优化1:最后k个值分散偏差(k=频率的2%,最少5个,确保最后一根K线无异常)
                            int lastK = Math.max(5, frequency / 50);
                            int normalCount = frequency - lastK;
 
                            // 优化2:前n-k个值——模拟真实行情波动(截断正态分布+趋势一致性)
                            for (int i = 0; i < normalCount; i++) {
                                BigDecimal randomValue;
                                if (i == 0) {
                                    // 第一个值:在基础波动范围内随机
                                    randomValue = generateTruncatedGaussianValue(average, baseFluctuation, decimal + 4);
                                } else {
                                    // 非第一个值:70%概率保持与前一个值的波动方向一致
                                    boolean lastUp = upDownTrend.get(i - 1);
                                    boolean keepTrend = Math.random() * 100 <= trendConsistencyRate;
 
                                    if (keepTrend) {
                                        // 保持趋势:上涨则继续涨(或小跌),下跌则继续跌(或小涨)
                                        if (lastUp) {
                                            // 前一个上涨:本次波动范围 [0, +baseFluctuation*1.2](允许小回调)
                                            randomValue = generateDirectionalValue(average, 0, baseFluctuation * 1.2, decimal + 4);
                                        } else {
                                            // 前一个下跌:本次波动范围 [-baseFluctuation*1.2, 0](允许小反弹)
                                            randomValue = generateDirectionalValue(average, -baseFluctuation * 1.2, 0, decimal + 4);
                                        }
                                    } else {
                                        // 反转趋势:正常基础波动
                                        randomValue = generateTruncatedGaussianValue(average, baseFluctuation, decimal + 4);
                                    }
 
                                    // 优化3:强约束相邻值波动——不超过前一个值的5%
                                    BigDecimal lastValue = preAllocationList.get(i - 1);
                                    BigDecimal maxAdjacent = lastValue.multiply(new BigDecimal(maxAdjacentFluctuation / 100.0))
                                            .setScale(decimal + 4, RoundingMode.HALF_UP);
                                    BigDecimal adjacentDiff = randomValue.subtract(lastValue).abs();
                                    if (adjacentDiff.compareTo(maxAdjacent) > 0) {
                                        randomValue = lastValue.add(
                                                adjacentDiff.divide(randomValue.subtract(lastValue), decimal + 4, RoundingMode.HALF_UP)
                                                        .multiply(maxAdjacent)
                                        );
                                    }
                                }
 
                                preAllocationList.add(randomValue);
                                sum = sum.add(randomValue);
                                // 记录当前值的波动方向(与平均值对比)
                                upDownTrend.add(randomValue.compareTo(average) > 0);
                            }
 
                            // 优化4:最后k个值——极小波动+偏差分散(确保最后一根K线平滑)
                            BigDecimal remaining = totalValue.subtract(sum);
                            BigDecimal avgLastK = remaining.divide(new BigDecimal(lastK), decimal + 4, RoundingMode.HALF_UP);
                            for (int i = 0; i < lastK - 1; i++) {
                                // 最后k-1个值:波动范围缩小到基础波动的1/3(±1.7%~2.7%)
                                BigDecimal smallFluctValue = generateTruncatedGaussianValue(avgLastK, baseFluctuation / 3, decimal + 4);
                                preAllocationList.add(smallFluctValue);
                                sum = sum.add(smallFluctValue);
                                upDownTrend.add(smallFluctValue.compareTo(avgLastK) > 0);
                            }
                            // 最后1个值:仅承担剩余微小偏差(波动≤基础波动的1/5)
                            BigDecimal finalValue = totalValue.subtract(sum);
                            BigDecimal maxFinalFluct = avgLastK.multiply(new BigDecimal(baseFluctuation / 5))
                                    .setScale(decimal + 4, RoundingMode.HALF_UP);
                            if (finalValue.abs().compareTo(maxFinalFluct) > 0) {
                                finalValue = maxFinalFluct.multiply(finalValue.signum() == 1 ? BigDecimal.ONE : BigDecimal.ONE.negate());
                                // 若超出范围,反向微调前一个值(确保总和正确)
                                BigDecimal prevLastValue = preAllocationList.get(preAllocationList.size() - 1);
                                preAllocationList.set(preAllocationList.size() - 1, prevLastValue.add(totalValue.subtract(sum).subtract(finalValue)));
                            }
                            preAllocationList.add(finalValue);
                            upDownTrend.add(finalValue.compareTo(avgLastK) > 0);
 
                            // 缓存新增:波动方向列表(用于趋势一致性)
                            AdjustmentValueCache.getPreAllocationList().put(symbol, preAllocationList);
                            AdjustmentValueCache.getCurrentAllocationIndex().put(symbol, 0);
                            AdjustmentValueCache.getUpDownTrend().put(symbol, upDownTrend);
                            currentIndex = 0;
                        }
                    }
 
                    // 后续分配逻辑(不变,仅需在清理缓存时移除upDownTrend)
                    if (frequency <= 1) {
                        // 单次分配逻辑
                        if (currentValue == null) {
                            AdjustmentValueCache.getCurrentValue().put(symbol, delayValue.getValue().setScale(decimal, RoundingMode.HALF_UP));
                        } else {
                            AdjustmentValueCache.getCurrentValue().put(symbol,
                                    currentValue.add(delayValue.getValue()).setScale(decimal, RoundingMode.HALF_UP));
                        }
                        if (!item.getAdjustmentValue().equals(AdjustmentValueCache.getCurrentValue().get(symbol))) {
                            item.setAdjustmentValue(AdjustmentValueCache.getCurrentValue().get(symbol));
                            itemService.saveOrUpdate(item);
                        }
                        cleanUpKLineCache(symbol); // K线场景专属缓存清理
                    } else {
                        // 按预分配列表取值
                        if (preAllocationList != null && currentIndex != null && currentIndex < preAllocationList.size()) {
                            BigDecimal currentValue_frequency = preAllocationList.get(currentIndex)
                                    .setScale(decimal, RoundingMode.HALF_UP);
 
                            // 更新当前值(K线价格)
                            if (currentValue == null) {
                                AdjustmentValueCache.getCurrentValue().put(symbol, currentValue_frequency);
                            } else {
                                AdjustmentValueCache.getCurrentValue().put(symbol,
                                        currentValue.add(currentValue_frequency).setScale(decimal, RoundingMode.HALF_UP));
                            }
 
                            // 更新延迟值和索引
                            delayValue.setValue(delayValue.getValue().subtract(currentValue_frequency)
                                    .setScale(decimal, RoundingMode.HALF_UP));
                            delayValue.setSecond(Arith.sub(delayValue.getSecond(), Arith.div(this.interval, 1000.0D)));
                            AdjustmentValueCache.getDelayValue().put(symbol, delayValue);
 
                            int nextIndex = currentIndex + 1;
                            AdjustmentValueCache.getCurrentAllocationIndex().put(symbol, nextIndex);
 
                            // 分配完成,清理缓存
                            if (nextIndex >= frequency) {
                                cleanUpKLineCache(symbol);
                            }
 
                            // 保存K线数据更新
                            if (!item.getAdjustmentValue().equals(AdjustmentValueCache.getCurrentValue().get(symbol))) {
                                item.setAdjustmentValue(AdjustmentValueCache.getCurrentValue().get(symbol));
                                itemService.saveOrUpdate(item);
                            }
                        }
                    }
                }
 
                currentValue = AdjustmentValueCache.getCurrentValue().get(realtime.getSymbol());
 
                if (currentValue != null && currentValue.compareTo(BigDecimal.ZERO) != 0) {
                    realtime.setClose(realtime.getClose().add(currentValue).setScale(decimal, RoundingMode.HALF_UP));
                    BigDecimal ask = realtime.getAsk();
                    if(ask!=null){
                        realtime.setAsk(ask.add(currentValue).setScale(decimal, RoundingMode.HALF_UP));
                    }
                    BigDecimal bid = realtime.getBid();
                    if(bid!=null){
                        realtime.setBid(bid.add(currentValue).setScale(decimal, RoundingMode.HALF_UP));
                    }
                    // realtime.setVolume(Arith.add(realtime.getVolume(), Arith.mul(Arith.div(currentValue, realtime.getClose()), realtime.getVolume())));
                    // realtime.setAmount(Arith.add(realtime.getAmount(), Arith.mul(Arith.div(currentValue, realtime.getClose()), realtime.getAmount())));
                }
 
                // 缓存中最新一条Realtime数据
                Realtime realtimeLast = DataCache.getRealtime(symbol);
                // 临时处理:正常10秒超过25%也不合理,丢弃.只有虚拟货币才这样执行
                boolean checkRate = getName().contains("虚拟货币");
                double rate = 0;
                if (!checkRate) {
                    saveData(realtime, symbol, item);
                }else{
                    if (realtimeLast != null) {
                        rate = Math.abs(Arith.sub(realtime.getClose().doubleValue(), realtimeLast.getClose().doubleValue()));
                    }
                    if (null == realtimeLast || Arith.div(rate, realtimeLast.getClose().doubleValue()) < 0.25D) {
                        saveData(realtime, symbol, item);
                    } else {
                        logger.error("当前{}价格{},上一次价格为{}过25%也不合理,丢弃Realtime,不入库", realtime.getSymbol(),realtimeLast.getClose(), realtime.getClose());
                    }
                }
 
 
            } catch (Exception e) {
                logger.error("数据采集失败", e);
 
            }
        }
    }
 
    private void saveData(Realtime realtime, String symbol, Item item) {
        Double high = DataCache.getRealtimeHigh().get(symbol);
        Double low = DataCache.getRealtimeLow().get(symbol);
        if (realtime.getTs().toString().length() <= 10) {
            realtime.setTs(Long.valueOf(realtime.getTs() + "000"));
        }
        realtime.setName(item.getName());
        if (high == null || realtime.getHigh().doubleValue() > high) {
            DataCache.getRealtimeHigh().put(symbol, realtime.getHigh().doubleValue());
        }
        if ((low == null || realtime.getLow().doubleValue() < low) && realtime.getLow().doubleValue() > 0) {
            DataCache.getRealtimeLow().put(symbol, realtime.getLow().doubleValue());
        }
        this.dataDBService.saveAsyn(realtime);
    }
 
 
 
    // ------------------------------ 工具方法 ------------------------------
    /**
     * 生成截断正态分布值(限制在平均值±fluctuation范围内,模拟真实小波动)
     */
    private BigDecimal generateTruncatedGaussianValue(BigDecimal average, double fluctuation, int scale) {
        double factor = nextGaussian() * 0.25; // 标准差0.25,99.7%概率在±0.75内(更集中)
        factor = Math.max(-1.0, Math.min(1.0, factor)); // 截断极端值
        BigDecimal fluctuationValue = average.multiply(new BigDecimal(factor * fluctuation))
                .setScale(scale, RoundingMode.HALF_UP);
        return average.add(fluctuationValue);
    }
 
    /**
     * 生成指定方向的波动值(如[0, +0.1]表示只涨不跌)
     */
    private BigDecimal generateDirectionalValue(BigDecimal average, double minFactor, double maxFactor, int scale) {
        double factor = minFactor + Math.random() * (maxFactor - minFactor);
        BigDecimal fluctuationValue = average.multiply(new BigDecimal(factor))
                .setScale(scale, RoundingMode.HALF_UP);
        return average.add(fluctuationValue);
    }
 
    /**
     * K线场景专属缓存清理(含波动方向列表)
     */
    private void cleanUpKLineCache(String symbol) {
        AdjustmentValueCache.getDelayValue().remove(symbol);
        AdjustmentValueCache.getFrequency().remove(symbol);
        AdjustmentValueCache.getPreAllocationList().remove(symbol);
        AdjustmentValueCache.getCurrentAllocationIndex().remove(symbol);
        AdjustmentValueCache.getUpDownTrend().remove(symbol); // 新增:清理波动方向缓存
    }
 
    /**
     * 获取K线周期(根据symbol或配置判断,示例返回1=1min,5=5min等)
     */
    private int getKLineCycle(String symbol) {
        // 实际场景可从配置或symbol后缀获取(如BTC-USDT-1MIN → 1)
        return 1;
    }
 
    /**
     * 正态分布随机数生成(均值0,标准差1)
     */
    private static double nextGaussian() {
        double u1 = Math.random();
        double u2 = Math.random();
        return Math.sqrt(-2 * Math.log(u1)) * Math.cos(2 * Math.PI * u2);
    }
}