1
zj
2024-08-05 edaa364ccc37fe5372bca577482dd5d7142425cd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
package org.example.geteclient.wsClinet;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.example.geteclient.pojo.Currency;
import org.example.geteclient.util.RedisUtil;
import org.json.JSONException;
 
import javax.websocket.*;
import java.math.BigDecimal;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
 
/**
 * @program: demo
 * @description: GateClient 用于与 Gate.io WebSocket API 进行交互
 * @create: 2024-07-18 15:30
 **/
@ClientEndpoint
@Slf4j
public class GateClient {
 
    private static final String WS_ENDPOINT = "wss://api.gateio.ws/ws/v4/"; // WebSocket 端点 URL
    private static final long PING_INTERVAL = 20000; // Ping 消息发送间隔,单位毫秒
 
    private final List<Currency> subscriptions; // 存储要订阅的货币列表
    private final ScheduledExecutorService executorService; // 定时任务调度服务
    private Session session; // WebSocket 连接会话
 
    private final Object lock = new Object(); // 添加一个锁对象
    private volatile boolean reconnecting = false; // 表示是否正在重连,使用 volatile 保证可见性
 
    public GateClient(List<Currency> subscriptions) {
        this.subscriptions = subscriptions; // 初始化订阅的货币
        this.executorService = Executors.newScheduledThreadPool(1); // 创建一个定时任务调度线程池
    }
 
    public void start() {
        try {
            connect(); // 尝试连接 WebSocket
            if (session == null) { // 如果连接失败,session 仍然为 null
                log.info("无法在超时时间内连接到服务器。");
                return; // 提前返回
            }
 
            // 定期发送 Ping 消息保持连接
            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
 
            // 订阅消息
            for (Currency subscription : subscriptions) {
                String parameter = getParameter(subscription.getSymbol()); // 获取订阅参数
                session.getBasicRemote().sendText(parameter); // 发送订阅请求
            }
 
            synchronized (this) { // 进入同步块以等待连接的激活
                this.wait(); // 等待连接激活的通知
            }
 
        } catch (Exception e) {
            log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常
        } finally {
            executorService.shutdown(); // 确保定时任务调度服务被关闭
        }
    }
 
    private void connect() throws Exception {
        WebSocketContainer container = ContainerProvider.getWebSocketContainer(); // 获取 WebSocket 容器
        container.connectToServer(this, new URI(WS_ENDPOINT)); // 连接到 WebSocket 服务器
    }
 
    @OnOpen
    public void onOpen(Session session) {
        log.info("gate ws 已连接到服务器。"); // 连接成功日志
        this.session = session; // 保存会话信息
        synchronized (this) { // 进入同步块
            this.notify(); // 通知等待的线程
        }
    }
 
    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员,避免重复实例化
    private static final String RESULT_KEY = "result"; // 定义结果键的常量
    private static final String BIDS_KEY = "bids"; // 定义 bids 的常量
    private static final String ASKS_KEY = "asks"; // 定义 asks 的常量
    private static final String p = "p"; // 定义 asks 的常量
    private static final String v = "v"; // 定义 asks 的常量
    private static final String S_KEY = "s"; // 定义 s 的常量
 
