package com.yami.trading.huobi.tradingview.api;
|
|
import com.fasterxml.jackson.databind.JsonNode;
|
import com.google.common.base.Splitter;
|
import com.google.common.cache.Cache;
|
import com.google.common.cache.CacheBuilder;
|
import com.yami.trading.huobi.tradingview.api.model.CandleData;
|
import com.yami.trading.huobi.tradingview.api.model.Kline;
|
import com.yami.trading.huobi.tradingview.api.model.TickerData;
|
|
import java.util.*;
|
import java.util.concurrent.*;
|
import java.util.function.Consumer;
|
|
public class TradingViewAPI {
|
private final Map<String, Set<TickerSubscription>> subscriptionMap = new HashMap<>();
|
private final Map<String, Set<Consumer<CandleData>>> candleSubscriptions = new HashMap<>();
|
private final TradingViewWebSocket ws = new TradingViewWebSocket();
|
|
//价格缓存,避免更新实时价格的时候,错位
|
private static final Cache<String, TickerData> cache = CacheBuilder.newBuilder()
|
.maximumSize(20000) // 设置最大缓存大小为100条
|
.build();
|
|
public CompletableFuture<Void> setup() {
|
ws.setDataHandler(event -> {
|
if (!"ok".equals(event.status)) {
|
return;
|
}
|
|
// 处理实时报价数据
|
if (event.data.has("lp")) {
|
Set<TickerSubscription> subs = subscriptionMap.get(event.simpleOrProName);
|
if (subs != null) {
|
subs.removeIf(s -> {
|
if (s.isCanBeDestroyed()) {
|
s.setDestroyed(true);
|
if (subs.isEmpty()) {
|
ws.unregisterSymbol(s.getSimpleOrProName());
|
subscriptionMap.remove(s.getSimpleOrProName());
|
}
|
return true;
|
}
|
s.updateData(convertToTickerData(event.simpleOrProName,event.data));
|
return false;
|
});
|
}
|
}
|
|
// 处理K线数据
|
if (event.simpleOrProName.contains("cs_")) {
|
JsonNode candleNode = event.data;
|
|
for (Map.Entry<String, Set<Consumer<CandleData>>> entry : candleSubscriptions.entrySet()) {
|
String symbol = entry.getKey();
|
Set<Consumer<CandleData>> listeners = entry.getValue();
|
|
// 遍历每个监听器并触发
|
for (Consumer<CandleData> listener : listeners) {
|
CandleData candle = convertToCandleData(symbol, candleNode);
|
listener.accept(candle);
|
}
|
|
}
|
}
|
});
|
return ws.connect();
|
}
|
|
public void cleanup() {
|
ws.disconnect();
|
}
|
|
public CompletableFuture<TickerSubscription> getTicker(String simpleOrProName) {
|
Set<TickerSubscription> tickers = subscriptionMap.get(simpleOrProName);
|
if (tickers != null && !tickers.isEmpty()) {
|
return CompletableFuture.completedFuture(tickers.iterator().next());
|
}
|
|
TickerSubscription ticker = new TickerSubscription(this, simpleOrProName);
|
return ticker.fetch().thenApply(v -> ticker);
|
}
|
|
/**
|
* 支持多产品订阅
|
* @return
|
*/
|
public CompletableFuture<TickerSubscription> getTickers(String simpleOrProName) {
|
Set<TickerSubscription> tickers = subscriptionMap.get(simpleOrProName);
|
if (tickers != null && !tickers.isEmpty()) {
|
return CompletableFuture.completedFuture(tickers.iterator().next());
|
}
|
|
TickerSubscription ticker = new TickerSubscription(this, simpleOrProName);
|
return ticker.fetch().thenApply(v -> ticker);
|
}
|
|
public CompletableFuture<Void> ensureRegistered(TickerSubscription ticker) {
|
Set<TickerSubscription> tickers = subscriptionMap.get(ticker.getSimpleOrProName());
|
if (tickers != null && tickers.contains(ticker)) {
|
return CompletableFuture.completedFuture(null);
|
}
|
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
|
final UpdateListener onUpdate = new UpdateListener() {
|
@Override
|
public void onUpdate(TickerData data) {
|
if (data.getProName() == null) {
|
return;
|
}
|
ticker.removeUpdateListener(this);
|
future.complete(null);
|
}
|
};
|
|
ticker.addUpdateListener(onUpdate);
|
|
if (tickers == null) {
|
tickers = new HashSet<>();
|
//设置多产品订阅回调
|
List<String> symbols = Splitter.on(",").trimResults().splitToList(ticker.getSimpleOrProName());
|
for (String symbol : symbols) {
|
subscriptionMap.put(symbol, tickers);
|
}
|
}
|
|
tickers.add(ticker);
|
|
//数字货币用
|
//ws.registerSymbol(ticker.getSimpleOrProName())
|
// .orTimeout(3000, TimeUnit.MILLISECONDS)
|
// .exceptionally(ex -> {
|
// ticker.removeUpdateListener(onUpdate);
|
// future.completeExceptionally(new RuntimeException("Registration timed out"));
|
// return null;
|
// });
|
|
//最新接口
|
//ws.registerSymbolV2(ticker.getSimpleOrProName())
|
// .orTimeout(3000, TimeUnit.MILLISECONDS)
|
// .exceptionally(ex -> {
|
// ticker.removeUpdateListener(onUpdate);
|
// future.completeExceptionally(new RuntimeException("Registration timed out"));
|
// return null;
|
// });
|
|
//jdk8
|
try {
|
CompletableFuture.supplyAsync(() -> {
|
try {
|
return ws.registerSymbolV2(ticker.getSimpleOrProName());
|
} catch (Exception e) {
|
throw new CompletionException(e);
|
}
|
}).get(3000, TimeUnit.MILLISECONDS)
|
.exceptionally(ex -> {
|
ticker.removeUpdateListener(onUpdate);
|
future.completeExceptionally(new RuntimeException("Registration timed out"));
|
return null;
|
});
|
} catch (InterruptedException e) {
|
throw new RuntimeException(e);
|
} catch (ExecutionException e) {
|
throw new RuntimeException(e);
|
} catch (TimeoutException e) {
|
throw new RuntimeException(e);
|
}
|
|
return future;
|
}
|
|
public CompletableFuture<Void> subscribeCandles(String symbol, CandleData.Interval interval,
|
Consumer<CandleData> listener) {
|
String subscriptionKey = symbol + "_" + interval;
|
Set<Consumer<CandleData>> listeners = candleSubscriptions.computeIfAbsent(subscriptionKey,
|
k -> new HashSet<>());
|
listeners.add(listener);
|
|
return ws.subscribeCandles(symbol, interval);
|
}
|
|
public void unsubscribeCandles(String symbol, CandleData.Interval interval, Consumer<CandleData> listener) {
|
String subscriptionKey = symbol + "_" + interval.getCode();
|
Set<Consumer<CandleData>> listeners = candleSubscriptions.get(subscriptionKey);
|
if (listeners != null) {
|
listeners.remove(listener);
|
if (listeners.isEmpty()) {
|
candleSubscriptions.remove(subscriptionKey);
|
ws.unsubscribeCandles(symbol, interval);
|
}
|
}
|
}
|
|
private TickerData convertToTickerData(String proName,JsonNode data) {
|
TickerData tickerData = new TickerData();
|
if(Objects.nonNull(proName)){
|
tickerData.setProName(proName);
|
}
|
if (data.has("pro_name"))
|
tickerData.setProName(data.get("pro_name").asText());
|
|
if (data.has("short_name")) {
|
tickerData.setShortName(data.get("short_name").asText());
|
} else {
|
//兼容 价格跟币种错位问题
|
//FX_IDC:USDJPY
|
String result = proName.substring(proName.indexOf(":") + 1);
|
tickerData.setShortName(result);
|
}
|
|
if (data.has("exchange"))
|
tickerData.setExchange(data.get("exchange").asText());
|
else
|
tickerData.setExchange(cache.getIfPresent(proName).getExchange());
|
|
if (data.has("description"))
|
tickerData.setDescription(data.get("description").asText());
|
else
|
tickerData.setDescription(cache.getIfPresent(proName).getDescription());
|
|
if (data.has("type"))
|
tickerData.setType(data.get("type").asText());
|
else
|
tickerData.setType(cache.getIfPresent(proName).getType());
|
|
if (data.has("lp"))
|
tickerData.setLastPrice(data.get("lp").asDouble());
|
else
|
tickerData.setLastPrice(cache.getIfPresent(proName).getLastPrice());
|
|
if (data.has("ch"))
|
tickerData.setChange(data.get("ch").asDouble());
|
else
|
tickerData.setChange(cache.getIfPresent(proName).getChange() == null ? 0 : cache.getIfPresent(proName).getChange());
|
|
if (data.has("chp"))
|
tickerData.setChangePercent(data.get("chp").asDouble());
|
else
|
tickerData.setChangePercent(cache.getIfPresent(proName).getChangePercent() == null ? 0 : cache.getIfPresent(proName).getChangePercent());
|
|
if (data.has("volume"))
|
tickerData.setVolume(data.get("volume").asLong());
|
else
|
tickerData.setVolume(cache.getIfPresent(proName).getVolume());
|
|
// 新增字段
|
if (data.has("open_price"))
|
tickerData.setOpen(data.get("open_price").asDouble());
|
else
|
tickerData.setOpen(cache.getIfPresent(proName).getOpen());
|
|
if (data.has("high_price"))
|
tickerData.setHigh(data.get("high_price").asDouble());
|
else
|
tickerData.setHigh(cache.getIfPresent(proName).getHigh());
|
|
if (data.has("low_price"))
|
tickerData.setLow(data.get("low_price").asDouble());
|
else
|
tickerData.setLow(cache.getIfPresent(proName).getLow());
|
|
if (data.has("prev_close_price"))
|
tickerData.setPrevClose(data.get("prev_close_price").asDouble());
|
else
|
tickerData.setPrevClose(cache.getIfPresent(proName).getPrevClose());
|
|
//首次数据写入缓存
|
if(data.has("open_price") && data.has("high_price") && data.has("low_price") && data.has("prev_close_price")){
|
cache.put(proName,tickerData);
|
}
|
|
|
return tickerData;
|
}
|
|
private CandleData convertToCandleData(String symbol, JsonNode rootNode) {
|
CandleData candle = new CandleData();
|
candle.setSymbol(symbol);
|
|
List<Kline> kLineList = new ArrayList<>();
|
// 解析JSON数组
|
for (JsonNode node : rootNode) {
|
long index = node.get("i").asLong();
|
JsonNode valuesNode = node.get("v");
|
long timestamp = valuesNode.get(0).asLong() * 1000;
|
double open = valuesNode.get(1).asDouble();
|
double high = valuesNode.get(2).asDouble();
|
double low = valuesNode.get(3).asDouble();
|
double close = valuesNode.get(4).asDouble();
|
double volume = null != valuesNode.get(5) ? valuesNode.get(5).asDouble() : 0.00; // 如果有成交量的话
|
|
// 封装成K线数据对象
|
Kline kLine = new Kline(index,timestamp, open, high, low, close, volume);
|
kLineList.add(kLine);
|
}
|
candle.setKlines(kLineList);
|
return candle;
|
}
|
}
|