From 378b31595f7eebdf46149fa2052cec41f7ce9565 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Sat, 20 Jul 2024 18:06:25 +0800
Subject: [PATCH] 1

---
 websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java     |   47 ++++++++-------
 websocketClient/src/main/java/org/example/wsClient/KucoinClient.java |  105 ++++++++++++++++++++++++++++------
 2 files changed, 112 insertions(+), 40 deletions(-)

diff --git a/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
index 7d45771..29aaf1c 100644
--- a/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
+++ b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
@@ -1,6 +1,8 @@
 package org.example.WsBean;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.http.HttpEntity;
 import org.apache.http.HttpResponse;
@@ -15,6 +17,7 @@
 import org.example.wsClient.GateClient;
 import org.example.wsClient.KucoinClient;
 import org.example.wsClient.MexcClient;
+import org.json.JSONObject;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -60,31 +63,33 @@
 //        }
 //    }
 
-    @Bean
-    public void gateWebsocketRunClientMap() {
-        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
-        if (!CollectionUtils.isEmpty(mexc)) {
-            int batchSize = 100; // 每个线程处理的数据量
-            int totalSize = mexc.size();
-            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
-
-            for (int i = 0; i < threadCount; i++) {
-                int fromIndex = i * batchSize;
-                int toIndex = Math.min(fromIndex + batchSize, totalSize);
-                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
-
-                // 使用自定义线程池提交任务
-                threadPoolTaskExecutor.execute(new GateClient(sublist)::start);
-            }
-
-        }
-    }
+//    @Bean
+//    public void gateWebsocketRunClientMap() {
+//        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
+//        if (!CollectionUtils.isEmpty(mexc)) {
+//            int batchSize = 100; // 每个线程处理的数据量
+//            int totalSize = mexc.size();
+//            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+//
+//            for (int i = 0; i < threadCount; i++) {
+//                int fromIndex = i * batchSize;
+//                int toIndex = Math.min(fromIndex + batchSize, totalSize);
+//                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
+//
+//                // 使用自定义线程池提交任务
+//                threadPoolTaskExecutor.execute(new GateClient(sublist)::start);
+//            }
+//
+//        }
+//    }
 
     @Bean
     public void kucoinWebsocketRunClientMap() throws Exception {
-        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
+        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin"));
         if (!CollectionUtils.isEmpty(mexc)) {
-            String token = doPost();
+            String result = doPost();
+            JSONObject jsonObject = new JSONObject(result);
+            String token = jsonObject.getJSONObject("data").getString("token");
             int batchSize = 100; // 每个线程处理的数据量
             int totalSize = mexc.size();
             int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
diff --git a/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java b/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
index 2700dc6..88ad686 100644
--- a/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
+++ b/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
@@ -1,5 +1,6 @@
 package org.example.wsClient;
 
+import com.alibaba.druid.util.StringUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.reflect.TypeToken;
@@ -8,16 +9,27 @@
 import lombok.extern.slf4j.Slf4j;
 import org.example.pojo.Currency;
 import org.example.util.RedisUtil;
+import org.jetbrains.annotations.NotNull;
+import org.json.JSONException;
 
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
 import javax.websocket.*;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.net.URI;
+import java.net.URLEncoder;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 /**
  * @program: demo
@@ -28,7 +40,7 @@
 @Slf4j
 public class KucoinClient {
 
-    private static final String WS_ENDPOINT = "wss://api.kucoinio.ws/ws/v4/";
+    private static final String WS_ENDPOINT = "wss://ws-api-spot.kucoin.com/spotMarket/level2Depth5:";
     private static final long PING_INTERVAL = 20000;
 
     private final List<Currency> subscriptions;
@@ -37,6 +49,8 @@
     private String token;
     private final Object lock = new Object(); // 添加一个锁对象
     private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
+
+    private String id;
 
     public KucoinClient(List<Currency> subscriptions,String token) {
         this.subscriptions = subscriptions;
@@ -54,12 +68,6 @@
 
             executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
 
-            // 订阅消息
-            for (Currency subscription : subscriptions) {
-                String parameter = getParameter(subscription.getSymbol());
-                session.getBasicRemote().sendText(parameter);
-            }
-
             synchronized (this) {
                 this.wait();
             }
@@ -73,8 +81,24 @@
 
     private void connect() throws Exception {
         WebSocketContainer container = ContainerProvider.getWebSocketContainer();
-        container.connectToServer(this, new URI(WS_ENDPOINT));
+        String url = extracted();
+        if(!StringUtils.isEmpty(url)){
+            container.connectToServer(this, new URI(url));
+            // 订阅消息
+            String parameter = subscription();
+            session.getBasicRemote().sendText(parameter);
+
+        }
     }
+
+    private static final String CHARSET = "UTF-8"; // 定义字符集常量,避免魔法字符串
+
+    private String extracted() throws UnsupportedEncodingException {
+        String symbol = getSymbol();
+        String url = WS_ENDPOINT + symbol + "?token=" + URLEncoder.encode(token, CHARSET);
+        return url;
+    }
+
 
     @OnOpen
     public void onOpen(Session session) {
@@ -90,18 +114,30 @@
     public void onMessage(String message) {
         try {
             Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
-            Object object = map.get("result");
-            Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
-            if (null != resultMap && null != resultMap.get("s")) {
-                HashMap<String,Object> hashMap = new HashMap<>();
-                ObjectMapper mapper = new ObjectMapper();
-                hashMap.put("bids",resultMap.get("bids"));
-                hashMap.put("asks",resultMap.get("asks"));
-                String key = "kucoin" + resultMap.get("s");
-                if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){
-                    System.out.println(hashMap.toString());
+            if(null != map.get("id")){
+                this.id = map.get("id").toString();
+            }
+            if(null != map.get("data")){
+                Object object = map.get("data");
+                String topic = map.get("topic").toString();
+                Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
+                if (null != resultMap) {
+                    HashMap<String,Object> hashMap = new HashMap<>();
+                    ObjectMapper mapper = new ObjectMapper();
+                    hashMap.put("bids",resultMap.get("bids"));
+                    hashMap.put("asks",resultMap.get("asks"));
+
+                    int index = topic.indexOf(":"); // 找到逗号的位置
+                    if (index != -1) { // 如果找到了逗号
+                        String substring = topic.substring(index + 1);
+                        String symbol = substring.replaceAll("-", "");
+                        String key = "kucoin" + symbol;
+                        RedisUtil.set(key, mapper.writeValueAsString(hashMap));
+                    } else {
+                        // 处理未找到特定字符的情况
+                        log.error("topic--->存入redis失败");
+                    }
                 }
-                RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
             }
         } catch (JsonSyntaxException e) {
             log.error("JSON 解析异常:" + e.getMessage(), e);
@@ -167,6 +203,37 @@
             log.error("发送心跳失败", e);
         }
     }
+
+    public String subscription() throws JsonProcessingException, JSONException {
+        String symbol = getSymbol();
+        // 创建一个ObjectMapper实例
+        ObjectMapper mapper = new ObjectMapper();
+
+        // 使用Map构建JSON对象
+        Map<String, Object> jsonMap = new HashMap<>();
+        jsonMap.put("id", id);
+        jsonMap.put("type", "subscribe");
+        jsonMap.put("topic", "/spotMarket/level2Depth50:"+symbol);
+        jsonMap.put("privateChannel", false);
+        jsonMap.put("response", true);
+
+        // 将Map转换为JSON字符串
+        String jsonString = mapper.writeValueAsString(jsonMap);
+        return jsonString;
+
+    }
+
+    @NotNull
+    private String getSymbol() {
+        String symbol;
+        List<String> symbolList = subscriptions.stream()
+                .map(currency -> currency.getSymbol().replaceAll("USDT", "-USDT"))
+                .collect(Collectors.toList());
+        symbol = String.join(",", symbolList);
+        return symbol;
+    }
+
+
 }
 
 

--
Gitblit v1.9.3