package org.example.mexcclient.wsClient;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.node.ArrayNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.google.common.reflect.TypeToken;
|
import com.google.gson.Gson;
|
import com.google.gson.JsonSyntaxException;
|
import lombok.extern.slf4j.Slf4j;
|
import org.example.mexcclient.MexcClientApplication;
|
import org.example.mexcclient.comm.ApplicationContextProvider;
|
import org.example.mexcclient.pojo.Currency;
|
import org.example.mexcclient.util.RedisUtil;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.boot.SpringApplication;
|
import org.springframework.context.ApplicationContext;
|
import org.springframework.context.ConfigurableApplicationContext;
|
import org.springframework.stereotype.Component;
|
|
import javax.websocket.*;
|
import java.io.IOException;
|
import java.io.UncheckedIOException;
|
import java.lang.reflect.Type;
|
import java.math.BigDecimal;
|
import java.net.URI;
|
import java.nio.ByteBuffer;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.*;
|
|
@ClientEndpoint
|
@Slf4j
|
@Component
|
public class MexcClient {
|
private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws";
|
private static final long PING_INTERVAL = 20000;
|
|
private final List<Currency> subscriptions;
|
private final ScheduledExecutorService executorService;
|
private Session session;
|
|
private final Object lock = new Object(); // 添加一个锁对象
|
private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
|
|
public MexcClient(List<Currency> subscriptions) {
|
this.subscriptions = subscriptions;
|
this.executorService = Executors.newScheduledThreadPool(1);
|
}
|
|
public void start() throws Exception {
|
try {
|
connect();
|
if (session == null) {
|
log.info("无法在超时时间内连接到服务器。");
|
return;
|
}
|
|
executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
|
|
// 订阅消息
|
for (Currency subscription : subscriptions) {
|
String parameter = getParameter(subscription.getSymbol());
|
session.getBasicRemote().sendText(parameter);
|
}
|
|
synchronized (this) {
|
this.wait();
|
}
|
|
} catch (Exception e) {
|
log.error("mexc ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常
|
throw e;
|
} finally {
|
executorService.shutdown();
|
}
|
}
|
|
private void connect() throws Exception {
|
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
|
container.connectToServer(this, new URI(WS_ENDPOINT));
|
}
|
|
@OnOpen
|
public void onOpen(Session session) {
|
log.info("mexc ws 已连接到服务器。");
|
this.session = session;
|
synchronized (this) {
|
this.notify();
|
}
|
}
|
private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
|
|
@OnMessage
|
public void onMessage(String message) {
|
try {
|
Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
|
|
|
if (map != null && map.containsKey("s")) {
|
|
Object object = map.get("d");
|
Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
|
HashMap<String,Object> hashMap = new HashMap<>();
|
Object bidsObj = resultMap.get("asks");
|
Object asksObj = resultMap.get("bids");
|
|
Type listType = new TypeToken<List<Map<String,Object>>>(){}.getType();
|
List<Map<String,Object>> asksList = gson.fromJson(asksObj.toString(), listType);
|
List<Map<String,Object>> bidsList = gson.fromJson(bidsObj.toString(), listType);
|
|
|
if(!asksList.isEmpty() && !asksList.get(0).isEmpty() && !bidsList.isEmpty() && !bidsList.get(0).isEmpty()){
|
if (!asksList.isEmpty() && !asksList.get(0).isEmpty()) {
|
|
Map<String,Object> objectMap = asksList.get(0);
|
HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
|
pvMap.put("p", new BigDecimal(objectMap.get("p").toString()).toPlainString());
|
pvMap.put("v", new BigDecimal(objectMap.get("v").toString()).toPlainString());
|
hashMap.put("asks", pvMap);
|
}
|
|
if (!bidsList.isEmpty() && !bidsList.get(0).isEmpty()) {
|
Map<String,Object> objectMap = bidsList.get(0);
|
HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
|
pvMap.put("p", new BigDecimal(objectMap.get("p").toString()).toPlainString());
|
pvMap.put("v", new BigDecimal(objectMap.get("v").toString()).toPlainString());
|
hashMap.put("bids", pvMap);
|
}
|
|
ObjectMapper mapper = new ObjectMapper();
|
String key = "mexc" + map.get("s").toString();
|
try {
|
RedisUtil.set(key, mapper.writeValueAsString(hashMap));
|
} catch (JsonProcessingException e) {
|
e.printStackTrace();
|
}
|
}
|
}
|
} catch (JsonSyntaxException e) {
|
log.error("JSON 解析异常:" + e.getMessage(), e);
|
}
|
}
|
|
@OnClose
|
public void onClose() throws Exception {
|
log.info("mexc ws 连接已关闭,尝试重新连接...");
|
throw new Exception();
|
}
|
|
@OnError
|
public void onError(Throwable throwable) throws Exception {
|
log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable);
|
throw new Exception();
|
}
|
|
// private void handleConnectionClosedOrError() {
|
// synchronized (lock) {
|
// if (!reconnecting) {
|
// reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
|
// executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
|
// }
|
// }
|
// }
|
//
|
// private void attemptReconnect() {
|
// boolean doReconnect = true;
|
// try {
|
// log.info("mexc ws 开始重连");
|
// connect(); // 假设 connect() 方法用于实际的连接逻辑
|
// log.info("mexc ws 重连成功");
|
// } catch (Exception e) {
|
// log.error("mexc ws 重连失败", e);
|
// // 连接失败时,可以根据具体情况决定是否继续重连
|
// // 在这里假设总是继续尝试重连
|
// } finally {
|
// synchronized (lock) {
|
// if (doReconnect) {
|
// scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
|
// } else {
|
// reconnecting = false; // 重连结束后设置 reconnecting 为 false
|
// }
|
// }
|
// }
|
// }
|
//
|
// private void scheduleReconnect() {
|
// if (!executorService.isShutdown()) {
|
// executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
|
// }
|
// }
|
|
private void sendPing() {
|
try {
|
if (session != null) {
|
session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
|
}
|
} catch (Exception e) {
|
log.error("发送心跳失败", e);
|
}
|
}
|
|
public String getParameter(String symbol) throws JsonProcessingException {
|
ObjectMapper mapper = new ObjectMapper();
|
ObjectNode root = mapper.createObjectNode();
|
|
root.put("method", "SUBSCRIPTION");
|
ArrayNode paramsArray = mapper.createArrayNode();
|
String customParam = String.format("spot@public.limit.depth.v3.api@%s@5", symbol);
|
paramsArray.add(customParam);
|
root.set("params", paramsArray);
|
|
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(root);
|
}
|
}
|