package org.example.wsClient;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.node.ArrayNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.google.gson.Gson;
|
import org.example.pojo.Currency;
|
import org.example.util.RedisUtil;
|
|
import javax.websocket.*;
|
import java.io.IOException;
|
import java.io.UncheckedIOException;
|
import java.net.URI;
|
import java.nio.ByteBuffer;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.*;
|
|
@ClientEndpoint
|
public class MexcClient {
|
private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址
|
private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping
|
private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间
|
|
private final List<Currency> subscriptions;
|
private final ScheduledExecutorService executorService;
|
private Session session;
|
private boolean reconnecting = false;
|
|
public MexcClient(List<Currency> subscriptions) {
|
this.subscriptions = subscriptions;
|
this.executorService = Executors.newScheduledThreadPool(1);
|
}
|
|
public void start() {
|
try {
|
connect();
|
if (session == null) {
|
System.err.println("无法在超时时间内连接到服务器。");
|
return;
|
}
|
|
executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
|
|
// 订阅消息
|
for (Currency subscription : subscriptions) {
|
String parameter = getParameter(subscription.getSymbol());
|
session.getBasicRemote().sendText(parameter);
|
}
|
|
synchronized (this) {
|
this.wait();
|
}
|
|
} catch (Exception e) {
|
System.err.println("连接过程中发生异常: " + e.getMessage());
|
e.printStackTrace();
|
} finally {
|
executorService.shutdown();
|
}
|
}
|
|
private void connect() throws Exception {
|
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
|
container.connectToServer(this, new URI(WS_ENDPOINT));
|
}
|
|
@OnOpen
|
public void onOpen(Session session) {
|
System.out.println("已连接到服务器。");
|
this.session = session;
|
synchronized (this) {
|
this.notify();
|
}
|
}
|
|
@OnMessage
|
public void onMessage(String message) {
|
Gson gson = new Gson();
|
Map map = gson.fromJson(message, Map.class);
|
if (map != null && map.containsKey("s")) {
|
RedisUtil.set("mexc" + map.get("s").toString(), message);
|
}
|
// 没有过滤撤单的数据
|
}
|
|
@OnClose
|
public void onClose() {
|
System.out.println("连接已关闭,尝试重新连接...");
|
session = null;
|
if (!reconnecting) {
|
reconnect();
|
}
|
}
|
|
@OnError
|
public void onError(Throwable throwable) {
|
System.err.println("发生错误: " + throwable.getMessage());
|
if (!reconnecting) {
|
reconnect();
|
}
|
}
|
|
private void reconnect() {
|
if (reconnecting) {
|
return;
|
}
|
reconnecting = true;
|
executorService.schedule(() -> {
|
try {
|
connect();
|
reconnecting = false;
|
} catch (Exception e) {
|
e.printStackTrace();
|
reconnect();
|
}
|
}, calculateBackoffTime(), TimeUnit.MILLISECONDS);
|
}
|
|
private long calculateBackoffTime() {
|
// 实现退避策略,例如指数退避
|
return 5000; // 例子:5秒
|
}
|
|
private void sendPing() {
|
try {
|
if (session != null) {
|
session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
public String getParameter(String symbol) throws JsonProcessingException {
|
ObjectMapper mapper = new ObjectMapper();
|
ObjectNode root = mapper.createObjectNode();
|
|
root.put("method", "SUBSCRIPTION");
|
ArrayNode paramsArray = mapper.createArrayNode();
|
String customParam = String.format("spot@public.limit.depth.v3.api@%s@20", symbol);
|
paramsArray.add(customParam);
|
root.set("params", paramsArray);
|
|
return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(root);
|
}
|
}
|