1
zj
2024-08-05 edaa364ccc37fe5372bca577482dd5d7142425cd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
package org.example.kucoinclient.wsClient;
 
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import lombok.extern.slf4j.Slf4j;
import org.example.kucoinclient.pojo.Currency;
import org.example.kucoinclient.util.RedisUtil;
import org.json.JSONException;
 
import javax.websocket.*;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
 
/**
 * @program: demo
 * @description: Kucoin WebSocket 客户端
 * @create: 2024-07-19 16:44
 **/
@ClientEndpoint
@Slf4j
public class KucoinClient {
 
    private static final String WS_ENDPOINT = "wss://ws-api-spot.kucoin.com/spotMarket/level2Depth5:"; // WebSocket 端点
    private static final String CHARSET = "UTF-8"; // 定义字符集常量,避免魔法字符串
    private static final String PING_MESSAGE = "Ping"; // 心跳消息
    private static final long PING_INTERVAL = 20000; // 心跳间隔
    private static final int RECONNECT_DELAY = 5; // 重连间隔
 
    private final List<Currency> subscriptions; // 订阅的货币列表
    private final ScheduledExecutorService executorService; // 调度线程池
    private Session session; // WebSocket会话
    private String token; // 访问令牌
    private final Object lock = new Object(); // 锁对象,用于控制重连
    private volatile boolean reconnecting = false; // 表示当前是否在重连
 
    private String id; // 当前会话的 ID
 
    public KucoinClient(List<Currency> subscriptions, String token) {
        this.subscriptions = subscriptions; // 保存订阅的货币列表
        this.token = token; // 保存令牌
        this.executorService = Executors.newScheduledThreadPool(1); // 创建调度线程池
    }
 
    public void start() {
        try {
            connect(); // 连接到 WebSocket 服务器
            if (session == null) {
                log.info("无法在超时时间内连接到服务器。"); // 输出连接失败日志
                return; // 结束方法
            }
 
            // 定期发送心跳
            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
 
            synchronized (this) { // 同步等待连接
                this.wait(); // 当前线程等待
            }
 
        } catch (Exception e) {
            log.error("kucoin ws 连接过程中发生异常: ", e); // 捕获并记录异常
        } finally {
            executorService.shutdown(); // 关闭调度线程池
        }
    }
 
    private void connect() throws Exception {
        WebSocketContainer container = ContainerProvider.getWebSocketContainer(); // 获取 WebSocket 容器
        String url = generateWebSocketURL(); // 生成 WebSocket URL
        container.connectToServer(this, new URI(url)); // 连接到 WebSocket 服务器
        String parameter = subscription(); // 生成订阅消息
        session.getBasicRemote().sendText(parameter); // 发送订阅消息
    }
 
    private String generateWebSocketURL() throws UnsupportedEncodingException {
        String symbol = getSymbol(); // 获取符号
        String url = WS_ENDPOINT + symbol + "?token=" + URLEncoder.encode(token, CHARSET); // 构建 WebSocket URL
        return url; // 返回生成的 URL
    }
 
    @OnOpen
    public void onOpen(Session session) {
        log.info("kucoin ws 已连接到服务器。"); // 输出连接成功日志
        this.session = session; // 保存当前会话
        synchronized (this) { // 同步通知所有等待的线程
            this.notify();
        }
    }
 
    private static final Gson gson = new Gson(); // Gson 实例,负责 JSON 处理
 
    @OnMessage
    public void onMessage(String message) {
        try {
            Map<String, Object> map = parseMessage(message); // 解析消息
            handleMessage(map); // 处理消息
        } catch (JsonSyntaxException | JsonProcessingException e) {
            log.error("JSON 解析异常:", e); // 捕获 JSON 解析异常
        }
    }
 
