From f4374f91ad562a357702e2ab4faeb1428d1172b1 Mon Sep 17 00:00:00 2001
From: zyy <zyy@email.com>
Date: Sat, 25 Oct 2025 15:30:51 +0800
Subject: [PATCH] ws优化
---
trading-order-huobi/src/main/java/com.yami.trading.huobi/data/websocket/service/huobi/connection/HuobiWebSocketConnection.java | 41 +++++++++++++++++++++++++++++------------
1 files changed, 29 insertions(+), 12 deletions(-)
diff --git a/trading-order-huobi/src/main/java/com.yami.trading.huobi/data/websocket/service/huobi/connection/HuobiWebSocketConnection.java b/trading-order-huobi/src/main/java/com.yami.trading.huobi/data/websocket/service/huobi/connection/HuobiWebSocketConnection.java
index 24af0ff..9973bfc 100644
--- a/trading-order-huobi/src/main/java/com.yami.trading.huobi/data/websocket/service/huobi/connection/HuobiWebSocketConnection.java
+++ b/trading-order-huobi/src/main/java/com.yami.trading.huobi/data/websocket/service/huobi/connection/HuobiWebSocketConnection.java
@@ -108,7 +108,7 @@
}
void connect() {
- if (state == ConnectionStateEnum.CONNECTED) {
+ if (state == ConnectionStateEnum.CONNECTED || webSocket != null) {
log.info("[Connection][" + this.getId() + "] Already connected");
return;
}
@@ -119,7 +119,8 @@
public void reConnect(int delayInSecond) {
log.warn("[Sub][" + this.getId() + "] Reconnecting after " + delayInSecond + " seconds later");
if (webSocket != null) {
- webSocket.cancel();
+ // 发送重连关闭帧(1001:端点离开)
+ webSocket.close(1001, "Reconnecting");
webSocket = null;
}
this.delayInSecond = delayInSecond;
@@ -292,17 +293,20 @@
public void close() {
log.error("[Connection close][" + this.getId() + "] Closing normally");
- webSocket.cancel();
- webSocket = null;
+ if (webSocket != null) {
+ // 发送正常关闭帧(1000:正常关闭)
+ webSocket.close(1000, "Normal close");
+ webSocket = null; // 置空,避免重复操作
+ }
WebSocketWatchDog.onClosedNormally(this);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
super.onClosed(webSocket, code, reason);
- if (state == ConnectionStateEnum.CONNECTED) {
- state = ConnectionStateEnum.IDLE;
- }
+ log.info("[Connection closed][" + this.getId() + "] Code: " + code + ", Reason: " + reason);
+ this.webSocket = null; // 确保旧连接引用被清除
+ state = ConnectionStateEnum.IDLE;
}
@SuppressWarnings("unchecked")
@@ -318,24 +322,37 @@
state = ConnectionStateEnum.CONNECTED;
lastReceivedTime = System.currentTimeMillis();
- // 不需要验签的话,直接把命令发出去就好
- commandList.forEach(command -> {
- send(command);
- });
+ // 延迟1秒发送订阅命令,避免连接未稳定
+ new Thread(() -> {
+ try {
+ Thread.sleep(1000); // 1秒延迟
+ commandList.forEach(command -> {
+ send(command);
+ });
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }).start();
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, Response response) {
onError("Unexpected error: " + t.getMessage(), t);
+ // 关键:关闭可能存在的响应体
+ if (response != null) {
+ response.close();
+ }
closeOnError();
}
private void closeOnError() {
if (webSocket != null) {
- this.webSocket.cancel();
+ // 发送错误关闭帧(1011:服务器内部错误)
+ webSocket.close(1011, "Error close");
state = ConnectionStateEnum.CLOSED_ON_ERROR;
log.error("[Connection error][" + this.getId() + "] Connection is closing due to error");
+ webSocket = null; // 置空,避免重复操作
}
}
--
Gitblit v1.9.3