1
zj
2024-07-19 4b8e10e605d28fc1b4ad3d33a6cf2bfbbea15bd5
1
5 files modified
2 files added
576 ■■■■■ changed files
.idea/inspectionProfiles/Project_Default.xml 1 ●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java 74 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/util/RedisUtil.java 4 ●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/wsClient/GateClient.java 213 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/wsClient/KucoinClient.java 172 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/wsClient/MexcClient.java 108 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/resources/application.yml 4 ●●●● patch | view | raw | blame | history
.idea/inspectionProfiles/Project_Default.xml
@@ -2,6 +2,7 @@
  <profile version="1.0">
    <option name="myName" value="Project Default" />
    <inspection_tool class="AliAccessStaticViaInstance" enabled="true" level="WARNING" enabled_by_default="true" />
    <inspection_tool class="AliDeprecation" enabled="true" level="WARNING" enabled_by_default="true" />
    <inspection_tool class="AlibabaAbstractClassShouldStartWithAbstractNaming" enabled="true" level="WARNING" enabled_by_default="true" />
    <inspection_tool class="AlibabaAbstractMethodOrInterfaceMethodMustUseJavadoc" enabled="true" level="WARNING" enabled_by_default="true" />
    <inspection_tool class="AlibabaAvoidApacheBeanUtilsCopy" enabled="true" level="WARNING" enabled_by_default="true" />
websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
@@ -2,8 +2,18 @@
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.example.pojo.Currency;
import org.example.server.impl.CurrencySerivceImpl;
import org.example.wsClient.GateClient;
import org.example.wsClient.KucoinClient;
import org.example.wsClient.MexcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@@ -11,7 +21,9 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/**
 * @ClassDescription: 客户端请求类
@@ -28,11 +40,31 @@
    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
//    @Bean
//    public void mexcWebsocketRunClientMap() {
//        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
//        if (!CollectionUtils.isEmpty(mexc)) {
//            int batchSize = 30; // 每个线程处理的数据量
//            int totalSize = mexc.size();
//            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
//
//            for (int i = 0; i < threadCount; i++) {
//                int fromIndex = i * batchSize;
//                int toIndex = Math.min(fromIndex + batchSize, totalSize);
//                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
//
//                // 使用自定义线程池提交任务
//                threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
//            }
//
//        }
//    }
    @Bean
    public void mexcWebsocketRunClientMap() {
        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
    public void gateWebsocketRunClientMap() {
        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
        if (!CollectionUtils.isEmpty(mexc)) {
            int batchSize = 30; // 每个线程处理的数据量
            int batchSize = 100; // 每个线程处理的数据量
            int totalSize = mexc.size();
            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
@@ -42,10 +74,44 @@
                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
                // 使用自定义线程池提交任务
                threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
                threadPoolTaskExecutor.execute(new GateClient(sublist)::start);
            }
        }
    }
    @Bean
    public void kucoinWebsocketRunClientMap() throws Exception {
        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
        if (!CollectionUtils.isEmpty(mexc)) {
            String token = doPost();
            int batchSize = 100; // 每个线程处理的数据量
            int totalSize = mexc.size();
            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
            for (int i = 0; i < threadCount; i++) {
                int fromIndex = i * batchSize;
                int toIndex = Math.min(fromIndex + batchSize, totalSize);
                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
                // 使用自定义线程池提交任务
                threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start);
            }
        }
    }
    public static String doPost() throws Exception {
        String url = "https://api.kucoin.com/api/v1/bullet-public";
        HttpPost httpPost = new HttpPost(url);
        DefaultHttpClient defaultHttpClient = new DefaultHttpClient();
        List<NameValuePair> nvps = new ArrayList<NameValuePair>();
        httpPost.setEntity(new UrlEncodedFormEntity(nvps));
        HttpResponse response = defaultHttpClient.execute(httpPost);
        HttpEntity respEntity = response.getEntity();
        String text = EntityUtils.toString(respEntity, "UTF-8");
        defaultHttpClient.getConnectionManager().shutdown();
        return text;
    }
}
websocketClient/src/main/java/org/example/util/RedisUtil.java
@@ -9,11 +9,13 @@
public class RedisUtil {
    private static JedisPool jedisPool;
    // 在静态代码块中初始化 JedisPool
    static {
        jedisPool = new JedisPool(new JedisPoolConfig(), "localhost", 6379);
    }
    // 私有构造方法,防止实例化
    private RedisUtil() {}
    public static void set(String key, String value) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.set(key, value);
websocketClient/src/main/java/org/example/wsClient/GateClient.java
New file
@@ -0,0 +1,213 @@
package org.example.wsClient;
import com.alibaba.druid.support.json.JSONUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.example.pojo.Currency;
import org.example.util.RedisUtil;
import org.json.JSONException;
import org.json.JSONObject;
import javax.websocket.*;
import java.math.BigDecimal;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.json.JSONObject;
/**
 * @program: demo
 * @description:
 * @create: 2024-07-18 15:30
 **/
