package com.yami.trading.huobi.data.websocket.service.huobi.connection;
|
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import com.yami.trading.huobi.data.websocket.constant.Options;
|
import com.yami.trading.huobi.data.websocket.constant.enums.ConnectionStateEnum;
|
import com.yami.trading.huobi.data.websocket.service.huobi.parser.HuobiModelParser;
|
import com.yami.trading.huobi.data.websocket.utils.*;
|
import okhttp3.Request;
|
import okhttp3.Response;
|
import okhttp3.WebSocket;
|
import okhttp3.WebSocketListener;
|
import okio.ByteString;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
|
|
import java.io.IOException;
|
import java.net.MalformedURLException;
|
import java.net.URL;
|
import java.util.List;
|
|
public class HuobiWebSocketConnection extends WebSocketListener implements WebSocketConnection {
|
|
private static Logger log = LoggerFactory.getLogger(HuobiWebSocketConnection.class);
|
|
public static final String HUOBI_TRADING_WEBSOCKET_PATH = "/ws/v1";
|
public static final String HUOBI_TRADING_WEBSOCKET_V2_PATH = "/ws/v2";
|
public static final String HUOBI_MARKET_WEBSOCKET_PATH = "/ws";
|
|
public static final String AUTH_VERSION_V1 = "v1";
|
|
private long lastReceivedTime;
|
|
private WebSocket webSocket;
|
|
private Request okhttpRequest;
|
|
private ConnectionStateEnum state;
|
|
private Long id;
|
|
private List<String> commandList;
|
|
private HuobiModelParser parser;
|
|
private ResponseCallback callback;
|
|
private boolean autoClose;
|
|
private boolean authNeed;
|
|
private Options options;
|
|
private long delayInSecond;
|
|
private String host;
|
|
private String authVersion = AUTH_VERSION_V1;
|
|
private HuobiWebSocketConnection() {
|
}
|
|
public static HuobiWebSocketConnection createAssetConnection(Options options, List<String> commandList,
|
HuobiModelParser parser, ResponseCallback callback, Boolean autoClose) {
|
|
return createConnection(options, commandList, parser, callback, autoClose, true);
|
}
|
|
public static HuobiWebSocketConnection createMarketConnection(Options options, List<String> commandList,
|
HuobiModelParser parser, ResponseCallback callback, boolean autoClose) {
|
return createConnection(options, commandList, parser, callback, autoClose, false);
|
}
|
|
public static HuobiWebSocketConnection createConnection(Options options, List<String> commandList,
|
HuobiModelParser parser, ResponseCallback callback, Boolean autoClose, boolean authNeed) {
|
return createConnection(options, commandList, parser, callback, autoClose, authNeed, AUTH_VERSION_V1);
|
}
|
|
public static HuobiWebSocketConnection createConnection(Options options, List<String> commandList,
|
HuobiModelParser parser, ResponseCallback callback, Boolean autoClose, boolean authNeed,
|
String authVersion) {
|
|
HuobiWebSocketConnection connection = new HuobiWebSocketConnection();
|
connection.setOptions(options);
|
connection.setCommandList(commandList);
|
connection.setParser(parser);
|
connection.setCallback(callback);
|
connection.setAuthNeed(authNeed);
|
connection.setAutoClose(autoClose);
|
connection.setId(IdGenerator.getNextId());
|
connection.setAuthVersion(authVersion);
|
|
// 创建websocket请求
|
String url = options.getWebSocketHost() + HUOBI_MARKET_WEBSOCKET_PATH;
|
Request request = new Request.Builder().url(url).build();
|
connection.setOkhttpRequest(request);
|
|
try {
|
connection.setHost(new URL(options.getRestHost()).getHost());
|
} catch (MalformedURLException e) {
|
e.printStackTrace();
|
}
|
|
// 开启链接
|
connection.connect();
|
return connection;
|
}
|
|
void connect() {
|
if (state == ConnectionStateEnum.CONNECTED || webSocket != null) {
|
log.info("[Connection][" + this.getId() + "] Already connected");
|
return;
|
}
|
log.info("[Connection][" + this.getId() + "] Connecting...");
|
webSocket = ConnectionFactory.createWebSocket(okhttpRequest, this);
|
}
|
|
public void reConnect(int delayInSecond) {
|
log.warn("[Sub][" + this.getId() + "] Reconnecting after " + delayInSecond + " seconds later");
|
if (webSocket != null) {
|
// 发送重连关闭帧(1001:端点离开)
|
webSocket.close(1001, "Reconnecting");
|
webSocket = null;
|
}
|
this.delayInSecond = delayInSecond;
|
state = ConnectionStateEnum.DELAY_CONNECT;
|
}
|
|
public void reConnect() {
|
if (delayInSecond != 0) {
|
delayInSecond--;
|
} else {
|
connect();
|
}
|
}
|
|
public long getLastReceivedTime() {
|
return this.lastReceivedTime;
|
}
|
|
void send(List<String> commandList) {
|
if (commandList == null || commandList.size() <= 0) {
|
return;
|
}
|
commandList.forEach(command -> {
|
send(command);
|
});
|
}
|
|
public void send(String str) {
|
boolean result = false;
|
log.info("[Connection Send]{}", str);
|
if (webSocket != null) {
|
result = webSocket.send(str);
|
}
|
if (!result) {
|
log.error("[Connection Send][" + this.getId() + "] Failed to send message");
|
closeOnError();
|
}
|
}
|
|
@Override
|
public void onMessage(WebSocket webSocket, String text) {
|
super.onMessage(webSocket, text);
|
lastReceivedTime = System.currentTimeMillis();
|
|
log.debug("[On Message Text]:{}", text);
|
try {
|
JSONObject json = JSON.parseObject(text);
|
|
if (json.containsKey("action")) {
|
String action = json.getString("action");
|
if ("ping".equals(action)) {
|
processPingOnV2TradingLine(json, webSocket);
|
} else if ("push".equals(action)) {
|
onReceive(json);
|
}
|
if ("req".equals(action)) {
|
String ch = json.getString("ch");
|
if ("auth".equals(ch)) {
|
send(commandList);
|
}
|
|
}
|
|
}
|
|
} catch (Exception e) {
|
log.error("[On Message][{}]: catch exception:", this.getId(), e);
|
closeOnError();
|
}
|
}
|
|
@SuppressWarnings("unchecked")
|
@Override
|
public void onMessage(WebSocket webSocket, ByteString bytes) {
|
super.onMessage(webSocket, bytes);
|
try {
|
|
lastReceivedTime = System.currentTimeMillis();
|
|
String data;
|
try {
|
data = new String(InternalUtils.decode(bytes.toByteArray()));
|
} catch (IOException e) {
|
log.error("[Connection On Message][" + this.getId() + "] Receive message error: " + e.getMessage());
|
closeOnError();
|
return;
|
}
|
log.debug("[Connection On Message][{}] {}", this.getId(), data);
|
JSONObject jsonObject = JSON.parseObject(data);
|
|
if (jsonObject.containsKey("status") && !"ok".equals(jsonObject.getString("status"))) {
|
String errorCode = jsonObject.getString("err-code");
|
String errorMsg = jsonObject.getString("err-msg");
|
onError(errorCode + ": " + errorMsg, null);
|
log.error("[Connection On Message][" + this.getId() + "] Got error from server: " + errorCode + "; "
|
+ errorMsg);
|
close();
|
} else if (jsonObject.containsKey("op")) {
|
String op = jsonObject.getString("op");
|
if (op.equals("notify")) {
|
onReceive(jsonObject);
|
} else if (op.equals("ping")) {
|
processPingOnTradingLine(jsonObject, webSocket);
|
} else if (op.equals("auth")) {
|
send(commandList);
|
} else if (op.equals("req")) {
|
onReceiveAndClose(jsonObject);
|
}
|
} else if (jsonObject.containsKey("ch") || jsonObject.containsKey("rep")) {
|
onReceiveAndClose(jsonObject);
|
} else if (jsonObject.containsKey("ping")) {
|
processPingOnMarketLine(jsonObject, webSocket);
|
} else if (jsonObject.containsKey("subbed")) {
|
}
|
} catch (Exception e) {
|
log.error("[Connection On Message][" + this.getId() + "] Unexpected error: " + e.getMessage());
|
closeOnError();
|
}
|
}
|
|
private void onError(String errorMessage, Throwable e) {
|
log.error("[Connection error][" + this.getId() + "] " + errorMessage);
|
closeOnError();
|
}
|
|
private void onReceiveAndClose(JSONObject jsonObject) {
|
onReceive(jsonObject);
|
if (autoClose) {
|
close();
|
}
|
}
|
|
@SuppressWarnings("unchecked")
|
private void onReceive(JSONObject jsonObject) {
|
Object obj = null;
|
try {
|
obj = parser.parse(jsonObject);
|
} catch (Exception e) {
|
onError("Process error: " + e.getMessage() + " You should capture the exception in your error handler", e);
|
return;
|
}
|
|
callback.onResponse(obj);
|
}
|
|
private void processPingOnTradingLine(JSONObject jsonObject, WebSocket webSocket) {
|
long ts = jsonObject.getLong("ts");
|
webSocket.send(String.format("{\"op\":\"pong\",\"ts\":%d}", ts));
|
}
|
|
private void processPingOnMarketLine(JSONObject jsonObject, WebSocket webSocket) {
|
long ts = jsonObject.getLong("ping");
|
webSocket.send(String.format("{\"pong\":%d}", ts));
|
}
|
|
private void processPingOnV2TradingLine(JSONObject jsonWrapper, WebSocket webSocket) {
|
long ts = jsonWrapper.getJSONObject("data").getLong("ts");
|
String pong = String.format("{\"action\": \"pong\",\"params\": {\"ts\": %d}}", ts);
|
webSocket.send(pong);
|
}
|
|
public ConnectionStateEnum getState() {
|
return state;
|
}
|
|
@Override
|
public Long getConnectionId() {
|
return this.getId();
|
}
|
|
public void close() {
|
log.error("[Connection close][" + this.getId() + "] Closing normally");
|
if (webSocket != null) {
|
// 发送正常关闭帧(1000:正常关闭)
|
webSocket.close(1000, "Normal close");
|
webSocket = null; // 置空,避免重复操作
|
}
|
WebSocketWatchDog.onClosedNormally(this);
|
}
|
|
@Override
|
public void onClosed(WebSocket webSocket, int code, String reason) {
|
super.onClosed(webSocket, code, reason);
|
log.info("[Connection closed][" + this.getId() + "] Code: " + code + ", Reason: " + reason);
|
this.webSocket = null; // 确保旧连接引用被清除
|
state = ConnectionStateEnum.IDLE;
|
}
|
|
@SuppressWarnings("unchecked")
|
@Override
|
public void onOpen(WebSocket webSocket, Response response) {
|
super.onOpen(webSocket, response);
|
this.webSocket = webSocket;
|
log.info("[Connection][" + this.getId() + "] Connected to server");
|
if (options.isWebSocketAutoConnect()) {
|
WebSocketWatchDog.onConnectionCreated(this);
|
}
|
|
state = ConnectionStateEnum.CONNECTED;
|
lastReceivedTime = System.currentTimeMillis();
|
|
// 延迟1秒发送订阅命令,避免连接未稳定
|
new Thread(() -> {
|
try {
|
Thread.sleep(1000); // 1秒延迟
|
commandList.forEach(command -> {
|
send(command);
|
});
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt();
|
}
|
}).start();
|
|
}
|
|
@Override
|
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
|
onError("Unexpected error: " + t.getMessage(), t);
|
// 关键:关闭可能存在的响应体
|
if (response != null) {
|
response.close();
|
}
|
closeOnError();
|
}
|
|
private void closeOnError() {
|
if (webSocket != null) {
|
// 发送错误关闭帧(1011:服务器内部错误)
|
webSocket.close(1011, "Error close");
|
state = ConnectionStateEnum.CLOSED_ON_ERROR;
|
log.error("[Connection error][" + this.getId() + "] Connection is closing due to error");
|
webSocket = null; // 置空,避免重复操作
|
}
|
}
|
|
public WebSocket getWebSocket() {
|
return webSocket;
|
}
|
|
public void setWebSocket(WebSocket webSocket) {
|
this.webSocket = webSocket;
|
}
|
|
public Request getOkhttpRequest() {
|
return okhttpRequest;
|
}
|
|
public void setOkhttpRequest(Request okhttpRequest) {
|
this.okhttpRequest = okhttpRequest;
|
}
|
|
public Long getId() {
|
return id;
|
}
|
|
public void setId(Long id) {
|
this.id = id;
|
}
|
|
public List<String> getCommandList() {
|
return commandList;
|
}
|
|
public void setCommandList(List<String> commandList) {
|
this.commandList = commandList;
|
}
|
|
public HuobiModelParser getParser() {
|
return parser;
|
}
|
|
public void setParser(HuobiModelParser parser) {
|
this.parser = parser;
|
}
|
|
public ResponseCallback getCallback() {
|
return callback;
|
}
|
|
public void setCallback(ResponseCallback callback) {
|
this.callback = callback;
|
}
|
|
public boolean isAutoClose() {
|
return autoClose;
|
}
|
|
public void setAutoClose(boolean autoClose) {
|
this.autoClose = autoClose;
|
}
|
|
public boolean isAuthNeed() {
|
return authNeed;
|
}
|
|
public void setAuthNeed(boolean authNeed) {
|
this.authNeed = authNeed;
|
}
|
|
public Options getOptions() {
|
return options;
|
}
|
|
public void setOptions(Options options) {
|
this.options = options;
|
}
|
|
public long getDelayInSecond() {
|
return delayInSecond;
|
}
|
|
public void setDelayInSecond(long delayInSecond) {
|
this.delayInSecond = delayInSecond;
|
}
|
|
public String getHost() {
|
return host;
|
}
|
|
public void setHost(String host) {
|
this.host = host;
|
}
|
|
public void setLastReceivedTime(long lastReceivedTime) {
|
this.lastReceivedTime = lastReceivedTime;
|
}
|
|
public void setState(ConnectionStateEnum state) {
|
this.state = state;
|
}
|
|
public String getAuthVersion() {
|
return authVersion;
|
}
|
|
public void setAuthVersion(String authVersion) {
|
this.authVersion = authVersion;
|
}
|
|
@Override
|
public String toString() {
|
return "HuobiWebSocketConnection [log=" + log + ", lastReceivedTime=" + lastReceivedTime + ", webSocket="
|
+ webSocket + ", okhttpRequest=" + okhttpRequest + ", state=" + state + ", id=" + id + ", commandList="
|
+ commandList + ", parser=" + parser + ", callback=" + callback + ", autoClose=" + autoClose
|
+ ", authNeed=" + authNeed + ", options=" + options + ", delayInSecond=" + delayInSecond + ", host="
|
+ host + ", authVersion=" + authVersion + "]";
|
}
|
|
}
|