From 4b8e10e605d28fc1b4ad3d33a6cf2bfbbea15bd5 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Fri, 19 Jul 2024 18:34:01 +0800
Subject: [PATCH] 1

---
 websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java     |   74 +++++++
 websocketClient/src/main/java/org/example/wsClient/KucoinClient.java |  172 +++++++++++++++++
 .idea/inspectionProfiles/Project_Default.xml                         |    1 
 websocketClient/src/main/java/org/example/util/RedisUtil.java        |    4 
 websocketClient/src/main/java/org/example/wsClient/MexcClient.java   |  108 ++++++---
 websocketClient/src/main/java/org/example/wsClient/GateClient.java   |  213 +++++++++++++++++++++
 websocketClient/src/main/resources/application.yml                   |    4 
 7 files changed, 530 insertions(+), 46 deletions(-)

diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
index 56d11c3..5db292a 100644
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -2,6 +2,7 @@
   <profile version="1.0">
     <option name="myName" value="Project Default" />
     <inspection_tool class="AliAccessStaticViaInstance" enabled="true" level="WARNING" enabled_by_default="true" />
+    <inspection_tool class="AliDeprecation" enabled="true" level="WARNING" enabled_by_default="true" />
     <inspection_tool class="AlibabaAbstractClassShouldStartWithAbstractNaming" enabled="true" level="WARNING" enabled_by_default="true" />
     <inspection_tool class="AlibabaAbstractMethodOrInterfaceMethodMustUseJavadoc" enabled="true" level="WARNING" enabled_by_default="true" />
     <inspection_tool class="AlibabaAvoidApacheBeanUtilsCopy" enabled="true" level="WARNING" enabled_by_default="true" />
diff --git a/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
index 0315b55..7d45771 100644
--- a/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
+++ b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
@@ -2,8 +2,18 @@
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
 import org.example.pojo.Currency;
 import org.example.server.impl.CurrencySerivceImpl;
+import org.example.wsClient.GateClient;
+import org.example.wsClient.KucoinClient;
 import org.example.wsClient.MexcClient;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
