package org.example.mexcclient.wsClient; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.mexcclient.MexcClientApplication; import org.example.mexcclient.comm.ApplicationContextProvider; import org.example.mexcclient.pojo.Currency; import org.example.mexcclient.util.RedisUtil; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; import javax.websocket.*; import java.io.IOException; import java.io.UncheckedIOException; import java.lang.reflect.Type; import java.math.BigDecimal; import java.net.URI; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; @ClientEndpoint @Slf4j @Component public class MexcClient { private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; private static final long PING_INTERVAL = 20000; private final List subscriptions; private final ScheduledExecutorService executorService; private Session session; private final Object lock = new Object(); // 添加一个锁对象 private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性 public MexcClient(List subscriptions) { this.subscriptions = subscriptions; this.executorService = Executors.newScheduledThreadPool(1); } public void start() throws Exception { try { connect(); if (session == null) { log.info("无法在超时时间内连接到服务器。"); return; } executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS); // 订阅消息 for (Currency subscription : subscriptions) { String parameter = getParameter(subscription.getSymbol()); session.getBasicRemote().sendText(parameter); } synchronized (this) { this.wait(); } } catch (Exception e) { log.error("mexc ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常 throw e; } finally { executorService.shutdown(); } } private void connect() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(this, new URI(WS_ENDPOINT)); } @OnOpen public void onOpen(Session session) { log.info("mexc ws 已连接到服务器。"); this.session = session; synchronized (this) { this.notify(); } } private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例 @OnMessage public void onMessage(String message) { try { Map map = gson.fromJson(message, new TypeToken>() {}.getType()); if (map != null && map.containsKey("s")) { Object object = map.get("d"); Map resultMap = gson.fromJson(object.toString(), new TypeToken>() {}.getType()); HashMap hashMap = new HashMap<>(); Object bidsObj = resultMap.get("asks"); Object asksObj = resultMap.get("bids"); Type listType = new TypeToken>>(){}.getType(); List> asksList = gson.fromJson(asksObj.toString(), listType); List> bidsList = gson.fromJson(bidsObj.toString(), listType); if(!asksList.isEmpty() && !asksList.get(0).isEmpty() && !bidsList.isEmpty() && !bidsList.get(0).isEmpty()){ if (!asksList.isEmpty() && !asksList.get(0).isEmpty()) { Map objectMap = asksList.get(0); HashMap pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks pvMap.put("p", new BigDecimal(objectMap.get("p").toString()).toPlainString()); pvMap.put("v", new BigDecimal(objectMap.get("v").toString()).toPlainString()); hashMap.put("asks", pvMap); } if (!bidsList.isEmpty() && !bidsList.get(0).isEmpty()) { Map objectMap = bidsList.get(0); HashMap pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks pvMap.put("p", new BigDecimal(objectMap.get("p").toString()).toPlainString()); pvMap.put("v", new BigDecimal(objectMap.get("v").toString()).toPlainString()); hashMap.put("bids", pvMap); } ObjectMapper mapper = new ObjectMapper(); String key = "mexc" + map.get("s").toString(); try { RedisUtil.set(key, mapper.writeValueAsString(hashMap)); } catch (JsonProcessingException e) { e.printStackTrace(); } } } } catch (JsonSyntaxException e) { log.error("JSON 解析异常:" + e.getMessage(), e); } } @OnClose public void onClose() throws Exception { log.info("mexc ws 连接已关闭,尝试重新连接..."); throw new Exception(); } @OnError public void onError(Throwable throwable) throws Exception { log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable); throw new Exception(); } // private void handleConnectionClosedOrError() { // synchronized (lock) { // if (!reconnecting) { // reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 // executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 // } // } // } // // private void attemptReconnect() { // boolean doReconnect = true; // try { // log.info("mexc ws 开始重连"); // connect(); // 假设 connect() 方法用于实际的连接逻辑 // log.info("mexc ws 重连成功"); // } catch (Exception e) { // log.error("mexc ws 重连失败", e); // // 连接失败时,可以根据具体情况决定是否继续重连 // // 在这里假设总是继续尝试重连 // } finally { // synchronized (lock) { // if (doReconnect) { // scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务 // } else { // reconnecting = false; // 重连结束后设置 reconnecting 为 false // } // } // } // } // // private void scheduleReconnect() { // if (!executorService.isShutdown()) { // executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // } // } private void sendPing() { try { if (session != null) { session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); } } catch (Exception e) { log.error("发送心跳失败", e); } } public String getParameter(String symbol) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); ObjectNode root = mapper.createObjectNode(); root.put("method", "SUBSCRIPTION"); ArrayNode paramsArray = mapper.createArrayNode(); String customParam = String.format("spot@public.limit.depth.v3.api@%s@5", symbol); paramsArray.add(customParam); root.set("params", paramsArray); return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(root); } }