1
zj
2024-08-07 8ed60795a7c278f9699616f72eb05ce49800ba6f
1
15 files modified
1 files renamed
5 files added
1 files deleted
763 ■■■■■ changed files
bitgetsClient/src/main/java/org/example/bitgetsclient/ThreadConfig/AsyncConfiguration.java 6 ●●●● patch | view | raw | blame | history
bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java 40 ●●●●● patch | view | raw | blame | history
bitgetsClient/src/main/java/org/example/bitgetsclient/comm/ApplicationContextProvider.java 21 ●●●●● patch | view | raw | blame | history
bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java 2 ●●● patch | view | raw | blame | history
bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java 91 ●●●● patch | view | raw | blame | history
geteClient/src/main/java/org/example/geteclient/ThreadConfig/AsyncConfiguration.java 6 ●●●● patch | view | raw | blame | history
geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java 88 ●●●●● patch | view | raw | blame | history
geteClient/src/main/java/org/example/geteclient/WsBean/MexcWsBean.java 68 ●●●●● patch | view | raw | blame | history
geteClient/src/main/java/org/example/geteclient/comm/ApplicationContextProvider.java 21 ●●●●● patch | view | raw | blame | history
geteClient/src/main/java/org/example/geteclient/task/RunTask.java 2 ●●● patch | view | raw | blame | history
geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java 90 ●●●● patch | view | raw | blame | history
kucoinClient/src/main/java/org/example/kucoinclient/ThreadConfig/AsyncConfiguration.java 6 ●●●● patch | view | raw | blame | history
kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java 45 ●●●● patch | view | raw | blame | history
kucoinClient/src/main/java/org/example/kucoinclient/comm/ApplicationContextProvider.java 21 ●●●●● patch | view | raw | blame | history
kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java 2 ●●● patch | view | raw | blame | history
kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java 82 ●●●● patch | view | raw | blame | history
mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java 37 ●●●●● patch | view | raw | blame | history
mexcClient/src/main/java/org/example/mexcclient/comm/ApplicationContextProvider.java 21 ●●●●● patch | view | raw | blame | history
mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java 2 ●●● patch | view | raw | blame | history
mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java 95 ●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java 13 ●●●● patch | view | raw | blame | history
websocketSerivce/src/main/resources/application.properties 4 ●●●● patch | view | raw | blame | history
bitgetsClient/src/main/java/org/example/bitgetsclient/ThreadConfig/AsyncConfiguration.java
@@ -18,9 +18,9 @@
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(100);    // 核心线程数, 根据需求进行调整
        executor.setMaxPoolSize(150);    // 最大线程数, 适当设置以避免资源耗尽
        executor.setQueueCapacity(200);    // 队列容量, 适当限制以避免请求堆积
        executor.setCorePoolSize(20);    // 核心线程数, 根据需求进行调整
        executor.setMaxPoolSize(30);    // 最大线程数, 适当设置以避免资源耗尽
        executor.setQueueCapacity(100);    // 队列容量, 适当限制以避免请求堆积
        executor.setKeepAliveSeconds(30);    // 线程空闲时的存活时间为30秒,减少系统开销
        executor.setThreadNamePrefix("Thread-");    // 线程名称的前缀
bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/BitgetsWsBean.java
@@ -4,12 +4,15 @@
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.example.bitgetsclient.BitgetsClientApplication;
import org.example.bitgetsclient.pojo.Currency;
import org.example.bitgetsclient.server.impl.CurrencySerivceImpl;
import org.example.bitgetsclient.wsClient.BitgetClient;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -19,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
@@ -32,6 +36,9 @@
    @Autowired
    private CurrencySerivceImpl currencyService;
    @Autowired
    private ConfigurableApplicationContext context;
    @Autowired
    @Qualifier("threadPoolTaskExecutor")