@@ -11,7 +21,9 @@
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.util.CollectionUtils;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 /**
  * @ClassDescription: 客户端请求类
@@ -28,11 +40,31 @@
     @Autowired
     private ThreadPoolTaskExecutor threadPoolTaskExecutor;
 
+//    @Bean
+//    public void mexcWebsocketRunClientMap() {
+//        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
+//        if (!CollectionUtils.isEmpty(mexc)) {
+//            int batchSize = 30; // 每个线程处理的数据量
+//            int totalSize = mexc.size();
+//            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+//
+//            for (int i = 0; i < threadCount; i++) {
+//                int fromIndex = i * batchSize;
+//                int toIndex = Math.min(fromIndex + batchSize, totalSize);
+//                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
+//
+//                // 使用自定义线程池提交任务
+//                threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
+//            }
+//
+//        }
+//    }
+
     @Bean
-    public void mexcWebsocketRunClientMap() {
-        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
+    public void gateWebsocketRunClientMap() {
+        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
         if (!CollectionUtils.isEmpty(mexc)) {
-            int batchSize = 30; // 每个线程处理的数据量
+            int batchSize = 100; // 每个线程处理的数据量
             int totalSize = mexc.size();
             int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
 
@@ -42,10 +74,44 @@
                 List<Currency> sublist = mexc.subList(fromIndex, toIndex);
 
                 // 使用自定义线程池提交任务
-                threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
+                threadPoolTaskExecutor.execute(new GateClient(sublist)::start);
             }
 
         }
     }
+
+    @Bean
+    public void kucoinWebsocketRunClientMap() throws Exception {
+        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
+        if (!CollectionUtils.isEmpty(mexc)) {
+            String token = doPost();
+            int batchSize = 100; // 每个线程处理的数据量
+            int totalSize = mexc.size();
+            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+
+            for (int i = 0; i < threadCount; i++) {
+                int fromIndex = i * batchSize;
+                int toIndex = Math.min(fromIndex + batchSize, totalSize);
+                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
+
+                // 使用自定义线程池提交任务
+                threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start);
+            }
+
+        }
+    }
+
+    public static String doPost() throws Exception {
+        String url = "https://api.kucoin.com/api/v1/bullet-public";
+        HttpPost httpPost = new HttpPost(url);
+        DefaultHttpClient defaultHttpClient = new DefaultHttpClient();
+        List<NameValuePair> nvps = new ArrayList<NameValuePair>();
+        httpPost.setEntity(new UrlEncodedFormEntity(nvps));
+        HttpResponse response = defaultHttpClient.execute(httpPost);
+        HttpEntity respEntity = response.getEntity();
+        String text = EntityUtils.toString(respEntity, "UTF-8");
+        defaultHttpClient.getConnectionManager().shutdown();
+        return text;
+    }
 }
 
diff --git a/websocketClient/src/main/java/org/example/util/RedisUtil.java b/websocketClient/src/main/java/org/example/util/RedisUtil.java
index 63361dd..dab4e20 100644
--- a/websocketClient/src/main/java/org/example/util/RedisUtil.java
+++ b/websocketClient/src/main/java/org/example/util/RedisUtil.java
@@ -9,11 +9,13 @@
 public class RedisUtil {
     private static JedisPool jedisPool;
 
-    // 在静态代码块中初始化 JedisPool
     static {
         jedisPool = new JedisPool(new JedisPoolConfig(), "localhost", 6379);
     }
 
+    // 私有构造方法,防止实例化
+    private RedisUtil() {}
+
     public static void set(String key, String value) {
         try (Jedis jedis = jedisPool.getResource()) {
             jedis.set(key, value);
diff --git a/websocketClient/src/main/java/org/example/wsClient/GateClient.java b/websocketClient/src/main/java/org/example/wsClient/GateClient.java
new file mode 100644
index 0000000..729bf0f
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/wsClient/GateClient.java
@@ -0,0 +1,213 @@
+package org.example.wsClient;
+
+import com.alibaba.druid.support.json.JSONUtils;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
+import org.example.pojo.Currency;
+import org.example.util.RedisUtil;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import javax.websocket.*;
+import java.math.BigDecimal;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.json.JSONObject;
+/**
+ * @program: demo
+ * @description:
+ * @create: 2024-07-18 15:30
+ **/
+@ClientEndpoint
+@Slf4j
+public class GateClient {
+
+    private static final String WS_ENDPOINT = "wss://api.gateio.ws/ws/v4/";
+    private static final long PING_INTERVAL = 20000;
+
+    private final List<Currency> subscriptions;
+    private final ScheduledExecutorService executorService;
+    private Session session;
+
+    private final Object lock = new Object(); // 添加一个锁对象
+    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
+
+    public GateClient(List<Currency> subscriptions) {
+        this.subscriptions = subscriptions;
+        this.executorService = Executors.newScheduledThreadPool(1);
+    }
+
+    public void start() {
+        try {
+            connect();
+            if (session == null) {
+                log.info("无法在超时时间内连接到服务器。");
+                return;
+            }
+
+            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
+
+            // 订阅消息
+            for (Currency subscription : subscriptions) {
+                String parameter = getParameter(subscription.getSymbol());
+                session.getBasicRemote().sendText(parameter);
+            }
+
+            synchronized (this) {
+                this.wait();
+            }
+
+        } catch (Exception e) {
+            log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e);
+        } finally {
+            executorService.shutdown();
+        }
+    }
+
+    private void connect() throws Exception {
+        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+        container.connectToServer(this, new URI(WS_ENDPOINT));
+    }
+
+    @OnOpen
+    public void onOpen(Session session) {
+        log.info("gate ws 已连接到服务器。");
+        this.session = session;
+        synchronized (this) {
+            this.notify();
+        }
+    }
+    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
+
+    @OnMessage
+    public void onMessage(String message) {
+        try {
+            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
+            Object object = map.get("result");
+            Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
+            if (null != resultMap && null != resultMap.get("s")) {
+                HashMap<String,Object> hashMap = new HashMap<>();
+                ObjectMapper mapper = new ObjectMapper();
+                hashMap.put("bids",resultMap.get("bids"));
+                hashMap.put("asks",resultMap.get("asks"));
+                String key = "gate" + resultMap.get("s");
+                if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){
+                    System.out.println(hashMap.toString());
+                }
+                RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
+            }
+        } catch (JsonSyntaxException e) {
+            log.error("JSON 解析异常:" + e.getMessage(), e);
+        } catch (JsonProcessingException e) {
+            log.error("JSON 解析异常:" + e.getMessage(), e);
+        }
+    }
+
+    @OnClose
+    public void onClose() {
+        log.info("gate ws 连接已关闭,尝试重新连接...");
+        handleConnectionClosedOrError();
+    }
+
+    @OnError
+    public void onError(Throwable throwable) {
+        log.error("gate ws 发生错误: " + throwable.getMessage(), throwable);
+        handleConnectionClosedOrError();
+    }
+
+    private void handleConnectionClosedOrError() {
+        synchronized (lock) {
+            if (!reconnecting) {
+                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
+            }
+        }
+    }
+
+    private void attemptReconnect() {
+        boolean doReconnect = true;
+        try {
+            log.info("gate ws 开始重连");
+            connect(); // 假设 connect() 方法用于实际的连接逻辑
+            log.info("gate ws 重连成功");
+        } catch (Exception e) {
+            log.error("gate ws 重连失败", e);
+            // 连接失败时,可以根据具体情况决定是否继续重连
+            // 在这里假设总是继续尝试重连
+        } finally {
+            synchronized (lock) {
+                if (doReconnect) {
+                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
+                } else {
+                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
+                }
+            }
+        }
+    }
+
+    private void scheduleReconnect() {
+        if (!executorService.isShutdown()) {
+            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
+        }
+    }
+
+    private void sendPing() {
+        try {
+            if (session != null) {
+                session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
+            }
+        } catch (Exception e) {
+            log.error("发送心跳失败", e);
+        }
+    }
+
+    public String getParameter(String symbol) throws JsonProcessingException, JSONException {
+        // 替换USDT为_USDT
+        symbol = symbol.replaceAll("USDT", "_USDT");
+
+        // 创建一个ObjectMapper实例
+        ObjectMapper mapper = new ObjectMapper();
+        // 获取当前时间的毫秒数
+        long currentTimeMillis = System.currentTimeMillis();
+
+        // 定义常量
+        final String CHANNEL = "spot.order_book";  // 固定频道
+        final String EVENT_SUBSCRIBE = "subscribe"; // 订阅事件
+        final String EVENT_UNSUBSCRIBE = "unsubscribe"; // 取消订阅事件
+        final String[] PAYLOAD = new String[] {symbol, "20", "100ms"}; // 负载信息
+
+        // 使用Map构建JSON对象
+        Map<String, Object> jsonMap = new HashMap<>(); // 创建Map用于存放JSON内容
+        jsonMap.put("time", currentTimeMillis); // 放入当前时间
+        jsonMap.put("channel", CHANNEL); // 放入频道
+        jsonMap.put("event", EVENT_SUBSCRIBE); // 放入事件类型
+        jsonMap.put("payload", Arrays.asList(PAYLOAD)); // 将数组转换为List并放入Map
+
+        // 将Map转换为JSON字符串
+        String jsonString = mapper.writeValueAsString(jsonMap); // 使用ObjectMapper转换
+        return jsonString; // 返回JSON字符串
+
+    }
+
+    public static void main(String[] args) {
+        String scientificNotation = "5.0E-5"; // 科学计数法字符串
+        BigDecimal bigDecimal = new BigDecimal(scientificNotation);
+
+        System.out.println("Scientific Notation: " + scientificNotation);
+        System.out.println("BigDecimal Value: " + bigDecimal);
+    }
+
+}
diff --git a/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java b/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
new file mode 100644
index 0000000..2700dc6
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
@@ -0,0 +1,172 @@
+package org.example.wsClient;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
+import org.example.pojo.Currency;
+import org.example.util.RedisUtil;
+
+import javax.websocket.*;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @program: demo
+ * @description:
+ * @create: 2024-07-19 16:44
+ **/
+@ClientEndpoint
+@Slf4j
+public class KucoinClient {
+
+    private static final String WS_ENDPOINT = "wss://api.kucoinio.ws/ws/v4/";
+    private static final long PING_INTERVAL = 20000;
+
+    private final List<Currency> subscriptions;
+    private final ScheduledExecutorService executorService;
+    private Session session;
+    private String token;
+    private final Object lock = new Object(); // 添加一个锁对象
+    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
+
+    public KucoinClient(List<Currency> subscriptions,String token) {
+        this.subscriptions = subscriptions;
+        this.token = token;
+        this.executorService = Executors.newScheduledThreadPool(1);
+    }
+
+    public void start() {
+        try {
+            connect();
+            if (session == null) {
+                log.info("无法在超时时间内连接到服务器。");
+                return;
+            }
+
+            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
+
+            // 订阅消息
+            for (Currency subscription : subscriptions) {
+                String parameter = getParameter(subscription.getSymbol());
+                session.getBasicRemote().sendText(parameter);
+            }
+
+            synchronized (this) {
+                this.wait();
+            }
+
+        } catch (Exception e) {
+            log.error("kucoin ws 连接过程中发生异常: " + e.getMessage(), e);
+        } finally {
+            executorService.shutdown();
+        }
+    }
+
+    private void connect() throws Exception {
+        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+        container.connectToServer(this, new URI(WS_ENDPOINT));
+    }
+
+    @OnOpen
+    public void onOpen(Session session) {
+        log.info("kucoin ws 已连接到服务器。");
+        this.session = session;
+        synchronized (this) {
+            this.notify();
+        }
+    }
+    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
+
+    @OnMessage
+    public void onMessage(String message) {
+        try {
+            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
+            Object object = map.get("result");
+            Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
+            if (null != resultMap && null != resultMap.get("s")) {
+                HashMap<String,Object> hashMap = new HashMap<>();
+                ObjectMapper mapper = new ObjectMapper();
+                hashMap.put("bids",resultMap.get("bids"));
+                hashMap.put("asks",resultMap.get("asks"));
+                String key = "kucoin" + resultMap.get("s");
+                if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){
+                    System.out.println(hashMap.toString());
+                }
+                RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
+            }
+        } catch (JsonSyntaxException e) {
+            log.error("JSON 解析异常:" + e.getMessage(), e);
+        } catch (JsonProcessingException e) {
+            log.error("JSON 解析异常:" + e.getMessage(), e);
+        }
+    }
+
+    @OnClose
+    public void onClose() {
+        log.info("kucoin ws 连接已关闭,尝试重新连接...");
+        handleConnectionClosedOrError();
+    }
+
+    @OnError
+    public void onError(Throwable throwable) {
+        log.error("kucoin ws 发生错误: " + throwable.getMessage(), throwable);
+        handleConnectionClosedOrError();
+    }
+
+    private void handleConnectionClosedOrError() {
+        synchronized (lock) {
+            if (!reconnecting) {
+                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
+            }
+        }
+    }
+
+    private void attemptReconnect() {
+        boolean doReconnect = true;
+        try {
+            log.info("kucoin ws 开始重连");
+            connect(); // 假设 connect() 方法用于实际的连接逻辑
+            log.info("kucoin ws 重连成功");
+        } catch (Exception e) {
+            log.error("kucoin ws 重连失败", e);
+            // 连接失败时,可以根据具体情况决定是否继续重连
+            // 在这里假设总是继续尝试重连
+        } finally {
+            synchronized (lock) {
+                if (doReconnect) {
+                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
+                } else {
+                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
+                }
+            }
+        }
+    }
+
+    private void scheduleReconnect() {
+        if (!executorService.isShutdown()) {
+            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
+        }
+    }
+
+    private void sendPing() {
+        try {
+            if (session != null) {
+                session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
+            }
+        } catch (Exception e) {
+            log.error("发送心跳失败", e);
+        }
+    }
+}
+
+
diff --git a/websocketClient/src/main/java/org/example/wsClient/MexcClient.java b/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
index 6dceb87..80352a8 100644
--- a/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
+++ b/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
@@ -4,7 +4,10 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
 import org.example.pojo.Currency;
 import org.example.util.RedisUtil;
 
