1
dd
2026-01-06 45b1456afcdd3b103a21b573cd9a93437487efcd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package com.yami.trading.huobi.data.job;
 
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.yami.trading.bean.data.domain.Realtime;
import com.yami.trading.bean.item.domain.Item;
import com.yami.trading.common.domain.BaseEntity;
import com.yami.trading.huobi.data.DataCache;
import com.yami.trading.huobi.data.NezhaHandleRealTime;
import com.yami.trading.huobi.tradingview.service.TradingViewService;
import com.yami.trading.huobi.websocket.model.market.MarketTicker;
import com.yami.trading.service.item.ItemService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
 
/**
 * @Author: TG:哪吒出海
 * @Date: 2025-06-08-18:31
 * @Description:
 */
 
 
@Slf4j
@Component
public class TradingViewJob extends AbstractGetDataJob {
 
    @Autowired
    private ItemService itemService;
 
    @Autowired
    private TradingViewService tradingViewService;
 
    @Autowired
    private NezhaHandleRealTime nezhaHandleRealTime;
    // 在类级别定义共享的线程池
    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
 
    @Override
    public void run() {
        List<Item> list = itemService.list(new LambdaQueryWrapper<>(Item.class).eq(BaseEntity::getDelFlag,0)).stream().filter(i->"0".equalsIgnoreCase(i.getFake())).collect(Collectors.toList());
 
        //etf 跟 美股的行情
        String etfUsStockSymbols = list.stream()
                .filter(item -> item.getOpenCloseType() != null &&item.getOpenCloseType()
                        .equalsIgnoreCase(Item.US_STOCKS)).map(Item::getSymbolData)
                .collect(Collectors.joining(","));
        //外汇
        String forexSymbols = list.stream()
                .filter(item -> item.getOpenCloseType() != null && item.getOpenCloseType()
                        .equalsIgnoreCase(Item.forex)).map(Item::getSymbolData)
                .collect(Collectors.joining(","));
 
        //港股
        String hkSymbols = list.stream()
                .filter(item -> item.getOpenCloseType() != null && item.getOpenCloseType().equalsIgnoreCase(Item.HK_STOCKS))
                .map(item -> "HKEX:" + Integer.valueOf(item.getSymbolData()))
                .collect(Collectors.joining(","));
 
        // 开始订阅币种
        tradingViewService.subscribeSymbol(hkSymbols + "," + forexSymbols + "," + etfUsStockSymbols, data -> {
            // 使用共享线程池,延迟30分钟执行推送
            Realtime realtime = new Realtime();
            realtime.setSymbol(data.getShortName());
            realtime.setName(data.getShortName());
            realtime.setTs(System.currentTimeMillis());
            realtime.setOpen(data.getOpen());
            realtime.setClose(data.getLastPrice());
            realtime.setHigh(data.getHigh());
            realtime.setLow(data.getLow());
            realtime.setAmount(BigDecimal.ZERO.doubleValue());
            realtime.setVolume(data.getVolume());
            realtime.setNetChange(data.getChange());
            realtime.setChangeRatio(data.getChangePercent());
            nezhaHandleRealTime.builder(null).handleRealTimeList(realtime);
        });
    }
 
    @Override
    public String getName() {
        return "【股票,外汇,ETF】 行情数据采集";
    }
 
    @Override
    public void realtimeHandle(String symbols) {
 
    }
 
 
}