package org.example.wsClient; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.gson.Gson; import org.example.pojo.Currency; import org.example.util.RedisUtil; import javax.websocket.*; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.concurrent.*; @ClientEndpoint public class MexcClient { private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址 private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间 private final List subscriptions; private final ScheduledExecutorService executorService; private Session session; private boolean reconnecting = false; public MexcClient(List subscriptions) { this.subscriptions = subscriptions; this.executorService = Executors.newScheduledThreadPool(1); } public void start() { try { connect(); if (session == null) { System.err.println("无法在超时时间内连接到服务器。"); return; } executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS); // 订阅消息 for (Currency subscription : subscriptions) { String parameter = getParameter(subscription.getSymbol()); session.getBasicRemote().sendText(parameter); } synchronized (this) { this.wait(); } } catch (Exception e) { System.err.println("连接过程中发生异常: " + e.getMessage()); e.printStackTrace(); } finally { executorService.shutdown(); } } private void connect() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(this, new URI(WS_ENDPOINT)); } @OnOpen public void onOpen(Session session) { System.out.println("已连接到服务器。"); this.session = session; synchronized (this) { this.notify(); } } @OnMessage public void onMessage(String message) { Gson gson = new Gson(); Map map = gson.fromJson(message, Map.class); if (map != null && map.containsKey("s")) { RedisUtil.set("mexc" + map.get("s").toString(), message); } // 没有过滤撤单的数据 } @OnClose public void onClose() { System.out.println("连接已关闭,尝试重新连接..."); session = null; if (!reconnecting) { reconnect(); } } @OnError public void onError(Throwable throwable) { System.err.println("发生错误: " + throwable.getMessage()); if (!reconnecting) { reconnect(); } } private void reconnect() { if (reconnecting) { return; } reconnecting = true; executorService.schedule(() -> { try { connect(); reconnecting = false; } catch (Exception e) { e.printStackTrace(); reconnect(); } }, calculateBackoffTime(), TimeUnit.MILLISECONDS); } private long calculateBackoffTime() { // 实现退避策略,例如指数退避 return 5000; // 例子:5秒 } private void sendPing() { try { if (session != null) { session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); } } catch (Exception e) { e.printStackTrace(); } } public String getParameter(String symbol) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); ObjectNode root = mapper.createObjectNode(); root.put("method", "SUBSCRIPTION"); ArrayNode paramsArray = mapper.createArrayNode(); String customParam = String.format("spot@public.limit.depth.v3.api@%s@20", symbol); paramsArray.add(customParam); root.set("params", paramsArray); return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(root); } }