@@ -13,20 +16,23 @@
 import java.io.UncheckedIOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
 
 @ClientEndpoint
+@Slf4j
 public class MexcClient {
-    private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址
-    private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping
-    private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间
+    private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws";
+    private static final long PING_INTERVAL = 20000;
 
     private final List<Currency> subscriptions;
     private final ScheduledExecutorService executorService;
     private Session session;
-    private boolean reconnecting = false;
+
+    private final Object lock = new Object(); // 添加一个锁对象
+    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
 
     public MexcClient(List<Currency> subscriptions) {
         this.subscriptions = subscriptions;
@@ -37,7 +43,7 @@
         try {
             connect();
             if (session == null) {
-                System.err.println("无法在超时时间内连接到服务器。");
+                log.info("无法在超时时间内连接到服务器。");
                 return;
             }
 
@@ -54,8 +60,7 @@
             }
 
         } catch (Exception e) {
-            System.err.println("连接过程中发生异常: " + e.getMessage());
-            e.printStackTrace();
+            log.error("mexc ws 连接过程中发生异常: " + e.getMessage(), e);
         } finally {
             executorService.shutdown();
         }
@@ -68,59 +73,84 @@
 
     @OnOpen
     public void onOpen(Session session) {
-        System.out.println("已连接到服务器。");
+        log.info("mexc ws 已连接到服务器。");
         this.session = session;
         synchronized (this) {
             this.notify();
         }
     }
+    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
 
     @OnMessage
     public void onMessage(String message) {
-        Gson gson = new Gson();
-        Map map = gson.fromJson(message, Map.class);
-        if (map != null && map.containsKey("s")) {
-            RedisUtil.set("mexc" + map.get("s").toString(), message);
+        try {
+            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
+            if (map != null && map.containsKey("s")) {
+                Object object = map.get("d");
+                Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
+                HashMap<String,Object> hashMap = new HashMap<>();
+                ObjectMapper mapper = new ObjectMapper();
+                hashMap.put("bids",resultMap.get("bids"));
+                hashMap.put("asks",resultMap.get("asks"));
+                String key = "mexc" + map.get("s").toString();
+                RedisUtil.set(key, mapper.writeValueAsString(hashMap));
+            } else {
+                log.warn("消息不包含 's' 字段或解析失败:" + message);
+            }
+        } catch (JsonSyntaxException e) {
+            log.error("JSON 解析异常:" + e.getMessage(), e);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
         }
-        // 没有过滤撤单的数据
     }
+
 
     @OnClose
     public void onClose() {
-        System.out.println("连接已关闭,尝试重新连接...");
-        session = null;
-        if (!reconnecting) {
-            reconnect();
-        }
+        log.info("mexc ws 连接已关闭,尝试重新连接...");
+        handleConnectionClosedOrError();
     }
 
     @OnError
     public void onError(Throwable throwable) {
-        System.err.println("发生错误: " + throwable.getMessage());
-        if (!reconnecting) {
-            reconnect();
-        }
+        log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable);
+        handleConnectionClosedOrError();
     }
 
-    private void reconnect() {
-        if (reconnecting) {
-            return;
-        }
-        reconnecting = true;
-        executorService.schedule(() -> {
-            try {
-                connect();
-                reconnecting = false;
-            } catch (Exception e) {
-                e.printStackTrace();
-                reconnect();
+    private void handleConnectionClosedOrError() {
+        synchronized (lock) {
+            if (!reconnecting) {
+                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
             }
-        }, calculateBackoffTime(), TimeUnit.MILLISECONDS);
+        }
     }
 
-    private long calculateBackoffTime() {
-        // 实现退避策略,例如指数退避
-        return 5000; // 例子:5秒
+    private void attemptReconnect() {
+        boolean doReconnect = true;
+        try {
+            log.info("mexc ws 开始重连");
+            connect(); // 假设 connect() 方法用于实际的连接逻辑
+            log.info("mexc ws 重连成功");
+        } catch (Exception e) {
+            log.error("mexc ws 重连失败", e);
+            // 连接失败时,可以根据具体情况决定是否继续重连
+            // 在这里假设总是继续尝试重连
+        } finally {
+            synchronized (lock) {
+                if (doReconnect) {
+                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
+                } else {
+                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
+                }
+            }
+        }
+    }
+
+    private void scheduleReconnect() {
+        if (!executorService.isShutdown()) {
+            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
+        }
     }
 
     private void sendPing() {
@@ -129,7 +159,7 @@
                 session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
             }
         } catch (Exception e) {
-            e.printStackTrace();
+            log.error("发送心跳失败", e);
         }
     }
 
diff --git a/websocketClient/src/main/resources/application.yml b/websocketClient/src/main/resources/application.yml
index 8b50501..9253a37 100644
--- a/websocketClient/src/main/resources/application.yml
+++ b/websocketClient/src/main/resources/application.yml
@@ -1,5 +1,5 @@
 server:
-  port: 8090
+  port: 8095
 
 
 spring:
@@ -10,7 +10,7 @@
     # 端口,默认为6379
     port: 6379
     # 数据库索引
-    database: 0
+    database: 1
     # 密码
     password:
     # 连接超时时间

--
Gitblit v1.9.3