@ClientEndpoint
@Slf4j
public class GateClient {
    private static final String WS_ENDPOINT = "wss://api.gateio.ws/ws/v4/";
    private static final long PING_INTERVAL = 20000;
    private final List<Currency> subscriptions;
    private final ScheduledExecutorService executorService;
    private Session session;
    private final Object lock = new Object(); // 添加一个锁对象
    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
    public GateClient(List<Currency> subscriptions) {
        this.subscriptions = subscriptions;
        this.executorService = Executors.newScheduledThreadPool(1);
    }
    public void start() {
        try {
            connect();
            if (session == null) {
                log.info("无法在超时时间内连接到服务器。");
                return;
            }
            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
            // 订阅消息
            for (Currency subscription : subscriptions) {
                String parameter = getParameter(subscription.getSymbol());
                session.getBasicRemote().sendText(parameter);
            }
            synchronized (this) {
                this.wait();
            }
        } catch (Exception e) {
            log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e);
        } finally {
            executorService.shutdown();
        }
    }
    private void connect() throws Exception {
        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
        container.connectToServer(this, new URI(WS_ENDPOINT));
    }
    @OnOpen
    public void onOpen(Session session) {
        log.info("gate ws 已连接到服务器。");
        this.session = session;
        synchronized (this) {
            this.notify();
        }
    }
    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
    @OnMessage
    public void onMessage(String message) {
        try {
            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
            Object object = map.get("result");
            Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
            if (null != resultMap && null != resultMap.get("s")) {
                HashMap<String,Object> hashMap = new HashMap<>();
                ObjectMapper mapper = new ObjectMapper();
                hashMap.put("bids",resultMap.get("bids"));
                hashMap.put("asks",resultMap.get("asks"));
                String key = "gate" + resultMap.get("s");
                if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){
                    System.out.println(hashMap.toString());
                }
                RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
            }
        } catch (JsonSyntaxException e) {
            log.error("JSON 解析异常:" + e.getMessage(), e);
        } catch (JsonProcessingException e) {
            log.error("JSON 解析异常:" + e.getMessage(), e);
        }
    }
    @OnClose
    public void onClose() {
        log.info("gate ws 连接已关闭,尝试重新连接...");
        handleConnectionClosedOrError();
    }
    @OnError
    public void onError(Throwable throwable) {
        log.error("gate ws 发生错误: " + throwable.getMessage(), throwable);
        handleConnectionClosedOrError();
    }
    private void handleConnectionClosedOrError() {
        synchronized (lock) {
            if (!reconnecting) {
                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
            }
        }
    }
    private void attemptReconnect() {
        boolean doReconnect = true;
        try {
            log.info("gate ws 开始重连");
            connect(); // 假设 connect() 方法用于实际的连接逻辑
            log.info("gate ws 重连成功");
        } catch (Exception e) {
            log.error("gate ws 重连失败", e);
            // 连接失败时,可以根据具体情况决定是否继续重连
            // 在这里假设总是继续尝试重连
        } finally {
            synchronized (lock) {
                if (doReconnect) {
                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
                } else {
                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
                }
            }
        }
    }
    private void scheduleReconnect() {
        if (!executorService.isShutdown()) {
            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
        }
    }
    private void sendPing() {
        try {
            if (session != null) {
                session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
            }
        } catch (Exception e) {
            log.error("发送心跳失败", e);
        }
    }
    public String getParameter(String symbol) throws JsonProcessingException, JSONException {
        // 替换USDT为_USDT
        symbol = symbol.replaceAll("USDT", "_USDT");
        // 创建一个ObjectMapper实例
        ObjectMapper mapper = new ObjectMapper();
        // 获取当前时间的毫秒数
        long currentTimeMillis = System.currentTimeMillis();
        // 定义常量
        final String CHANNEL = "spot.order_book";  // 固定频道
        final String EVENT_SUBSCRIBE = "subscribe"; // 订阅事件
        final String EVENT_UNSUBSCRIBE = "unsubscribe"; // 取消订阅事件
        final String[] PAYLOAD = new String[] {symbol, "20", "100ms"}; // 负载信息
        // 使用Map构建JSON对象
        Map<String, Object> jsonMap = new HashMap<>(); // 创建Map用于存放JSON内容
        jsonMap.put("time", currentTimeMillis); // 放入当前时间
        jsonMap.put("channel", CHANNEL); // 放入频道
        jsonMap.put("event", EVENT_SUBSCRIBE); // 放入事件类型
        jsonMap.put("payload", Arrays.asList(PAYLOAD)); // 将数组转换为List并放入Map
        // 将Map转换为JSON字符串
        String jsonString = mapper.writeValueAsString(jsonMap); // 使用ObjectMapper转换
        return jsonString; // 返回JSON字符串
    }
    public static void main(String[] args) {
        String scientificNotation = "5.0E-5"; // 科学计数法字符串
        BigDecimal bigDecimal = new BigDecimal(scientificNotation);
        System.out.println("Scientific Notation: " + scientificNotation);
        System.out.println("BigDecimal Value: " + bigDecimal);
    }
}
websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
New file
@@ -0,0 +1,172 @@
package org.example.wsClient;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.example.pojo.Currency;
import org.example.util.RedisUtil;
import javax.websocket.*;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
/**
 * @program: demo
 * @description:
 * @create: 2024-07-19 16:44
 **/
