package com.yami.trading.huobi.data.job;
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.google.common.base.Joiner;
|
import com.google.common.collect.Lists;
|
import com.yami.trading.bean.data.domain.Realtime;
|
import com.yami.trading.bean.item.domain.Item;
|
import com.yami.trading.common.domain.BaseEntity;
|
import com.yami.trading.huobi.data.DataCache;
|
import com.yami.trading.huobi.data.NezhaHandleRealTime;
|
import com.yami.trading.huobi.tradingview.service.TradingViewService;
|
import com.yami.trading.huobi.websocket.model.market.MarketTicker;
|
import com.yami.trading.service.item.ItemService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
|
import java.math.BigDecimal;
|
import java.math.RoundingMode;
|
import java.util.List;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
import java.util.stream.Collectors;
|
import java.util.stream.Stream;
|
|
/**
|
* @Author: TG:哪吒出海
|
* @Date: 2025-06-08-18:31
|
* @Description:
|
*/
|
|
|
@Slf4j
|
@Component
|
public class TradingViewJob extends AbstractGetDataJob {
|
|
@Autowired
|
private ItemService itemService;
|
|
@Autowired
|
private TradingViewService tradingViewService;
|
|
@Autowired
|
private NezhaHandleRealTime nezhaHandleRealTime;
|
// 在类级别定义共享的线程池
|
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
|
|
@Override
|
public void run() {
|
List<Item> list = itemService.list(new LambdaQueryWrapper<>(Item.class).eq(BaseEntity::getDelFlag,0)).stream().filter(i->"0".equalsIgnoreCase(i.getFake())).collect(Collectors.toList());
|
|
//etf 跟 美股的行情
|
String etfUsStockSymbols = list.stream()
|
.filter(item -> item.getOpenCloseType() != null &&item.getOpenCloseType()
|
.equalsIgnoreCase(Item.US_STOCKS)).map(Item::getSymbolData)
|
.collect(Collectors.joining(","));
|
//外汇
|
String forexSymbols = list.stream()
|
.filter(item -> item.getOpenCloseType() != null && item.getOpenCloseType()
|
.equalsIgnoreCase(Item.forex)).map(Item::getSymbolData)
|
.collect(Collectors.joining(","));
|
|
//港股
|
String hkSymbols = list.stream()
|
.filter(item -> item.getOpenCloseType() != null && item.getOpenCloseType().equalsIgnoreCase(Item.HK_STOCKS))
|
.map(item -> "HKEX:" + Integer.valueOf(item.getSymbolData()))
|
.collect(Collectors.joining(","));
|
|
// 开始订阅币种
|
tradingViewService.subscribeSymbol(hkSymbols + "," + forexSymbols + "," + etfUsStockSymbols, data -> {
|
// 使用共享线程池,延迟30分钟执行推送
|
Realtime realtime = new Realtime();
|
realtime.setSymbol(data.getShortName());
|
realtime.setName(data.getShortName());
|
realtime.setTs(System.currentTimeMillis());
|
realtime.setOpen(data.getOpen());
|
realtime.setClose(data.getLastPrice());
|
realtime.setHigh(data.getHigh());
|
realtime.setLow(data.getLow());
|
realtime.setAmount(BigDecimal.ZERO.doubleValue());
|
realtime.setVolume(data.getVolume());
|
realtime.setNetChange(data.getChange());
|
realtime.setChangeRatio(data.getChangePercent());
|
nezhaHandleRealTime.builder(null).handleRealTimeList(realtime);
|
});
|
}
|
|
@Override
|
public String getName() {
|
return "【股票,外汇,ETF】 行情数据采集";
|
}
|
|
@Override
|
public void realtimeHandle(String symbols) {
|
|
}
|
|
|
}
|