bitgetsClient/src/main/java/org/example/bitgetsclient/ThreadConfig/AsyncConfiguration.java
@@ -18,9 +18,9 @@ public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(100); // 核心线程数, 根据需求进行调整 executor.setMaxPoolSize(150); // 最大线程数, 适当设置以避免资源耗尽 executor.setQueueCapacity(200); // 队列容量, 适当限制以避免请求堆积 executor.setCorePoolSize(20); // 核心线程数, 根据需求进行调整 executor.setMaxPoolSize(30); // 最大线程数, 适当设置以避免资源耗尽 executor.setQueueCapacity(100); // 队列容量, 适当限制以避免请求堆积 executor.setKeepAliveSeconds(30); // 线程空闲时的存活时间为30秒,减少系统开销 executor.setThreadNamePrefix("Thread-"); // 线程名称的前缀 bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java
@@ -4,12 +4,15 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.example.bitgetsclient.BitgetsClientApplication; import org.example.bitgetsclient.pojo.Currency; import org.example.bitgetsclient.server.impl.CurrencySerivceImpl; import org.example.bitgetsclient.wsClient.BitgetClient; import org.json.JSONException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -19,6 +22,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; /** @@ -32,6 +36,9 @@ @Autowired private CurrencySerivceImpl currencyService; @Autowired private ConfigurableApplicationContext context; @Autowired @Qualifier("threadPoolTaskExecutor") @@ -51,12 +58,41 @@ int toIndex = Math.min(fromIndex + batchSize, totalSize); // 计算结束索引 List<Currency> sublist = mexc.subList(fromIndex, toIndex); // 切分子列表 String parameter = getParameter(sublist); // 获取参数 // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(new BitgetClient(parameter)::start); // 提交到线程池执行 threadPoolTaskExecutor.execute(() -> { try { new BitgetClient(parameter).start(); } catch (Exception e) { run(); } }); } } } private boolean runExecuted = false; private synchronized void run() { if (runExecuted) { return; // 已经执行过,直接返回 } runExecuted = true; log.info("ws 异常开始重启"); Thread restartThread = new Thread(() -> { try { SpringApplication.exit(context, () -> 0); SpringApplication.run(BitgetsClientApplication.class); log.info("ws 重启成功"); } catch (Exception e) { e.printStackTrace(); log.error("ws 重启失败"); } }); restartThread.setDaemon(false); restartThread.start(); log.info("ws 重启失败"); } public String getParameter(List<Currency> list) throws JsonProcessingException, JSONException { // 创建一个ObjectMapper实例 ObjectMapper mapper = new ObjectMapper(); bitgetsClient/src/main/java/org/example/bitgetsclient/comm/ApplicationContextProvider.java
New file @@ -0,0 +1,21 @@ package org.example.bitgetsclient.comm; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; @Component public class ApplicationContextProvider implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext context) throws BeansException { applicationContext = context; } public static ApplicationContext getApplicationContext() { return applicationContext; } } bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java
@@ -23,7 +23,7 @@ @Autowired private ConfigurableApplicationContext context; @Scheduled(cron = "0 0 */3 * * ?") @Scheduled(cron = "0 0/30 * * * ?") public void restart() { Thread restartThread = new Thread(() -> { try { bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java
@@ -8,9 +8,16 @@ import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.bitgetsclient.BitgetsClientApplication; import org.example.bitgetsclient.comm.ApplicationContextProvider; import org.example.bitgetsclient.util.RedisUtil; import org.json.JSONException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import javax.websocket.*; @@ -26,6 +33,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @ClientEndpoint @Slf4j @@ -55,7 +63,7 @@ public void start() { public void start() throws Exception { try { connect(); // 尝试连接 if (session == null) { @@ -67,7 +75,6 @@ executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS); // 发送订阅消息 session.getBasicRemote().sendText(subscriptions); // 发送订阅信息 synchronized (this) { this.wait(); // 等待 WebSocket 消息到来 } @@ -77,6 +84,7 @@ log.error("线程被中断: " + e.getMessage(), e); // 记录线程中断的错误 } catch (Exception e) { log.error("bitget ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常 throw e; } finally { executorService.shutdownNow(); // 尝试立即关闭调度服务 } @@ -109,8 +117,8 @@ if (dataNode != null) { Map<String, Object> hashMap = new HashMap<>(); // 变量命名更具描述性 String bids = dataNode.get("bids").toString(); String asks = dataNode.get("asks").toString(); String asks = dataNode.get("bids").toString(); String bids = dataNode.get("asks").toString(); Type listType = new TypeToken<List<List<String>>>(){}.getType(); List<List<String>> bidsList = gson.fromJson(bids, listType); @@ -167,51 +175,52 @@ } @OnClose public void onClose() { public void onClose() throws Exception { log.info("bitget ws 连接已关闭,尝试重新连接..."); // 记录连接关闭的信息 handleConnectionClosedOrError(); // 处理连接关闭事件 throw new Exception(); } @OnError public void onError(Throwable throwable) { public void onError(Throwable throwable) throws Exception { log.error("bitget ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误信息 handleConnectionClosedOrError(); // 处理错误事件 throw new Exception(); } private void handleConnectionClosedOrError() { synchronized (lock) { if (!reconnecting) { reconnecting = true; // 开始重连 executorService.execute(this::attemptReconnect); // 执行重连操作 } } } // private void handleConnectionClosedOrError() { // executorService.execute(() -> { // try { // attemptReconnect(); // } catch (Exception e) { // throw new RuntimeException(e); // } // }); // } private void attemptReconnect() { if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制 try { log.info("bitget ws 开始重连"); // 记录开始重连的信息 connect(); // 尝试重连 log.info("bitget ws 重连成功"); // 成功重连的日志 reconnectAttempts = 0; // 重连成功,重置重连次数 } catch (Exception e) { reconnectAttempts++; // 增加重连尝试次数 log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息 // 采用指数退避策略,增加重连间隔 long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒 scheduleReconnect(waitTime); // 调度下次重连 } } else { log.error("超过最大重连次数,停止重连"); // 超过最大重连次数 reconnecting = false; // 重连状态重置 } } private void scheduleReconnect(long waitTime) { // 接受等待时间参数 if (!executorService.isShutdown()) { executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连 } } // private void attemptReconnect() throws Exception { // if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制 // try { // log.info("bitget ws 开始重连"); // 记录开始重连的信息 // connect(); // 尝试重连 // log.info("bitget ws 重连成功"); // 成功重连的日志 // reconnectAttempts = 0; // 重连成功,重置重连次数 // } catch (Exception e) { // reconnectAttempts++; // 增加重连尝试次数 // log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息 // // 采用指数退避策略,增加重连间隔 // long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒 // scheduleReconnect(waitTime); // 调度下次重连 // } // } else { // log.error("超过最大重连次数,停止重连"); // 超过最大重连次数 // throw new Exception(); // } // } // // private void scheduleReconnect(long waitTime) { // 接受等待时间参数 // if (!executorService.isShutdown()) { // executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连 // } // } private void sendPing() { try { geteClient/src/main/java/org/example/geteclient/ThreadConfig/AsyncConfiguration.java
@@ -18,9 +18,9 @@ public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(100); // 核心线程数, 根据需求进行调整 executor.setMaxPoolSize(150); // 最大线程数, 适当设置以避免资源耗尽 executor.setQueueCapacity(200); // 队列容量, 适当限制以避免请求堆积 executor.setCorePoolSize(40); // 核心线程数, 根据需求进行调整 executor.setMaxPoolSize(50); // 最大线程数, 适当设置以避免资源耗尽 executor.setQueueCapacity(400); // 队列容量, 适当限制以避免请求堆积 executor.setKeepAliveSeconds(30); // 线程空闲时的存活时间为30秒,减少系统开销 executor.setThreadNamePrefix("Thread-"); // 线程名称的前缀 geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java
New file @@ -0,0 +1,88 @@ package org.example.geteclient.WsBean; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.example.geteclient.GeteClientApplication; import org.example.geteclient.pojo.Currency; import org.example.geteclient.server.impl.CurrencySerivceImpl; import org.example.geteclient.wsClinet.GateClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.CollectionUtils; import java.util.List; /** * @ClassDescription: 客户端请求类 * @JdkVersion: 1.8 * @Created: 2023/8/31 16:13 */ @Slf4j @Configuration public class GateWsBean { @Autowired private CurrencySerivceImpl currencyService; @Autowired private ConfigurableApplicationContext context; @Autowired @Qualifier("threadPoolTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Bean public void gateWebsocketRunClientMap() { List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate")); if (!CollectionUtils.isEmpty(mexc)) { int batchSize = 100; // 每个线程处理的数据量 int totalSize = mexc.size(); int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 for (int i = 0; i < threadCount; i++) { int fromIndex = i * batchSize; int toIndex = Math.min(fromIndex + batchSize, totalSize); List<Currency> sublist = mexc.subList(fromIndex, toIndex); // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(() -> { try { new GateClient(sublist).start(); } catch (Exception e) { run(); } }); } } } private boolean runExecuted = false; private synchronized void run() { if (runExecuted) { return; // 已经执行过,直接返回 } runExecuted = true; log.info("ws 异常开始重启"); Thread restartThread = new Thread(() -> { try { SpringApplication.exit(context, () -> 0); SpringApplication.run(GeteClientApplication.class); log.info("ws 重启成功"); } catch (Exception e) { e.printStackTrace(); log.error("ws 重启失败"); } }); restartThread.setDaemon(false); restartThread.start(); log.info("ws 重启失败"); } } geteClient/src/main/java/org/example/geteclient/WsBean/MexcWsBean.java
File was deleted geteClient/src/main/java/org/example/geteclient/comm/ApplicationContextProvider.java
New file @@ -0,0 +1,21 @@ package org.example.geteclient.comm; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; @Component public class ApplicationContextProvider implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext context) throws BeansException { applicationContext = context; } public static ApplicationContext getApplicationContext() { return applicationContext; } } geteClient/src/main/java/org/example/geteclient/task/RunTask.java
@@ -20,7 +20,7 @@ @Autowired private ConfigurableApplicationContext context; @Scheduled(cron = "0 0 */3 * * ?") @Scheduled(cron = "0 0/30 * * * ?") public void restart() { Thread restartThread = new Thread(() -> { try { geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java
@@ -6,9 +6,16 @@ import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.geteclient.GeteClientApplication; import org.example.geteclient.comm.ApplicationContextProvider; import org.example.geteclient.pojo.Currency; import org.example.geteclient.util.RedisUtil; import org.json.JSONException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; import javax.websocket.*; import java.math.BigDecimal; @@ -46,7 +53,7 @@ this.executorService = Executors.newScheduledThreadPool(1); // 创建一个定时任务调度线程池 } public void start() { public void start() throws Exception { try { connect(); // 尝试连接 WebSocket if (session == null) { // 如果连接失败,session 仍然为 null @@ -69,6 +76,7 @@ } catch (Exception e) { log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常 throw e; } finally { executorService.shutdown(); // 确保定时任务调度服务被关闭 } @@ -116,7 +124,7 @@ String[] asksData = dataArray[0]; pvMap.put("p", new BigDecimal(asksData[0]).toPlainString()); pvMap.put("v", new BigDecimal(asksData[1]).toPlainString()); hashMap.put(BIDS_KEY, pvMap); // 放入 bids 数据 hashMap.put(ASKS_KEY, pvMap); // 放入 bids 数据 } if (bidsObj instanceof List && !((List<?>) bidsObj).isEmpty()) { @@ -126,7 +134,7 @@ HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks pvMap.put("p", new BigDecimal(bidsData[0]).toPlainString()); pvMap.put("v", new BigDecimal(bidsData[1]).toPlainString()); hashMap.put(ASKS_KEY,pvMap); hashMap.put(BIDS_KEY,pvMap); } String key = "gate" + resultMap.get(S_KEY); // 生成 Redis 键 @@ -143,51 +151,51 @@ @OnClose public void onClose() { public void onClose() throws Exception { log.info("gate ws 连接已关闭,尝试重新连接..."); // 连接关闭日志 handleConnectionClosedOrError(); // 处理连接关闭或错误 throw new Exception(); } @OnError public void onError(Throwable throwable) { public void onError(Throwable throwable) throws Exception { log.error("gate ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误日志 handleConnectionClosedOrError(); // 处理连接关闭或错误 throw new Exception(); } private void handleConnectionClosedOrError() { synchronized (lock) { // 进入同步块以防止并发重连 if (!reconnecting) { // 检查当前是否已经在重连 reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 } } } private void attemptReconnect() { boolean doReconnect = true; // 是否进行重连的标志 try { log.info("gate ws 开始重连"); // 开始重连日志 connect(); // 尝试重新连接 log.info("gate ws 重连成功"); // 重连成功日志 } catch (Exception e) { log.error("gate ws 重连失败", e); // 重连失败记录日志 doReconnect = false; // 标记不再继续重连 } finally { synchronized (lock) { // 进入同步块 if (doReconnect) { scheduleReconnect(); // 如果需要继续重连,调度重连任务 } else { reconnecting = false; // 重连结束,标记重连状态为 false } } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { // 确保调度服务未关闭 executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接 } } // private void handleConnectionClosedOrError() { // synchronized (lock) { // 进入同步块以防止并发重连 // if (!reconnecting) { // 检查当前是否已经在重连 // reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 // executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 // } // } // } // // private void attemptReconnect() { // boolean doReconnect = true; // 是否进行重连的标志 // try { // log.info("gate ws 开始重连"); // 开始重连日志 // connect(); // 尝试重新连接 // log.info("gate ws 重连成功"); // 重连成功日志 // } catch (Exception e) { // log.error("gate ws 重连失败", e); // 重连失败记录日志 // doReconnect = false; // 标记不再继续重连 // } finally { // synchronized (lock) { // 进入同步块 // if (doReconnect) { // scheduleReconnect(); // 如果需要继续重连,调度重连任务 // } else { // reconnecting = false; // 重连结束,标记重连状态为 false // } // } // } // } // // private void scheduleReconnect() { // if (!executorService.isShutdown()) { // 确保调度服务未关闭 // executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接 // } // } private void sendPing() { try { kucoinClient/src/main/java/org/example/kucoinclient/ThreadConfig/AsyncConfiguration.java
@@ -18,9 +18,9 @@ public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(100); // 核心线程数, 根据需求进行调整 executor.setMaxPoolSize(150); // 最大线程数, 适当设置以避免资源耗尽 executor.setQueueCapacity(200); // 队列容量, 适当限制以避免请求堆积 executor.setCorePoolSize(20); // 核心线程数, 根据需求进行调整 executor.setMaxPoolSize(30); // 最大线程数, 适当设置以避免资源耗尽 executor.setQueueCapacity(100); // 队列容量, 适当限制以避免请求堆积 executor.setKeepAliveSeconds(30); // 线程空闲时的存活时间为30秒,减少系统开销 executor.setThreadNamePrefix("Thread-"); // 线程名称的前缀 kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java
File was renamed from kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java @@ -1,8 +1,6 @@ package org.example.kucoinclient.WsBean; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; @@ -11,23 +9,22 @@ import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.util.EntityUtils; import org.example.kucoinclient.KucoinClientApplication; import org.example.kucoinclient.pojo.Currency; import org.example.kucoinclient.server.impl.CurrencySerivceImpl; import org.example.kucoinclient.wsClient.KucoinClient; import org.json.JSONException; import org.json.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * @ClassDescription: 客户端请求类 @@ -36,10 +33,13 @@ */ @Slf4j @Configuration public class MexcWsBean { public class KucoinWsBean { @Autowired private CurrencySerivceImpl currencyService; @Autowired private ConfigurableApplicationContext context; @Autowired @Qualifier("threadPoolTaskExecutor") @@ -62,12 +62,41 @@ List<Currency> sublist = mexc.subList(fromIndex, toIndex); // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start); threadPoolTaskExecutor.execute(() -> { try { new KucoinClient(sublist,token).start(); } catch (Exception e) { run(); } }); } } } private boolean runExecuted = false; private synchronized void run() { if (runExecuted) { return; // 已经执行过,直接返回 } runExecuted = true; log.info("ws 异常开始重启"); Thread restartThread = new Thread(() -> { try { SpringApplication.exit(context, () -> 0); SpringApplication.run(KucoinClientApplication.class); log.info("ws 重启成功"); } catch (Exception e) { e.printStackTrace(); log.error("ws 重启失败"); } }); restartThread.setDaemon(false); restartThread.start(); log.info("ws 重启失败"); } public static String doPost() throws Exception { String url = "https://api.kucoin.com/api/v1/bullet-public"; HttpPost httpPost = new HttpPost(url); kucoinClient/src/main/java/org/example/kucoinclient/comm/ApplicationContextProvider.java
New file @@ -0,0 +1,21 @@ package org.example.kucoinclient.comm; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; @Component public class ApplicationContextProvider implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext context) throws BeansException { applicationContext = context; } public static ApplicationContext getApplicationContext() { return applicationContext; } } kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java
@@ -20,7 +20,7 @@ @Autowired private ConfigurableApplicationContext context; @Scheduled(cron = "0 0 */3 * * ?") @Scheduled(cron = "0 0/30 * * * ?") public void restart() { Thread restartThread = new Thread(() -> { try { kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java
@@ -6,9 +6,16 @@ import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.kucoinclient.KucoinClientApplication; import org.example.kucoinclient.comm.ApplicationContextProvider; import org.example.kucoinclient.pojo.Currency; import org.example.kucoinclient.util.RedisUtil; import org.json.JSONException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; import javax.websocket.*; import java.io.UnsupportedEncodingException; @@ -55,7 +62,7 @@ this.executorService = Executors.newScheduledThreadPool(1); // 创建调度线程池 } public void start() { public void start() throws Exception { try { connect(); // 连接到 WebSocket 服务器 if (session == null) { @@ -71,7 +78,8 @@ } } catch (Exception e) { log.error("kucoin ws 连接过程中发生异常: ", e); // 捕获并记录异常 log.error("kucoin ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常 throw e; } finally { executorService.shutdown(); // 关闭调度线程池 } @@ -145,8 +153,8 @@ // 空值检查,避免存储 null 值到 Redis if (resultMap.get("bids") != null && resultMap.get("asks") != null) { Object bidsObj = resultMap.get("bids"); Object asksObj = resultMap.get("asks"); Object asksObj = resultMap.get("bids"); Object bidsObj = resultMap.get("asks"); if(bidsObj instanceof List && !((List<?>) bidsObj).isEmpty() && asksObj instanceof List && !((List<?>) asksObj).isEmpty()){ if (bidsObj instanceof List && !((List<?>) bidsObj).isEmpty()) { @@ -189,46 +197,46 @@ } @OnClose public void onClose() { public void onClose() throws Exception { log.info("kucoin ws 连接已关闭,尝试重新连接..."); // 输出连接关闭日志 handleConnectionClosedOrError(); // 处理连接关闭或错误 throw new Exception(); } @OnError public void onError(Throwable throwable) { public void onError(Throwable throwable) throws Exception { log.error("kucoin ws 发生错误: ", throwable); // 输出错误日志 handleConnectionClosedOrError(); // 处理连接关闭或错误 throw new Exception(); } private void handleConnectionClosedOrError() { synchronized (lock) { // 同步块,确保线程安全 if (!reconnecting) { reconnecting = true; // 状态标记为重连中 executorService.execute(this::attemptReconnect); // 执行重连操作 } } } private void attemptReconnect() { try { log.info("kucoin ws 开始重连"); // 输出重连开始日志 connect(); // 尝试重新连接 log.info("kucoin ws 重连成功"); // 输出重连成功日志 } catch (Exception e) { log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志 } finally { synchronized (lock) { // 同步块 reconnecting = false; // 状态标记为未重连 scheduleReconnect(); // 重新调度重连任务 } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { // 检查线程池是否已经关闭 executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连 } } // private void handleConnectionClosedOrError() { // synchronized (lock) { // 同步块,确保线程安全 // if (!reconnecting) { // reconnecting = true; // 状态标记为重连中 // executorService.execute(this::attemptReconnect); // 执行重连操作 // } // } // } // // private void attemptReconnect() { // try { // log.info("kucoin ws 开始重连"); // 输出重连开始日志 // connect(); // 尝试重新连接 // log.info("kucoin ws 重连成功"); // 输出重连成功日志 // } catch (Exception e) { // log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志 // } finally { // synchronized (lock) { // 同步块 // reconnecting = false; // 状态标记为未重连 // scheduleReconnect(); // 重新调度重连任务 // } // } // } // // private void scheduleReconnect() { // if (!executorService.isShutdown()) { // 检查线程池是否已经关闭 // executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连 // } // } private void sendPing() { try { mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java
@@ -14,6 +14,7 @@ import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.example.mexcclient.MexcClientApplication; import org.example.mexcclient.pojo.Currency; import org.example.mexcclient.server.impl.CurrencySerivceImpl; import org.example.mexcclient.wsClient.MexcClient; @@ -21,6 +22,8 @@ import org.json.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; @@ -45,6 +48,9 @@ private CurrencySerivceImpl currencyService; @Autowired private ConfigurableApplicationContext context; @Autowired @Qualifier("threadPoolTaskExecutor") private ThreadPoolTaskExecutor threadPoolTaskExecutor; @@ -62,10 +68,39 @@ List<Currency> sublist = mexc.subList(fromIndex, toIndex); // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(new MexcClient(sublist)::start); threadPoolTaskExecutor.execute(() -> { try { new MexcClient(sublist).start(); } catch (Exception e) { run(); } }); } } } private boolean runExecuted = false; private synchronized void run() { if (runExecuted) { return; // 已经执行过,直接返回 } runExecuted = true; log.info("ws 异常开始重启"); Thread restartThread = new Thread(() -> { try { SpringApplication.exit(context, () -> 0); SpringApplication.run(MexcClientApplication.class); log.info("ws 重启成功"); } catch (Exception e) { e.printStackTrace(); log.error("ws 重启失败"); } }); restartThread.setDaemon(false); restartThread.start(); log.info("ws 重启失败"); } } mexcClient/src/main/java/org/example/mexcclient/comm/ApplicationContextProvider.java
New file @@ -0,0 +1,21 @@ package org.example.mexcclient.comm; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; @Component public class ApplicationContextProvider implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext context) throws BeansException { applicationContext = context; } public static ApplicationContext getApplicationContext() { return applicationContext; } } mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java
@@ -20,7 +20,7 @@ @Autowired private ConfigurableApplicationContext context; @Scheduled(cron = "0 0 */3 * * ?") @Scheduled(cron = "0 0/30 * * * ?") public void restart() { Thread restartThread = new Thread(() -> { try { mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java
@@ -8,8 +8,15 @@ import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.mexcclient.MexcClientApplication; import org.example.mexcclient.comm.ApplicationContextProvider; import org.example.mexcclient.pojo.Currency; import org.example.mexcclient.util.RedisUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; import javax.websocket.*; import java.io.IOException; @@ -25,6 +32,7 @@ @ClientEndpoint @Slf4j @Component public class MexcClient { private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; private static final long PING_INTERVAL = 20000; @@ -41,7 +49,7 @@ this.executorService = Executors.newScheduledThreadPool(1); } public void start() { public void start() throws Exception { try { connect(); if (session == null) { @@ -62,7 +70,8 @@ } } catch (Exception e) { log.error("mexc ws 连接过程中发生异常: " + e.getMessage(), e); log.error("mexc ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常 throw e; } finally { executorService.shutdown(); } @@ -94,8 +103,8 @@ Object object = map.get("d"); Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType()); HashMap<String,Object> hashMap = new HashMap<>(); Object asksObj = resultMap.get("asks"); Object bidsObj = resultMap.get("bids"); Object bidsObj = resultMap.get("asks"); Object asksObj = resultMap.get("bids"); Type listType = new TypeToken<List<Map<String,Object>>>(){}.getType(); List<Map<String,Object>> asksList = gson.fromJson(asksObj.toString(), listType); @@ -135,52 +144,52 @@ } @OnClose public void onClose() { public void onClose() throws Exception { log.info("mexc ws 连接已关闭,尝试重新连接..."); handleConnectionClosedOrError(); throw new Exception(); } @OnError public void onError(Throwable throwable) { public void onError(Throwable throwable) throws Exception { log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable); handleConnectionClosedOrError(); throw new Exception(); } private void handleConnectionClosedOrError() { synchronized (lock) { if (!reconnecting) { reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 } } } private void attemptReconnect() { boolean doReconnect = true; try { log.info("mexc ws 开始重连"); connect(); // 假设 connect() 方法用于实际的连接逻辑 log.info("mexc ws 重连成功"); } catch (Exception e) { log.error("mexc ws 重连失败", e); // 连接失败时,可以根据具体情况决定是否继续重连 // 在这里假设总是继续尝试重连 } finally { synchronized (lock) { if (doReconnect) { scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务 } else { reconnecting = false; // 重连结束后设置 reconnecting 为 false } } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); } } // private void handleConnectionClosedOrError() { // synchronized (lock) { // if (!reconnecting) { // reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 // executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 // } // } // } // // private void attemptReconnect() { // boolean doReconnect = true; // try { // log.info("mexc ws 开始重连"); // connect(); // 假设 connect() 方法用于实际的连接逻辑 // log.info("mexc ws 重连成功"); // } catch (Exception e) { // log.error("mexc ws 重连失败", e); // // 连接失败时,可以根据具体情况决定是否继续重连 // // 在这里假设总是继续尝试重连 // } finally { // synchronized (lock) { // if (doReconnect) { // scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务 // } else { // reconnecting = false; // 重连结束后设置 reconnecting 为 false // } // } // } // } // // private void scheduleReconnect() { // if (!executorService.isShutdown()) { // executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // } // } private void sendPing() { try { websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
@@ -244,8 +244,8 @@ marketDataOut.setSellPrice(sellPrice.toPlainString()); // 设置卖出价格 marketDataOut.setBuyNumber(markets1.getBids().getV().toPlainString()); // 设置买入数量 marketDataOut.setSellNumber(markets2.getAsks().getV().toPlainString()); // 设置卖出数量 marketDataOut.setBuyTotalPrice((markets1.getBids().getP().multiply(markets1.getBids().getV())).setScale(0,RoundingMode.DOWN).toPlainString()); // 设置买入总价 marketDataOut.setSellTotalPrice((markets2.getAsks().getP().multiply(markets2.getAsks().getV())).setScale(0,RoundingMode.DOWN).toPlainString()); // 设置卖出总价 marketDataOut.setBuyTotalPrice((markets1.getBids().getP().multiply(markets1.getBids().getV())).setScale(0, RoundingMode.HALF_UP).toPlainString()); // 设置买入总价 marketDataOut.setSellTotalPrice((markets2.getAsks().getP().multiply(markets2.getAsks().getV())).setScale(0,RoundingMode.HALF_UP).toPlainString()); // 设置卖出总价 marketDataOut.setServceTime(formattedDateTime); // 设置服务时间 marketDataOut.setBuyAndSell(marketDataOut.getBaseAsset()+marketDataOut.getBuyingPlatform()+marketDataOut.getSellPlatform()); marketDataOuts.add(marketDataOut); // 添加到输出列表 @@ -267,17 +267,8 @@ } public void quotationCalculation(){ long startExtracted = System.nanoTime(); extracted(); long endExtracted = System.nanoTime(); double executionTimeExtracted = (endExtracted - startExtracted) / 1e9; // 转换为秒 System.out.println("extracted 方法执行时间: " + executionTimeExtracted + " 秒"); long startFindPairs = System.nanoTime(); findProfitablePairs(mexcList, gateList, bitgetList, kucoinList); // 请确保这些变量有定义和赋值 long endFindPairs = System.nanoTime(); double executionTimeFindPairs = (endFindPairs - startFindPairs) / 1e9; // 转换为秒 System.out.println("findProfitablePairs 方法执行时间: " + executionTimeFindPairs + " 秒"); } public void scheduler(){ websocketSerivce/src/main/resources/application.properties
@@ -1,6 +1,6 @@ #XDB_PATH=/www/wwwroot/csdn-ip2region.xdb XDB_PATH=F:/project/marketData/websocketSerivce/src/main/resources/ip/csdn-ip2region.xdb XDB_PATH=/www/wwwroot/csdn-ip2region.xdb #XDB_PATH=F:/project/marketData/websocketSerivce/src/main/resources/ip/csdn-ip2region.xdb redis1.ip=localhost redis1.port=6379