package org.example.wsClient; import com.alibaba.druid.util.StringUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.pojo.Currency; import org.example.util.RedisUtil; import org.jetbrains.annotations.NotNull; import org.json.JSONException; import java.io.*; import java.net.HttpURLConnection; import java.net.URL; import javax.websocket.*; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.net.URLEncoder; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** * @program: demo * @description: * @create: 2024-07-19 16:44 **/ @ClientEndpoint @Slf4j public class KucoinClient { private static final String WS_ENDPOINT = "wss://ws-api-spot.kucoin.com/spotMarket/level2Depth5:"; private static final long PING_INTERVAL = 20000; private final List subscriptions; private final ScheduledExecutorService executorService; private Session session; private String token; private final Object lock = new Object(); // 添加一个锁对象 private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性 private String id; public KucoinClient(List subscriptions,String token) { this.subscriptions = subscriptions; this.token = token; this.executorService = Executors.newScheduledThreadPool(1); } public void start() { try { connect(); if (session == null) { log.info("无法在超时时间内连接到服务器。"); return; } executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS); synchronized (this) { this.wait(); } } catch (Exception e) { log.error("kucoin ws 连接过程中发生异常: " + e.getMessage(), e); } finally { executorService.shutdown(); } } private void connect() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); String url = extracted(); if(!StringUtils.isEmpty(url)){ container.connectToServer(this, new URI(url)); // 订阅消息 String parameter = subscription(); session.getBasicRemote().sendText(parameter); } } private static final String CHARSET = "UTF-8"; // 定义字符集常量,避免魔法字符串 private String extracted() throws UnsupportedEncodingException { String symbol = getSymbol(); String url = WS_ENDPOINT + symbol + "?token=" + URLEncoder.encode(token, CHARSET); return url; } @OnOpen public void onOpen(Session session) { log.info("kucoin ws 已连接到服务器。"); this.session = session; synchronized (this) { this.notify(); } } private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例 @OnMessage public void onMessage(String message) { try { Map map = gson.fromJson(message, new TypeToken>() {}.getType()); if(null != map.get("id")){ this.id = map.get("id").toString(); } if(null != map.get("data")){ Object object = map.get("data"); String topic = map.get("topic").toString(); Map resultMap = gson.fromJson(object.toString(), new TypeToken>() {}.getType()); if (null != resultMap) { HashMap hashMap = new HashMap<>(); ObjectMapper mapper = new ObjectMapper(); hashMap.put("bids",resultMap.get("bids")); hashMap.put("asks",resultMap.get("asks")); int index = topic.indexOf(":"); // 找到逗号的位置 if (index != -1) { // 如果找到了逗号 String substring = topic.substring(index + 1); String symbol = substring.replaceAll("-", ""); String key = "kucoin" + symbol; RedisUtil.set(key, mapper.writeValueAsString(hashMap)); } else { // 处理未找到特定字符的情况 log.error("topic--->存入redis失败"); } } } } catch (JsonSyntaxException e) { log.error("JSON 解析异常:" + e.getMessage(), e); } catch (JsonProcessingException e) { log.error("JSON 解析异常:" + e.getMessage(), e); } } @OnClose public void onClose() { log.info("kucoin ws 连接已关闭,尝试重新连接..."); handleConnectionClosedOrError(); } @OnError public void onError(Throwable throwable) { log.error("kucoin ws 发生错误: " + throwable.getMessage(), throwable); handleConnectionClosedOrError(); } private void handleConnectionClosedOrError() { synchronized (lock) { if (!reconnecting) { reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 } } } private void attemptReconnect() { boolean doReconnect = true; try { log.info("kucoin ws 开始重连"); connect(); // 假设 connect() 方法用于实际的连接逻辑 log.info("kucoin ws 重连成功"); } catch (Exception e) { log.error("kucoin ws 重连失败", e); // 连接失败时,可以根据具体情况决定是否继续重连 // 在这里假设总是继续尝试重连 } finally { synchronized (lock) { if (doReconnect) { scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务 } else { reconnecting = false; // 重连结束后设置 reconnecting 为 false } } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); } } private void sendPing() { try { if (session != null) { session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); } } catch (Exception e) { log.error("发送心跳失败", e); } } public String subscription() throws JsonProcessingException, JSONException { String symbol = getSymbol(); // 创建一个ObjectMapper实例 ObjectMapper mapper = new ObjectMapper(); // 使用Map构建JSON对象 Map jsonMap = new HashMap<>(); jsonMap.put("id", id); jsonMap.put("type", "subscribe"); jsonMap.put("topic", "/spotMarket/level2Depth50:"+symbol); jsonMap.put("privateChannel", false); jsonMap.put("response", true); // 将Map转换为JSON字符串 String jsonString = mapper.writeValueAsString(jsonMap); return jsonString; } @NotNull private String getSymbol() { String symbol; List symbolList = subscriptions.stream() .map(currency -> currency.getSymbol().replaceAll("USDT", "-USDT")) .collect(Collectors.toList()); symbol = String.join(",", symbolList); return symbol; } }