1
zj
2024-11-12 cd95c9114fcbb3fc65c666b8b86c378f9a96d55b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package org.example.bitgetsclient.wsClient;
 
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.example.bitgetsclient.BitgetsClientApplication;
import org.example.bitgetsclient.comm.ApplicationContextProvider;
import org.example.bitgetsclient.util.RedisUtil;
import org.json.JSONException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
 
import javax.websocket.*;
import java.io.IOException;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
 
@ClientEndpoint
@Slf4j
public class BitgetClient {
 
    private static final String WS_ENDPOINT = "wss://ws.bitget.com/v2/ws/public"; // WebSocket 接口地址
    private static final long PING_INTERVAL = 20000; // 心跳间隔,单位毫秒
    private static final String PING_MESSAGE = "ping"; // 心跳消息
    private static final int MAX_RECONNECT_ATTEMPTS = 5; // 最大重连次数
    private final String subscriptions; // 订阅内容
    private final ScheduledExecutorService executorService; // 定义调度任务执行服务
    private Session session; // WebSocket 会话
 
    private final Object lock = new Object(); // 使用锁对象以确保线程安全
    private boolean reconnecting = false; // 重连状态
    private int reconnectAttempts = 0; // 当前重连次数
 
    // 静态单例 ObjectMapper 实例,避免重复创建
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private static final Gson gson = new Gson(); // 静态 Gson 实例
 
    // 构造方法,初始化订阅内容和调度服务
    public BitgetClient(String subscriptions) {
        this.subscriptions = subscriptions; // 初始化订阅内容
        this.executorService = Executors.newScheduledThreadPool(1); // 初始化调度线程池
    }
 
 
 
    public void start()  {
        try {
            connect(); // 尝试连接
            if (session == null) {
                log.info("无法在超时内连接到服务器。"); // 记录连接失败的信息
                return; // 如果连接失败,直接返回
            }
 
            // 定期发送心跳
            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
            // 发送订阅消息
            session.getBasicRemote().sendText(subscriptions); // 发送订阅信息
            synchronized (this) {
                this.wait(); // 等待 WebSocket 消息到来
            }
 
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt(); // 恢复中断状态
            log.error("线程被中断: " + e.getMessage(), e); // 记录线程中断的错误
        } catch (Exception e) {
            log.error("bitget ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常
        } finally {
            executorService.shutdownNow(); // 尝试立即关闭调度服务
        }
    }
 
    private void connect() throws Exception {
        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
        container.connectToServer(this, new URI(WS_ENDPOINT)); // 连接 WebSocket 服务
    }
 
    @OnOpen
    public void onOpen(Session session) {
        log.info("bitget ws 已连接到服务器。"); // 记录连接成功的信息
        this.session = session; // 存储会话
        synchronized (this) {
            this.notify(); // 通知等待的线程
        }
        reconnectAttempts = 0; // 连接成功重置重连次数
    }
 
    @OnMessage
    public void onMessage(String message) {
        try {
            // 将消息解析为 Map
            Map<String, Object> map = parseMessage(message); // 调用共用方法解析
            if (map != null && map.containsKey("arg") && map.containsKey("data")) { // 确保 map 不为 null 且包含必要的键
                JsonNode dataNode = getDataNode(message); // 获取数据节点
 
                // 确保 dataNode 不为 null,以避免空指针异常
                if (dataNode != null) {
                    Map<String, Object> hashMap = new HashMap<>(); // 变量命名更具描述性
 
                    String asks = dataNode.get("bids").toString();
                    String bids = dataNode.get("asks").toString();
 
                    Type listType = new TypeToken<List<List<String>>>(){}.getType();
                    List<List<String>> bidsList = gson.fromJson(bids, listType);
                    List<List<String>> asksList = gson.fromJson(asks, listType);
 
                    if(!bidsList.isEmpty()  && !bidsList.get(0).isEmpty() && !asksList.isEmpty() && !asksList.get(0).isEmpty()){
                        if (!bidsList.isEmpty()  && !bidsList.get(0).isEmpty()) {
                            List<String> bidsStringList = bidsList.get(0);
                            HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
                            pvMap.put("p", new BigDecimal(bidsStringList.get(0)).toPlainString());
                            pvMap.put("v", new BigDecimal(bidsStringList.get(1)).toPlainString());
                            hashMap.put("bids", pvMap); // 获取并存储 bids
                        }
 
                        if (!asksList.isEmpty() && !asksList.get(0).isEmpty()) {
                            List<String> asksStringList = asksList.get(0); // 修正此处为 asksList
                            HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
                            pvMap.put("p", new BigDecimal(asksStringList.get(0)).toPlainString());
                            pvMap.put("v", new BigDecimal(asksStringList.get(1)).toPlainString());
                            hashMap.put("asks", pvMap); // 获取并存储 asks
                        }
 
                        Map<String, Object> argMap = gson.fromJson(map.get("arg").toString(), new TypeToken<Map<String, Object>>() {}.getType()); // 解析 arg 为 Map
                        String key = "bitget" + argMap.get("instId"); // 构建 Redis 存储的键
 
                        // 存储到 Redis,使用 ObjectMapper 转换为 JSON 字符串
                        String jsonData = objectMapper.writeValueAsString(hashMap); // 先将 HashMap 转换为 JSON 字符串
                        RedisUtil.set(key, jsonData); // 存储数据
                    }
                }
            }
        } catch (JsonSyntaxException e) {
            log.error("JSON 解析错误: " + e.getMessage(), e); // 记录 JSON 解析错误
        } catch (IOException e) {
            log.error("对象转换时发生 I/O 异常: " + e.getMessage(), e); // 记录对象转换 I/O 异常
        } catch (Exception e) {
            log.error("处理消息时发生异常: " + e.getMessage(), e); // 记录处理消息时发生的异常
        }
    }
 
 
    // 解析消息的共用方法
    private Map<String, Object> parseMessage(String message) {
        if(!message.equals("pong")){
            return gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType()); // 将消息解析为 Map
        }
        return null;
    }
 
