| | |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import com.fasterxml.jackson.databind.node.ArrayNode; |
| | | import com.fasterxml.jackson.databind.node.ObjectNode; |
| | | import com.google.common.reflect.TypeToken; |
| | | import com.google.gson.Gson; |
| | | import com.google.gson.JsonSyntaxException; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.example.pojo.Currency; |
| | | import org.example.util.RedisUtil; |
| | | |
| | |
| | | import java.io.UncheckedIOException; |
| | | import java.net.URI; |
| | | import java.nio.ByteBuffer; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.*; |
| | | |
| | | @ClientEndpoint |
| | | @Slf4j |
| | | public class MexcClient { |
| | | private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址 |
| | | private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping |
| | | private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间 |
| | | private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; |
| | | private static final long PING_INTERVAL = 20000; |
| | | |
| | | private final List<Currency> subscriptions; |
| | | private final ScheduledExecutorService executorService; |
| | | private Session session; |
| | | private boolean reconnecting = false; |
| | | |
| | | private final Object lock = new Object(); // 添加一个锁对象 |
| | | private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性 |
| | | |
| | | public MexcClient(List<Currency> subscriptions) { |
| | | this.subscriptions = subscriptions; |
| | |
| | | try { |
| | | connect(); |
| | | if (session == null) { |
| | | System.err.println("无法在超时时间内连接到服务器。"); |
| | | log.info("无法在超时时间内连接到服务器。"); |
| | | return; |
| | | } |
| | | |
| | |
| | | } |
| | | |
| | | } catch (Exception e) { |
| | | System.err.println("连接过程中发生异常: " + e.getMessage()); |
| | | e.printStackTrace(); |
| | | log.error("mexc ws 连接过程中发生异常: " + e.getMessage(), e); |
| | | } finally { |
| | | executorService.shutdown(); |
| | | } |
| | |
| | | |
| | | @OnOpen |
| | | public void onOpen(Session session) { |
| | | System.out.println("已连接到服务器。"); |
| | | log.info("mexc ws 已连接到服务器。"); |
| | | this.session = session; |
| | | synchronized (this) { |
| | | this.notify(); |
| | | } |
| | | } |
| | | private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例 |
| | | |
| | | @OnMessage |
| | | public void onMessage(String message) { |
| | | Gson gson = new Gson(); |
| | | Map map = gson.fromJson(message, Map.class); |
| | | if (map != null && map.containsKey("s")) { |
| | | RedisUtil.set("mexc" + map.get("s").toString(), message); |
| | | try { |
| | | Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType()); |
| | | if (map != null && map.containsKey("s")) { |
| | | Object object = map.get("d"); |
| | | Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType()); |
| | | HashMap<String,Object> hashMap = new HashMap<>(); |
| | | ObjectMapper mapper = new ObjectMapper(); |
| | | hashMap.put("bids",resultMap.get("bids")); |
| | | hashMap.put("asks",resultMap.get("asks")); |
| | | String key = "mexc" + map.get("s").toString(); |
| | | RedisUtil.set(key, mapper.writeValueAsString(hashMap)); |
| | | } else { |
| | | log.warn("消息不包含 's' 字段或解析失败:" + message); |
| | | } |
| | | } catch (JsonSyntaxException e) { |
| | | log.error("JSON 解析异常:" + e.getMessage(), e); |
| | | } catch (JsonProcessingException e) { |
| | | throw new RuntimeException(e); |
| | | } |
| | | // 没有过滤撤单的数据 |
| | | } |
| | | |
| | | |
| | | @OnClose |
| | | public void onClose() { |
| | | System.out.println("连接已关闭,尝试重新连接..."); |
| | | session = null; |
| | | if (!reconnecting) { |
| | | reconnect(); |
| | | } |
| | | log.info("mexc ws 连接已关闭,尝试重新连接..."); |
| | | handleConnectionClosedOrError(); |
| | | } |
| | | |
| | | @OnError |
| | | public void onError(Throwable throwable) { |
| | | System.err.println("发生错误: " + throwable.getMessage()); |
| | | if (!reconnecting) { |
| | | reconnect(); |
| | | } |
| | | log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable); |
| | | handleConnectionClosedOrError(); |
| | | } |
| | | |
| | | private void reconnect() { |
| | | if (reconnecting) { |
| | | return; |
| | | } |
| | | reconnecting = true; |
| | | executorService.schedule(() -> { |
| | | try { |
| | | connect(); |
| | | reconnecting = false; |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | reconnect(); |
| | | private void handleConnectionClosedOrError() { |
| | | synchronized (lock) { |
| | | if (!reconnecting) { |
| | | reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 |
| | | executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 |
| | | } |
| | | }, calculateBackoffTime(), TimeUnit.MILLISECONDS); |
| | | } |
| | | } |
| | | |
| | | private long calculateBackoffTime() { |
| | | // 实现退避策略,例如指数退避 |
| | | return 5000; // 例子:5秒 |
| | | private void attemptReconnect() { |
| | | boolean doReconnect = true; |
| | | try { |
| | | log.info("mexc ws 开始重连"); |
| | | connect(); // 假设 connect() 方法用于实际的连接逻辑 |
| | | log.info("mexc ws 重连成功"); |
| | | } catch (Exception e) { |
| | | log.error("mexc ws 重连失败", e); |
| | | // 连接失败时,可以根据具体情况决定是否继续重连 |
| | | // 在这里假设总是继续尝试重连 |
| | | } finally { |
| | | synchronized (lock) { |
| | | if (doReconnect) { |
| | | scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务 |
| | | } else { |
| | | reconnecting = false; // 重连结束后设置 reconnecting 为 false |
| | | } |
| | | } |
| | | } |
| | | } |
| | | |
| | | private void scheduleReconnect() { |
| | | if (!executorService.isShutdown()) { |
| | | executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); |
| | | } |
| | | } |
| | | |
| | | private void sendPing() { |
| | |
| | | session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); |
| | | } |
| | | } catch (Exception e) { |
| | | e.printStackTrace(); |
| | | log.error("发送心跳失败", e); |
| | | } |
| | | } |
| | | |