From 378b31595f7eebdf46149fa2052cec41f7ce9565 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Sat, 20 Jul 2024 18:06:25 +0800
Subject: [PATCH] 1

---
 websocketClient/src/main/java/org/example/wsClient/KucoinClient.java |  105 +++++++++++++++++++++++++++++++++++++++++++---------
 1 files changed, 86 insertions(+), 19 deletions(-)

diff --git a/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java b/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
index 2700dc6..88ad686 100644
--- a/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
+++ b/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
@@ -1,5 +1,6 @@
 package org.example.wsClient;
 
+import com.alibaba.druid.util.StringUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.reflect.TypeToken;
@@ -8,16 +9,27 @@
 import lombok.extern.slf4j.Slf4j;
 import org.example.pojo.Currency;
 import org.example.util.RedisUtil;
+import org.jetbrains.annotations.NotNull;
+import org.json.JSONException;
 
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
 import javax.websocket.*;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.net.URI;
+import java.net.URLEncoder;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * @program: demo
@@ -28,7 +40,7 @@
 @Slf4j
 public class KucoinClient {
 
-    private static final String WS_ENDPOINT = "wss://api.kucoinio.ws/ws/v4/";
+    private static final String WS_ENDPOINT = "wss://ws-api-spot.kucoin.com/spotMarket/level2Depth5:";
     private static final long PING_INTERVAL = 20000;
 
     private final List<Currency> subscriptions;
@@ -37,6 +49,8 @@
     private String token;
     private final Object lock = new Object(); // 添加一个锁对象
     private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
+
+    private String id;
 
     public KucoinClient(List<Currency> subscriptions,String token) {
         this.subscriptions = subscriptions;
@@ -54,12 +68,6 @@
 
             executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
 
-            // 订阅消息
-            for (Currency subscription : subscriptions) {
-                String parameter = getParameter(subscription.getSymbol());
-                session.getBasicRemote().sendText(parameter);
-            }
-
             synchronized (this) {
                 this.wait();
             }
@@ -73,8 +81,24 @@
 
     private void connect() throws Exception {
         WebSocketContainer container = ContainerProvider.getWebSocketContainer();
-        container.connectToServer(this, new URI(WS_ENDPOINT));
+        String url = extracted();
+        if(!StringUtils.isEmpty(url)){
+            container.connectToServer(this, new URI(url));
+            // 订阅消息
+            String parameter = subscription();
+            session.getBasicRemote().sendText(parameter);
+
+        }
     }
+
+    private static final String CHARSET = "UTF-8"; // 定义字符集常量,避免魔法字符串
+
+    private String extracted() throws UnsupportedEncodingException {
+        String symbol = getSymbol();
+        String url = WS_ENDPOINT + symbol + "?token=" + URLEncoder.encode(token, CHARSET);
+        return url;
+    }
+
 
     @OnOpen
     public void onOpen(Session session) {
@@ -90,18 +114,30 @@
     public void onMessage(String message) {
         try {
             Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
-            Object object = map.get("result");
-            Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
-            if (null != resultMap && null != resultMap.get("s")) {
-                HashMap<String,Object> hashMap = new HashMap<>();
-                ObjectMapper mapper = new ObjectMapper();
-                hashMap.put("bids",resultMap.get("bids"));
-                hashMap.put("asks",resultMap.get("asks"));
-                String key = "kucoin" + resultMap.get("s");
-                if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){
-                    System.out.println(hashMap.toString());
+            if(null != map.get("id")){
+                this.id = map.get("id").toString();
+            }
+            if(null != map.get("data")){
+                Object object = map.get("data");
+                String topic = map.get("topic").toString();
+                Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
+                if (null != resultMap) {
+                    HashMap<String,Object> hashMap = new HashMap<>();
+                    ObjectMapper mapper = new ObjectMapper();
+                    hashMap.put("bids",resultMap.get("bids"));
+                    hashMap.put("asks",resultMap.get("asks"));
+
+                    int index = topic.indexOf(":"); // 找到逗号的位置
+                    if (index != -1) { // 如果找到了逗号
+                        String substring = topic.substring(index + 1);
+                        String symbol = substring.replaceAll("-", "");
+                        String key = "kucoin" + symbol;
+                        RedisUtil.set(key, mapper.writeValueAsString(hashMap));
+                    } else {
+                        // 处理未找到特定字符的情况
+                        log.error("topic--->存入redis失败");
+                    }
                 }
-                RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
             }
         } catch (JsonSyntaxException e) {
             log.error("JSON 解析异常:" + e.getMessage(), e);
@@ -167,6 +203,37 @@
             log.error("发送心跳失败", e);
         }
     }
+
+    public String subscription() throws JsonProcessingException, JSONException {
+        String symbol = getSymbol();
+        // 创建一个ObjectMapper实例
+        ObjectMapper mapper = new ObjectMapper();
+
+        // 使用Map构建JSON对象
+        Map<String, Object> jsonMap = new HashMap<>();
+        jsonMap.put("id", id);
+        jsonMap.put("type", "subscribe");
+        jsonMap.put("topic", "/spotMarket/level2Depth50:"+symbol);
+        jsonMap.put("privateChannel", false);
+        jsonMap.put("response", true);
+
+        // 将Map转换为JSON字符串
+        String jsonString = mapper.writeValueAsString(jsonMap);
+        return jsonString;
+
+    }
+
+    @NotNull
+    private String getSymbol() {
+        String symbol;
+        List<String> symbolList = subscriptions.stream()
+                .map(currency -> currency.getSymbol().replaceAll("USDT", "-USDT"))
+                .collect(Collectors.toList());
+        symbol = String.join(",", symbolList);
+        return symbol;
+    }
+
+
 }
 
 

--
Gitblit v1.9.3