    // 获取数据节点的共用方法
    private JsonNode getDataNode(String message) throws JsonProcessingException {
        JsonNode jsonNode = objectMapper.readTree(message); // 使用静态 ObjectMapper,避免重复创建
        return jsonNode.get("data").get(0); // 获取第一个数据节点
    }
 
    @OnClose
    public void onClose()  {
        log.info("bitget ws 连接已关闭,尝试重新连接..."); // 记录连接关闭的信息
        handleConnectionClosedOrError();
    }
 
    @OnError
    public void onError(Throwable throwable) {
        log.error("bitget ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误信息
        handleConnectionClosedOrError();
    }
 
    private void handleConnectionClosedOrError() {
        executorService.execute(() -> {
            try {
                attemptReconnect();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
 
    private void attemptReconnect() {
        if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制
            try {
                log.info("bitget ws 开始重连"); // 记录开始重连的信息
                connect(); // 尝试重连
                log.info("bitget ws 重连成功"); // 成功重连的日志
                reconnectAttempts = 0; // 重连成功,重置重连次数
            } catch (Exception e) {
                reconnectAttempts++; // 增加重连尝试次数
                log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息
                // 采用指数退避策略,增加重连间隔
                long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒
                scheduleReconnect(waitTime); // 调度下次重连
            }
        } else {
            log.error("超过最大重连次数,停止重连"); // 超过最大重连次数
        }
    }
 
    private void scheduleReconnect(long waitTime) { // 接受等待时间参数
        if (!executorService.isShutdown()) {
            executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连
        }
    }
 
    private void sendPing() {
        try {
            if (session != null) {
                session.getBasicRemote().sendText(PING_MESSAGE); // 发送心跳消息
            }
        } catch (Exception e) {
            log.error("发送心跳失败", e); // 记录发送心跳失败的错误
        }
    }
 
    public String getParameter(String symbol) throws JsonProcessingException {
        // 使用 ObjectMapper 构建 JSON 请求
        Map<String, Object> jsonMap = new HashMap<>();
        jsonMap.put("op", "subscribe"); // 设置操作类型
        Map<String, Object> argsMap = new HashMap<>();
        jsonMap.put("args", argsMap); // 参数放入请求
 
        // 设置请求参数
        argsMap.put("instType", "SPOT"); // 交易类型
        argsMap.put("channel", "books15"); // 渠道类型
        argsMap.put("instId", symbol); // 交易对符号
 
        // 返回 JSON 字符串
        return objectMapper.writeValueAsString(jsonMap); // 使用单一的 ObjectMapper 实例
    }
}