From 8ed60795a7c278f9699616f72eb05ce49800ba6f Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Wed, 07 Aug 2024 01:20:50 +0800
Subject: [PATCH] 1
---
geteClient/src/main/java/org/example/geteclient/ThreadConfig/AsyncConfiguration.java | 6
mexcClient/src/main/java/org/example/mexcclient/comm/ApplicationContextProvider.java | 21 +
geteClient/src/main/java/org/example/geteclient/task/RunTask.java | 2
bitgetsClient/src/main/java/org/example/bitgetsclient/comm/ApplicationContextProvider.java | 21 +
bitgetsClient/src/main/java/org/example/bitgetsclient/ThreadConfig/AsyncConfiguration.java | 6
websocketSerivce/src/main/resources/application.properties | 4
bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java | 40 ++
kucoinClient/src/main/java/org/example/kucoinclient/comm/ApplicationContextProvider.java | 21 +
kucoinClient/src/main/java/org/example/kucoinclient/ThreadConfig/AsyncConfiguration.java | 6
/dev/null | 68 -----
mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java | 95 ++++---
geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java | 90 +++---
kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java | 82 +++--
bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java | 91 +++---
geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java | 88 ++++++
kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java | 2
websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java | 13
mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java | 2
mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java | 37 ++
kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java | 45 ++
bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java | 2
geteClient/src/main/java/org/example/geteclient/comm/ApplicationContextProvider.java | 21 +
22 files changed, 496 insertions(+), 267 deletions(-)
diff --git a/bitgetsClient/src/main/java/org/example/bitgetsclient/ThreadConfig/AsyncConfiguration.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/ThreadConfig/AsyncConfiguration.java
index 1748f43..870633d 100644
--- a/bitgetsClient/src/main/java/org/example/bitgetsclient/ThreadConfig/AsyncConfiguration.java
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/ThreadConfig/AsyncConfiguration.java
@@ -18,9 +18,9 @@
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(100); // 核心线程数, 根据需求进行调整
- executor.setMaxPoolSize(150); // 最大线程数, 适当设置以避免资源耗尽
- executor.setQueueCapacity(200); // 队列容量, 适当限制以避免请求堆积
+ executor.setCorePoolSize(20); // 核心线程数, 根据需求进行调整
+ executor.setMaxPoolSize(30); // 最大线程数, 适当设置以避免资源耗尽
+ executor.setQueueCapacity(100); // 队列容量, 适当限制以避免请求堆积
executor.setKeepAliveSeconds(30); // 线程空闲时的存活时间为30秒,减少系统开销
executor.setThreadNamePrefix("Thread-"); // 线程名称的前缀
diff --git a/bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java
index 5cd3cea..1730608 100644
--- a/bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java
@@ -4,12 +4,15 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
+import org.example.bitgetsclient.BitgetsClientApplication;
import org.example.bitgetsclient.pojo.Currency;
import org.example.bitgetsclient.server.impl.CurrencySerivceImpl;
import org.example.bitgetsclient.wsClient.BitgetClient;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -19,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
@@ -32,6 +36,9 @@
@Autowired
private CurrencySerivceImpl currencyService;
+
+ @Autowired
+ private ConfigurableApplicationContext context;
@Autowired
@Qualifier("threadPoolTaskExecutor")
@@ -51,12 +58,41 @@
int toIndex = Math.min(fromIndex + batchSize, totalSize); // 计算结束索引
List<Currency> sublist = mexc.subList(fromIndex, toIndex); // 切分子列表
String parameter = getParameter(sublist); // 获取参数
- // 使用自定义线程池提交任务
- threadPoolTaskExecutor.execute(new BitgetClient(parameter)::start); // 提交到线程池执行
+ threadPoolTaskExecutor.execute(() -> {
+ try {
+ new BitgetClient(parameter).start();
+ } catch (Exception e) {
+ run();
+ }
+ });
+
}
}
}
+ private boolean runExecuted = false;
+ private synchronized void run() {
+
+ if (runExecuted) {
+ return; // 已经执行过,直接返回
+ }
+ runExecuted = true;
+ log.info("ws 异常开始重启");
+ Thread restartThread = new Thread(() -> {
+ try {
+ SpringApplication.exit(context, () -> 0);
+ SpringApplication.run(BitgetsClientApplication.class);
+ log.info("ws 重启成功");
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("ws 重启失败");
+ }
+ });
+ restartThread.setDaemon(false);
+ restartThread.start();
+ log.info("ws 重启失败");
+ }
+
public String getParameter(List<Currency> list) throws JsonProcessingException, JSONException {
// 创建一个ObjectMapper实例
ObjectMapper mapper = new ObjectMapper();
diff --git a/bitgetsClient/src/main/java/org/example/bitgetsclient/comm/ApplicationContextProvider.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/comm/ApplicationContextProvider.java
new file mode 100644
index 0000000..eac9ea3
--- /dev/null
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/comm/ApplicationContextProvider.java
@@ -0,0 +1,21 @@
+package org.example.bitgetsclient.comm;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ApplicationContextProvider implements ApplicationContextAware {
+
+ private static ApplicationContext applicationContext;
+
+ @Override
+ public void setApplicationContext(ApplicationContext context) throws BeansException {
+ applicationContext = context;
+ }
+
+ public static ApplicationContext getApplicationContext() {
+ return applicationContext;
+ }
+}
diff --git a/bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java
index 4b5fe7a..aa0661d 100644
--- a/bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java
@@ -23,7 +23,7 @@
@Autowired
private ConfigurableApplicationContext context;
- @Scheduled(cron = "0 0 */3 * * ?")
+ @Scheduled(cron = "0 0/30 * * * ?")
public void restart() {
Thread restartThread = new Thread(() -> {
try {
diff --git a/bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java
index aea4698..c3cb229 100644
--- a/bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java
@@ -8,9 +8,16 @@
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
+import org.example.bitgetsclient.BitgetsClientApplication;
+import org.example.bitgetsclient.comm.ApplicationContextProvider;
import org.example.bitgetsclient.util.RedisUtil;
import org.json.JSONException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.websocket.*;
@@ -26,6 +33,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
@ClientEndpoint
@Slf4j
@@ -55,7 +63,7 @@
- public void start() {
+ public void start() throws Exception {
try {
connect(); // 尝试连接
if (session == null) {
@@ -67,7 +75,6 @@
executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
// 发送订阅消息
session.getBasicRemote().sendText(subscriptions); // 发送订阅信息
-
synchronized (this) {
this.wait(); // 等待 WebSocket 消息到来
}
@@ -77,6 +84,7 @@
log.error("线程被中断: " + e.getMessage(), e); // 记录线程中断的错误
} catch (Exception e) {
log.error("bitget ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常
+ throw e;
} finally {
executorService.shutdownNow(); // 尝试立即关闭调度服务
}
@@ -109,8 +117,8 @@
if (dataNode != null) {
Map<String, Object> hashMap = new HashMap<>(); // 变量命名更具描述性
- String bids = dataNode.get("bids").toString();
- String asks = dataNode.get("asks").toString();
+ String asks = dataNode.get("bids").toString();
+ String bids = dataNode.get("asks").toString();
Type listType = new TypeToken<List<List<String>>>(){}.getType();
List<List<String>> bidsList = gson.fromJson(bids, listType);
@@ -167,51 +175,52 @@
}
@OnClose
- public void onClose() {
+ public void onClose() throws Exception {
log.info("bitget ws 连接已关闭,尝试重新连接..."); // 记录连接关闭的信息
- handleConnectionClosedOrError(); // 处理连接关闭事件
+ throw new Exception();
}
@OnError
- public void onError(Throwable throwable) {
+ public void onError(Throwable throwable) throws Exception {
log.error("bitget ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误信息
- handleConnectionClosedOrError(); // 处理错误事件
+ throw new Exception();
}
- private void handleConnectionClosedOrError() {
- synchronized (lock) {
- if (!reconnecting) {
- reconnecting = true; // 开始重连
- executorService.execute(this::attemptReconnect); // 执行重连操作
- }
- }
- }
+// private void handleConnectionClosedOrError() {
+// executorService.execute(() -> {
+// try {
+// attemptReconnect();
+// } catch (Exception e) {
+// throw new RuntimeException(e);
+// }
+// });
+// }
- private void attemptReconnect() {
- if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制
- try {
- log.info("bitget ws 开始重连"); // 记录开始重连的信息
- connect(); // 尝试重连
- log.info("bitget ws 重连成功"); // 成功重连的日志
- reconnectAttempts = 0; // 重连成功,重置重连次数
- } catch (Exception e) {
- reconnectAttempts++; // 增加重连尝试次数
- log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息
- // 采用指数退避策略,增加重连间隔
- long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒
- scheduleReconnect(waitTime); // 调度下次重连
- }
- } else {
- log.error("超过最大重连次数,停止重连"); // 超过最大重连次数
- reconnecting = false; // 重连状态重置
- }
- }
-
- private void scheduleReconnect(long waitTime) { // 接受等待时间参数
- if (!executorService.isShutdown()) {
- executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连
- }
- }
+// private void attemptReconnect() throws Exception {
+// if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制
+// try {
+// log.info("bitget ws 开始重连"); // 记录开始重连的信息
+// connect(); // 尝试重连
+// log.info("bitget ws 重连成功"); // 成功重连的日志
+// reconnectAttempts = 0; // 重连成功,重置重连次数
+// } catch (Exception e) {
+// reconnectAttempts++; // 增加重连尝试次数
+// log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息
+// // 采用指数退避策略,增加重连间隔
+// long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒
+// scheduleReconnect(waitTime); // 调度下次重连
+// }
+// } else {
+// log.error("超过最大重连次数,停止重连"); // 超过最大重连次数
+// throw new Exception();
+// }
+// }
+//
+// private void scheduleReconnect(long waitTime) { // 接受等待时间参数
+// if (!executorService.isShutdown()) {
+// executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连
+// }
+// }
private void sendPing() {
try {
diff --git a/geteClient/src/main/java/org/example/geteclient/ThreadConfig/AsyncConfiguration.java b/geteClient/src/main/java/org/example/geteclient/ThreadConfig/AsyncConfiguration.java
index 22a71c3..06a876f 100644
--- a/geteClient/src/main/java/org/example/geteclient/ThreadConfig/AsyncConfiguration.java
+++ b/geteClient/src/main/java/org/example/geteclient/ThreadConfig/AsyncConfiguration.java
@@ -18,9 +18,9 @@
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(100); // 核心线程数, 根据需求进行调整
- executor.setMaxPoolSize(150); // 最大线程数, 适当设置以避免资源耗尽
- executor.setQueueCapacity(200); // 队列容量, 适当限制以避免请求堆积
+ executor.setCorePoolSize(40); // 核心线程数, 根据需求进行调整
+ executor.setMaxPoolSize(50); // 最大线程数, 适当设置以避免资源耗尽
+ executor.setQueueCapacity(400); // 队列容量, 适当限制以避免请求堆积
executor.setKeepAliveSeconds(30); // 线程空闲时的存活时间为30秒,减少系统开销
executor.setThreadNamePrefix("Thread-"); // 线程名称的前缀
diff --git a/geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java b/geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java
new file mode 100644
index 0000000..0e84e1b
--- /dev/null
+++ b/geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java
@@ -0,0 +1,88 @@
+package org.example.geteclient.WsBean;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.example.geteclient.GeteClientApplication;
+import org.example.geteclient.pojo.Currency;
+import org.example.geteclient.server.impl.CurrencySerivceImpl;
+import org.example.geteclient.wsClinet.GateClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+
+/**
+ * @ClassDescription: 客户端请求类
+ * @JdkVersion: 1.8
+ * @Created: 2023/8/31 16:13
+ */
+@Slf4j
+@Configuration
+public class GateWsBean {
+
+ @Autowired
+ private CurrencySerivceImpl currencyService;
+
+ @Autowired
+ private ConfigurableApplicationContext context;
+
+ @Autowired
+ @Qualifier("threadPoolTaskExecutor")
+ private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+ @Bean
+ public void gateWebsocketRunClientMap() {
+ List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
+ if (!CollectionUtils.isEmpty(mexc)) {
+ int batchSize = 100; // 每个线程处理的数据量
+ int totalSize = mexc.size();
+ int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+
+ for (int i = 0; i < threadCount; i++) {
+ int fromIndex = i * batchSize;
+ int toIndex = Math.min(fromIndex + batchSize, totalSize);
+ List<Currency> sublist = mexc.subList(fromIndex, toIndex);
+
+ // 使用自定义线程池提交任务
+ threadPoolTaskExecutor.execute(() -> {
+ try {
+ new GateClient(sublist).start();
+ } catch (Exception e) {
+ run();
+ }
+ });
+ }
+
+ }
+ }
+
+ private boolean runExecuted = false;
+ private synchronized void run() {
+
+ if (runExecuted) {
+ return; // 已经执行过,直接返回
+ }
+ runExecuted = true;
+ log.info("ws 异常开始重启");
+ Thread restartThread = new Thread(() -> {
+ try {
+ SpringApplication.exit(context, () -> 0);
+ SpringApplication.run(GeteClientApplication.class);
+ log.info("ws 重启成功");
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("ws 重启失败");
+ }
+ });
+ restartThread.setDaemon(false);
+ restartThread.start();
+ log.info("ws 重启失败");
+ }
+}
+
diff --git a/geteClient/src/main/java/org/example/geteclient/WsBean/MexcWsBean.java b/geteClient/src/main/java/org/example/geteclient/WsBean/MexcWsBean.java
deleted file mode 100644
index bfa3bff..0000000
--- a/geteClient/src/main/java/org/example/geteclient/WsBean/MexcWsBean.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.example.geteclient.WsBean;
-
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.NameValuePair;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.util.EntityUtils;
-import org.example.geteclient.pojo.Currency;
-import org.example.geteclient.server.impl.CurrencySerivceImpl;
-import org.example.geteclient.wsClinet.GateClient;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.CollectionUtils;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * @ClassDescription: 客户端请求类
- * @JdkVersion: 1.8
- * @Created: 2023/8/31 16:13
- */
-@Slf4j
-@Configuration
-public class MexcWsBean {
-
- @Autowired
- private CurrencySerivceImpl currencyService;
-
- @Autowired
- @Qualifier("threadPoolTaskExecutor")
- private ThreadPoolTaskExecutor threadPoolTaskExecutor;
-
- @Bean
- public void gateWebsocketRunClientMap() {
- List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
- if (!CollectionUtils.isEmpty(mexc)) {
- int batchSize = 100; // 每个线程处理的数据量
- int totalSize = mexc.size();
- int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
-
- for (int i = 0; i < threadCount; i++) {
- int fromIndex = i * batchSize;
- int toIndex = Math.min(fromIndex + batchSize, totalSize);
- List<Currency> sublist = mexc.subList(fromIndex, toIndex);
-
- // 使用自定义线程池提交任务
- threadPoolTaskExecutor.execute(new GateClient(sublist)::start);
- }
-
- }
- }
-}
-
diff --git a/geteClient/src/main/java/org/example/geteclient/comm/ApplicationContextProvider.java b/geteClient/src/main/java/org/example/geteclient/comm/ApplicationContextProvider.java
new file mode 100644
index 0000000..bd3711b
--- /dev/null
+++ b/geteClient/src/main/java/org/example/geteclient/comm/ApplicationContextProvider.java
@@ -0,0 +1,21 @@
+package org.example.geteclient.comm;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ApplicationContextProvider implements ApplicationContextAware {
+
+ private static ApplicationContext applicationContext;
+
+ @Override
+ public void setApplicationContext(ApplicationContext context) throws BeansException {
+ applicationContext = context;
+ }
+
+ public static ApplicationContext getApplicationContext() {
+ return applicationContext;
+ }
+}
diff --git a/geteClient/src/main/java/org/example/geteclient/task/RunTask.java b/geteClient/src/main/java/org/example/geteclient/task/RunTask.java
index dc4e948..1922b9e 100644
--- a/geteClient/src/main/java/org/example/geteclient/task/RunTask.java
+++ b/geteClient/src/main/java/org/example/geteclient/task/RunTask.java
@@ -20,7 +20,7 @@
@Autowired
private ConfigurableApplicationContext context;
- @Scheduled(cron = "0 0 */3 * * ?")
+ @Scheduled(cron = "0 0/30 * * * ?")
public void restart() {
Thread restartThread = new Thread(() -> {
try {
diff --git a/geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java b/geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java
index cf395ba..3301db2 100644
--- a/geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java
+++ b/geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java
@@ -6,9 +6,16 @@
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
+import org.example.geteclient.GeteClientApplication;
+import org.example.geteclient.comm.ApplicationContextProvider;
import org.example.geteclient.pojo.Currency;
import org.example.geteclient.util.RedisUtil;
import org.json.JSONException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.stereotype.Component;
import javax.websocket.*;
import java.math.BigDecimal;
@@ -46,7 +53,7 @@
this.executorService = Executors.newScheduledThreadPool(1); // 创建一个定时任务调度线程池
}
- public void start() {
+ public void start() throws Exception {
try {
connect(); // 尝试连接 WebSocket
if (session == null) { // 如果连接失败,session 仍然为 null
@@ -69,6 +76,7 @@
} catch (Exception e) {
log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常
+ throw e;
} finally {
executorService.shutdown(); // 确保定时任务调度服务被关闭
}
@@ -116,7 +124,7 @@
String[] asksData = dataArray[0];
pvMap.put("p", new BigDecimal(asksData[0]).toPlainString());
pvMap.put("v", new BigDecimal(asksData[1]).toPlainString());
- hashMap.put(BIDS_KEY, pvMap); // 放入 bids 数据
+ hashMap.put(ASKS_KEY, pvMap); // 放入 bids 数据
}
if (bidsObj instanceof List && !((List<?>) bidsObj).isEmpty()) {
@@ -126,7 +134,7 @@
HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
pvMap.put("p", new BigDecimal(bidsData[0]).toPlainString());
pvMap.put("v", new BigDecimal(bidsData[1]).toPlainString());
- hashMap.put(ASKS_KEY,pvMap);
+ hashMap.put(BIDS_KEY,pvMap);
}
String key = "gate" + resultMap.get(S_KEY); // 生成 Redis 键
@@ -143,51 +151,51 @@
@OnClose
- public void onClose() {
+ public void onClose() throws Exception {
log.info("gate ws 连接已关闭,尝试重新连接..."); // 连接关闭日志
- handleConnectionClosedOrError(); // 处理连接关闭或错误
+ throw new Exception();
}
@OnError
- public void onError(Throwable throwable) {
+ public void onError(Throwable throwable) throws Exception {
log.error("gate ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误日志
- handleConnectionClosedOrError(); // 处理连接关闭或错误
+ throw new Exception();
}
- private void handleConnectionClosedOrError() {
- synchronized (lock) { // 进入同步块以防止并发重连
- if (!reconnecting) { // 检查当前是否已经在重连
- reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
- executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
- }
- }
- }
-
- private void attemptReconnect() {
- boolean doReconnect = true; // 是否进行重连的标志
- try {
- log.info("gate ws 开始重连"); // 开始重连日志
- connect(); // 尝试重新连接
- log.info("gate ws 重连成功"); // 重连成功日志
- } catch (Exception e) {
- log.error("gate ws 重连失败", e); // 重连失败记录日志
- doReconnect = false; // 标记不再继续重连
- } finally {
- synchronized (lock) { // 进入同步块
- if (doReconnect) {
- scheduleReconnect(); // 如果需要继续重连,调度重连任务
- } else {
- reconnecting = false; // 重连结束,标记重连状态为 false
- }
- }
- }
- }
-
- private void scheduleReconnect() {
- if (!executorService.isShutdown()) { // 确保调度服务未关闭
- executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接
- }
- }
+// private void handleConnectionClosedOrError() {
+// synchronized (lock) { // 进入同步块以防止并发重连
+// if (!reconnecting) { // 检查当前是否已经在重连
+// reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+// executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
+// }
+// }
+// }
+//
+// private void attemptReconnect() {
+// boolean doReconnect = true; // 是否进行重连的标志
+// try {
+// log.info("gate ws 开始重连"); // 开始重连日志
+// connect(); // 尝试重新连接
+// log.info("gate ws 重连成功"); // 重连成功日志
+// } catch (Exception e) {
+// log.error("gate ws 重连失败", e); // 重连失败记录日志
+// doReconnect = false; // 标记不再继续重连
+// } finally {
+// synchronized (lock) { // 进入同步块
+// if (doReconnect) {
+// scheduleReconnect(); // 如果需要继续重连,调度重连任务
+// } else {
+// reconnecting = false; // 重连结束,标记重连状态为 false
+// }
+// }
+// }
+// }
+//
+// private void scheduleReconnect() {
+// if (!executorService.isShutdown()) { // 确保调度服务未关闭
+// executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接
+// }
+// }
private void sendPing() {
try {
diff --git a/kucoinClient/src/main/java/org/example/kucoinclient/ThreadConfig/AsyncConfiguration.java b/kucoinClient/src/main/java/org/example/kucoinclient/ThreadConfig/AsyncConfiguration.java
index c2105e7..b549313 100644
--- a/kucoinClient/src/main/java/org/example/kucoinclient/ThreadConfig/AsyncConfiguration.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/ThreadConfig/AsyncConfiguration.java
@@ -18,9 +18,9 @@
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(100); // 核心线程数, 根据需求进行调整
- executor.setMaxPoolSize(150); // 最大线程数, 适当设置以避免资源耗尽
- executor.setQueueCapacity(200); // 队列容量, 适当限制以避免请求堆积
+ executor.setCorePoolSize(20); // 核心线程数, 根据需求进行调整
+ executor.setMaxPoolSize(30); // 最大线程数, 适当设置以避免资源耗尽
+ executor.setQueueCapacity(100); // 队列容量, 适当限制以避免请求堆积
executor.setKeepAliveSeconds(30); // 线程空闲时的存活时间为30秒,减少系统开销
executor.setThreadNamePrefix("Thread-"); // 线程名称的前缀
diff --git a/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java b/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java
similarity index 70%
rename from kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java
rename to kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java
index 408184b..4d721e2 100644
--- a/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java
@@ -1,8 +1,6 @@
package org.example.kucoinclient.WsBean;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
@@ -11,23 +9,22 @@
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
+import org.example.kucoinclient.KucoinClientApplication;
import org.example.kucoinclient.pojo.Currency;
import org.example.kucoinclient.server.impl.CurrencySerivceImpl;
import org.example.kucoinclient.wsClient.KucoinClient;
-import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
/**
* @ClassDescription: 客户端请求类
@@ -36,10 +33,13 @@
*/
@Slf4j
@Configuration
-public class MexcWsBean {
+public class KucoinWsBean {
@Autowired
private CurrencySerivceImpl currencyService;
+
+ @Autowired
+ private ConfigurableApplicationContext context;
@Autowired
@Qualifier("threadPoolTaskExecutor")
@@ -62,12 +62,41 @@
List<Currency> sublist = mexc.subList(fromIndex, toIndex);
// 使用自定义线程池提交任务
- threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start);
+ threadPoolTaskExecutor.execute(() -> {
+ try {
+ new KucoinClient(sublist,token).start();
+ } catch (Exception e) {
+ run();
+ }
+ });
}
}
}
+ private boolean runExecuted = false;
+ private synchronized void run() {
+
+ if (runExecuted) {
+ return; // 已经执行过,直接返回
+ }
+ runExecuted = true;
+ log.info("ws 异常开始重启");
+ Thread restartThread = new Thread(() -> {
+ try {
+ SpringApplication.exit(context, () -> 0);
+ SpringApplication.run(KucoinClientApplication.class);
+ log.info("ws 重启成功");
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("ws 重启失败");
+ }
+ });
+ restartThread.setDaemon(false);
+ restartThread.start();
+ log.info("ws 重启失败");
+ }
+
public static String doPost() throws Exception {
String url = "https://api.kucoin.com/api/v1/bullet-public";
HttpPost httpPost = new HttpPost(url);
diff --git a/kucoinClient/src/main/java/org/example/kucoinclient/comm/ApplicationContextProvider.java b/kucoinClient/src/main/java/org/example/kucoinclient/comm/ApplicationContextProvider.java
new file mode 100644
index 0000000..09a9b6f
--- /dev/null
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/comm/ApplicationContextProvider.java
@@ -0,0 +1,21 @@
+package org.example.kucoinclient.comm;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ApplicationContextProvider implements ApplicationContextAware {
+
+ private static ApplicationContext applicationContext;
+
+ @Override
+ public void setApplicationContext(ApplicationContext context) throws BeansException {
+ applicationContext = context;
+ }
+
+ public static ApplicationContext getApplicationContext() {
+ return applicationContext;
+ }
+}
diff --git a/kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java b/kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java
index da81afa..cf9752b 100644
--- a/kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java
@@ -20,7 +20,7 @@
@Autowired
private ConfigurableApplicationContext context;
- @Scheduled(cron = "0 0 */3 * * ?")
+ @Scheduled(cron = "0 0/30 * * * ?")
public void restart() {
Thread restartThread = new Thread(() -> {
try {
diff --git a/kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java b/kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java
index aca5da9..e5042fa 100644
--- a/kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java
@@ -6,9 +6,16 @@
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
+import org.example.kucoinclient.KucoinClientApplication;
+import org.example.kucoinclient.comm.ApplicationContextProvider;
import org.example.kucoinclient.pojo.Currency;
import org.example.kucoinclient.util.RedisUtil;
import org.json.JSONException;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.stereotype.Component;
import javax.websocket.*;
import java.io.UnsupportedEncodingException;
@@ -55,7 +62,7 @@
this.executorService = Executors.newScheduledThreadPool(1); // 创建调度线程池
}
- public void start() {
+ public void start() throws Exception {
try {
connect(); // 连接到 WebSocket 服务器
if (session == null) {
@@ -71,7 +78,8 @@
}
} catch (Exception e) {
- log.error("kucoin ws 连接过程中发生异常: ", e); // 捕获并记录异常
+ log.error("kucoin ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常
+ throw e;
} finally {
executorService.shutdown(); // 关闭调度线程池
}
@@ -145,8 +153,8 @@
// 空值检查,避免存储 null 值到 Redis
if (resultMap.get("bids") != null && resultMap.get("asks") != null) {
- Object bidsObj = resultMap.get("bids");
- Object asksObj = resultMap.get("asks");
+ Object asksObj = resultMap.get("bids");
+ Object bidsObj = resultMap.get("asks");
if(bidsObj instanceof List && !((List<?>) bidsObj).isEmpty() && asksObj instanceof List && !((List<?>) asksObj).isEmpty()){
if (bidsObj instanceof List && !((List<?>) bidsObj).isEmpty()) {
@@ -189,46 +197,46 @@
}
@OnClose
- public void onClose() {
+ public void onClose() throws Exception {
log.info("kucoin ws 连接已关闭,尝试重新连接..."); // 输出连接关闭日志
- handleConnectionClosedOrError(); // 处理连接关闭或错误
+ throw new Exception();
}
@OnError
- public void onError(Throwable throwable) {
+ public void onError(Throwable throwable) throws Exception {
log.error("kucoin ws 发生错误: ", throwable); // 输出错误日志
- handleConnectionClosedOrError(); // 处理连接关闭或错误
+ throw new Exception();
}
- private void handleConnectionClosedOrError() {
- synchronized (lock) { // 同步块,确保线程安全
- if (!reconnecting) {
- reconnecting = true; // 状态标记为重连中
- executorService.execute(this::attemptReconnect); // 执行重连操作
- }
- }
- }
-
- private void attemptReconnect() {
- try {
- log.info("kucoin ws 开始重连"); // 输出重连开始日志
- connect(); // 尝试重新连接
- log.info("kucoin ws 重连成功"); // 输出重连成功日志
- } catch (Exception e) {
- log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志
- } finally {
- synchronized (lock) { // 同步块
- reconnecting = false; // 状态标记为未重连
- scheduleReconnect(); // 重新调度重连任务
- }
- }
- }
-
- private void scheduleReconnect() {
- if (!executorService.isShutdown()) { // 检查线程池是否已经关闭
- executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连
- }
- }
+// private void handleConnectionClosedOrError() {
+// synchronized (lock) { // 同步块,确保线程安全
+// if (!reconnecting) {
+// reconnecting = true; // 状态标记为重连中
+// executorService.execute(this::attemptReconnect); // 执行重连操作
+// }
+// }
+// }
+//
+// private void attemptReconnect() {
+// try {
+// log.info("kucoin ws 开始重连"); // 输出重连开始日志
+// connect(); // 尝试重新连接
+// log.info("kucoin ws 重连成功"); // 输出重连成功日志
+// } catch (Exception e) {
+// log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志
+// } finally {
+// synchronized (lock) { // 同步块
+// reconnecting = false; // 状态标记为未重连
+// scheduleReconnect(); // 重新调度重连任务
+// }
+// }
+// }
+//
+// private void scheduleReconnect() {
+// if (!executorService.isShutdown()) { // 检查线程池是否已经关闭
+// executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连
+// }
+// }
private void sendPing() {
try {
diff --git a/mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java b/mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java
index 3adeebf..f2f504e 100644
--- a/mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java
+++ b/mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java
@@ -14,6 +14,7 @@
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
+import org.example.mexcclient.MexcClientApplication;
import org.example.mexcclient.pojo.Currency;
import org.example.mexcclient.server.impl.CurrencySerivceImpl;
import org.example.mexcclient.wsClient.MexcClient;
@@ -21,6 +22,8 @@
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -45,6 +48,9 @@
private CurrencySerivceImpl currencyService;
@Autowired
+ private ConfigurableApplicationContext context;
+
+ @Autowired
@Qualifier("threadPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@@ -62,10 +68,39 @@
List<Currency> sublist = mexc.subList(fromIndex, toIndex);
// 使用自定义线程池提交任务
- threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
+ threadPoolTaskExecutor.execute(() -> {
+ try {
+ new MexcClient(sublist).start();
+ } catch (Exception e) {
+ run();
+ }
+ });
}
}
}
+
+ private boolean runExecuted = false;
+ private synchronized void run() {
+
+ if (runExecuted) {
+ return; // 已经执行过,直接返回
+ }
+ runExecuted = true;
+ log.info("ws 异常开始重启");
+ Thread restartThread = new Thread(() -> {
+ try {
+ SpringApplication.exit(context, () -> 0);
+ SpringApplication.run(MexcClientApplication.class);
+ log.info("ws 重启成功");
+ } catch (Exception e) {
+ e.printStackTrace();
+ log.error("ws 重启失败");
+ }
+ });
+ restartThread.setDaemon(false);
+ restartThread.start();
+ log.info("ws 重启失败");
+ }
}
diff --git a/mexcClient/src/main/java/org/example/mexcclient/comm/ApplicationContextProvider.java b/mexcClient/src/main/java/org/example/mexcclient/comm/ApplicationContextProvider.java
new file mode 100644
index 0000000..446e40e
--- /dev/null
+++ b/mexcClient/src/main/java/org/example/mexcclient/comm/ApplicationContextProvider.java
@@ -0,0 +1,21 @@
+package org.example.mexcclient.comm;
+
+import org.springframework.beans.BeansException;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+import org.springframework.stereotype.Component;
+
+@Component
+public class ApplicationContextProvider implements ApplicationContextAware {
+
+ private static ApplicationContext applicationContext;
+
+ @Override
+ public void setApplicationContext(ApplicationContext context) throws BeansException {
+ applicationContext = context;
+ }
+
+ public static ApplicationContext getApplicationContext() {
+ return applicationContext;
+ }
+}
diff --git a/mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java b/mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java
index 0152071..b01d95a 100644
--- a/mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java
+++ b/mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java
@@ -20,7 +20,7 @@
@Autowired
private ConfigurableApplicationContext context;
- @Scheduled(cron = "0 0 */3 * * ?")
+ @Scheduled(cron = "0 0/30 * * * ?")
public void restart() {
Thread restartThread = new Thread(() -> {
try {
diff --git a/mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java b/mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java
index 09cf860..8f2a2c1 100644
--- a/mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java
+++ b/mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java
@@ -8,8 +8,15 @@
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
+import org.example.mexcclient.MexcClientApplication;
+import org.example.mexcclient.comm.ApplicationContextProvider;
import org.example.mexcclient.pojo.Currency;
import org.example.mexcclient.util.RedisUtil;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringApplication;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ConfigurableApplicationContext;
+import org.springframework.stereotype.Component;
import javax.websocket.*;
import java.io.IOException;
@@ -25,6 +32,7 @@
@ClientEndpoint
@Slf4j
+@Component
public class MexcClient {
private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws";
private static final long PING_INTERVAL = 20000;
@@ -41,7 +49,7 @@
this.executorService = Executors.newScheduledThreadPool(1);
}
- public void start() {
+ public void start() throws Exception {
try {
connect();
if (session == null) {
@@ -62,7 +70,8 @@
}
} catch (Exception e) {
- log.error("mexc ws 连接过程中发生异常: " + e.getMessage(), e);
+ log.error("mexc ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常
+ throw e;
} finally {
executorService.shutdown();
}
@@ -94,8 +103,8 @@
Object object = map.get("d");
Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
HashMap<String,Object> hashMap = new HashMap<>();
- Object asksObj = resultMap.get("asks");
- Object bidsObj = resultMap.get("bids");
+ Object bidsObj = resultMap.get("asks");
+ Object asksObj = resultMap.get("bids");
Type listType = new TypeToken<List<Map<String,Object>>>(){}.getType();
List<Map<String,Object>> asksList = gson.fromJson(asksObj.toString(), listType);
@@ -135,52 +144,52 @@
}
@OnClose
- public void onClose() {
+ public void onClose() throws Exception {
log.info("mexc ws 连接已关闭,尝试重新连接...");
- handleConnectionClosedOrError();
+ throw new Exception();
}
@OnError
- public void onError(Throwable throwable) {
+ public void onError(Throwable throwable) throws Exception {
log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable);
- handleConnectionClosedOrError();
+ throw new Exception();
}
- private void handleConnectionClosedOrError() {
- synchronized (lock) {
- if (!reconnecting) {
- reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
- executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
- }
- }
- }
-
- private void attemptReconnect() {
- boolean doReconnect = true;
- try {
- log.info("mexc ws 开始重连");
- connect(); // 假设 connect() 方法用于实际的连接逻辑
- log.info("mexc ws 重连成功");
- } catch (Exception e) {
- log.error("mexc ws 重连失败", e);
- // 连接失败时,可以根据具体情况决定是否继续重连
- // 在这里假设总是继续尝试重连
- } finally {
- synchronized (lock) {
- if (doReconnect) {
- scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
- } else {
- reconnecting = false; // 重连结束后设置 reconnecting 为 false
- }
- }
- }
- }
-
- private void scheduleReconnect() {
- if (!executorService.isShutdown()) {
- executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
- }
- }
+// private void handleConnectionClosedOrError() {
+// synchronized (lock) {
+// if (!reconnecting) {
+// reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+// executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
+// }
+// }
+// }
+//
+// private void attemptReconnect() {
+// boolean doReconnect = true;
+// try {
+// log.info("mexc ws 开始重连");
+// connect(); // 假设 connect() 方法用于实际的连接逻辑
+// log.info("mexc ws 重连成功");
+// } catch (Exception e) {
+// log.error("mexc ws 重连失败", e);
+// // 连接失败时,可以根据具体情况决定是否继续重连
+// // 在这里假设总是继续尝试重连
+// } finally {
+// synchronized (lock) {
+// if (doReconnect) {
+// scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
+// } else {
+// reconnecting = false; // 重连结束后设置 reconnecting 为 false
+// }
+// }
+// }
+// }
+//
+// private void scheduleReconnect() {
+// if (!executorService.isShutdown()) {
+// executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
+// }
+// }
private void sendPing() {
try {
diff --git a/websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java b/websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
index 8cf96d9..fc356fb 100644
--- a/websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
+++ b/websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
@@ -244,8 +244,8 @@
marketDataOut.setSellPrice(sellPrice.toPlainString()); // 设置卖出价格
marketDataOut.setBuyNumber(markets1.getBids().getV().toPlainString()); // 设置买入数量
marketDataOut.setSellNumber(markets2.getAsks().getV().toPlainString()); // 设置卖出数量
- marketDataOut.setBuyTotalPrice((markets1.getBids().getP().multiply(markets1.getBids().getV())).setScale(0,RoundingMode.DOWN).toPlainString()); // 设置买入总价
- marketDataOut.setSellTotalPrice((markets2.getAsks().getP().multiply(markets2.getAsks().getV())).setScale(0,RoundingMode.DOWN).toPlainString()); // 设置卖出总价
+ marketDataOut.setBuyTotalPrice((markets1.getBids().getP().multiply(markets1.getBids().getV())).setScale(0, RoundingMode.HALF_UP).toPlainString()); // 设置买入总价
+ marketDataOut.setSellTotalPrice((markets2.getAsks().getP().multiply(markets2.getAsks().getV())).setScale(0,RoundingMode.HALF_UP).toPlainString()); // 设置卖出总价
marketDataOut.setServceTime(formattedDateTime); // 设置服务时间
marketDataOut.setBuyAndSell(marketDataOut.getBaseAsset()+marketDataOut.getBuyingPlatform()+marketDataOut.getSellPlatform());
marketDataOuts.add(marketDataOut); // 添加到输出列表
@@ -267,17 +267,8 @@
}
public void quotationCalculation(){
- long startExtracted = System.nanoTime();
extracted();
- long endExtracted = System.nanoTime();
- double executionTimeExtracted = (endExtracted - startExtracted) / 1e9; // 转换为秒
- System.out.println("extracted 方法执行时间: " + executionTimeExtracted + " 秒");
-
- long startFindPairs = System.nanoTime();
findProfitablePairs(mexcList, gateList, bitgetList, kucoinList); // 请确保这些变量有定义和赋值
- long endFindPairs = System.nanoTime();
- double executionTimeFindPairs = (endFindPairs - startFindPairs) / 1e9; // 转换为秒
- System.out.println("findProfitablePairs 方法执行时间: " + executionTimeFindPairs + " 秒");
}
public void scheduler(){
diff --git a/websocketSerivce/src/main/resources/application.properties b/websocketSerivce/src/main/resources/application.properties
index 94a8aa8..235f969 100644
--- a/websocketSerivce/src/main/resources/application.properties
+++ b/websocketSerivce/src/main/resources/application.properties
@@ -1,6 +1,6 @@
-#XDB_PATH=/www/wwwroot/csdn-ip2region.xdb
-XDB_PATH=F:/project/marketData/websocketSerivce/src/main/resources/ip/csdn-ip2region.xdb
+XDB_PATH=/www/wwwroot/csdn-ip2region.xdb
+#XDB_PATH=F:/project/marketData/websocketSerivce/src/main/resources/ip/csdn-ip2region.xdb
redis1.ip=localhost
redis1.port=6379
--
Gitblit v1.9.3