    private Map<String, Object> parseMessage(String message) throws JsonSyntaxException {
        return gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType()); // 解析 JSON 消息
    }
 
    private void handleMessage(Map<String, Object> map) throws JsonProcessingException {
        if (map.get("id") != null) {
            this.id = map.get("id").toString(); // 设置会话 ID
        }
        if (map.get("data") != null) {
            Object object = map.get("data"); // 获取数据内容
            processData(map.get("topic").toString(), object); // 处理数据
        }
    }
 
    private static final String PREFIX = "kucoin"; // 创建常量,方便以后修改和维护
 
    private void processData(String topic, Object object) throws JsonProcessingException {
        // 将数据解析为 Map
        Map<String, Object> resultMap = null;
        try {
            resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType()); // 解析 JSON 对象
        } catch (JsonSyntaxException e) {
            log.error("JSON 解析失败: {}", e.getMessage()); // 输出 JSON 解析错误日志
            return; // 结束方法执行
        }
 
        if (resultMap != null) {
            // 创建线程安全的 HashMap
            Map<String, Object> hashMap = new ConcurrentHashMap<>();
            ObjectMapper mapper = new ObjectMapper(); // 创建 ObjectMapper 实例
 
            // 空值检查,避免存储 null 值到 Redis
            if (resultMap.get("bids") != null) {
                Object bidsObj = resultMap.get("bids");
                if (bidsObj instanceof List && !((List<?>) bidsObj).isEmpty()) {
                    List<String> bidsList = (List<String>) bidsObj;
                    HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
                    pvMap.put("p", bidsList.get(0));
                    pvMap.put("V", bidsList.get(1));
                    hashMap.put("bids",pvMap);
                }
            }
            if (resultMap.get("asks") != null) {
 
                Object asksObj = resultMap.get("asks");
                if (asksObj instanceof List && !((List<?>) asksObj).isEmpty()) {
                    List<String> asksList = (List<String>) asksObj;
                    HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
                    pvMap.put("p", asksList.get(0));
                    pvMap.put("V", asksList.get(1));
                    hashMap.put("asks",pvMap);
                }
            }
 
            String symbol = extractSymbolFromTopic(topic); // 从 topic 提取符号
            String key = PREFIX + symbol; // 创建 Redis 缓存键
            try {
                RedisUtil.set(key, mapper.writeValueAsString(hashMap)); // 存储到 Redis
            } catch (JsonProcessingException e) {
                log.error("将数据存入 Redis 时出错: {}", e.getMessage()); // 输出数据存储错误日志
            }
        } else {
            log.error("topic--->存入redis失败"); // 输出处理失败日志
        }
    }
 
 
    private String extractSymbolFromTopic(String topic) {
        int index = topic.indexOf(":"); // 找到分隔符的位置
        if (index != -1) { // 如果找到了分隔符
            String substring = topic.substring(index + 1);
            return substring.replaceAll("-", ""); // 返回去掉"-"的符号
        }
        return ""; // 未找到分隔符返回空
    }
 
    @OnClose
    public void onClose() {
        log.info("kucoin ws 连接已关闭,尝试重新连接..."); // 输出连接关闭日志
        handleConnectionClosedOrError(); // 处理连接关闭或错误
    }
 
    @OnError
    public void onError(Throwable throwable) {
        log.error("kucoin ws 发生错误: ", throwable); // 输出错误日志
        handleConnectionClosedOrError(); // 处理连接关闭或错误
    }
 
    private void handleConnectionClosedOrError() {
        synchronized (lock) { // 同步块,确保线程安全
            if (!reconnecting) {
                reconnecting = true; // 状态标记为重连中
                executorService.execute(this::attemptReconnect); // 执行重连操作
            }
        }
    }
 
    private void attemptReconnect() {
        try {
            log.info("kucoin ws 开始重连"); // 输出重连开始日志
            connect(); // 尝试重新连接
            log.info("kucoin ws 重连成功"); // 输出重连成功日志
        } catch (Exception e) {
            log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志
        } finally {
            synchronized (lock) { // 同步块
                reconnecting = false; // 状态标记为未重连
                scheduleReconnect(); // 重新调度重连任务
            }
        }
    }
 
    private void scheduleReconnect() {
        if (!executorService.isShutdown()) { // 检查线程池是否已经关闭
            executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连
        }
    }
 
    private void sendPing() {
        try {
            if (session != null) {
                session.getBasicRemote().sendPing(ByteBuffer.wrap(PING_MESSAGE.getBytes(CHARSET))); // 发送心跳消息
            }
        } catch (Exception e) {
            log.error("发送心跳失败:", e); // 捕获并记录心跳发送失败的异常
        }
    }
 
    public String subscription() throws JsonProcessingException, JSONException {
        String symbol = getSymbol(); // 获取当前货币符号
 
        // 创建 Map 构建 JSON 对象
        Map<String, Object> jsonMap = new HashMap<>();
        jsonMap.put("id", id); // 会话 ID
        jsonMap.put("type", "subscribe"); // 订阅类型
        jsonMap.put("topic", "/spotMarket/level1:" + symbol); // 订阅的主题
        jsonMap.put("privateChannel", false); // 是否私有通道
        jsonMap.put("response", true); // 是否返回响应
 
        ObjectMapper mapper = new ObjectMapper(); // 创建 ObjectMapper 实例
        return mapper.writeValueAsString(jsonMap); // 返回 JSON 字符串
    }
 
    private String getSymbol() {
        List<String> symbolList = subscriptions.stream()
                .map(currency -> currency.getSymbol().replaceAll("USDT", "-USDT")) // 替换符号
                .collect(Collectors.toList()); // 收集符号到列表
        return String.join(",", symbolList); // 将符号列表转换为 字符串
    }
}