@ClientEndpoint
@Slf4j
public class KucoinClient {
    private static final String WS_ENDPOINT = "wss://api.kucoinio.ws/ws/v4/";
    private static final long PING_INTERVAL = 20000;
    private final List<Currency> subscriptions;
    private final ScheduledExecutorService executorService;
    private Session session;
    private String token;
    private final Object lock = new Object(); // 添加一个锁对象
    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
    public KucoinClient(List<Currency> subscriptions,String token) {
        this.subscriptions = subscriptions;
        this.token = token;
        this.executorService = Executors.newScheduledThreadPool(1);
    }
    public void start() {
        try {
            connect();
            if (session == null) {
                log.info("无法在超时时间内连接到服务器。");
                return;
            }
            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
            // 订阅消息
            for (Currency subscription : subscriptions) {
                String parameter = getParameter(subscription.getSymbol());
                session.getBasicRemote().sendText(parameter);
            }
            synchronized (this) {
                this.wait();
            }
        } catch (Exception e) {
            log.error("kucoin ws 连接过程中发生异常: " + e.getMessage(), e);
        } finally {
            executorService.shutdown();
        }
    }
    private void connect() throws Exception {
        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
        container.connectToServer(this, new URI(WS_ENDPOINT));
    }
    @OnOpen
    public void onOpen(Session session) {
        log.info("kucoin ws 已连接到服务器。");
        this.session = session;
        synchronized (this) {
            this.notify();
        }
    }
    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
    @OnMessage
    public void onMessage(String message) {
        try {
            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
            Object object = map.get("result");
            Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
            if (null != resultMap && null != resultMap.get("s")) {
                HashMap<String,Object> hashMap = new HashMap<>();
                ObjectMapper mapper = new ObjectMapper();
                hashMap.put("bids",resultMap.get("bids"));
                hashMap.put("asks",resultMap.get("asks"));
                String key = "kucoin" + resultMap.get("s");
                if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){
                    System.out.println(hashMap.toString());
                }
                RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
            }
        } catch (JsonSyntaxException e) {
            log.error("JSON 解析异常:" + e.getMessage(), e);
        } catch (JsonProcessingException e) {
            log.error("JSON 解析异常:" + e.getMessage(), e);
        }
    }
    @OnClose
    public void onClose() {
        log.info("kucoin ws 连接已关闭,尝试重新连接...");
        handleConnectionClosedOrError();
    }
    @OnError
    public void onError(Throwable throwable) {
        log.error("kucoin ws 发生错误: " + throwable.getMessage(), throwable);
        handleConnectionClosedOrError();
    }
    private void handleConnectionClosedOrError() {
        synchronized (lock) {
            if (!reconnecting) {
                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
            }
        }
    }
    private void attemptReconnect() {
        boolean doReconnect = true;
        try {
            log.info("kucoin ws 开始重连");
            connect(); // 假设 connect() 方法用于实际的连接逻辑
            log.info("kucoin ws 重连成功");
        } catch (Exception e) {
            log.error("kucoin ws 重连失败", e);
            // 连接失败时,可以根据具体情况决定是否继续重连
            // 在这里假设总是继续尝试重连
        } finally {
            synchronized (lock) {
                if (doReconnect) {
                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
                } else {
                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
                }
            }
        }
    }
    private void scheduleReconnect() {
        if (!executorService.isShutdown()) {
            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
        }
    }
    private void sendPing() {
        try {
            if (session != null) {
                session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
            }
        } catch (Exception e) {
            log.error("发送心跳失败", e);
        }
    }
}
websocketClient/src/main/java/org/example/wsClient/MexcClient.java
@@ -4,7 +4,10 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.example.pojo.Currency;
import org.example.util.RedisUtil;
@@ -13,20 +16,23 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@ClientEndpoint
@Slf4j
public class MexcClient {
    private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址
    private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping
    private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间
    private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws";
    private static final long PING_INTERVAL = 20000;
    private final List<Currency> subscriptions;
    private final ScheduledExecutorService executorService;
    private Session session;
    private boolean reconnecting = false;
    private final Object lock = new Object(); // 添加一个锁对象
    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
    public MexcClient(List<Currency> subscriptions) {
        this.subscriptions = subscriptions;
@@ -37,7 +43,7 @@
        try {
            connect();
            if (session == null) {
                System.err.println("无法在超时时间内连接到服务器。");
                log.info("无法在超时时间内连接到服务器。");
                return;
            }
@@ -54,8 +60,7 @@
            }
        } catch (Exception e) {
            System.err.println("连接过程中发生异常: " + e.getMessage());
            e.printStackTrace();
            log.error("mexc ws 连接过程中发生异常: " + e.getMessage(), e);
        } finally {
            executorService.shutdown();
        }
@@ -68,59 +73,84 @@
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("已连接到服务器。");
        log.info("mexc ws 已连接到服务器。");
        this.session = session;
        synchronized (this) {
            this.notify();
        }
    }
    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
    @OnMessage
    public void onMessage(String message) {
        Gson gson = new Gson();
        Map map = gson.fromJson(message, Map.class);
        if (map != null && map.containsKey("s")) {
            RedisUtil.set("mexc" + map.get("s").toString(), message);
        try {
            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
            if (map != null && map.containsKey("s")) {
                Object object = map.get("d");
                Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
                HashMap<String,Object> hashMap = new HashMap<>();
                ObjectMapper mapper = new ObjectMapper();
                hashMap.put("bids",resultMap.get("bids"));
                hashMap.put("asks",resultMap.get("asks"));
                String key = "mexc" + map.get("s").toString();
                RedisUtil.set(key, mapper.writeValueAsString(hashMap));
            } else {
                log.warn("消息不包含 's' 字段或解析失败:" + message);
            }
        } catch (JsonSyntaxException e) {
            log.error("JSON 解析异常:" + e.getMessage(), e);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        // 没有过滤撤单的数据
    }
    @OnClose
    public void onClose() {
        System.out.println("连接已关闭,尝试重新连接...");
        session = null;
        if (!reconnecting) {
            reconnect();
        }
        log.info("mexc ws 连接已关闭,尝试重新连接...");
        handleConnectionClosedOrError();
    }
    @OnError
    public void onError(Throwable throwable) {
        System.err.println("发生错误: " + throwable.getMessage());
        if (!reconnecting) {
            reconnect();
        }
        log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable);
        handleConnectionClosedOrError();
    }
    private void reconnect() {
        if (reconnecting) {
            return;
        }
        reconnecting = true;
        executorService.schedule(() -> {
            try {
                connect();
                reconnecting = false;
            } catch (Exception e) {
                e.printStackTrace();
                reconnect();
    private void handleConnectionClosedOrError() {
        synchronized (lock) {
            if (!reconnecting) {
                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
            }
        }, calculateBackoffTime(), TimeUnit.MILLISECONDS);
        }
    }
    private long calculateBackoffTime() {
        // 实现退避策略,例如指数退避
        return 5000; // 例子:5秒
    private void attemptReconnect() {
        boolean doReconnect = true;
        try {
            log.info("mexc ws 开始重连");
            connect(); // 假设 connect() 方法用于实际的连接逻辑
            log.info("mexc ws 重连成功");
        } catch (Exception e) {
            log.error("mexc ws 重连失败", e);
            // 连接失败时,可以根据具体情况决定是否继续重连
            // 在这里假设总是继续尝试重连
        } finally {
            synchronized (lock) {
                if (doReconnect) {
                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
                } else {
                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
                }
            }
        }
    }
    private void scheduleReconnect() {
        if (!executorService.isShutdown()) {
            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
        }
    }
    private void sendPing() {
@@ -129,7 +159,7 @@
                session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("发送心跳失败", e);
        }
    }
websocketClient/src/main/resources/application.yml
@@ -1,5 +1,5 @@
server:
  port: 8090
  port: 8095
spring:
@@ -10,7 +10,7 @@
    # 端口,默认为6379
    port: 6379
    # 数据库索引
    database: 0
    database: 1
    # 密码
    password:
    # 连接超时时间