1
zj
2024-07-19 4b8e10e605d28fc1b4ad3d33a6cf2bfbbea15bd5
websocketClient/src/main/java/org/example/wsClient/MexcClient.java
@@ -4,7 +4,10 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.example.pojo.Currency;
import org.example.util.RedisUtil;
@@ -13,20 +16,23 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@ClientEndpoint
@Slf4j
public class MexcClient {
    private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址
    private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping
    private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间
    private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws";
    private static final long PING_INTERVAL = 20000;
    private final List<Currency> subscriptions;
    private final ScheduledExecutorService executorService;
    private Session session;
    private boolean reconnecting = false;
    private final Object lock = new Object(); // 添加一个锁对象
    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
    public MexcClient(List<Currency> subscriptions) {
        this.subscriptions = subscriptions;
@@ -37,7 +43,7 @@
        try {
            connect();
            if (session == null) {
                System.err.println("无法在超时时间内连接到服务器。");
                log.info("无法在超时时间内连接到服务器。");
                return;
            }
@@ -54,8 +60,7 @@
            }
        } catch (Exception e) {
            System.err.println("连接过程中发生异常: " + e.getMessage());
            e.printStackTrace();
            log.error("mexc ws 连接过程中发生异常: " + e.getMessage(), e);
        } finally {
            executorService.shutdown();
        }
@@ -68,59 +73,84 @@
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("已连接到服务器。");
        log.info("mexc ws 已连接到服务器。");
        this.session = session;
        synchronized (this) {
            this.notify();
        }
    }
    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
    @OnMessage
    public void onMessage(String message) {
        Gson gson = new Gson();
        Map map = gson.fromJson(message, Map.class);
        if (map != null && map.containsKey("s")) {
            RedisUtil.set("mexc" + map.get("s").toString(), message);
        try {
            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
            if (map != null && map.containsKey("s")) {
                Object object = map.get("d");
                Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
                HashMap<String,Object> hashMap = new HashMap<>();
                ObjectMapper mapper = new ObjectMapper();
                hashMap.put("bids",resultMap.get("bids"));
                hashMap.put("asks",resultMap.get("asks"));
                String key = "mexc" + map.get("s").toString();
                RedisUtil.set(key, mapper.writeValueAsString(hashMap));
            } else {
                log.warn("消息不包含 's' 字段或解析失败:" + message);
            }
        } catch (JsonSyntaxException e) {
            log.error("JSON 解析异常:" + e.getMessage(), e);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
        // 没有过滤撤单的数据
    }
    @OnClose
    public void onClose() {
        System.out.println("连接已关闭,尝试重新连接...");
        session = null;
        if (!reconnecting) {
            reconnect();
        }
        log.info("mexc ws 连接已关闭,尝试重新连接...");
        handleConnectionClosedOrError();
    }
    @OnError
    public void onError(Throwable throwable) {
        System.err.println("发生错误: " + throwable.getMessage());
        if (!reconnecting) {
            reconnect();
        }
        log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable);
        handleConnectionClosedOrError();
    }
    private void reconnect() {
        if (reconnecting) {
            return;
        }
        reconnecting = true;
        executorService.schedule(() -> {
            try {
                connect();
                reconnecting = false;
            } catch (Exception e) {
                e.printStackTrace();
                reconnect();
    private void handleConnectionClosedOrError() {
        synchronized (lock) {
            if (!reconnecting) {
                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
            }
        }, calculateBackoffTime(), TimeUnit.MILLISECONDS);
        }
    }
    private long calculateBackoffTime() {
        // 实现退避策略,例如指数退避
        return 5000; // 例子:5秒
    private void attemptReconnect() {
        boolean doReconnect = true;
        try {
            log.info("mexc ws 开始重连");
            connect(); // 假设 connect() 方法用于实际的连接逻辑
            log.info("mexc ws 重连成功");
        } catch (Exception e) {
            log.error("mexc ws 重连失败", e);
            // 连接失败时,可以根据具体情况决定是否继续重连
            // 在这里假设总是继续尝试重连
        } finally {
            synchronized (lock) {
                if (doReconnect) {
                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
                } else {
                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
                }
            }
        }
    }
    private void scheduleReconnect() {
        if (!executorService.isShutdown()) {
            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
        }
    }
    private void sendPing() {
@@ -129,7 +159,7 @@
                session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
            }
        } catch (Exception e) {
            e.printStackTrace();
            log.error("发送心跳失败", e);
        }
    }