bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java
@@ -38,9 +38,6 @@ private CurrencySerivceImpl currencyService; @Autowired private ConfigurableApplicationContext context; @Autowired @Qualifier("threadPoolTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @@ -59,38 +56,12 @@ List<Currency> sublist = mexc.subList(fromIndex, toIndex); // 切分子列表 String parameter = getParameter(sublist); // 获取参数 threadPoolTaskExecutor.execute(() -> { try { new BitgetClient(parameter).start(); } catch (Exception e) { run(); } new BitgetClient(parameter).start(); }); } } } private boolean runExecuted = false; private synchronized void run() { if (runExecuted) { return; // 已经执行过,直接返回 } runExecuted = true; log.info("ws 异常开始重启"); Thread restartThread = new Thread(() -> { try { SpringApplication.exit(context, () -> 0); SpringApplication.run(BitgetsClientApplication.class); log.info("ws 重启成功"); } catch (Exception e) { e.printStackTrace(); log.error("ws 重启失败"); } }); restartThread.setDaemon(false); restartThread.start(); log.info("ws 重启失败"); } public String getParameter(List<Currency> list) throws JsonProcessingException, JSONException { bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java
@@ -3,9 +3,11 @@ import lombok.extern.slf4j.Slf4j; import org.example.bitgetsclient.BitgetsClientApplication; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import org.springframework.web.servlet.function.ServerResponse; @@ -23,8 +25,17 @@ @Autowired private ConfigurableApplicationContext context; @Scheduled(cron = "0 0/30 * * * ?") @Autowired @Qualifier("threadPoolTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Scheduled(cron = "0 0/11 * * * ?") public void restart() { // 停止当前线程池中的任务 threadPoolTaskExecutor.shutdown(); // 创建新的线程池 threadPoolTaskExecutor.initialize(); Thread restartThread = new Thread(() -> { try { SpringApplication.exit(context, () -> 0); bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java
@@ -63,7 +63,7 @@ public void start() throws Exception { public void start() { try { connect(); // 尝试连接 if (session == null) { @@ -84,7 +84,6 @@ log.error("线程被中断: " + e.getMessage(), e); // 记录线程中断的错误 } catch (Exception e) { log.error("bitget ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常 throw e; } finally { executorService.shutdownNow(); // 尝试立即关闭调度服务 } @@ -175,52 +174,51 @@ } @OnClose public void onClose() throws Exception { public void onClose() { log.info("bitget ws 连接已关闭,尝试重新连接..."); // 记录连接关闭的信息 throw new Exception(); handleConnectionClosedOrError(); } @OnError public void onError(Throwable throwable) throws Exception { public void onError(Throwable throwable) { log.error("bitget ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误信息 throw new Exception(); handleConnectionClosedOrError(); } // private void handleConnectionClosedOrError() { // executorService.execute(() -> { // try { // attemptReconnect(); // } catch (Exception e) { // throw new RuntimeException(e); // } // }); // } private void handleConnectionClosedOrError() { executorService.execute(() -> { try { attemptReconnect(); } catch (Exception e) { throw new RuntimeException(e); } }); } // private void attemptReconnect() throws Exception { // if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制 // try { // log.info("bitget ws 开始重连"); // 记录开始重连的信息 // connect(); // 尝试重连 // log.info("bitget ws 重连成功"); // 成功重连的日志 // reconnectAttempts = 0; // 重连成功,重置重连次数 // } catch (Exception e) { // reconnectAttempts++; // 增加重连尝试次数 // log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息 // // 采用指数退避策略,增加重连间隔 // long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒 // scheduleReconnect(waitTime); // 调度下次重连 // } // } else { // log.error("超过最大重连次数,停止重连"); // 超过最大重连次数 // throw new Exception(); // } // } // // private void scheduleReconnect(long waitTime) { // 接受等待时间参数 // if (!executorService.isShutdown()) { // executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连 // } // } private void attemptReconnect() { if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制 try { log.info("bitget ws 开始重连"); // 记录开始重连的信息 connect(); // 尝试重连 log.info("bitget ws 重连成功"); // 成功重连的日志 reconnectAttempts = 0; // 重连成功,重置重连次数 } catch (Exception e) { reconnectAttempts++; // 增加重连尝试次数 log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息 // 采用指数退避策略,增加重连间隔 long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒 scheduleReconnect(waitTime); // 调度下次重连 } } else { log.error("超过最大重连次数,停止重连"); // 超过最大重连次数 } } private void scheduleReconnect(long waitTime) { // 接受等待时间参数 if (!executorService.isShutdown()) { executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连 } } private void sendPing() { try { geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java
@@ -30,9 +30,6 @@ private CurrencySerivceImpl currencyService; @Autowired private ConfigurableApplicationContext context; @Autowired @Qualifier("threadPoolTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @@ -51,38 +48,11 @@ // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(() -> { try { new GateClient(sublist).start(); } catch (Exception e) { run(); } new GateClient(sublist).start(); }); } } } private boolean runExecuted = false; private synchronized void run() { if (runExecuted) { return; // 已经执行过,直接返回 } runExecuted = true; log.info("ws 异常开始重启"); Thread restartThread = new Thread(() -> { try { SpringApplication.exit(context, () -> 0); SpringApplication.run(GeteClientApplication.class); log.info("ws 重启成功"); } catch (Exception e) { e.printStackTrace(); log.error("ws 重启失败"); } }); restartThread.setDaemon(false); restartThread.start(); log.info("ws 重启失败"); } } geteClient/src/main/java/org/example/geteclient/task/RunTask.java
@@ -3,9 +3,11 @@ import lombok.extern.slf4j.Slf4j; import org.example.geteclient.GeteClientApplication; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; /** @@ -20,8 +22,16 @@ @Autowired private ConfigurableApplicationContext context; @Scheduled(cron = "0 0/30 * * * ?") @Autowired @Qualifier("threadPoolTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Scheduled(cron = "0 0/11 * * * ?") public void restart() { // 停止当前线程池中的任务 threadPoolTaskExecutor.shutdown(); // 创建新的线程池 threadPoolTaskExecutor.initialize(); Thread restartThread = new Thread(() -> { try { SpringApplication.exit(context, () -> 0); geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java
@@ -53,7 +53,7 @@ this.executorService = Executors.newScheduledThreadPool(1); // 创建一个定时任务调度线程池 } public void start() throws Exception { public void start() { try { connect(); // 尝试连接 WebSocket if (session == null) { // 如果连接失败,session 仍然为 null @@ -76,7 +76,6 @@ } catch (Exception e) { log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常 throw e; } finally { executorService.shutdown(); // 确保定时任务调度服务被关闭 } @@ -153,49 +152,49 @@ @OnClose public void onClose() throws Exception { log.info("gate ws 连接已关闭,尝试重新连接..."); // 连接关闭日志 throw new Exception(); handleConnectionClosedOrError(); } @OnError public void onError(Throwable throwable) throws Exception { log.error("gate ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误日志 throw new Exception(); handleConnectionClosedOrError(); } // private void handleConnectionClosedOrError() { // synchronized (lock) { // 进入同步块以防止并发重连 // if (!reconnecting) { // 检查当前是否已经在重连 // reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 // executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 // } // } // } // // private void attemptReconnect() { // boolean doReconnect = true; // 是否进行重连的标志 // try { // log.info("gate ws 开始重连"); // 开始重连日志 // connect(); // 尝试重新连接 // log.info("gate ws 重连成功"); // 重连成功日志 // } catch (Exception e) { // log.error("gate ws 重连失败", e); // 重连失败记录日志 // doReconnect = false; // 标记不再继续重连 // } finally { // synchronized (lock) { // 进入同步块 // if (doReconnect) { // scheduleReconnect(); // 如果需要继续重连,调度重连任务 // } else { // reconnecting = false; // 重连结束,标记重连状态为 false // } // } // } // } // // private void scheduleReconnect() { // if (!executorService.isShutdown()) { // 确保调度服务未关闭 // executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接 // } // } private void handleConnectionClosedOrError() { synchronized (lock) { // 进入同步块以防止并发重连 if (!reconnecting) { // 检查当前是否已经在重连 reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 } } } private void attemptReconnect() { boolean doReconnect = true; // 是否进行重连的标志 try { log.info("gate ws 开始重连"); // 开始重连日志 connect(); // 尝试重新连接 log.info("gate ws 重连成功"); // 重连成功日志 } catch (Exception e) { log.error("gate ws 重连失败", e); // 重连失败记录日志 doReconnect = false; // 标记不再继续重连 } finally { synchronized (lock) { // 进入同步块 if (doReconnect) { scheduleReconnect(); // 如果需要继续重连,调度重连任务 } else { reconnecting = false; // 重连结束,标记重连状态为 false } } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { // 确保调度服务未关闭 executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接 } } private void sendPing() { try { kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java
@@ -39,9 +39,6 @@ private CurrencySerivceImpl currencyService; @Autowired private ConfigurableApplicationContext context; @Autowired @Qualifier("threadPoolTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @@ -63,38 +60,11 @@ // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(() -> { try { new KucoinClient(sublist,token).start(); } catch (Exception e) { run(); } new KucoinClient(sublist,token).start(); }); } } } private boolean runExecuted = false; private synchronized void run() { if (runExecuted) { return; // 已经执行过,直接返回 } runExecuted = true; log.info("ws 异常开始重启"); Thread restartThread = new Thread(() -> { try { SpringApplication.exit(context, () -> 0); SpringApplication.run(KucoinClientApplication.class); log.info("ws 重启成功"); } catch (Exception e) { e.printStackTrace(); log.error("ws 重启失败"); } }); restartThread.setDaemon(false); restartThread.start(); log.info("ws 重启失败"); } public static String doPost() throws Exception { kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java
@@ -3,9 +3,11 @@ import lombok.extern.slf4j.Slf4j; import org.example.kucoinclient.KucoinClientApplication; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; /** @@ -20,8 +22,17 @@ @Autowired private ConfigurableApplicationContext context; @Scheduled(cron = "0 0/30 * * * ?") @Autowired @Qualifier("threadPoolTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Scheduled(cron = "0 0/1 * * * ?") public void restart() { // 停止当前线程池中的任务 threadPoolTaskExecutor.shutdown(); // 创建新的线程池 threadPoolTaskExecutor.initialize(); Thread restartThread = new Thread(() -> { try { SpringApplication.exit(context, () -> 0); kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java
@@ -62,7 +62,7 @@ this.executorService = Executors.newScheduledThreadPool(1); // 创建调度线程池 } public void start() throws Exception { public void start() { try { connect(); // 连接到 WebSocket 服务器 if (session == null) { @@ -79,7 +79,6 @@ } catch (Exception e) { log.error("kucoin ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常 throw e; } finally { executorService.shutdown(); // 关闭调度线程池 } @@ -199,44 +198,44 @@ @OnClose public void onClose() throws Exception { log.info("kucoin ws 连接已关闭,尝试重新连接..."); // 输出连接关闭日志 throw new Exception(); handleConnectionClosedOrError(); } @OnError public void onError(Throwable throwable) throws Exception { log.error("kucoin ws 发生错误: ", throwable); // 输出错误日志 throw new Exception(); handleConnectionClosedOrError(); } // private void handleConnectionClosedOrError() { // synchronized (lock) { // 同步块,确保线程安全 // if (!reconnecting) { // reconnecting = true; // 状态标记为重连中 // executorService.execute(this::attemptReconnect); // 执行重连操作 // } // } // } // // private void attemptReconnect() { // try { // log.info("kucoin ws 开始重连"); // 输出重连开始日志 // connect(); // 尝试重新连接 // log.info("kucoin ws 重连成功"); // 输出重连成功日志 // } catch (Exception e) { // log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志 // } finally { // synchronized (lock) { // 同步块 // reconnecting = false; // 状态标记为未重连 // scheduleReconnect(); // 重新调度重连任务 // } // } // } // // private void scheduleReconnect() { // if (!executorService.isShutdown()) { // 检查线程池是否已经关闭 // executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连 // } // } private void handleConnectionClosedOrError() { synchronized (lock) { // 同步块,确保线程安全 if (!reconnecting) { reconnecting = true; // 状态标记为重连中 executorService.execute(this::attemptReconnect); // 执行重连操作 } } } private void attemptReconnect() { try { log.info("kucoin ws 开始重连"); // 输出重连开始日志 connect(); // 尝试重新连接 log.info("kucoin ws 重连成功"); // 输出重连成功日志 } catch (Exception e) { log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志 } finally { synchronized (lock) { // 同步块 reconnecting = false; // 状态标记为未重连 scheduleReconnect(); // 重新调度重连任务 } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { // 检查线程池是否已经关闭 executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连 } } private void sendPing() { try { mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java
@@ -48,9 +48,6 @@ private CurrencySerivceImpl currencyService; @Autowired private ConfigurableApplicationContext context; @Autowired @Qualifier("threadPoolTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @@ -69,38 +66,11 @@ // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(() -> { try { new MexcClient(sublist).start(); } catch (Exception e) { run(); } new MexcClient(sublist).start(); }); } } } private boolean runExecuted = false; private synchronized void run() { if (runExecuted) { return; // 已经执行过,直接返回 } runExecuted = true; log.info("ws 异常开始重启"); Thread restartThread = new Thread(() -> { try { SpringApplication.exit(context, () -> 0); SpringApplication.run(MexcClientApplication.class); log.info("ws 重启成功"); } catch (Exception e) { e.printStackTrace(); log.error("ws 重启失败"); } }); restartThread.setDaemon(false); restartThread.start(); log.info("ws 重启失败"); } } mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java
@@ -3,9 +3,11 @@ import lombok.extern.slf4j.Slf4j; import org.example.mexcclient.MexcClientApplication; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; /** @@ -20,8 +22,16 @@ @Autowired private ConfigurableApplicationContext context; @Scheduled(cron = "0 0/30 * * * ?") @Autowired @Qualifier("threadPoolTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Scheduled(cron = "0 0/11 * * * ?") public void restart() { // 停止当前线程池中的任务 threadPoolTaskExecutor.shutdown(); // 创建新的线程池 threadPoolTaskExecutor.initialize(); Thread restartThread = new Thread(() -> { try { SpringApplication.exit(context, () -> 0); mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java
@@ -49,7 +49,7 @@ this.executorService = Executors.newScheduledThreadPool(1); } public void start() throws Exception { public void start() { try { connect(); if (session == null) { @@ -71,7 +71,6 @@ } catch (Exception e) { log.error("mexc ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常 throw e; } finally { executorService.shutdown(); } @@ -146,50 +145,50 @@ @OnClose public void onClose() throws Exception { log.info("mexc ws 连接已关闭,尝试重新连接..."); throw new Exception(); handleConnectionClosedOrError(); } @OnError public void onError(Throwable throwable) throws Exception { log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable); throw new Exception(); handleConnectionClosedOrError(); } // private void handleConnectionClosedOrError() { // synchronized (lock) { // if (!reconnecting) { // reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 // executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 // } // } // } // // private void attemptReconnect() { // boolean doReconnect = true; // try { // log.info("mexc ws 开始重连"); // connect(); // 假设 connect() 方法用于实际的连接逻辑 // log.info("mexc ws 重连成功"); // } catch (Exception e) { // log.error("mexc ws 重连失败", e); // // 连接失败时,可以根据具体情况决定是否继续重连 // // 在这里假设总是继续尝试重连 // } finally { // synchronized (lock) { // if (doReconnect) { // scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务 // } else { // reconnecting = false; // 重连结束后设置 reconnecting 为 false // } // } // } // } // // private void scheduleReconnect() { // if (!executorService.isShutdown()) { // executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // } // } private void handleConnectionClosedOrError() { synchronized (lock) { if (!reconnecting) { reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 } } } private void attemptReconnect() { boolean doReconnect = true; try { log.info("mexc ws 开始重连"); connect(); // 假设 connect() 方法用于实际的连接逻辑 log.info("mexc ws 重连成功"); } catch (Exception e) { log.error("mexc ws 重连失败", e); // 连接失败时,可以根据具体情况决定是否继续重连 // 在这里假设总是继续尝试重连 } finally { synchronized (lock) { if (doReconnect) { scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务 } else { reconnecting = false; // 重连结束后设置 reconnecting 为 false } } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); } } private void sendPing() { try {