package org.example.mexcclient.wsClient; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.mexcclient.pojo.Currency; import org.example.mexcclient.util.RedisUtil; import javax.websocket.*; import java.io.IOException; import java.io.UncheckedIOException; import java.lang.reflect.Type; import java.math.BigDecimal; import java.net.URI; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; @ClientEndpoint @Slf4j public class MexcClient { private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; private static final long PING_INTERVAL = 20000; private final List subscriptions; private final ScheduledExecutorService executorService; private Session session; private final Object lock = new Object(); // 添加一个锁对象 private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性 public MexcClient(List subscriptions) { this.subscriptions = subscriptions; this.executorService = Executors.newScheduledThreadPool(1); } public void start() { try { connect(); if (session == null) { log.info("无法在超时时间内连接到服务器。"); return; } executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS); // 订阅消息 for (Currency subscription : subscriptions) { String parameter = getParameter(subscription.getSymbol()); session.getBasicRemote().sendText(parameter); } synchronized (this) { this.wait(); } } catch (Exception e) { log.error("mexc ws 连接过程中发生异常: " + e.getMessage(), e); } finally { executorService.shutdown(); } } private void connect() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(this, new URI(WS_ENDPOINT)); } @OnOpen public void onOpen(Session session) { log.info("mexc ws 已连接到服务器。"); this.session = session; synchronized (this) { this.notify(); } } private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例 @OnMessage public void onMessage(String message) { try { Map map = gson.fromJson(message, new TypeToken>() {}.getType()); if (map != null && map.containsKey("s")) { Object object = map.get("d"); Map resultMap = gson.fromJson(object.toString(), new TypeToken>() {}.getType()); HashMap hashMap = new HashMap<>(); Object asksObj = resultMap.get("asks"); Object bidsObj = resultMap.get("bids"); Type listType = new TypeToken>>(){}.getType(); List> asksList = gson.fromJson(asksObj.toString(), listType); List> bidsList = gson.fromJson(bidsObj.toString(), listType); if(!asksList.isEmpty() && !asksList.get(0).isEmpty() && !bidsList.isEmpty() && !bidsList.get(0).isEmpty()){ if (!asksList.isEmpty() && !asksList.get(0).isEmpty()) { Map objectMap = asksList.get(0); HashMap pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks pvMap.put("p", new BigDecimal(objectMap.get("p").toString()).toPlainString()); pvMap.put("v", new BigDecimal(objectMap.get("v").toString()).toPlainString()); hashMap.put("asks", pvMap); } if (!bidsList.isEmpty() && !bidsList.get(0).isEmpty()) { Map objectMap = bidsList.get(0); HashMap pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks pvMap.put("p", new BigDecimal(objectMap.get("p").toString()).toPlainString()); pvMap.put("v", new BigDecimal(objectMap.get("v").toString()).toPlainString()); hashMap.put("bids", pvMap); } ObjectMapper mapper = new ObjectMapper(); String key = "mexc" + map.get("s").toString(); try { RedisUtil.set(key, mapper.writeValueAsString(hashMap)); } catch (JsonProcessingException e) { e.printStackTrace(); } } } } catch (JsonSyntaxException e) { log.error("JSON 解析异常:" + e.getMessage(), e); } } @OnClose public void onClose() { log.info("mexc ws 连接已关闭,尝试重新连接..."); handleConnectionClosedOrError(); } @OnError public void onError(Throwable throwable) { log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable); handleConnectionClosedOrError(); } private void handleConnectionClosedOrError() { synchronized (lock) { if (!reconnecting) { reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 } } } private void attemptReconnect() { boolean doReconnect = true; try { log.info("mexc ws 开始重连"); connect(); // 假设 connect() 方法用于实际的连接逻辑 log.info("mexc ws 重连成功"); } catch (Exception e) { log.error("mexc ws 重连失败", e); // 连接失败时,可以根据具体情况决定是否继续重连 // 在这里假设总是继续尝试重连 } finally { synchronized (lock) { if (doReconnect) { scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务 } else { reconnecting = false; // 重连结束后设置 reconnecting 为 false } } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); } } private void sendPing() { try { if (session != null) { session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); } } catch (Exception e) { log.error("发送心跳失败", e); } } public String getParameter(String symbol) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); ObjectNode root = mapper.createObjectNode(); root.put("method", "SUBSCRIPTION"); ArrayNode paramsArray = mapper.createArrayNode(); String customParam = String.format("spot@public.limit.depth.v3.api@%s@5", symbol); paramsArray.add(customParam); root.set("params", paramsArray); return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(root); } }