From 4b8e10e605d28fc1b4ad3d33a6cf2bfbbea15bd5 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Fri, 19 Jul 2024 18:34:01 +0800
Subject: [PATCH] 1
---
websocketClient/src/main/java/org/example/wsClient/MexcClient.java | 108 ++++++++++++++++++++++++++++++++++-------------------
1 files changed, 69 insertions(+), 39 deletions(-)
diff --git a/websocketClient/src/main/java/org/example/wsClient/MexcClient.java b/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
index 6dceb87..80352a8 100644
--- a/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
+++ b/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
@@ -4,7 +4,10 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
import org.example.pojo.Currency;
import org.example.util.RedisUtil;
@@ -13,20 +16,23 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@ClientEndpoint
+@Slf4j
public class MexcClient {
- private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址
- private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping
- private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间
+ private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws";
+ private static final long PING_INTERVAL = 20000;
private final List<Currency> subscriptions;
private final ScheduledExecutorService executorService;
private Session session;
- private boolean reconnecting = false;
+
+ private final Object lock = new Object(); // 添加一个锁对象
+ private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
public MexcClient(List<Currency> subscriptions) {
this.subscriptions = subscriptions;
@@ -37,7 +43,7 @@
try {
connect();
if (session == null) {
- System.err.println("无法在超时时间内连接到服务器。");
+ log.info("无法在超时时间内连接到服务器。");
return;
}
@@ -54,8 +60,7 @@
}
} catch (Exception e) {
- System.err.println("连接过程中发生异常: " + e.getMessage());
- e.printStackTrace();
+ log.error("mexc ws 连接过程中发生异常: " + e.getMessage(), e);
} finally {
executorService.shutdown();
}
@@ -68,59 +73,84 @@
@OnOpen
public void onOpen(Session session) {
- System.out.println("已连接到服务器。");
+ log.info("mexc ws 已连接到服务器。");
this.session = session;
synchronized (this) {
this.notify();
}
}
+ private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
@OnMessage
public void onMessage(String message) {
- Gson gson = new Gson();
- Map map = gson.fromJson(message, Map.class);
- if (map != null && map.containsKey("s")) {
- RedisUtil.set("mexc" + map.get("s").toString(), message);
+ try {
+ Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
+ if (map != null && map.containsKey("s")) {
+ Object object = map.get("d");
+ Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
+ HashMap<String,Object> hashMap = new HashMap<>();
+ ObjectMapper mapper = new ObjectMapper();
+ hashMap.put("bids",resultMap.get("bids"));
+ hashMap.put("asks",resultMap.get("asks"));
+ String key = "mexc" + map.get("s").toString();
+ RedisUtil.set(key, mapper.writeValueAsString(hashMap));
+ } else {
+ log.warn("消息不包含 's' 字段或解析失败:" + message);
+ }
+ } catch (JsonSyntaxException e) {
+ log.error("JSON 解析异常:" + e.getMessage(), e);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
}
- // 没有过滤撤单的数据
}
+
@OnClose
public void onClose() {
- System.out.println("连接已关闭,尝试重新连接...");
- session = null;
- if (!reconnecting) {
- reconnect();
- }
+ log.info("mexc ws 连接已关闭,尝试重新连接...");
+ handleConnectionClosedOrError();
}
@OnError
public void onError(Throwable throwable) {
- System.err.println("发生错误: " + throwable.getMessage());
- if (!reconnecting) {
- reconnect();
- }
+ log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable);
+ handleConnectionClosedOrError();
}
- private void reconnect() {
- if (reconnecting) {
- return;
- }
- reconnecting = true;
- executorService.schedule(() -> {
- try {
- connect();
- reconnecting = false;
- } catch (Exception e) {
- e.printStackTrace();
- reconnect();
+ private void handleConnectionClosedOrError() {
+ synchronized (lock) {
+ if (!reconnecting) {
+ reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+ executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
}
- }, calculateBackoffTime(), TimeUnit.MILLISECONDS);
+ }
}
- private long calculateBackoffTime() {
- // 实现退避策略,例如指数退避
- return 5000; // 例子:5秒
+ private void attemptReconnect() {
+ boolean doReconnect = true;
+ try {
+ log.info("mexc ws 开始重连");
+ connect(); // 假设 connect() 方法用于实际的连接逻辑
+ log.info("mexc ws 重连成功");
+ } catch (Exception e) {
+ log.error("mexc ws 重连失败", e);
+ // 连接失败时,可以根据具体情况决定是否继续重连
+ // 在这里假设总是继续尝试重连
+ } finally {
+ synchronized (lock) {
+ if (doReconnect) {
+ scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
+ } else {
+ reconnecting = false; // 重连结束后设置 reconnecting 为 false
+ }
+ }
+ }
+ }
+
+ private void scheduleReconnect() {
+ if (!executorService.isShutdown()) {
+ executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
+ }
}
private void sendPing() {
@@ -129,7 +159,7 @@
session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
}
} catch (Exception e) {
- e.printStackTrace();
+ log.error("发送心跳失败", e);
}
}
--
Gitblit v1.9.3