From 5c1e682edc2bc6c89cf0f34f93a438d1da274e64 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Sun, 21 Jul 2024 01:18:56 +0800
Subject: [PATCH] 1

---
 websocketClient/src/main/java/org/example/wsClient/MexcClient.java |  108 ++++++++++++++++++++++++++++++++++-------------------
 1 files changed, 69 insertions(+), 39 deletions(-)

diff --git a/websocketClient/src/main/java/org/example/wsClient/MexcClient.java b/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
index 6dceb87..80352a8 100644
--- a/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
+++ b/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
@@ -4,7 +4,10 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
 import org.example.pojo.Currency;
 import org.example.util.RedisUtil;
 
@@ -13,20 +16,23 @@
 import java.io.UncheckedIOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.*;
 
 @ClientEndpoint
+@Slf4j
 public class MexcClient {
-    private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址
-    private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping
-    private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间
+    private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws";
+    private static final long PING_INTERVAL = 20000;
 
     private final List<Currency> subscriptions;
     private final ScheduledExecutorService executorService;
     private Session session;
-    private boolean reconnecting = false;
+
+    private final Object lock = new Object(); // 添加一个锁对象
+    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
 
     public MexcClient(List<Currency> subscriptions) {
         this.subscriptions = subscriptions;
@@ -37,7 +43,7 @@
         try {
             connect();
             if (session == null) {
-                System.err.println("无法在超时时间内连接到服务器。");
+                log.info("无法在超时时间内连接到服务器。");
                 return;
             }
 
@@ -54,8 +60,7 @@
             }
 
         } catch (Exception e) {
-            System.err.println("连接过程中发生异常: " + e.getMessage());
-            e.printStackTrace();
+            log.error("mexc ws 连接过程中发生异常: " + e.getMessage(), e);
         } finally {
             executorService.shutdown();
         }
@@ -68,59 +73,84 @@
 
     @OnOpen
     public void onOpen(Session session) {
-        System.out.println("已连接到服务器。");
+        log.info("mexc ws 已连接到服务器。");
         this.session = session;
         synchronized (this) {
             this.notify();
         }
     }
+    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
 
     @OnMessage
     public void onMessage(String message) {
-        Gson gson = new Gson();
-        Map map = gson.fromJson(message, Map.class);
-        if (map != null && map.containsKey("s")) {
-            RedisUtil.set("mexc" + map.get("s").toString(), message);
+        try {
+            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
+            if (map != null && map.containsKey("s")) {
+                Object object = map.get("d");
+                Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
+                HashMap<String,Object> hashMap = new HashMap<>();
+                ObjectMapper mapper = new ObjectMapper();
+                hashMap.put("bids",resultMap.get("bids"));
+                hashMap.put("asks",resultMap.get("asks"));
+                String key = "mexc" + map.get("s").toString();
+                RedisUtil.set(key, mapper.writeValueAsString(hashMap));
+            } else {
+                log.warn("消息不包含 's' 字段或解析失败:" + message);
+            }
+        } catch (JsonSyntaxException e) {
+            log.error("JSON 解析异常:" + e.getMessage(), e);
+        } catch (JsonProcessingException e) {
+            throw new RuntimeException(e);
         }
-        // 没有过滤撤单的数据
     }
+
 
     @OnClose
     public void onClose() {
-        System.out.println("连接已关闭,尝试重新连接...");
-        session = null;
-        if (!reconnecting) {
-            reconnect();
-        }
+        log.info("mexc ws 连接已关闭,尝试重新连接...");
+        handleConnectionClosedOrError();
     }
 
     @OnError
     public void onError(Throwable throwable) {
-        System.err.println("发生错误: " + throwable.getMessage());
-        if (!reconnecting) {
-            reconnect();
-        }
+        log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable);
+        handleConnectionClosedOrError();
     }
 
-    private void reconnect() {
-        if (reconnecting) {
-            return;
-        }
-        reconnecting = true;
-        executorService.schedule(() -> {
-            try {
-                connect();
-                reconnecting = false;
-            } catch (Exception e) {
-                e.printStackTrace();
-                reconnect();
+    private void handleConnectionClosedOrError() {
+        synchronized (lock) {
+            if (!reconnecting) {
+                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
             }
-        }, calculateBackoffTime(), TimeUnit.MILLISECONDS);
+        }
     }
 
-    private long calculateBackoffTime() {
-        // 实现退避策略,例如指数退避
-        return 5000; // 例子:5秒
+    private void attemptReconnect() {
+        boolean doReconnect = true;
+        try {
+            log.info("mexc ws 开始重连");
+            connect(); // 假设 connect() 方法用于实际的连接逻辑
+            log.info("mexc ws 重连成功");
+        } catch (Exception e) {
+            log.error("mexc ws 重连失败", e);
+            // 连接失败时,可以根据具体情况决定是否继续重连
+            // 在这里假设总是继续尝试重连
+        } finally {
+            synchronized (lock) {
+                if (doReconnect) {
+                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
+                } else {
+                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
+                }
+            }
+        }
+    }
+
+    private void scheduleReconnect() {
+        if (!executorService.isShutdown()) {
+            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
+        }
     }
 
     private void sendPing() {
@@ -129,7 +159,7 @@
                 session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
             }
         } catch (Exception e) {
-            e.printStackTrace();
+            log.error("发送心跳失败", e);
         }
     }
 

--
Gitblit v1.9.3