package org.example.kucoinclient.wsClient; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.kucoinclient.KucoinClientApplication; import org.example.kucoinclient.comm.ApplicationContextProvider; import org.example.kucoinclient.pojo.Currency; import org.example.kucoinclient.util.RedisUtil; import org.json.JSONException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.context.ApplicationContext; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; import javax.websocket.*; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.net.URI; import java.net.URLEncoder; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * @program: demo * @description: Kucoin WebSocket 客户端 * @create: 2024-07-19 16:44 **/ @ClientEndpoint @Slf4j public class KucoinClient { private static final String WS_ENDPOINT = "wss://ws-api-spot.kucoin.com/spotMarket/level2Depth5:"; // WebSocket 端点 private static final String CHARSET = "UTF-8"; // 定义字符集常量,避免魔法字符串 private static final String PING_MESSAGE = "Ping"; // 心跳消息 private static final long PING_INTERVAL = 20000; // 心跳间隔 private static final int RECONNECT_DELAY = 5; // 重连间隔 private final List subscriptions; // 订阅的货币列表 private final ScheduledExecutorService executorService; // 调度线程池 private Session session; // WebSocket会话 private String token; // 访问令牌 private final Object lock = new Object(); // 锁对象,用于控制重连 private volatile boolean reconnecting = false; // 表示当前是否在重连 private String id; // 当前会话的 ID public KucoinClient(List subscriptions, String token) { this.subscriptions = subscriptions; // 保存订阅的货币列表 this.token = token; // 保存令牌 this.executorService = Executors.newScheduledThreadPool(1); // 创建调度线程池 } public void start() throws Exception { try { connect(); // 连接到 WebSocket 服务器 if (session == null) { log.info("无法在超时时间内连接到服务器。"); // 输出连接失败日志 return; // 结束方法 } // 定期发送心跳 executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS); synchronized (this) { // 同步等待连接 this.wait(); // 当前线程等待 } } catch (Exception e) { log.error("kucoin ws 连接过程中发生异常:" + e.getMessage(), e); // 记录连接过程中发生的异常 throw e; } finally { executorService.shutdown(); // 关闭调度线程池 } } private void connect() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); // 获取 WebSocket 容器 String url = generateWebSocketURL(); // 生成 WebSocket URL container.connectToServer(this, new URI(url)); // 连接到 WebSocket 服务器 String parameter = subscription(); // 生成订阅消息 session.getBasicRemote().sendText(parameter); // 发送订阅消息 } private String generateWebSocketURL() throws UnsupportedEncodingException { String symbol = getSymbol(); // 获取符号 String url = WS_ENDPOINT + symbol + "?token=" + URLEncoder.encode(token, CHARSET); // 构建 WebSocket URL return url; // 返回生成的 URL } @OnOpen public void onOpen(Session session) { log.info("kucoin ws 已连接到服务器。"); // 输出连接成功日志 this.session = session; // 保存当前会话 synchronized (this) { // 同步通知所有等待的线程 this.notify(); } } private static final Gson gson = new Gson(); // Gson 实例,负责 JSON 处理 @OnMessage public void onMessage(String message) { try { Map map = parseMessage(message); // 解析消息 handleMessage(map); // 处理消息 } catch (JsonSyntaxException | JsonProcessingException e) { log.error("JSON 解析异常:", e); // 捕获 JSON 解析异常 } } private Map parseMessage(String message) throws JsonSyntaxException { return gson.fromJson(message, new TypeToken>() {}.getType()); // 解析 JSON 消息 } private void handleMessage(Map map) throws JsonProcessingException { if (map.get("id") != null) { this.id = map.get("id").toString(); // 设置会话 ID } if (map.get("data") != null) { Object object = map.get("data"); // 获取数据内容 processData(map.get("topic").toString(), object); // 处理数据 } } private static final String PREFIX = "kucoin"; // 创建常量,方便以后修改和维护 private void processData(String topic, Object object) throws JsonProcessingException { // 将数据解析为 Map Map resultMap = null; try { resultMap = gson.fromJson(object.toString(), new TypeToken>() {}.getType()); // 解析 JSON 对象 } catch (JsonSyntaxException e) { log.error("JSON 解析失败: {}", e.getMessage()); // 输出 JSON 解析错误日志 return; // 结束方法执行 } if (resultMap != null) { // 创建线程安全的 HashMap Map hashMap = new ConcurrentHashMap<>(); ObjectMapper mapper = new ObjectMapper(); // 创建 ObjectMapper 实例 // 空值检查,避免存储 null 值到 Redis if (resultMap.get("bids") != null && resultMap.get("asks") != null) { Object asksObj = resultMap.get("bids"); Object bidsObj = resultMap.get("asks"); if(bidsObj instanceof List && !((List) bidsObj).isEmpty() && asksObj instanceof List && !((List) asksObj).isEmpty()){ if (bidsObj instanceof List && !((List) bidsObj).isEmpty()) { List bidsList = (List) bidsObj; HashMap pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks pvMap.put("p", new BigDecimal(String.valueOf(bidsList.get(0))).toPlainString()); pvMap.put("v", new BigDecimal(String.valueOf(bidsList.get(1))).toPlainString()); hashMap.put("bids",pvMap); } if (asksObj instanceof List && !((List) asksObj).isEmpty()) { List asksList = (List) asksObj; HashMap pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks pvMap.put("p", new BigDecimal(String.valueOf(asksList.get(0))).toPlainString()); pvMap.put("v", new BigDecimal(String.valueOf(asksList.get(1))).toPlainString()); hashMap.put("asks",pvMap); } String symbol = extractSymbolFromTopic(topic); // 从 topic 提取符号 String key = PREFIX + symbol; // 创建 Redis 缓存键 try { RedisUtil.set(key, mapper.writeValueAsString(hashMap)); // 存储到 Redis } catch (JsonProcessingException e) { log.error("将数据存入 Redis 时出错: {}", e.getMessage()); // 输出数据存储错误日志 } } } } else { log.error("topic--->存入redis失败"); // 输出处理失败日志 } } private String extractSymbolFromTopic(String topic) { int index = topic.indexOf(":"); // 找到分隔符的位置 if (index != -1) { // 如果找到了分隔符 String substring = topic.substring(index + 1); return substring.replaceAll("-", ""); // 返回去掉"-"的符号 } return ""; // 未找到分隔符返回空 } @OnClose public void onClose() throws Exception { log.info("kucoin ws 连接已关闭,尝试重新连接..."); // 输出连接关闭日志 throw new Exception(); } @OnError public void onError(Throwable throwable) throws Exception { log.error("kucoin ws 发生错误: ", throwable); // 输出错误日志 throw new Exception(); } // private void handleConnectionClosedOrError() { // synchronized (lock) { // 同步块,确保线程安全 // if (!reconnecting) { // reconnecting = true; // 状态标记为重连中 // executorService.execute(this::attemptReconnect); // 执行重连操作 // } // } // } // // private void attemptReconnect() { // try { // log.info("kucoin ws 开始重连"); // 输出重连开始日志 // connect(); // 尝试重新连接 // log.info("kucoin ws 重连成功"); // 输出重连成功日志 // } catch (Exception e) { // log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志 // } finally { // synchronized (lock) { // 同步块 // reconnecting = false; // 状态标记为未重连 // scheduleReconnect(); // 重新调度重连任务 // } // } // } // // private void scheduleReconnect() { // if (!executorService.isShutdown()) { // 检查线程池是否已经关闭 // executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连 // } // } private void sendPing() { try { if (session != null) { session.getBasicRemote().sendPing(ByteBuffer.wrap(PING_MESSAGE.getBytes(CHARSET))); // 发送心跳消息 } } catch (Exception e) { log.error("发送心跳失败:", e); // 捕获并记录心跳发送失败的异常 } } public String subscription() throws JsonProcessingException, JSONException { String symbol = getSymbol(); // 获取当前货币符号 // 创建 Map 构建 JSON 对象 Map jsonMap = new HashMap<>(); jsonMap.put("id", id); // 会话 ID jsonMap.put("type", "subscribe"); // 订阅类型 jsonMap.put("topic", "/spotMarket/level1:" + symbol); // 订阅的主题 jsonMap.put("privateChannel", false); // 是否私有通道 jsonMap.put("response", true); // 是否返回响应 ObjectMapper mapper = new ObjectMapper(); // 创建 ObjectMapper 实例 return mapper.writeValueAsString(jsonMap); // 返回 JSON 字符串 } private String getSymbol() { List symbolList = subscriptions.stream() .map(currency -> currency.getSymbol().replaceAll("USDT", "-USDT")) // 替换符号 .collect(Collectors.toList()); // 收集符号到列表 return String.join(",", symbolList); // 将符号列表转换为 字符串 } }