    @OnMessage
    public void onMessage(String message) {
        try {
            // 解析收到的消息为 Map
            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
            Object object = map.get(RESULT_KEY); // 获取结果对象
            Map<String, Object> resultMap = gson.fromJson(gson.toJson(object), new TypeToken<Map<String, Object>>() {}.getType()); // 直接转换为 Map
            // 检查结果是否有效,并整合检查
            if (resultMap != null && resultMap.get(S_KEY) != null) {
                HashMap<String, Object> hashMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
 
                Object asksObj = resultMap.get(BIDS_KEY);
                Object bidsObj = resultMap.get(ASKS_KEY);
 
                if (asksObj instanceof List && !((List<?>) asksObj).isEmpty()) {
                    List<?> asksList = (List<?>) asksObj;
                    String[][] dataArray = gson.fromJson(gson.toJson(asksList), String[][].class);
                    HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
                    String[] asksData = dataArray[0];
                    pvMap.put(p, new BigDecimal(asksData[0]));
                    pvMap.put(v, new BigDecimal(asksData[1]));
                    hashMap.put(BIDS_KEY, pvMap); // 放入 bids 数据
                }
 
                if (bidsObj instanceof List && !((List<?>) bidsObj).isEmpty()) {
                    List<?> bidsList = (List<?>) bidsObj;
                    String[][] dataArray = gson.fromJson(gson.toJson(bidsList), String[][].class);
                    String[] bidsData = dataArray[0];
                    HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
                    pvMap.put(p, new BigDecimal(bidsData[0]));
                    pvMap.put(v, new BigDecimal(bidsData[1]));
                    hashMap.put(ASKS_KEY,pvMap);
                }
 
                String key = "gate" + resultMap.get(S_KEY); // 生成 Redis 键
                RedisUtil.set(key.replace("_", ""), gson.toJson(hashMap)); // 存入 Redis,使用 Gson 进行序列化
            }
        } catch (JsonSyntaxException e) {
            log.error("JSON 解析异常:" + e.getMessage(), e); // 记录 JSON 解析异常
        } catch (Exception e) { // 捕获其他可能的异常
            log.error("处理消息时发生异常:" + e.getMessage(), e); // 记录异常
        }
    }
 
 
    @OnClose
    public void onClose() {
        log.info("gate ws 连接已关闭,尝试重新连接..."); // 连接关闭日志
        handleConnectionClosedOrError(); // 处理连接关闭或错误
    }
 
    @OnError
    public void onError(Throwable throwable) {
        log.error("gate ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误日志
        handleConnectionClosedOrError(); // 处理连接关闭或错误
    }
 
    private void handleConnectionClosedOrError() {
        synchronized (lock) { // 进入同步块以防止并发重连
            if (!reconnecting) { // 检查当前是否已经在重连
                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
            }
        }
    }
 
    private void attemptReconnect() {
        boolean doReconnect = true; // 是否进行重连的标志
        try {
            log.info("gate ws 开始重连"); // 开始重连日志
            connect(); // 尝试重新连接
            log.info("gate ws 重连成功"); // 重连成功日志
        } catch (Exception e) {
            log.error("gate ws 重连失败", e); // 重连失败记录日志
            doReconnect = false; // 标记不再继续重连
        } finally {
            synchronized (lock) { // 进入同步块
                if (doReconnect) {
                    scheduleReconnect(); // 如果需要继续重连,调度重连任务
                } else {
                    reconnecting = false; // 重连结束,标记重连状态为 false
                }
            }
        }
    }
 
    private void scheduleReconnect() {
        if (!executorService.isShutdown()) { // 确保调度服务未关闭
            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接
        }
    }
 
    private void sendPing() {
        try {
            if (session != null) { // 检查会话是否存在
                session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); // 发送 Ping 消息
            }
        } catch (Exception e) {
            log.error("发送心跳失败", e); // 记录心跳发送失败日志
        }
    }
 
    public String getParameter(String symbol) throws JsonProcessingException, JSONException {
        // 替换USDT为_USDT
        symbol = symbol.replaceAll("USDT", "_USDT"); // 替换货币符号中的 USDT
 
        // 创建一个ObjectMapper实例
        ObjectMapper mapper = new ObjectMapper(); // 创建 ObjectMapper 实例
        // 获取当前时间的毫秒数
        long currentTimeMillis = System.currentTimeMillis(); // 获取当前时间
 
        // 定义常量
        final String CHANNEL = "spot.order_book"; // 固定频道名称
        final String EVENT_SUBSCRIBE = "subscribe"; // 订阅事件
        final String EVENT_UNSUBSCRIBE = "unsubscribe"; // 取消订阅事件
        final String[] PAYLOAD = new String[]{symbol, "5", "100ms"}; // 请求负载信息
 
        // 使用Map构建JSON对象
        Map<String, Object> jsonMap = new HashMap<>(); // 创建 Map 存放 JSON 内容
        jsonMap.put("time", currentTimeMillis); // 将当前时间放入 Map
        jsonMap.put("channel", CHANNEL); // 将频道信息放入 Map
        jsonMap.put("event", EVENT_SUBSCRIBE); // 事件类型放入 Map
        jsonMap.put("payload", Arrays.asList(PAYLOAD)); // 将负载数组转为 List 放入 Map
 
        // 将Map转换为JSON字符串
        String jsonString = mapper.writeValueAsString(jsonMap); // 转换为 JSON 字符串
        return jsonString; // 返回 JSON 字符串
    }
}