package com.yami.trading.huobi.tradingview.service;
|
|
import com.yami.trading.huobi.tradingview.api.TickerSubscription;
|
import com.yami.trading.huobi.tradingview.api.TradingViewAPI;
|
import com.yami.trading.huobi.tradingview.api.UpdateListener;
|
import com.yami.trading.huobi.tradingview.api.model.CandleData;
|
import com.yami.trading.huobi.tradingview.api.model.Kline;
|
import com.yami.trading.huobi.tradingview.api.model.TickerData;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.stereotype.Service;
|
|
import java.text.SimpleDateFormat;
|
import java.util.*;
|
import java.util.concurrent.*;
|
import java.util.function.Consumer;
|
|
/**
|
* @Author: TG:哪吒出海
|
* @Date: 2025-05-28-0:43
|
* @Description:
|
*/
|
@Slf4j
|
@Service
|
public class TradingViewService {
|
|
// 分别为行情和K线创建独立的API实例
|
private final TradingViewAPI marketApi;
|
private final TradingViewAPI klineApi;
|
// 分别为行情和K线创建独立的线程池
|
private final ExecutorService marketExecutor;
|
private final ExecutorService klineExecutor;
|
|
private final ExecutorService startExecutor;
|
|
private static final String MARKET_DATA_KEY_PREFIX = "market:data:";
|
private static final String KLINE_DATA_KEY_PREFIX = "kline:data:";
|
|
// 存储所有活跃的订阅
|
private final ConcurrentMap<String, List<TickerSubscription>> activeSubscriptions = new ConcurrentHashMap<>();
|
|
public TradingViewService() {
|
this.marketApi = new TradingViewAPI();
|
this.klineApi = new TradingViewAPI();
|
|
this.marketExecutor = Executors.newCachedThreadPool(r -> {
|
Thread t = new Thread(r, "market-thread-pool");
|
t.setDaemon(true);
|
return t;
|
});
|
this.klineExecutor = Executors.newCachedThreadPool(r -> {
|
Thread t = new Thread(r, "kline-thread-pool");
|
t.setDaemon(true);
|
return t;
|
});
|
|
this.startExecutor = Executors.newCachedThreadPool(r -> {
|
Thread t = new Thread(r, "start-thread-pool");
|
t.setDaemon(true);
|
return t;
|
});
|
|
startExecutor.execute(this::initializeAPIs);
|
}
|
|
/**
|
* 连接tw wss
|
*/
|
private void initializeAPIs() {
|
try {
|
// 初始化行情API
|
marketApi.setup().get();
|
log.info("行情API初始化成功");
|
|
// 初始化K线API
|
klineApi.setup().get();
|
log.info("K线API初始化成功");
|
} catch (Exception e) {
|
log.error("API初始化失败", e);
|
throw new RuntimeException("API初始化失败", e);
|
}
|
}
|
|
public void subscribeSymbol(String symbols) {
|
CompletableFuture.runAsync(() -> {
|
try {
|
// 取消已存在的订阅
|
cancelSubscriptions(symbols);
|
|
// 创建通用的更新监听器
|
UpdateListener commonListener = data -> {
|
try {
|
|
System.out.println("收到更新:");
|
System.out.println(" 交易对: " + data.getProName());
|
System.out.println(" 最新价格: " + data.getLastPrice());
|
System.out.println(" 24h涨跌: " + data.getChange());
|
System.out.println(" 24h涨跌幅: " + data.getChangePercent() + "%");
|
System.out.println(" 成交量: " + data.getVolume());
|
Date lastUpdated = data.getLastUpdated();
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
String formattedDate = sdf.format(lastUpdated);
|
System.out.println(" 更新时间: " + formattedDate);
|
|
String redisKey = MARKET_DATA_KEY_PREFIX + data.getProName();
|
//redisTemplate.opsForValue().set(redisKey, data);
|
log.debug("更新行情数据到Redis: {} - 价格: {}", redisKey, data.getLastPrice());
|
} catch (Exception e) {
|
log.error("保存行情数据到Redis失败", e);
|
}
|
};
|
|
// 订阅所有交易对
|
TickerSubscription tickers = marketApi.getTickers(symbols).get();
|
List<TickerSubscription> newSubscriptions = new ArrayList<>();
|
|
// 为每个交易对添加监听器
|
tickers.addUpdateListener(commonListener);
|
newSubscriptions.add(tickers);
|
log.info("已添加监听器: {}", tickers.getSimpleOrProName());
|
|
// 保存新的订阅
|
activeSubscriptions.put(symbols, newSubscriptions);
|
|
} catch (Exception e) {
|
log.error("订阅symbols失败: " + symbols, e);
|
throw new RuntimeException("订阅失败", e);
|
}
|
}, marketExecutor);
|
}
|
|
|
/**
|
* 参数回调
|
* @param symbols
|
* @param callback
|
*/
|
public void subscribeSymbol(String symbols, Consumer<TickerData> callback) {
|
CompletableFuture.runAsync(() -> {
|
try {
|
// 取消已存在的订阅
|
cancelSubscriptions(symbols);
|
|
// 创建通用的更新监听器
|
UpdateListener commonListener = data -> {
|
try {
|
|
//if(data.getShortName().equals("AUDJPY")){
|
// System.out.println("收到更新:");
|
// System.out.println(" 名称: " + data.getShortName());
|
// System.out.println(" 交易对: " + data.getProName());
|
// System.out.println(" 最新价格: " + data.getLastPrice());
|
// System.out.println(" 开盘: " + data.getOpen());
|
// System.out.println(" 最高: " + data.getHigh());
|
// System.out.println(" 最低: " + data.getLow());
|
// System.out.println(" 昨日收盘: " + data.getPrevClose());
|
// System.out.println(" 24h涨跌: " + data.getChange());
|
// System.out.println(" 24h涨跌幅: " + data.getChangePercent() + "%");
|
// System.out.println(" 成交量: " + data.getVolume());
|
// Date lastUpdated = data.getLastUpdated();
|
// SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
// String formattedDate = sdf.format(lastUpdated);
|
// System.out.println(" 更新时间: " + formattedDate);
|
//}
|
|
// 直接回调行情数据
|
if (callback != null) {
|
callback.accept(data);
|
}
|
//String redisKey = MARKET_DATA_KEY_PREFIX + data.getProName();
|
//redisTemplate.opsForValue().set(redisKey, data);
|
//log.debug("更新行情数据到Redis: {} - 价格: {}", redisKey, data.getLastPrice());
|
} catch (Exception e) {
|
log.error("保存行情数据到Redis失败", e);
|
}
|
};
|
|
// 订阅所有交易对
|
TickerSubscription tickers = marketApi.getTickers(symbols).get();
|
List<TickerSubscription> newSubscriptions = new ArrayList<>();
|
|
// 为每个交易对添加监听器
|
tickers.addUpdateListener(commonListener);
|
newSubscriptions.add(tickers);
|
log.info("已添加监听器: {}", tickers.getSimpleOrProName());
|
|
// 保存新的订阅
|
activeSubscriptions.put(symbols, newSubscriptions);
|
|
} catch (Exception e) {
|
log.error("订阅symbols失败: " + symbols, e);
|
throw new RuntimeException("订阅失败", e);
|
}
|
}, marketExecutor);
|
}
|
|
private void cancelSubscriptions(String symbols) {
|
List<TickerSubscription> existingSubscriptions = activeSubscriptions.remove(symbols);
|
if (existingSubscriptions != null) {
|
for (TickerSubscription subscription : existingSubscriptions) {
|
try {
|
//移除所有监听器
|
List<UpdateListener> updateListeners = subscription.getUpdateListeners();
|
for (UpdateListener updateListener : updateListeners) {
|
if(null != updateListener){
|
subscription.removeUpdateListener(updateListener);
|
}
|
}
|
} catch (Exception e) {
|
log.error("取消订阅失败: " + subscription.getSimpleOrProName(), e);
|
}
|
}
|
log.info("已取消现有订阅: {}", symbols);
|
}
|
}
|
|
public TickerData getMarketData(String symbol) {
|
String redisKey = MARKET_DATA_KEY_PREFIX + symbol;
|
//TickerData data = (TickerData) redisTemplate.opsForValue().get(redisKey);
|
//if (data == null) {
|
// log.warn("未找到symbol的市场数据: {}", symbol);
|
//}
|
//return data;
|
return null;
|
}
|
|
public CompletableFuture<Map<String, List<Kline>>> getKlineData(String symbols, String interval) {
|
CompletableFuture<Map<String, List<Kline>>> future = new CompletableFuture<>();
|
try {
|
// 将symbols按逗号分割成单个交易对
|
String[] symbolArray = symbols.split(",");
|
Map<String, CompletableFuture<List<Kline>>> futures = new HashMap<>();
|
|
// 为每个交易对创建一个Future
|
for (String symbol : symbolArray) {
|
String trimmedSymbol = symbol.trim();
|
futures.put(trimmedSymbol, getKlineDataForSingleSymbol(trimmedSymbol, interval));
|
}
|
|
// 等待所有Future完成
|
CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[0]))
|
.thenAccept(v -> {
|
try {
|
// 将每个交易对的K线数据放入Map
|
Map<String, List<Kline>> result = new HashMap<>();
|
for (Map.Entry<String, CompletableFuture<List<Kline>>> entry : futures.entrySet()) {
|
result.put(entry.getKey(), entry.getValue().get());
|
}
|
future.complete(result);
|
} catch (Exception e) {
|
future.completeExceptionally(e);
|
log.error("合并K线数据失败", e);
|
}
|
});
|
} catch (Exception e) {
|
future.completeExceptionally(e);
|
log.error("获取K线数据失败: " + symbols, e);
|
}
|
return future;
|
}
|
|
private CompletableFuture<List<Kline>> getKlineDataForSingleSymbol(String symbol, String interval) {
|
CompletableFuture<List<Kline>> future = new CompletableFuture<>();
|
try {
|
String redisKey = String.format("%s:%s:%s", KLINE_DATA_KEY_PREFIX, symbol, interval);
|
|
// 先从Redis获取数据
|
//List<Kline> cachedData = (List<Kline>) redisTemplate.opsForValue().get(redisKey);
|
//if (cachedData != null) {
|
// log.debug("从Redis获取K线数据: {}", redisKey);
|
// future.complete(cachedData);
|
// return future;
|
//}
|
|
// 转换interval字符串为CandleData.Interval枚举
|
CandleData.Interval candleInterval = parseInterval(interval);
|
|
// 订阅K线数据
|
klineApi.subscribeCandles(symbol, candleInterval, candleData -> {
|
try {
|
List<Kline> klines = candleData.getKlines();
|
if (klines != null && !klines.isEmpty()) {
|
// 将数据保存到Redis(设置5分钟过期)
|
//redisTemplate.opsForValue().set(redisKey, klines, 5, TimeUnit.MINUTES);
|
log.debug("更新K线数据到Redis: {}", redisKey);
|
|
// 完成Future并清理资源
|
future.complete(klines);
|
//这里不再调用cleanup(),而是创建新的连接实例\
|
klineApi.cleanup();
|
klineApi.setup().get();
|
}
|
} catch (Exception e) {
|
future.completeExceptionally(e);
|
log.error("处理K线数据失败: " + symbol, e);
|
}
|
});
|
|
} catch (Exception e) {
|
future.completeExceptionally(e);
|
log.error("订阅K线数据失败: " + symbol, e);
|
}
|
return future;
|
}
|
|
private CandleData.Interval parseInterval(String interval) {
|
// 根据传入的interval字符串返回对应的枚举值
|
switch (interval.toUpperCase()) {
|
case "1D":
|
return CandleData.Interval.DAY_1;
|
case "5D":
|
return CandleData.Interval.DAY_5;
|
case "1W":
|
return CandleData.Interval.WEEK_1;
|
case "1M":
|
return CandleData.Interval.MONTH_1;
|
case "6M":
|
return CandleData.Interval.MONTH_6;
|
case "YTD":
|
return CandleData.Interval.YEAR_THIS;
|
case "12M":
|
return CandleData.Interval.YEAR_1;
|
case "60M":
|
return CandleData.Interval.YEAR_5;
|
default:
|
return CandleData.Interval.ALL;
|
}
|
}
|
|
//@PreDestroy
|
public void cleanup() {
|
try {
|
// 清理所有活跃的订阅
|
activeSubscriptions.forEach((symbols, subscriptions) -> {
|
for (TickerSubscription subscription : subscriptions) {
|
try {
|
//移除所有监听器
|
List<UpdateListener> updateListeners = subscription.getUpdateListeners();
|
for (UpdateListener updateListener : updateListeners) {
|
subscription.removeUpdateListener(updateListener);
|
}
|
} catch (Exception e) {
|
log.error("清理订阅失败: " + subscription.getSimpleOrProName(), e);
|
}
|
}
|
});
|
activeSubscriptions.clear();
|
|
// 关闭行情线程池
|
marketExecutor.shutdown();
|
if (!marketExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
marketExecutor.shutdownNow();
|
}
|
|
// 关闭K线线程池
|
klineExecutor.shutdown();
|
if (!klineExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
|
klineExecutor.shutdownNow();
|
}
|
|
// 清理API资源
|
marketApi.cleanup();
|
klineApi.cleanup();
|
log.info("TradingView服务清理完成");
|
} catch (Exception e) {
|
log.error("清理资源失败", e);
|
marketExecutor.shutdownNow();
|
klineExecutor.shutdownNow();
|
}
|
}
|
}
|