package org.example.geteclient.wsClinet; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.geteclient.pojo.Currency; import org.example.geteclient.util.RedisUtil; import org.json.JSONException; import javax.websocket.*; import java.math.BigDecimal; import java.net.URI; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @program: demo * @description: GateClient 用于与 Gate.io WebSocket API 进行交互 * @create: 2024-07-18 15:30 **/ @ClientEndpoint @Slf4j public class GateClient { private static final String WS_ENDPOINT = "wss://api.gateio.ws/ws/v4/"; // WebSocket 端点 URL private static final long PING_INTERVAL = 20000; // Ping 消息发送间隔,单位毫秒 private final List subscriptions; // 存储要订阅的货币列表 private final ScheduledExecutorService executorService; // 定时任务调度服务 private Session session; // WebSocket 连接会话 private final Object lock = new Object(); // 添加一个锁对象 private volatile boolean reconnecting = false; // 表示是否正在重连,使用 volatile 保证可见性 public GateClient(List subscriptions) { this.subscriptions = subscriptions; // 初始化订阅的货币 this.executorService = Executors.newScheduledThreadPool(1); // 创建一个定时任务调度线程池 } public void start() { try { connect(); // 尝试连接 WebSocket if (session == null) { // 如果连接失败,session 仍然为 null log.info("无法在超时时间内连接到服务器。"); return; // 提前返回 } // 定期发送 Ping 消息保持连接 executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS); // 订阅消息 for (Currency subscription : subscriptions) { String parameter = getParameter(subscription.getSymbol()); // 获取订阅参数 session.getBasicRemote().sendText(parameter); // 发送订阅请求 } synchronized (this) { // 进入同步块以等待连接的激活 this.wait(); // 等待连接激活的通知 } } catch (Exception e) { log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常 } finally { executorService.shutdown(); // 确保定时任务调度服务被关闭 } } private void connect() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); // 获取 WebSocket 容器 container.connectToServer(this, new URI(WS_ENDPOINT)); // 连接到 WebSocket 服务器 } @OnOpen public void onOpen(Session session) { log.info("gate ws 已连接到服务器。"); // 连接成功日志 this.session = session; // 保存会话信息 synchronized (this) { // 进入同步块 this.notify(); // 通知等待的线程 } } private static final Gson gson = new Gson(); // 将 Gson 作为静态成员,避免重复实例化 private static final String RESULT_KEY = "result"; // 定义结果键的常量 private static final String BIDS_KEY = "bids"; // 定义 bids 的常量 private static final String ASKS_KEY = "asks"; // 定义 asks 的常量 private static final String p = "p"; // 定义 asks 的常量 private static final String v = "v"; // 定义 asks 的常量 private static final String S_KEY = "s"; // 定义 s 的常量 @OnMessage public void onMessage(String message) { try { // 解析收到的消息为 Map Map map = gson.fromJson(message, new TypeToken>() {}.getType()); Object object = map.get(RESULT_KEY); // 获取结果对象 Map resultMap = gson.fromJson(gson.toJson(object), new TypeToken>() {}.getType()); // 直接转换为 Map // 检查结果是否有效,并整合检查 if (resultMap != null && resultMap.get(S_KEY) != null) { HashMap hashMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks Object asksObj = resultMap.get(BIDS_KEY); Object bidsObj = resultMap.get(ASKS_KEY); if (asksObj instanceof List && !((List) asksObj).isEmpty()) { List asksList = (List) asksObj; String[][] dataArray = gson.fromJson(gson.toJson(asksList), String[][].class); HashMap pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks String[] asksData = dataArray[0]; pvMap.put(p, new BigDecimal(asksData[0])); pvMap.put(v, new BigDecimal(asksData[1])); hashMap.put(BIDS_KEY, pvMap); // 放入 bids 数据 } if (bidsObj instanceof List && !((List) bidsObj).isEmpty()) { List bidsList = (List) bidsObj; String[][] dataArray = gson.fromJson(gson.toJson(bidsList), String[][].class); String[] bidsData = dataArray[0]; HashMap pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks pvMap.put(p, new BigDecimal(bidsData[0])); pvMap.put(v, new BigDecimal(bidsData[1])); hashMap.put(ASKS_KEY,pvMap); } String key = "gate" + resultMap.get(S_KEY); // 生成 Redis 键 RedisUtil.set(key.replace("_", ""), gson.toJson(hashMap)); // 存入 Redis,使用 Gson 进行序列化 } } catch (JsonSyntaxException e) { log.error("JSON 解析异常:" + e.getMessage(), e); // 记录 JSON 解析异常 } catch (Exception e) { // 捕获其他可能的异常 log.error("处理消息时发生异常:" + e.getMessage(), e); // 记录异常 } } @OnClose public void onClose() { log.info("gate ws 连接已关闭,尝试重新连接..."); // 连接关闭日志 handleConnectionClosedOrError(); // 处理连接关闭或错误 } @OnError public void onError(Throwable throwable) { log.error("gate ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误日志 handleConnectionClosedOrError(); // 处理连接关闭或错误 } private void handleConnectionClosedOrError() { synchronized (lock) { // 进入同步块以防止并发重连 if (!reconnecting) { // 检查当前是否已经在重连 reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 } } } private void attemptReconnect() { boolean doReconnect = true; // 是否进行重连的标志 try { log.info("gate ws 开始重连"); // 开始重连日志 connect(); // 尝试重新连接 log.info("gate ws 重连成功"); // 重连成功日志 } catch (Exception e) { log.error("gate ws 重连失败", e); // 重连失败记录日志 doReconnect = false; // 标记不再继续重连 } finally { synchronized (lock) { // 进入同步块 if (doReconnect) { scheduleReconnect(); // 如果需要继续重连,调度重连任务 } else { reconnecting = false; // 重连结束,标记重连状态为 false } } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { // 确保调度服务未关闭 executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接 } } private void sendPing() { try { if (session != null) { // 检查会话是否存在 session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); // 发送 Ping 消息 } } catch (Exception e) { log.error("发送心跳失败", e); // 记录心跳发送失败日志 } } public String getParameter(String symbol) throws JsonProcessingException, JSONException { // 替换USDT为_USDT symbol = symbol.replaceAll("USDT", "_USDT"); // 替换货币符号中的 USDT // 创建一个ObjectMapper实例 ObjectMapper mapper = new ObjectMapper(); // 创建 ObjectMapper 实例 // 获取当前时间的毫秒数 long currentTimeMillis = System.currentTimeMillis(); // 获取当前时间 // 定义常量 final String CHANNEL = "spot.order_book"; // 固定频道名称 final String EVENT_SUBSCRIBE = "subscribe"; // 订阅事件 final String EVENT_UNSUBSCRIBE = "unsubscribe"; // 取消订阅事件 final String[] PAYLOAD = new String[]{symbol, "5", "100ms"}; // 请求负载信息 // 使用Map构建JSON对象 Map jsonMap = new HashMap<>(); // 创建 Map 存放 JSON 内容 jsonMap.put("time", currentTimeMillis); // 将当前时间放入 Map jsonMap.put("channel", CHANNEL); // 将频道信息放入 Map jsonMap.put("event", EVENT_SUBSCRIBE); // 事件类型放入 Map jsonMap.put("payload", Arrays.asList(PAYLOAD)); // 将负载数组转为 List 放入 Map // 将Map转换为JSON字符串 String jsonString = mapper.writeValueAsString(jsonMap); // 转换为 JSON 字符串 return jsonString; // 返回 JSON 字符串 } }