@@ -51,12 +58,41 @@
                int toIndex = Math.min(fromIndex + batchSize, totalSize); // 计算结束索引
                List<Currency> sublist = mexc.subList(fromIndex, toIndex); // 切分子列表
                String parameter = getParameter(sublist); // 获取参数
                // 使用自定义线程池提交任务
                threadPoolTaskExecutor.execute(new BitgetClient(parameter)::start); // 提交到线程池执行
                threadPoolTaskExecutor.execute(() -> {
                    try {
                        new BitgetClient(parameter).start();
                    } catch (Exception e) {
                        run();
                    }
                });
            }
        }
    }
    private boolean runExecuted = false;
    private synchronized void run() {
        if (runExecuted) {
            return; // 已经执行过,直接返回
        }
        runExecuted = true;
        log.info("ws 异常开始重启");
        Thread restartThread = new Thread(() -> {
            try {
                SpringApplication.exit(context, () -> 0);
                SpringApplication.run(BitgetsClientApplication.class);
                log.info("ws 重启成功");
            } catch (Exception e) {
                e.printStackTrace();
                log.error("ws 重启失败");
            }
        });
        restartThread.setDaemon(false);
        restartThread.start();
        log.info("ws 重启失败");
    }
    public String getParameter(List<Currency> list) throws JsonProcessingException, JSONException {
        // 创建一个ObjectMapper实例
        ObjectMapper mapper = new ObjectMapper();
bitgetsClient/src/main/java/org/example/bitgetsclient/comm/ApplicationContextProvider.java
New file
@@ -0,0 +1,21 @@
package org.example.bitgetsclient.comm;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextProvider implements ApplicationContextAware {
    private static ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        applicationContext = context;
    }
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
}
bitgetsClient/src/main/java/org/example/bitgetsclient/task/RunTask.java
@@ -23,7 +23,7 @@
    @Autowired
    private ConfigurableApplicationContext context;
    @Scheduled(cron = "0 0 */3 * * ?")
    @Scheduled(cron = "0 0/30 * * * ?")
    public void restart() {
        Thread restartThread = new Thread(() -> {
            try {
bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java
@@ -8,9 +8,16 @@
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.example.bitgetsclient.BitgetsClientApplication;
import org.example.bitgetsclient.comm.ApplicationContextProvider;
import org.example.bitgetsclient.util.RedisUtil;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import javax.websocket.*;
@@ -26,6 +33,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@ClientEndpoint
@Slf4j
@@ -55,7 +63,7 @@
    public void start() {
    public void start() throws Exception {
        try {
            connect(); // 尝试连接
            if (session == null) {
@@ -67,7 +75,6 @@
            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
            // 发送订阅消息
            session.getBasicRemote().sendText(subscriptions); // 发送订阅信息
            synchronized (this) {
                this.wait(); // 等待 WebSocket 消息到来
            }
@@ -77,6 +84,7 @@
            log.error("线程被中断: " + e.getMessage(), e); // 记录线程中断的错误
        } catch (Exception e) {
            log.error("bitget ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常
            throw e;
        } finally {
            executorService.shutdownNow(); // 尝试立即关闭调度服务
        }
@@ -109,8 +117,8 @@
                if (dataNode != null) {
                    Map<String, Object> hashMap = new HashMap<>(); // 变量命名更具描述性
                    String bids = dataNode.get("bids").toString();
                    String asks = dataNode.get("asks").toString();
                    String asks = dataNode.get("bids").toString();
                    String bids = dataNode.get("asks").toString();
                    Type listType = new TypeToken<List<List<String>>>(){}.getType();
                    List<List<String>> bidsList = gson.fromJson(bids, listType);
@@ -167,51 +175,52 @@
    }
    @OnClose
    public void onClose() {
    public void onClose() throws Exception {
        log.info("bitget ws 连接已关闭,尝试重新连接..."); // 记录连接关闭的信息
        handleConnectionClosedOrError(); // 处理连接关闭事件
        throw new Exception();
    }
    @OnError
    public void onError(Throwable throwable) {
    public void onError(Throwable throwable) throws Exception {
        log.error("bitget ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误信息
        handleConnectionClosedOrError(); // 处理错误事件
        throw new Exception();
    }
    private void handleConnectionClosedOrError() {
        synchronized (lock) {
            if (!reconnecting) {
                reconnecting = true; // 开始重连
                executorService.execute(this::attemptReconnect); // 执行重连操作
            }
        }
    }
//    private void handleConnectionClosedOrError() {
//        executorService.execute(() -> {
//            try {
//                attemptReconnect();
//            } catch (Exception e) {
//                throw new RuntimeException(e);
//            }
//        });
//    }
    private void attemptReconnect() {
        if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制
            try {
                log.info("bitget ws 开始重连"); // 记录开始重连的信息
                connect(); // 尝试重连
                log.info("bitget ws 重连成功"); // 成功重连的日志
                reconnectAttempts = 0; // 重连成功,重置重连次数
            } catch (Exception e) {
                reconnectAttempts++; // 增加重连尝试次数
                log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息
                // 采用指数退避策略,增加重连间隔
                long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒
                scheduleReconnect(waitTime); // 调度下次重连
            }
        } else {
            log.error("超过最大重连次数,停止重连"); // 超过最大重连次数
            reconnecting = false; // 重连状态重置
        }
    }
    private void scheduleReconnect(long waitTime) { // 接受等待时间参数
        if (!executorService.isShutdown()) {
            executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连
        }
    }
//    private void attemptReconnect() throws Exception {
//        if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制
//            try {
//                log.info("bitget ws 开始重连"); // 记录开始重连的信息
//                connect(); // 尝试重连
//                log.info("bitget ws 重连成功"); // 成功重连的日志
//                reconnectAttempts = 0; // 重连成功,重置重连次数
//            } catch (Exception e) {
//                reconnectAttempts++; // 增加重连尝试次数
//                log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息
//                // 采用指数退避策略,增加重连间隔
//                long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒
//                scheduleReconnect(waitTime); // 调度下次重连
//            }
//        } else {
//            log.error("超过最大重连次数,停止重连"); // 超过最大重连次数
//            throw new Exception();
//        }
//    }
//
//    private void scheduleReconnect(long waitTime) { // 接受等待时间参数
//        if (!executorService.isShutdown()) {
//            executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连
//        }
//    }
    private void sendPing() {
        try {
geteClient/src/main/java/org/example/geteclient/ThreadConfig/AsyncConfiguration.java
@@ -18,9 +18,9 @@
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(100);    // 核心线程数, 根据需求进行调整
        executor.setMaxPoolSize(150);    // 最大线程数, 适当设置以避免资源耗尽
        executor.setQueueCapacity(200);    // 队列容量, 适当限制以避免请求堆积
        executor.setCorePoolSize(40);    // 核心线程数, 根据需求进行调整
        executor.setMaxPoolSize(50);    // 最大线程数, 适当设置以避免资源耗尽
        executor.setQueueCapacity(400);    // 队列容量, 适当限制以避免请求堆积
        executor.setKeepAliveSeconds(30);    // 线程空闲时的存活时间为30秒,减少系统开销
        executor.setThreadNamePrefix("Thread-");    // 线程名称的前缀
geteClient/src/main/java/org/example/geteclient/WsBean/GateWsBean.java
New file
@@ -0,0 +1,88 @@
package org.example.geteclient.WsBean;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.example.geteclient.GeteClientApplication;
import org.example.geteclient.pojo.Currency;
import org.example.geteclient.server.impl.CurrencySerivceImpl;
import org.example.geteclient.wsClinet.GateClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
 * @ClassDescription: 客户端请求类
 * @JdkVersion: 1.8
 * @Created: 2023/8/31 16:13
 */
@Slf4j
@Configuration
public class GateWsBean {
    @Autowired
    private CurrencySerivceImpl currencyService;
    @Autowired
    private ConfigurableApplicationContext context;
    @Autowired
    @Qualifier("threadPoolTaskExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    @Bean
    public void gateWebsocketRunClientMap() {
        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
        if (!CollectionUtils.isEmpty(mexc)) {
            int batchSize = 100; // 每个线程处理的数据量
            int totalSize = mexc.size();
            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
            for (int i = 0; i < threadCount; i++) {
                int fromIndex = i * batchSize;
                int toIndex = Math.min(fromIndex + batchSize, totalSize);
                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
                // 使用自定义线程池提交任务
                threadPoolTaskExecutor.execute(() -> {
                    try {
                        new GateClient(sublist).start();
                    } catch (Exception e) {
                        run();
                    }
                });
            }
        }
    }
    private boolean runExecuted = false;
    private synchronized void run() {
        if (runExecuted) {
            return; // 已经执行过,直接返回
        }
        runExecuted = true;
        log.info("ws 异常开始重启");
        Thread restartThread = new Thread(() -> {
            try {
                SpringApplication.exit(context, () -> 0);
                SpringApplication.run(GeteClientApplication.class);
                log.info("ws 重启成功");
            } catch (Exception e) {
                e.printStackTrace();
                log.error("ws 重启失败");
            }
        });
        restartThread.setDaemon(false);
        restartThread.start();
        log.info("ws 重启失败");
    }
}
geteClient/src/main/java/org/example/geteclient/WsBean/MexcWsBean.java
File was deleted
geteClient/src/main/java/org/example/geteclient/comm/ApplicationContextProvider.java
New file
@@ -0,0 +1,21 @@
package org.example.geteclient.comm;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextProvider implements ApplicationContextAware {
    private static ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        applicationContext = context;
    }
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
}
geteClient/src/main/java/org/example/geteclient/task/RunTask.java
@@ -20,7 +20,7 @@
    @Autowired
    private ConfigurableApplicationContext context;
    @Scheduled(cron = "0 0 */3 * * ?")
    @Scheduled(cron = "0 0/30 * * * ?")
    public void restart() {
        Thread restartThread = new Thread(() -> {
            try {
geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java
@@ -6,9 +6,16 @@
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.example.geteclient.GeteClientApplication;
import org.example.geteclient.comm.ApplicationContextProvider;
import org.example.geteclient.pojo.Currency;
import org.example.geteclient.util.RedisUtil;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import java.math.BigDecimal;
@@ -46,7 +53,7 @@
        this.executorService = Executors.newScheduledThreadPool(1); // 创建一个定时任务调度线程池
    }
    public void start() {
    public void start() throws Exception {
        try {
            connect(); // 尝试连接 WebSocket
            if (session == null) { // 如果连接失败,session 仍然为 null
@@ -69,6 +76,7 @@
        } catch (Exception e) {
            log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常
            throw e;
        } finally {
            executorService.shutdown(); // 确保定时任务调度服务被关闭
        }
@@ -116,7 +124,7 @@
                        String[] asksData = dataArray[0];
                        pvMap.put("p", new BigDecimal(asksData[0]).toPlainString());
                        pvMap.put("v", new BigDecimal(asksData[1]).toPlainString());
                        hashMap.put(BIDS_KEY, pvMap); // 放入 bids 数据
                        hashMap.put(ASKS_KEY, pvMap); // 放入 bids 数据
                    }
                    if (bidsObj instanceof List && !((List<?>) bidsObj).isEmpty()) {
@@ -126,7 +134,7 @@
                        HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
                        pvMap.put("p", new BigDecimal(bidsData[0]).toPlainString());
                        pvMap.put("v", new BigDecimal(bidsData[1]).toPlainString());
                        hashMap.put(ASKS_KEY,pvMap);
                        hashMap.put(BIDS_KEY,pvMap);
                    }
                    String key = "gate" + resultMap.get(S_KEY); // 生成 Redis 键
@@ -143,51 +151,51 @@
    @OnClose
    public void onClose() {
    public void onClose() throws Exception {
        log.info("gate ws 连接已关闭,尝试重新连接..."); // 连接关闭日志
        handleConnectionClosedOrError(); // 处理连接关闭或错误
        throw new Exception();
    }
    @OnError
    public void onError(Throwable throwable) {
    public void onError(Throwable throwable) throws Exception {
        log.error("gate ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误日志
        handleConnectionClosedOrError(); // 处理连接关闭或错误
        throw new Exception();
    }
    private void handleConnectionClosedOrError() {
        synchronized (lock) { // 进入同步块以防止并发重连
            if (!reconnecting) { // 检查当前是否已经在重连
                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
            }
        }
    }
    private void attemptReconnect() {
        boolean doReconnect = true; // 是否进行重连的标志
        try {
            log.info("gate ws 开始重连"); // 开始重连日志
            connect(); // 尝试重新连接
            log.info("gate ws 重连成功"); // 重连成功日志
        } catch (Exception e) {
            log.error("gate ws 重连失败", e); // 重连失败记录日志
            doReconnect = false; // 标记不再继续重连
        } finally {
            synchronized (lock) { // 进入同步块
                if (doReconnect) {
                    scheduleReconnect(); // 如果需要继续重连,调度重连任务
                } else {
                    reconnecting = false; // 重连结束,标记重连状态为 false
                }
            }
        }
    }
    private void scheduleReconnect() {
        if (!executorService.isShutdown()) { // 确保调度服务未关闭
            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接
        }
    }
//    private void handleConnectionClosedOrError() {
//        synchronized (lock) { // 进入同步块以防止并发重连
//            if (!reconnecting) { // 检查当前是否已经在重连
//                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
//                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
//            }
//        }
//    }
//
//    private void attemptReconnect() {
//        boolean doReconnect = true; // 是否进行重连的标志
//        try {
//            log.info("gate ws 开始重连"); // 开始重连日志
//            connect(); // 尝试重新连接
//            log.info("gate ws 重连成功"); // 重连成功日志
//        } catch (Exception e) {
//            log.error("gate ws 重连失败", e); // 重连失败记录日志
//            doReconnect = false; // 标记不再继续重连
//        } finally {
//            synchronized (lock) { // 进入同步块
//                if (doReconnect) {
//                    scheduleReconnect(); // 如果需要继续重连,调度重连任务
//                } else {
//                    reconnecting = false; // 重连结束,标记重连状态为 false
//                }
//            }
//        }
//    }
//
//    private void scheduleReconnect() {
//        if (!executorService.isShutdown()) { // 确保调度服务未关闭
//            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接
//        }
//    }
    private void sendPing() {
        try {
kucoinClient/src/main/java/org/example/kucoinclient/ThreadConfig/AsyncConfiguration.java
@@ -18,9 +18,9 @@
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(100);    // 核心线程数, 根据需求进行调整
        executor.setMaxPoolSize(150);    // 最大线程数, 适当设置以避免资源耗尽
        executor.setQueueCapacity(200);    // 队列容量, 适当限制以避免请求堆积
        executor.setCorePoolSize(20);    // 核心线程数, 根据需求进行调整
        executor.setMaxPoolSize(30);    // 最大线程数, 适当设置以避免资源耗尽
        executor.setQueueCapacity(100);    // 队列容量, 适当限制以避免请求堆积
        executor.setKeepAliveSeconds(30);    // 线程空闲时的存活时间为30秒,减少系统开销
        executor.setThreadNamePrefix("Thread-");    // 线程名称的前缀
kucoinClient/src/main/java/org/example/kucoinclient/WsBean/KucoinWsBean.java
File was renamed from kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java
@@ -1,8 +1,6 @@
package org.example.kucoinclient.WsBean;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
@@ -11,23 +9,22 @@
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.util.EntityUtils;
import org.example.kucoinclient.KucoinClientApplication;
import org.example.kucoinclient.pojo.Currency;
import org.example.kucoinclient.server.impl.CurrencySerivceImpl;
import org.example.kucoinclient.wsClient.KucoinClient;
import org.json.JSONException;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
 * @ClassDescription: 客户端请求类
@@ -36,10 +33,13 @@
 */
@Slf4j
@Configuration
public class MexcWsBean {
public class KucoinWsBean {
    @Autowired
    private CurrencySerivceImpl currencyService;
    @Autowired
    private ConfigurableApplicationContext context;
    @Autowired
    @Qualifier("threadPoolTaskExecutor")
@@ -62,12 +62,41 @@
                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
                // 使用自定义线程池提交任务
                threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start);
                threadPoolTaskExecutor.execute(() -> {
                    try {
                        new KucoinClient(sublist,token).start();
                    } catch (Exception e) {
                        run();
                    }
                });
            }
        }
    }
    private boolean runExecuted = false;
    private synchronized void run() {
        if (runExecuted) {
            return; // 已经执行过,直接返回
        }
        runExecuted = true;
        log.info("ws 异常开始重启");
        Thread restartThread = new Thread(() -> {
            try {
                SpringApplication.exit(context, () -> 0);
                SpringApplication.run(KucoinClientApplication.class);
                log.info("ws 重启成功");
            } catch (Exception e) {
                e.printStackTrace();
                log.error("ws 重启失败");
            }
        });
        restartThread.setDaemon(false);
        restartThread.start();
        log.info("ws 重启失败");
    }
    public static String doPost() throws Exception {
        String url = "https://api.kucoin.com/api/v1/bullet-public";
        HttpPost httpPost = new HttpPost(url);
kucoinClient/src/main/java/org/example/kucoinclient/comm/ApplicationContextProvider.java
New file
@@ -0,0 +1,21 @@
package org.example.kucoinclient.comm;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextProvider implements ApplicationContextAware {
    private static ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        applicationContext = context;
    }
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
}
kucoinClient/src/main/java/org/example/kucoinclient/task/RunTask.java
@@ -20,7 +20,7 @@
    @Autowired
    private ConfigurableApplicationContext context;
    @Scheduled(cron = "0 0 */3 * * ?")
    @Scheduled(cron = "0 0/30 * * * ?")
    public void restart() {
        Thread restartThread = new Thread(() -> {
            try {
kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java
@@ -6,9 +6,16 @@
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.example.kucoinclient.KucoinClientApplication;
import org.example.kucoinclient.comm.ApplicationContextProvider;
import org.example.kucoinclient.pojo.Currency;
import org.example.kucoinclient.util.RedisUtil;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import java.io.UnsupportedEncodingException;
@@ -55,7 +62,7 @@
        this.executorService = Executors.newScheduledThreadPool(1); // 创建调度线程池
    }
    public void start() {
    public void start() throws Exception {
        try {
            connect(); // 连接到 WebSocket 服务器
            if (session == null) {
@@ -71,7 +78,8 @@
            }
        } catch (Exception e) {
            log.error("kucoin ws 连接过程中发生异常: ", e); // 捕获并记录异常
            log.error("kucoin ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常
           throw e;
        } finally {
            executorService.shutdown(); // 关闭调度线程池
        }
@@ -145,8 +153,8 @@
            // 空值检查,避免存储 null 值到 Redis
            if (resultMap.get("bids") != null && resultMap.get("asks") != null) {
                Object bidsObj = resultMap.get("bids");
                Object asksObj = resultMap.get("asks");
                Object asksObj = resultMap.get("bids");
                Object bidsObj = resultMap.get("asks");
                if(bidsObj instanceof List && !((List<?>) bidsObj).isEmpty() && asksObj instanceof List && !((List<?>) asksObj).isEmpty()){
                    if (bidsObj instanceof List && !((List<?>) bidsObj).isEmpty()) {
@@ -189,46 +197,46 @@
    }
    @OnClose
    public void onClose() {
    public void onClose() throws Exception {
        log.info("kucoin ws 连接已关闭,尝试重新连接..."); // 输出连接关闭日志
        handleConnectionClosedOrError(); // 处理连接关闭或错误
        throw new Exception();
    }
    @OnError
    public void onError(Throwable throwable) {
    public void onError(Throwable throwable) throws Exception {
        log.error("kucoin ws 发生错误: ", throwable); // 输出错误日志
        handleConnectionClosedOrError(); // 处理连接关闭或错误
        throw new Exception();
    }
    private void handleConnectionClosedOrError() {
        synchronized (lock) { // 同步块,确保线程安全
            if (!reconnecting) {
                reconnecting = true; // 状态标记为重连中
                executorService.execute(this::attemptReconnect); // 执行重连操作
            }
        }
    }
    private void attemptReconnect() {
        try {
            log.info("kucoin ws 开始重连"); // 输出重连开始日志
            connect(); // 尝试重新连接
            log.info("kucoin ws 重连成功"); // 输出重连成功日志
        } catch (Exception e) {
            log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志
        } finally {
            synchronized (lock) { // 同步块
                reconnecting = false; // 状态标记为未重连
                scheduleReconnect(); // 重新调度重连任务
            }
        }
    }
    private void scheduleReconnect() {
        if (!executorService.isShutdown()) { // 检查线程池是否已经关闭
            executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连
        }
    }
//    private void handleConnectionClosedOrError() {
//        synchronized (lock) { // 同步块,确保线程安全
//            if (!reconnecting) {
//                reconnecting = true; // 状态标记为重连中
//                executorService.execute(this::attemptReconnect); // 执行重连操作
//            }
//        }
//    }
//
//    private void attemptReconnect() {
//        try {
//            log.info("kucoin ws 开始重连"); // 输出重连开始日志
//            connect(); // 尝试重新连接
//            log.info("kucoin ws 重连成功"); // 输出重连成功日志
//        } catch (Exception e) {
//            log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志
//        } finally {
//            synchronized (lock) { // 同步块
//                reconnecting = false; // 状态标记为未重连
//                scheduleReconnect(); // 重新调度重连任务
//            }
//        }
//    }
//
//    private void scheduleReconnect() {
//        if (!executorService.isShutdown()) { // 检查线程池是否已经关闭
//            executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连
//        }
//    }
    private void sendPing() {
        try {
mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java
@@ -14,6 +14,7 @@
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.example.mexcclient.MexcClientApplication;
import org.example.mexcclient.pojo.Currency;
import org.example.mexcclient.server.impl.CurrencySerivceImpl;
import org.example.mexcclient.wsClient.MexcClient;
@@ -21,6 +22,8 @@
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -45,6 +48,9 @@
    private CurrencySerivceImpl currencyService;
    @Autowired
    private ConfigurableApplicationContext context;
    @Autowired
    @Qualifier("threadPoolTaskExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@@ -62,10 +68,39 @@
                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
                // 使用自定义线程池提交任务
                threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
                threadPoolTaskExecutor.execute(() -> {
                    try {
                        new MexcClient(sublist).start();
                    } catch (Exception e) {
                        run();
                    }
                });
            }
        }
    }
    private boolean runExecuted = false;
    private synchronized void run() {
        if (runExecuted) {
            return; // 已经执行过,直接返回
        }
        runExecuted = true;
        log.info("ws 异常开始重启");
        Thread restartThread = new Thread(() -> {
            try {
                SpringApplication.exit(context, () -> 0);
                SpringApplication.run(MexcClientApplication.class);
                log.info("ws 重启成功");
            } catch (Exception e) {
                e.printStackTrace();
                log.error("ws 重启失败");
            }
        });
        restartThread.setDaemon(false);
        restartThread.start();
        log.info("ws 重启失败");
    }
}
mexcClient/src/main/java/org/example/mexcclient/comm/ApplicationContextProvider.java
New file
@@ -0,0 +1,21 @@
package org.example.mexcclient.comm;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;
@Component
public class ApplicationContextProvider implements ApplicationContextAware {
    private static ApplicationContext applicationContext;
    @Override
    public void setApplicationContext(ApplicationContext context) throws BeansException {
        applicationContext = context;
    }
    public static ApplicationContext getApplicationContext() {
        return applicationContext;
    }
}
mexcClient/src/main/java/org/example/mexcclient/task/RunTask.java
@@ -20,7 +20,7 @@
    @Autowired
    private ConfigurableApplicationContext context;
    @Scheduled(cron = "0 0 */3 * * ?")
    @Scheduled(cron = "0 0/30 * * * ?")
    public void restart() {
        Thread restartThread = new Thread(() -> {
            try {
mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java
@@ -8,8 +8,15 @@
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.example.mexcclient.MexcClientApplication;
import org.example.mexcclient.comm.ApplicationContextProvider;
import org.example.mexcclient.pojo.Currency;
import org.example.mexcclient.util.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import java.io.IOException;
@@ -25,6 +32,7 @@
@ClientEndpoint
@Slf4j
@Component
public class MexcClient {
    private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws";
    private static final long PING_INTERVAL = 20000;
@@ -41,7 +49,7 @@
        this.executorService = Executors.newScheduledThreadPool(1);
    }
    public void start() {
    public void start() throws Exception {
        try {
            connect();
            if (session == null) {
@@ -62,7 +70,8 @@
            }
        } catch (Exception e) {
            log.error("mexc ws 连接过程中发生异常: " + e.getMessage(), e);
            log.error("mexc ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常
            throw e;
        } finally {
            executorService.shutdown();
        }
@@ -94,8 +103,8 @@
                Object object = map.get("d");
                Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
                HashMap<String,Object> hashMap = new HashMap<>();
                Object asksObj = resultMap.get("asks");
                Object bidsObj = resultMap.get("bids");
                Object bidsObj = resultMap.get("asks");
                Object asksObj = resultMap.get("bids");
                Type listType = new TypeToken<List<Map<String,Object>>>(){}.getType();
                List<Map<String,Object>> asksList = gson.fromJson(asksObj.toString(), listType);
@@ -135,52 +144,52 @@
    }
    @OnClose
    public void onClose() {
    public void onClose() throws Exception {
        log.info("mexc ws 连接已关闭,尝试重新连接...");
        handleConnectionClosedOrError();
        throw new Exception();
    }
    @OnError
    public void onError(Throwable throwable) {
    public void onError(Throwable throwable) throws Exception {
        log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable);
        handleConnectionClosedOrError();
        throw new Exception();
    }
    private void handleConnectionClosedOrError() {
        synchronized (lock) {
            if (!reconnecting) {
                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
            }
        }
    }
    private void attemptReconnect() {
        boolean doReconnect = true;
        try {
            log.info("mexc ws 开始重连");
            connect(); // 假设 connect() 方法用于实际的连接逻辑
            log.info("mexc ws 重连成功");
        } catch (Exception e) {
            log.error("mexc ws 重连失败", e);
            // 连接失败时,可以根据具体情况决定是否继续重连
            // 在这里假设总是继续尝试重连
        } finally {
            synchronized (lock) {
                if (doReconnect) {
                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
                } else {
                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
                }
            }
        }
    }
    private void scheduleReconnect() {
        if (!executorService.isShutdown()) {
            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
        }
    }
//    private void handleConnectionClosedOrError() {
//        synchronized (lock) {
//            if (!reconnecting) {
//                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
//                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
//            }
//        }
//    }
//
//    private void attemptReconnect() {
//        boolean doReconnect = true;
//        try {
//            log.info("mexc ws 开始重连");
//            connect(); // 假设 connect() 方法用于实际的连接逻辑
//            log.info("mexc ws 重连成功");
//        } catch (Exception e) {
//            log.error("mexc ws 重连失败", e);
//            // 连接失败时,可以根据具体情况决定是否继续重连
//            // 在这里假设总是继续尝试重连
//        } finally {
//            synchronized (lock) {
//                if (doReconnect) {
//                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
//                } else {
//                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
//                }
//            }
//        }
//    }
//
//    private void scheduleReconnect() {
//        if (!executorService.isShutdown()) {
//            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
//        }
//    }
    private void sendPing() {
        try {
websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
@@ -244,8 +244,8 @@
        marketDataOut.setSellPrice(sellPrice.toPlainString()); // 设置卖出价格
        marketDataOut.setBuyNumber(markets1.getBids().getV().toPlainString()); // 设置买入数量
        marketDataOut.setSellNumber(markets2.getAsks().getV().toPlainString()); // 设置卖出数量
        marketDataOut.setBuyTotalPrice((markets1.getBids().getP().multiply(markets1.getBids().getV())).setScale(0,RoundingMode.DOWN).toPlainString()); // 设置买入总价
        marketDataOut.setSellTotalPrice((markets2.getAsks().getP().multiply(markets2.getAsks().getV())).setScale(0,RoundingMode.DOWN).toPlainString()); // 设置卖出总价
        marketDataOut.setBuyTotalPrice((markets1.getBids().getP().multiply(markets1.getBids().getV())).setScale(0, RoundingMode.HALF_UP).toPlainString()); // 设置买入总价
        marketDataOut.setSellTotalPrice((markets2.getAsks().getP().multiply(markets2.getAsks().getV())).setScale(0,RoundingMode.HALF_UP).toPlainString()); // 设置卖出总价
        marketDataOut.setServceTime(formattedDateTime); // 设置服务时间
        marketDataOut.setBuyAndSell(marketDataOut.getBaseAsset()+marketDataOut.getBuyingPlatform()+marketDataOut.getSellPlatform());
        marketDataOuts.add(marketDataOut); // 添加到输出列表
@@ -267,17 +267,8 @@
    }
    public void quotationCalculation(){
        long startExtracted = System.nanoTime();
        extracted();
        long endExtracted = System.nanoTime();
        double executionTimeExtracted = (endExtracted - startExtracted) / 1e9; // 转换为秒
        System.out.println("extracted 方法执行时间: " + executionTimeExtracted + " 秒");
        long startFindPairs = System.nanoTime();
        findProfitablePairs(mexcList, gateList, bitgetList, kucoinList); // 请确保这些变量有定义和赋值
        long endFindPairs = System.nanoTime();
        double executionTimeFindPairs = (endFindPairs - startFindPairs) / 1e9; // 转换为秒
        System.out.println("findProfitablePairs 方法执行时间: " + executionTimeFindPairs + " 秒");
    }
    public void scheduler(){
websocketSerivce/src/main/resources/application.properties
@@ -1,6 +1,6 @@
#XDB_PATH=/www/wwwroot/csdn-ip2region.xdb
XDB_PATH=F:/project/marketData/websocketSerivce/src/main/resources/ip/csdn-ip2region.xdb
XDB_PATH=/www/wwwroot/csdn-ip2region.xdb
#XDB_PATH=F:/project/marketData/websocketSerivce/src/main/resources/ip/csdn-ip2region.xdb
redis1.ip=localhost
redis1.port=6379