package com.yami.trading.huobi.tradingview.api;
|
|
import cn.hutool.core.util.RandomUtil;
|
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.google.common.base.Splitter;
|
import com.yami.trading.common.nezha.NezhaConstants;
|
import com.yami.trading.huobi.tradingview.api.model.CandleData;
|
import org.java_websocket.client.WebSocketClient;
|
import org.java_websocket.handshake.ServerHandshake;
|
|
import java.net.InetSocketAddress;
|
import java.net.Proxy;
|
import java.net.URI;
|
import java.util.HashSet;
|
import java.util.Set;
|
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.TimeUnit;
|
import java.util.function.Consumer;
|
import java.util.stream.Stream;
|
|
public class TradingViewWebSocket {
|
private static final int DEFAULT_TIMEOUT = 10000;
|
private static final String UNAUTHORIZED_USER_TOKEN = "unauthorized_user_token";
|
private static final String[] ALL_QUOTE_FIELDS = {
|
"pro_name", "short_name", "exchange", "description", "type",
|
"lp", "ch", "chp", "volume"
|
};
|
|
private static final String[] ALL_STOCK_QUOTE_FIELDS = {
|
"base-currency-logoid",
|
"ch",
|
"chp",
|
"currency-logoid",
|
"currency_code",
|
"currency_id",
|
"base_currency_id",
|
"current_session",
|
"description",
|
"exchange",
|
"format",
|
"fractional",
|
"is_tradable",
|
"language",
|
"local_description",
|
"listed_exchange",
|
"logoid",
|
"lp",
|
"lp_time",
|
"minmov",
|
"minmove2",
|
"original_name",
|
"pricescale",
|
"pro_name",
|
"short_name",
|
"type",
|
"typespecs",
|
"update_mode",
|
"volume",
|
"variable_tick_size",
|
"value_unit_id",
|
"unit_id",
|
"measure"
|
};
|
|
private WebSocketClient ws;
|
private String quoteSession;
|
private String chartSession;
|
private final Set<String> subscriptions = new HashSet<>();
|
private final ObjectMapper objectMapper = new ObjectMapper();
|
private Consumer<DataEvent> dataHandler;
|
|
public void setDataHandler(Consumer<DataEvent> handler) {
|
this.dataHandler = handler;
|
}
|
|
public CompletableFuture<Void> connect() {
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
quoteSession = null;
|
chartSession = null;
|
System.out.println("正在尝试建立WebSocket连接...");
|
|
try {
|
ws = new WebSocketClient(new URI("wss://data.tradingview.com/socket.io/websocket?from=symbols%2FIDX-AADI%2F&date=2025_05_12-17_16")) {
|
@Override
|
public void onOpen(ServerHandshake handshakedata) {
|
System.out.println("WebSocket连接已建立");
|
}
|
|
@Override
|
public void onMessage(String message) {
|
handleMessage(message);
|
}
|
|
@Override
|
public void onClose(int code, String reason, boolean remote) {
|
System.out.println("WebSocket连接已关闭: " + reason);
|
System.out.println("关闭代码: " + code);
|
System.out.println("是否为远程关闭: " + remote);
|
}
|
|
@Override
|
public void onError(Exception ex) {
|
System.err.println("WebSocket错误: " + ex.getMessage());
|
ex.printStackTrace();
|
future.completeExceptionally(ex);
|
}
|
};
|
|
if (NezhaConstants.LOCAL_PROXY) {
|
Proxy proxy = new Proxy(Proxy.Type.SOCKS, new InetSocketAddress("127.0.0.1", 10808));
|
ws.setProxy(proxy);
|
}
|
|
|
ws.addHeader("Origin", "https://tw.tradingview.com");
|
ws.addHeader("User-Agent",
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)");
|
ws.addHeader("Accept-Language", "zh-CN,zh;q=0.9");
|
ws.addHeader("Pragma", "no-cache");
|
ws.connect();
|
|
// Wait for session to be ready
|
CompletableFuture.runAsync(() -> {
|
try {
|
waitForSession();
|
//推送K线图消息
|
createChartSession();
|
future.complete(null);
|
} catch (Exception e) {
|
future.completeExceptionally(e);
|
}
|
});
|
|
} catch (Exception e) {
|
future.completeExceptionally(e);
|
}
|
|
return future;
|
}
|
|
public void disconnect() {
|
if (ws != null) {
|
ws.close();
|
ws = null;
|
quoteSession = null;
|
chartSession = null;
|
subscriptions.clear();
|
}
|
}
|
|
|
/**
|
* 最新版本接口
|
* @param symbol
|
* @return
|
*/
|
public CompletableFuture<Void> registerSymbolV2(String symbol) {
|
if (subscriptions.contains(symbol)) {
|
return CompletableFuture.completedFuture(null);
|
}
|
subscriptions.add(symbol);
|
//兼容多产品订阅
|
//List<String> syms = Splitter.on(",").trimResults().splitToList(symbol);
|
//for (String sym : syms) {
|
// sendMessage("quote_add_symbols", new Object[] { quoteSession, sym });
|
// String extendedStr = "={\"session\":\"extended\",\"symbol\":\""+sym+"\"}";
|
// sendMessage("quote_add_symbols", new Object[] { quoteSession, extendedStr });
|
// //quote_fast_symbols
|
// sendMessage("quote_fast_symbols", new Object[] { quoteSession, sym, extendedStr });
|
//}
|
String[] symbolsArray = Stream.concat(Stream.of(quoteSession), Splitter.on(",").trimResults().splitToList(symbol).stream())
|
.toArray(String[]::new);
|
// 发送消息
|
sendMessage("quote_add_symbols", symbolsArray);
|
sendMessage("quote_fast_symbols", symbolsArray);
|
return CompletableFuture.completedFuture(null);
|
}
|
|
public CompletableFuture<Void> registerSymbol(String symbol) {
|
if (subscriptions.contains(symbol)) {
|
return CompletableFuture.completedFuture(null);
|
}
|
subscriptions.add(symbol);
|
sendMessage("quote_add_symbols", new Object[] { quoteSession, symbol });
|
return CompletableFuture.completedFuture(null);
|
}
|
|
public void unregisterSymbol(String symbol) {
|
if (subscriptions.remove(symbol)) {
|
sendMessage("quote_remove_symbols", new Object[] { quoteSession, symbol });
|
}
|
}
|
|
private void handleMessage(String message) {
|
try {
|
String remainingMessage = message;
|
while (!remainingMessage.isEmpty()) {
|
|
// 解析 Socket.IO 消息头
|
java.util.regex.Pattern pattern = java.util.regex.Pattern.compile("~m~(\\d+)~m~");
|
java.util.regex.Matcher matcher = pattern.matcher(remainingMessage);
|
|
if (!matcher.find()) {
|
throw new Exception("Invalid Socket.IO packet");
|
}
|
|
String lengthStr = matcher.group(1);
|
int headerLength = matcher.group(0).length();
|
int packetLength = Integer.parseInt(lengthStr);
|
|
//提取未开盘的价格,暂未开发
|
//System.out.println("收到wss数据消息: " + remainingMessage);
|
|
// 提取消息内容
|
String packet = remainingMessage.substring(headerLength, headerLength + packetLength);
|
// 处理心跳包
|
if (packet.startsWith("~h~")) {
|
ws.send(prependHeader("~h~" + packet.substring(3)));
|
} else {
|
// 处理普通JSON消息
|
JsonNode data = objectMapper.readTree(packet);
|
|
//System.out.println("收到wss数据消息: " + data.toString());
|
|
handlePacket(data);
|
}
|
|
// 移动到下一个消息
|
remainingMessage = remainingMessage.substring(headerLength + packetLength);
|
}
|
} catch (Exception e) {
|
System.err.println("Error handling message: " + e.getMessage());
|
e.printStackTrace();
|
}
|
}
|
|
private void handlePacket(JsonNode data) {
|
try {
|
if (data.has("session_id")) {
|
setAuthToken(UNAUTHORIZED_USER_TOKEN);
|
createQuoteSession();
|
//setQuoteFields(ALL_QUOTE_FIELDS);
|
//setStockQuoteFields(ALL_STOCK_QUOTE_FIELDS);
|
return;
|
}
|
|
//K线消息
|
if (data.has("m") && "timescale_update".equals(data.get("m").asText()) && data.has("p")) {
|
JsonNode params = data.get("p");
|
if (params.isArray() && params.size() > 1 && params.get(0).asText().equals(chartSession)) {
|
JsonNode tickerData = params.get(1);
|
if (dataHandler != null) {
|
dataHandler.accept(new DataEvent(
|
params.get(0).asText(),
|
tickerData.get("sds_1").get("s")
|
));
|
}
|
}
|
}
|
|
if (data.has("m") && "qsd".equals(data.get("m").asText()) && data.has("p")) {
|
JsonNode params = data.get("p");
|
if (params.isArray() && params.size() > 1 && params.get(0).asText().equals(quoteSession)) {
|
JsonNode tickerData = params.get(1);
|
if (dataHandler != null) {
|
dataHandler.accept(new DataEvent(
|
tickerData.get("n").asText(),
|
tickerData.get("s").asText(),
|
tickerData.get("v")));
|
}
|
}
|
}
|
} catch (Exception e) {
|
System.err.println("Error handling packet: " + e.getMessage());
|
e.printStackTrace();
|
}
|
}
|
|
private void sendMessage(String func, Object[] args) {
|
try {
|
String payload = objectMapper.writeValueAsString(new MessagePacket(func, args));
|
String message = prependHeader(payload);
|
System.out.println("正在发送消息: " + message);
|
ws.send(message);
|
} catch (Exception e) {
|
System.err.println("Error sending message: " + e.getMessage());
|
e.printStackTrace();
|
}
|
}
|
|
private String prependHeader(String str) {
|
return "~m~" + str.length() + "~m~" + str;
|
}
|
|
private void setAuthToken(String token) {
|
sendMessage("set_auth_token", new Object[] { token });
|
}
|
|
private void createQuoteSession() {
|
quoteSession = "qs_" + generateRandomString(12);
|
sendMessage("quote_create_session", new Object[] { quoteSession });
|
}
|
|
private void setQuoteFields(String[] fields) {
|
Object[] args = new Object[fields.length + 1];
|
args[0] = quoteSession;
|
System.arraycopy(fields, 0, args, 1, fields.length);
|
sendMessage("quote_set_fields", args);
|
}
|
|
/**
|
* 股票专用查询字段
|
* @param fields
|
*/
|
private void setStockQuoteFields(String[] fields) {
|
String[] combinedFields = new String[fields.length + 1];
|
combinedFields[0] = quoteSession;
|
System.arraycopy(fields, 0, combinedFields, 1, fields.length);
|
sendMessage("quote_set_fields", combinedFields);
|
}
|
|
/**
|
* 构建K线 session
|
*/
|
private void createChartSession() {
|
chartSession = "cs_" + generateRandomString(12);
|
sendMessage("chart_create_session", new Object[] { chartSession, "disable_statistics" });
|
}
|
|
public CompletableFuture<Void> subscribeCandles(String symbol, CandleData.Interval interval) {
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
String subscriptionKey = symbol + "_" + interval;
|
|
if (subscriptions.contains(subscriptionKey)) {
|
return CompletableFuture.completedFuture(null);
|
}
|
|
int i = RandomUtil.randomInt(1);
|
|
if(null == chartSession){
|
return CompletableFuture.completedFuture(null);
|
}
|
|
//创建时区
|
sendMessage("switch_timezone",new Object[]{
|
chartSession,
|
"Etc/UTC"
|
});
|
|
//订阅币种
|
sendMessage("resolve_symbol",new Object[]{
|
chartSession,
|
"sds_sym_" + i,
|
symbol
|
});
|
|
//推送创建周期消息
|
sendMessage("create_series",new Object[]{
|
chartSession,
|
"sds_1",
|
"s" + i, // 请求次数
|
"sds_sym_" + i,
|
interval.getVal(), //K线单位
|
300,
|
interval.getCode() //K线周期
|
});
|
|
//推送K线消息
|
sendMessage("modify_series",new Object[] {
|
chartSession,
|
"sds_1",
|
"s" + i, // 请求次数
|
"sds_sym_" + i,
|
interval.getVal(), //K线单位
|
interval.getCode() //K线周期
|
});
|
|
//设置刻度模式
|
sendMessage("set_future_tickmarks_mode", new Object[]{
|
chartSession,
|
"full_single_session"
|
});
|
|
// 添加到订阅列表
|
subscriptions.add(subscriptionKey);
|
return future;
|
}
|
|
public void unsubscribeCandles(String symbol, CandleData.Interval interval) {
|
String subscriptionKey = symbol + "_" + interval;
|
if (subscriptions.remove(subscriptionKey)) {
|
sendMessage("remove_series", new Object[] {
|
chartSession,
|
"s1" // 系列ID
|
});
|
}
|
}
|
|
private String generateRandomString(int length) {
|
String chars = "abcdefghijklmnopqrstuvwxyz";
|
StringBuilder sb = new StringBuilder();
|
for (int i = 0; i < length; i++) {
|
sb.append(chars.charAt((int) (Math.random() * chars.length())));
|
}
|
return sb.toString();
|
}
|
|
private void waitForSession() throws Exception {
|
long startTime = System.currentTimeMillis();
|
while (quoteSession == null) {
|
if (System.currentTimeMillis() - startTime > DEFAULT_TIMEOUT) {
|
throw new Exception("Session initialization timeout");
|
}
|
TimeUnit.MILLISECONDS.sleep(100);
|
}
|
}
|
|
private static class MessagePacket {
|
public String m;
|
public Object[] p;
|
|
public MessagePacket(String method, Object[] params) {
|
this.m = method;
|
this.p = params;
|
}
|
}
|
|
public static class DataEvent {
|
public final String simpleOrProName;
|
public final String status;
|
public final JsonNode data;
|
|
public DataEvent(String simpleOrProName, String status, JsonNode data) {
|
this.simpleOrProName = simpleOrProName;
|
this.status = status;
|
this.data = data;
|
}
|
|
public DataEvent(String simpleOrProName,JsonNode data) {
|
this.simpleOrProName = simpleOrProName;
|
this.status = "ok";
|
this.data = data;
|
}
|
}
|
}
|