From ccb47a55a4ed9f3dbcaca9d40d8142515a91bdbe Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Wed, 07 Aug 2024 03:44:00 +0800
Subject: [PATCH] 1
---
mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java | 77 ++++----
geteClient/src/main/java/org/example/geteclient/task/RunTask.java | 12 +
geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java | 75 ++++----
kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java | 65 ++++----
bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java | 78 ++++-----
geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java | 32 ---
kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java | 13 +
mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java | 12 +
bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java | 33 ---
mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java | 32 ---
kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java | 32 ---
bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java | 13 +
12 files changed, 196 insertions(+), 278 deletions(-)
diff --git a/bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java
index 1730608..40ad1f2 100644
--- a/bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java
@@ -38,9 +38,6 @@
private CurrencySerivceImpl currencyService;
@Autowired
- private ConfigurableApplicationContext context;
-
- @Autowired
@Qualifier("threadPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@@ -59,38 +56,12 @@
List<Currency> sublist = mexc.subList(fromIndex, toIndex); // 切分子列表
String parameter = getParameter(sublist); // 获取参数
threadPoolTaskExecutor.execute(() -> {
- try {
- new BitgetClient(parameter).start();
- } catch (Exception e) {
- run();
- }
+ new BitgetClient(parameter).start();
+
});
}
}
- }
-
- private boolean runExecuted = false;
- private synchronized void run() {
-
- if (runExecuted) {
- return; // 已经执行过,直接返回
- }
- runExecuted = true;
- log.info("ws 异常开始重启");
- Thread restartThread = new Thread(() -> {
- try {
- SpringApplication.exit(context, () -> 0);
- SpringApplication.run(BitgetsClientApplication.class);
- log.info("ws 重启成功");
- } catch (Exception e) {
- e.printStackTrace();
- log.error("ws 重启失败");
- }
- });
- restartThread.setDaemon(false);
- restartThread.start();
- log.info("ws 重启失败");
}
public String getParameter(List<Currency> list) throws JsonProcessingException, JSONException {
diff --git a/bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java
index aa0661d..25b7c0a 100644
--- a/bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java
@@ -3,9 +3,11 @@
import lombok.extern.slf4j.Slf4j;
import org.example.bitgetsclient.BitgetsClientApplication;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.servlet.function.ServerResponse;
@@ -23,8 +25,17 @@
@Autowired
private ConfigurableApplicationContext context;
- @Scheduled(cron = "0 0/30 * * * ?")
+ @Autowired
+ @Qualifier("threadPoolTaskExecutor")
+ private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+ @Scheduled(cron = "0 0/11 * * * ?")
public void restart() {
+ // 停止当前线程池中的任务
+ threadPoolTaskExecutor.shutdown();
+ // 创建新的线程池
+ threadPoolTaskExecutor.initialize();
+
Thread restartThread = new Thread(() -> {
try {
SpringApplication.exit(context, () -> 0);
diff --git a/bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java
index c3cb229..dcdb2b1 100644
--- a/bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java
@@ -63,7 +63,7 @@
- public void start() throws Exception {
+ public void start() {
try {
connect(); // 尝试连接
if (session == null) {
@@ -84,7 +84,6 @@
log.error("线程被中断: " + e.getMessage(), e); // 记录线程中断的错误
} catch (Exception e) {
log.error("bitget ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常
- throw e;
} finally {
executorService.shutdownNow(); // 尝试立即关闭调度服务
}
@@ -175,52 +174,51 @@
}
@OnClose
- public void onClose() throws Exception {
+ public void onClose() {
log.info("bitget ws 连接已关闭,尝试重新连接..."); // 记录连接关闭的信息
- throw new Exception();
+ handleConnectionClosedOrError();
}
@OnError
- public void onError(Throwable throwable) throws Exception {
+ public void onError(Throwable throwable) {
log.error("bitget ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误信息
- throw new Exception();
+ handleConnectionClosedOrError();
}
-// private void handleConnectionClosedOrError() {
-// executorService.execute(() -> {
-// try {
-// attemptReconnect();
-// } catch (Exception e) {
-// throw new RuntimeException(e);
-// }
-// });
-// }
+ private void handleConnectionClosedOrError() {
+ executorService.execute(() -> {
+ try {
+ attemptReconnect();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
-// private void attemptReconnect() throws Exception {
-// if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制
-// try {
-// log.info("bitget ws 开始重连"); // 记录开始重连的信息
-// connect(); // 尝试重连
-// log.info("bitget ws 重连成功"); // 成功重连的日志
-// reconnectAttempts = 0; // 重连成功,重置重连次数
-// } catch (Exception e) {
-// reconnectAttempts++; // 增加重连尝试次数
-// log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息
-// // 采用指数退避策略,增加重连间隔
-// long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒
-// scheduleReconnect(waitTime); // 调度下次重连
-// }
-// } else {
-// log.error("超过最大重连次数,停止重连"); // 超过最大重连次数
-// throw new Exception();
-// }
-// }
-//
-// private void scheduleReconnect(long waitTime) { // 接受等待时间参数
-// if (!executorService.isShutdown()) {
-// executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连
-// }
-// }
+ private void attemptReconnect() {
+ if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制
+ try {
+ log.info("bitget ws 开始重连"); // 记录开始重连的信息
+ connect(); // 尝试重连
+ log.info("bitget ws 重连成功"); // 成功重连的日志
+ reconnectAttempts = 0; // 重连成功,重置重连次数
+ } catch (Exception e) {
+ reconnectAttempts++; // 增加重连尝试次数
+ log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息
+ // 采用指数退避策略,增加重连间隔
+ long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒
+ scheduleReconnect(waitTime); // 调度下次重连
+ }
+ } else {
+ log.error("超过最大重连次数,停止重连"); // 超过最大重连次数
+ }
+ }
+
+ private void scheduleReconnect(long waitTime) { // 接受等待时间参数
+ if (!executorService.isShutdown()) {
+ executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连
+ }
+ }
private void sendPing() {
try {
diff --git a/geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java b/geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java
index 0e84e1b..1fc5d4a 100644
--- a/geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java
+++ b/geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java
@@ -30,9 +30,6 @@
private CurrencySerivceImpl currencyService;
@Autowired
- private ConfigurableApplicationContext context;
-
- @Autowired
@Qualifier("threadPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@@ -51,38 +48,11 @@
// 使用自定义线程池提交任务
threadPoolTaskExecutor.execute(() -> {
- try {
- new GateClient(sublist).start();
- } catch (Exception e) {
- run();
- }
+ new GateClient(sublist).start();
});
}
}
- }
-
- private boolean runExecuted = false;
- private synchronized void run() {
-
- if (runExecuted) {
- return; // 已经执行过,直接返回
- }
- runExecuted = true;
- log.info("ws 异常开始重启");
- Thread restartThread = new Thread(() -> {
- try {
- SpringApplication.exit(context, () -> 0);
- SpringApplication.run(GeteClientApplication.class);
- log.info("ws 重启成功");
- } catch (Exception e) {
- e.printStackTrace();
- log.error("ws 重启失败");
- }
- });
- restartThread.setDaemon(false);
- restartThread.start();
- log.info("ws 重启失败");
}
}
diff --git a/geteClient/src/main/java/org/example/geteclient/task/RunTask.java b/geteClient/src/main/java/org/example/geteclient/task/RunTask.java
index 1922b9e..6ea5561 100644
--- a/geteClient/src/main/java/org/example/geteclient/task/RunTask.java
+++ b/geteClient/src/main/java/org/example/geteclient/task/RunTask.java
@@ -3,9 +3,11 @@
import lombok.extern.slf4j.Slf4j;
import org.example.geteclient.GeteClientApplication;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
/**
@@ -20,8 +22,16 @@
@Autowired
private ConfigurableApplicationContext context;
- @Scheduled(cron = "0 0/30 * * * ?")
+ @Autowired
+ @Qualifier("threadPoolTaskExecutor")
+ private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+ @Scheduled(cron = "0 0/11 * * * ?")
public void restart() {
+ // 停止当前线程池中的任务
+ threadPoolTaskExecutor.shutdown();
+ // 创建新的线程池
+ threadPoolTaskExecutor.initialize();
Thread restartThread = new Thread(() -> {
try {
SpringApplication.exit(context, () -> 0);
diff --git a/geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java b/geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java
index 3301db2..2c03b74 100644
--- a/geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java
+++ b/geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java
@@ -53,7 +53,7 @@
this.executorService = Executors.newScheduledThreadPool(1); // 创建一个定时任务调度线程池
}
- public void start() throws Exception {
+ public void start() {
try {
connect(); // 尝试连接 WebSocket
if (session == null) { // 如果连接失败,session 仍然为 null
@@ -76,7 +76,6 @@
} catch (Exception e) {
log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常
- throw e;
} finally {
executorService.shutdown(); // 确保定时任务调度服务被关闭
}
@@ -153,49 +152,49 @@
@OnClose
public void onClose() throws Exception {
log.info("gate ws 连接已关闭,尝试重新连接..."); // 连接关闭日志
- throw new Exception();
+ handleConnectionClosedOrError();
}
@OnError
public void onError(Throwable throwable) throws Exception {
log.error("gate ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误日志
- throw new Exception();
+ handleConnectionClosedOrError();
}
-// private void handleConnectionClosedOrError() {
-// synchronized (lock) { // 进入同步块以防止并发重连
-// if (!reconnecting) { // 检查当前是否已经在重连
-// reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
-// executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
-// }
-// }
-// }
-//
-// private void attemptReconnect() {
-// boolean doReconnect = true; // 是否进行重连的标志
-// try {
-// log.info("gate ws 开始重连"); // 开始重连日志
-// connect(); // 尝试重新连接
-// log.info("gate ws 重连成功"); // 重连成功日志
-// } catch (Exception e) {
-// log.error("gate ws 重连失败", e); // 重连失败记录日志
-// doReconnect = false; // 标记不再继续重连
-// } finally {
-// synchronized (lock) { // 进入同步块
-// if (doReconnect) {
-// scheduleReconnect(); // 如果需要继续重连,调度重连任务
-// } else {
-// reconnecting = false; // 重连结束,标记重连状态为 false
-// }
-// }
-// }
-// }
-//
-// private void scheduleReconnect() {
-// if (!executorService.isShutdown()) { // 确保调度服务未关闭
-// executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接
-// }
-// }
+ private void handleConnectionClosedOrError() {
+ synchronized (lock) { // 进入同步块以防止并发重连
+ if (!reconnecting) { // 检查当前是否已经在重连
+ reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+ executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
+ }
+ }
+ }
+
+ private void attemptReconnect() {
+ boolean doReconnect = true; // 是否进行重连的标志
+ try {
+ log.info("gate ws 开始重连"); // 开始重连日志
+ connect(); // 尝试重新连接
+ log.info("gate ws 重连成功"); // 重连成功日志
+ } catch (Exception e) {
+ log.error("gate ws 重连失败", e); // 重连失败记录日志
+ doReconnect = false; // 标记不再继续重连
+ } finally {
+ synchronized (lock) { // 进入同步块
+ if (doReconnect) {
+ scheduleReconnect(); // 如果需要继续重连,调度重连任务
+ } else {
+ reconnecting = false; // 重连结束,标记重连状态为 false
+ }
+ }
+ }
+ }
+
+ private void scheduleReconnect() {
+ if (!executorService.isShutdown()) { // 确保调度服务未关闭
+ executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接
+ }
+ }
private void sendPing() {
try {
diff --git a/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java b/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java
index 4d721e2..9399a58 100644
--- a/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java
@@ -39,9 +39,6 @@
private CurrencySerivceImpl currencyService;
@Autowired
- private ConfigurableApplicationContext context;
-
- @Autowired
@Qualifier("threadPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@@ -63,38 +60,11 @@
// 使用自定义线程池提交任务
threadPoolTaskExecutor.execute(() -> {
- try {
- new KucoinClient(sublist,token).start();
- } catch (Exception e) {
- run();
- }
+ new KucoinClient(sublist,token).start();
});
}
}
- }
-
- private boolean runExecuted = false;
- private synchronized void run() {
-
- if (runExecuted) {
- return; // 已经执行过,直接返回
- }
- runExecuted = true;
- log.info("ws 异常开始重启");
- Thread restartThread = new Thread(() -> {
- try {
- SpringApplication.exit(context, () -> 0);
- SpringApplication.run(KucoinClientApplication.class);
- log.info("ws 重启成功");
- } catch (Exception e) {
- e.printStackTrace();
- log.error("ws 重启失败");
- }
- });
- restartThread.setDaemon(false);
- restartThread.start();
- log.info("ws 重启失败");
}
public static String doPost() throws Exception {
diff --git a/kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java b/kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java
index cf9752b..a3c686e 100644
--- a/kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java
@@ -3,9 +3,11 @@
import lombok.extern.slf4j.Slf4j;
import org.example.kucoinclient.KucoinClientApplication;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
/**
@@ -20,8 +22,17 @@
@Autowired
private ConfigurableApplicationContext context;
- @Scheduled(cron = "0 0/30 * * * ?")
+ @Autowired
+ @Qualifier("threadPoolTaskExecutor")
+ private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+ @Scheduled(cron = "0 0/1 * * * ?")
public void restart() {
+ // 停止当前线程池中的任务
+ threadPoolTaskExecutor.shutdown();
+ // 创建新的线程池
+ threadPoolTaskExecutor.initialize();
+
Thread restartThread = new Thread(() -> {
try {
SpringApplication.exit(context, () -> 0);
diff --git a/kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java b/kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java
index e5042fa..58f6eda 100644
--- a/kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java
@@ -62,7 +62,7 @@
this.executorService = Executors.newScheduledThreadPool(1); // 创建调度线程池
}
- public void start() throws Exception {
+ public void start() {
try {
connect(); // 连接到 WebSocket 服务器
if (session == null) {
@@ -79,7 +79,6 @@
} catch (Exception e) {
log.error("kucoin ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常
- throw e;
} finally {
executorService.shutdown(); // 关闭调度线程池
}
@@ -199,44 +198,44 @@
@OnClose
public void onClose() throws Exception {
log.info("kucoin ws 连接已关闭,尝试重新连接..."); // 输出连接关闭日志
- throw new Exception();
+ handleConnectionClosedOrError();
}
@OnError
public void onError(Throwable throwable) throws Exception {
log.error("kucoin ws 发生错误: ", throwable); // 输出错误日志
- throw new Exception();
+ handleConnectionClosedOrError();
}
-// private void handleConnectionClosedOrError() {
-// synchronized (lock) { // 同步块,确保线程安全
-// if (!reconnecting) {
-// reconnecting = true; // 状态标记为重连中
-// executorService.execute(this::attemptReconnect); // 执行重连操作
-// }
-// }
-// }
-//
-// private void attemptReconnect() {
-// try {
-// log.info("kucoin ws 开始重连"); // 输出重连开始日志
-// connect(); // 尝试重新连接
-// log.info("kucoin ws 重连成功"); // 输出重连成功日志
-// } catch (Exception e) {
-// log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志
-// } finally {
-// synchronized (lock) { // 同步块
-// reconnecting = false; // 状态标记为未重连
-// scheduleReconnect(); // 重新调度重连任务
-// }
-// }
-// }
-//
-// private void scheduleReconnect() {
-// if (!executorService.isShutdown()) { // 检查线程池是否已经关闭
-// executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连
-// }
-// }
+ private void handleConnectionClosedOrError() {
+ synchronized (lock) { // 同步块,确保线程安全
+ if (!reconnecting) {
+ reconnecting = true; // 状态标记为重连中
+ executorService.execute(this::attemptReconnect); // 执行重连操作
+ }
+ }
+ }
+
+ private void attemptReconnect() {
+ try {
+ log.info("kucoin ws 开始重连"); // 输出重连开始日志
+ connect(); // 尝试重新连接
+ log.info("kucoin ws 重连成功"); // 输出重连成功日志
+ } catch (Exception e) {
+ log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志
+ } finally {
+ synchronized (lock) { // 同步块
+ reconnecting = false; // 状态标记为未重连
+ scheduleReconnect(); // 重新调度重连任务
+ }
+ }
+ }
+
+ private void scheduleReconnect() {
+ if (!executorService.isShutdown()) { // 检查线程池是否已经关闭
+ executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连
+ }
+ }
private void sendPing() {
try {
diff --git a/mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java b/mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java
index f2f504e..0340831 100644
--- a/mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java
+++ b/mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java
@@ -48,9 +48,6 @@
private CurrencySerivceImpl currencyService;
@Autowired
- private ConfigurableApplicationContext context;
-
- @Autowired
@Qualifier("threadPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@@ -69,38 +66,11 @@
// 使用自定义线程池提交任务
threadPoolTaskExecutor.execute(() -> {
- try {
- new MexcClient(sublist).start();
- } catch (Exception e) {
- run();
- }
+ new MexcClient(sublist).start();
});
}
}
- }
-
- private boolean runExecuted = false;
- private synchronized void run() {
-
- if (runExecuted) {
- return; // 已经执行过,直接返回
- }
- runExecuted = true;
- log.info("ws 异常开始重启");
- Thread restartThread = new Thread(() -> {
- try {
- SpringApplication.exit(context, () -> 0);
- SpringApplication.run(MexcClientApplication.class);
- log.info("ws 重启成功");
- } catch (Exception e) {
- e.printStackTrace();
- log.error("ws 重启失败");
- }
- });
- restartThread.setDaemon(false);
- restartThread.start();
- log.info("ws 重启失败");
}
}
diff --git a/mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java b/mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java
index b01d95a..5febd82 100644
--- a/mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java
+++ b/mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java
@@ -3,9 +3,11 @@
import lombok.extern.slf4j.Slf4j;
import org.example.mexcclient.MexcClientApplication;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
/**
@@ -20,8 +22,16 @@
@Autowired
private ConfigurableApplicationContext context;
- @Scheduled(cron = "0 0/30 * * * ?")
+ @Autowired
+ @Qualifier("threadPoolTaskExecutor")
+ private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+ @Scheduled(cron = "0 0/11 * * * ?")
public void restart() {
+ // 停止当前线程池中的任务
+ threadPoolTaskExecutor.shutdown();
+ // 创建新的线程池
+ threadPoolTaskExecutor.initialize();
Thread restartThread = new Thread(() -> {
try {
SpringApplication.exit(context, () -> 0);
diff --git a/mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java b/mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java
index 8f2a2c1..3d1f18e 100644
--- a/mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java
+++ b/mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java
@@ -49,7 +49,7 @@
this.executorService = Executors.newScheduledThreadPool(1);
}
- public void start() throws Exception {
+ public void start() {
try {
connect();
if (session == null) {
@@ -71,7 +71,6 @@
} catch (Exception e) {
log.error("mexc ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常
- throw e;
} finally {
executorService.shutdown();
}
@@ -146,50 +145,50 @@
@OnClose
public void onClose() throws Exception {
log.info("mexc ws 连接已关闭,尝试重新连接...");
- throw new Exception();
+ handleConnectionClosedOrError();
}
@OnError
public void onError(Throwable throwable) throws Exception {
log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable);
- throw new Exception();
+ handleConnectionClosedOrError();
}
-// private void handleConnectionClosedOrError() {
-// synchronized (lock) {
-// if (!reconnecting) {
-// reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
-// executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
-// }
-// }
-// }
-//
-// private void attemptReconnect() {
-// boolean doReconnect = true;
-// try {
-// log.info("mexc ws 开始重连");
-// connect(); // 假设 connect() 方法用于实际的连接逻辑
-// log.info("mexc ws 重连成功");
-// } catch (Exception e) {
-// log.error("mexc ws 重连失败", e);
-// // 连接失败时,可以根据具体情况决定是否继续重连
-// // 在这里假设总是继续尝试重连
-// } finally {
-// synchronized (lock) {
-// if (doReconnect) {
-// scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
-// } else {
-// reconnecting = false; // 重连结束后设置 reconnecting 为 false
-// }
-// }
-// }
-// }
-//
-// private void scheduleReconnect() {
-// if (!executorService.isShutdown()) {
-// executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
-// }
-// }
+ private void handleConnectionClosedOrError() {
+ synchronized (lock) {
+ if (!reconnecting) {
+ reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+ executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
+ }
+ }
+ }
+
+ private void attemptReconnect() {
+ boolean doReconnect = true;
+ try {
+ log.info("mexc ws 开始重连");
+ connect(); // 假设 connect() 方法用于实际的连接逻辑
+ log.info("mexc ws 重连成功");
+ } catch (Exception e) {
+ log.error("mexc ws 重连失败", e);
+ // 连接失败时,可以根据具体情况决定是否继续重连
+ // 在这里假设总是继续尝试重连
+ } finally {
+ synchronized (lock) {
+ if (doReconnect) {
+ scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
+ } else {
+ reconnecting = false; // 重连结束后设置 reconnecting 为 false
+ }
+ }
+ }
+ }
+
+ private void scheduleReconnect() {
+ if (!executorService.isShutdown()) {
+ executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
+ }
+ }
private void sendPing() {
try {
--
Gitblit v1.9.3