package org.example.kucoinclient.wsClient;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.google.common.reflect.TypeToken;
|
import com.google.gson.Gson;
|
import com.google.gson.JsonSyntaxException;
|
import lombok.extern.slf4j.Slf4j;
|
import org.example.kucoinclient.pojo.Currency;
|
import org.example.kucoinclient.util.RedisUtil;
|
import org.json.JSONException;
|
|
import javax.websocket.*;
|
import java.io.UnsupportedEncodingException;
|
import java.math.BigDecimal;
|
import java.net.URI;
|
import java.net.URLEncoder;
|
import java.nio.ByteBuffer;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
import java.util.stream.Collectors;
|
|
/**
|
* @program: demo
|
* @description: Kucoin WebSocket 客户端
|
* @create: 2024-07-19 16:44
|
**/
|
@ClientEndpoint
|
@Slf4j
|
public class KucoinClient {
|
|
private static final String WS_ENDPOINT = "wss://ws-api-spot.kucoin.com/spotMarket/level2Depth5:"; // WebSocket 端点
|
private static final String CHARSET = "UTF-8"; // 定义字符集常量,避免魔法字符串
|
private static final String PING_MESSAGE = "Ping"; // 心跳消息
|
private static final long PING_INTERVAL = 20000; // 心跳间隔
|
private static final int RECONNECT_DELAY = 5; // 重连间隔
|
|
private final List<Currency> subscriptions; // 订阅的货币列表
|
private final ScheduledExecutorService executorService; // 调度线程池
|
private Session session; // WebSocket会话
|
private String token; // 访问令牌
|
private final Object lock = new Object(); // 锁对象,用于控制重连
|
private volatile boolean reconnecting = false; // 表示当前是否在重连
|
|
private String id; // 当前会话的 ID
|
|
public KucoinClient(List<Currency> subscriptions, String token) {
|
this.subscriptions = subscriptions; // 保存订阅的货币列表
|
this.token = token; // 保存令牌
|
this.executorService = Executors.newScheduledThreadPool(1); // 创建调度线程池
|
}
|
|
public void start() {
|
try {
|
connect(); // 连接到 WebSocket 服务器
|
if (session == null) {
|
log.info("无法在超时时间内连接到服务器。"); // 输出连接失败日志
|
return; // 结束方法
|
}
|
|
// 定期发送心跳
|
executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
|
|
synchronized (this) { // 同步等待连接
|
this.wait(); // 当前线程等待
|
}
|
|
} catch (Exception e) {
|
log.error("kucoin ws 连接过程中发生异常: ", e); // 捕获并记录异常
|
} finally {
|
executorService.shutdown(); // 关闭调度线程池
|
}
|
}
|
|
private void connect() throws Exception {
|
WebSocketContainer container = ContainerProvider.getWebSocketContainer(); // 获取 WebSocket 容器
|
String url = generateWebSocketURL(); // 生成 WebSocket URL
|
container.connectToServer(this, new URI(url)); // 连接到 WebSocket 服务器
|
String parameter = subscription(); // 生成订阅消息
|
session.getBasicRemote().sendText(parameter); // 发送订阅消息
|
}
|
|
private String generateWebSocketURL() throws UnsupportedEncodingException {
|
String symbol = getSymbol(); // 获取符号
|
String url = WS_ENDPOINT + symbol + "?token=" + URLEncoder.encode(token, CHARSET); // 构建 WebSocket URL
|
return url; // 返回生成的 URL
|
}
|
|
@OnOpen
|
public void onOpen(Session session) {
|
log.info("kucoin ws 已连接到服务器。"); // 输出连接成功日志
|
this.session = session; // 保存当前会话
|
synchronized (this) { // 同步通知所有等待的线程
|
this.notify();
|
}
|
}
|
|
private static final Gson gson = new Gson(); // Gson 实例,负责 JSON 处理
|
|
@OnMessage
|
public void onMessage(String message) {
|
try {
|
Map<String, Object> map = parseMessage(message); // 解析消息
|
handleMessage(map); // 处理消息
|
} catch (JsonSyntaxException | JsonProcessingException e) {
|
log.error("JSON 解析异常:", e); // 捕获 JSON 解析异常
|
}
|
}
|
|
private Map<String, Object> parseMessage(String message) throws JsonSyntaxException {
|
return gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType()); // 解析 JSON 消息
|
}
|
|
private void handleMessage(Map<String, Object> map) throws JsonProcessingException {
|
if (map.get("id") != null) {
|
this.id = map.get("id").toString(); // 设置会话 ID
|
}
|
if (map.get("data") != null) {
|
Object object = map.get("data"); // 获取数据内容
|
processData(map.get("topic").toString(), object); // 处理数据
|
}
|
}
|
|
private static final String PREFIX = "kucoin"; // 创建常量,方便以后修改和维护
|
|
private void processData(String topic, Object object) throws JsonProcessingException {
|
// 将数据解析为 Map
|
Map<String, Object> resultMap = null;
|
try {
|
resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType()); // 解析 JSON 对象
|
} catch (JsonSyntaxException e) {
|
log.error("JSON 解析失败: {}", e.getMessage()); // 输出 JSON 解析错误日志
|
return; // 结束方法执行
|
}
|
|
if (resultMap != null) {
|
// 创建线程安全的 HashMap
|
Map<String, Object> hashMap = new ConcurrentHashMap<>();
|
ObjectMapper mapper = new ObjectMapper(); // 创建 ObjectMapper 实例
|
|
// 空值检查,避免存储 null 值到 Redis
|
if (resultMap.get("bids") != null) {
|
Object bidsObj = resultMap.get("bids");
|
if (bidsObj instanceof List && !((List<?>) bidsObj).isEmpty()) {
|
List<String> bidsList = (List<String>) bidsObj;
|
HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
|
pvMap.put("p", bidsList.get(0));
|
pvMap.put("V", bidsList.get(1));
|
hashMap.put("bids",pvMap);
|
}
|
}
|
if (resultMap.get("asks") != null) {
|
|
Object asksObj = resultMap.get("asks");
|
if (asksObj instanceof List && !((List<?>) asksObj).isEmpty()) {
|
List<String> asksList = (List<String>) asksObj;
|
HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
|
pvMap.put("p", asksList.get(0));
|
pvMap.put("V", asksList.get(1));
|
hashMap.put("asks",pvMap);
|
}
|
}
|
|
String symbol = extractSymbolFromTopic(topic); // 从 topic 提取符号
|
String key = PREFIX + symbol; // 创建 Redis 缓存键
|
try {
|
RedisUtil.set(key, mapper.writeValueAsString(hashMap)); // 存储到 Redis
|
} catch (JsonProcessingException e) {
|
log.error("将数据存入 Redis 时出错: {}", e.getMessage()); // 输出数据存储错误日志
|
}
|
} else {
|
log.error("topic--->存入redis失败"); // 输出处理失败日志
|
}
|
}
|
|
|
private String extractSymbolFromTopic(String topic) {
|
int index = topic.indexOf(":"); // 找到分隔符的位置
|
if (index != -1) { // 如果找到了分隔符
|
String substring = topic.substring(index + 1);
|
return substring.replaceAll("-", ""); // 返回去掉"-"的符号
|
}
|
return ""; // 未找到分隔符返回空
|
}
|
|
@OnClose
|
public void onClose() {
|
log.info("kucoin ws 连接已关闭,尝试重新连接..."); // 输出连接关闭日志
|
handleConnectionClosedOrError(); // 处理连接关闭或错误
|
}
|
|
@OnError
|
public void onError(Throwable throwable) {
|
log.error("kucoin ws 发生错误: ", throwable); // 输出错误日志
|
handleConnectionClosedOrError(); // 处理连接关闭或错误
|
}
|
|
private void handleConnectionClosedOrError() {
|
synchronized (lock) { // 同步块,确保线程安全
|
if (!reconnecting) {
|
reconnecting = true; // 状态标记为重连中
|
executorService.execute(this::attemptReconnect); // 执行重连操作
|
}
|
}
|
}
|
|
private void attemptReconnect() {
|
try {
|
log.info("kucoin ws 开始重连"); // 输出重连开始日志
|
connect(); // 尝试重新连接
|
log.info("kucoin ws 重连成功"); // 输出重连成功日志
|
} catch (Exception e) {
|
log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志
|
} finally {
|
synchronized (lock) { // 同步块
|
reconnecting = false; // 状态标记为未重连
|
scheduleReconnect(); // 重新调度重连任务
|
}
|
}
|
}
|
|
private void scheduleReconnect() {
|
if (!executorService.isShutdown()) { // 检查线程池是否已经关闭
|
executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连
|
}
|
}
|
|
private void sendPing() {
|
try {
|
if (session != null) {
|
session.getBasicRemote().sendPing(ByteBuffer.wrap(PING_MESSAGE.getBytes(CHARSET))); // 发送心跳消息
|
}
|
} catch (Exception e) {
|
log.error("发送心跳失败:", e); // 捕获并记录心跳发送失败的异常
|
}
|
}
|
|
public String subscription() throws JsonProcessingException, JSONException {
|
String symbol = getSymbol(); // 获取当前货币符号
|
|
// 创建 Map 构建 JSON 对象
|
Map<String, Object> jsonMap = new HashMap<>();
|
jsonMap.put("id", id); // 会话 ID
|
jsonMap.put("type", "subscribe"); // 订阅类型
|
jsonMap.put("topic", "/spotMarket/level1:" + symbol); // 订阅的主题
|
jsonMap.put("privateChannel", false); // 是否私有通道
|
jsonMap.put("response", true); // 是否返回响应
|
|
ObjectMapper mapper = new ObjectMapper(); // 创建 ObjectMapper 实例
|
return mapper.writeValueAsString(jsonMap); // 返回 JSON 字符串
|
}
|
|
private String getSymbol() {
|
List<String> symbolList = subscriptions.stream()
|
.map(currency -> currency.getSymbol().replaceAll("USDT", "-USDT")) // 替换符号
|
.collect(Collectors.toList()); // 收集符号到列表
|
return String.join(",", symbolList); // 将符号列表转换为 字符串
|
}
|
}
|