From 5c1e682edc2bc6c89cf0f34f93a438d1da274e64 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Sun, 21 Jul 2024 01:18:56 +0800
Subject: [PATCH] 1

---
 websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java     |   63 +++++++++++-
 websocketClient/src/main/java/org/example/wsClient/BitgetClient.java |  186 +++++++++++++++++++++++++++++++++++++
 websocketClient/src/main/java/org/example/wsClient/GateClient.java   |   49 +++++++--
 3 files changed, 278 insertions(+), 20 deletions(-)

diff --git a/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
index 29aaf1c..d6664e9 100644
--- a/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
+++ b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
@@ -1,6 +1,8 @@
 package org.example.WsBean;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.reflect.TypeToken;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
@@ -14,9 +16,11 @@
 import org.apache.http.util.EntityUtils;
 import org.example.pojo.Currency;
 import org.example.server.impl.CurrencySerivceImpl;
+import org.example.wsClient.BitgetClient;
 import org.example.wsClient.GateClient;
 import org.example.wsClient.KucoinClient;
 import org.example.wsClient.MexcClient;
+import org.json.JSONException;
 import org.json.JSONObject;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
@@ -25,8 +29,10 @@
 import org.springframework.util.CollectionUtils;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * @ClassDescription: 客户端请求类
@@ -84,12 +90,9 @@
 //    }
 
     @Bean
-    public void kucoinWebsocketRunClientMap() throws Exception {
-        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin"));
+    public void bitgetWebsocketRunClientMap() throws JSONException, JsonProcessingException {
+        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "bitget"));
         if (!CollectionUtils.isEmpty(mexc)) {
-            String result = doPost();
-            JSONObject jsonObject = new JSONObject(result);
-            String token = jsonObject.getJSONObject("data").getString("token");
             int batchSize = 100; // 每个线程处理的数据量
             int totalSize = mexc.size();
             int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
@@ -98,13 +101,36 @@
                 int fromIndex = i * batchSize;
                 int toIndex = Math.min(fromIndex + batchSize, totalSize);
                 List<Currency> sublist = mexc.subList(fromIndex, toIndex);
-
+                String parameter = getParameter(sublist);
                 // 使用自定义线程池提交任务
-                threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start);
+                threadPoolTaskExecutor.execute(new BitgetClient(parameter)::start);
             }
 
         }
     }
+//
+//    @Bean
+//    public void kucoinWebsocketRunClientMap() throws Exception {
+//        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin"));
+//        if (!CollectionUtils.isEmpty(mexc)) {
+//            String result = doPost();
+//            JSONObject jsonObject = new JSONObject(result);
+//            String token = jsonObject.getJSONObject("data").getString("token");
+//            int batchSize = 100; // 每个线程处理的数据量
+//            int totalSize = mexc.size();
+//            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+//
+//            for (int i = 0; i < threadCount; i++) {
+//                int fromIndex = i * batchSize;
+//                int toIndex = Math.min(fromIndex + batchSize, totalSize);
+//                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
+//
+//                // 使用自定义线程池提交任务
+//                threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start);
+//            }
+//
+//        }
+//    }
 
     public static String doPost() throws Exception {
         String url = "https://api.kucoin.com/api/v1/bullet-public";
@@ -118,5 +144,28 @@
         defaultHttpClient.getConnectionManager().shutdown();
         return text;
     }
+
+    public String getParameter(List<Currency> list) throws JsonProcessingException, JSONException {
+        // 创建一个ObjectMapper实例
+        ObjectMapper mapper = new ObjectMapper();
+        List<String> symbolList = list.stream().map(Currency::getSymbol).collect(Collectors.toList());
+        // 使用Map构建JSON对象
+        Map<String, Object> jsonMap = new HashMap<>();
+        jsonMap.put("op", "subscribe");
+        List<Map<String, String>> mapList = new ArrayList<>();
+        symbolList.forEach(f->{
+            Map<String, String> argsMap = new HashMap<>();
+            argsMap.put("instType", "SPOT");
+            argsMap.put("channel", "books15");
+            argsMap.put("instId", f);
+            mapList.add(argsMap);
+        });
+        jsonMap.put("args", mapList);
+
+        // 将Map转换为JSON字符串
+        String jsonString = mapper.writeValueAsString(jsonMap);
+        return jsonString;
+
+    }
 }
 
diff --git a/websocketClient/src/main/java/org/example/wsClient/BitgetClient.java b/websocketClient/src/main/java/org/example/wsClient/BitgetClient.java
new file mode 100644
index 0000000..d7f06c8
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/wsClient/BitgetClient.java
@@ -0,0 +1,186 @@
+package org.example.wsClient;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
+import org.example.pojo.Currency;
+import org.example.util.RedisUtil;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import javax.websocket.*;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@ClientEndpoint
+@Slf4j
+public class BitgetClient {
+
+
+    private static final String WS_ENDPOINT = "wss://ws.bitget.com/v2/ws/public";
+    private static final long PING_INTERVAL = 20000;
+
+    private final String subscriptions;
+    private final ScheduledExecutorService executorService;
+    private Session session;
+
+    private final Object lock = new Object(); // 添加一个锁对象
+    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
+
+    public BitgetClient(String subscriptions) {
+        this.subscriptions = subscriptions;
+        this.executorService = Executors.newScheduledThreadPool(1);
+    }
+
+    public void start() {
+        try {
+            connect();
+            if (session == null) {
+                log.info("无法在超时时间内连接到服务器。");
+                return;
+            }
+
+            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
+
+            // 订阅消息
+            session.getBasicRemote().sendText(subscriptions);
+
+            synchronized (this) {
+                this.wait();
+            }
+
+        } catch (Exception e) {
+            log.error("bitget ws 连接过程中发生异常: " + e.getMessage(), e);
+        } finally {
+            executorService.shutdown();
+        }
+    }
+
+    private void connect() throws Exception {
+        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+        container.connectToServer(this, new URI(WS_ENDPOINT));
+    }
+
+    @OnOpen
+    public void onOpen(Session session) {
+        log.info("bitget ws 已连接到服务器。");
+        this.session = session;
+        synchronized (this) {
+            this.notify();
+        }
+    }
+    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
+
+    @OnMessage
+    public void onMessage(String message) {
+        try {
+            JSONObject jsonObject = new JSONObject(message);
+            if (null != jsonObject && null != jsonObject.getJSONObject("arg").getString("instId") && null != jsonObject.get("data")) {
+                HashMap<String,Object> hashMap = new HashMap<>();
+                ObjectMapper mapper = new ObjectMapper();
+
+                hashMap.put("bids",jsonObject.getJSONObject("data").getString("bids"));
+                hashMap.put("asks",jsonObject.getJSONObject("data").getString("asks"));
+
+                String key = "bitget" + jsonObject.getJSONObject("arg").getString("instId");
+                RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
+            }
+        } catch (JsonSyntaxException e) {
+            log.error("JSON 解析异常:" + e.getMessage(), e);
+        } catch (JsonProcessingException e) {
+            log.error("JSON 解析异常:" + e.getMessage(), e);
+        } catch (JSONException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @OnClose
+    public void onClose() {
+        log.info("bitget ws 连接已关闭,尝试重新连接...");
+        handleConnectionClosedOrError();
+    }
+
+    @OnError
+    public void onError(Throwable throwable) {
+        log.error("bitget ws 发生错误: " + throwable.getMessage(), throwable);
+        handleConnectionClosedOrError();
+    }
+
+    private void handleConnectionClosedOrError() {
+        synchronized (lock) {
+            if (!reconnecting) {
+                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
+            }
+        }
+    }
+
+    private void attemptReconnect() {
+        boolean doReconnect = true;
+        try {
+            log.info("bitget ws 开始重连");
+            connect(); // 假设 connect() 方法用于实际的连接逻辑
+            log.info("bitget ws 重连成功");
+        } catch (Exception e) {
+            log.error("bitget ws 重连失败", e);
+            // 连接失败时,可以根据具体情况决定是否继续重连
+            // 在这里假设总是继续尝试重连
+        } finally {
+            synchronized (lock) {
+                if (doReconnect) {
+                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
+                } else {
+                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
+                }
+            }
+        }
+    }
+
+    private void scheduleReconnect() {
+        if (!executorService.isShutdown()) {
+            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
+        }
+    }
+
+    private void sendPing() {
+        try {
+            if (session != null) {
+                session.getBasicRemote().sendText("ping"); // 发送心跳消息
+            }
+        } catch (Exception e) {
+            log.error("发送心跳失败", e);
+        }
+    }
+
+    public String getParameter(String symbol) throws JsonProcessingException, JSONException {
+        // 创建一个ObjectMapper实例
+        ObjectMapper mapper = new ObjectMapper();
+
+        // 使用Map构建JSON对象
+        Map<String, Object> jsonMap = new HashMap<>();
+        jsonMap.put("op", "subscribe");
+        Map<String, Object> argsMap = new HashMap<>();
+        jsonMap.put("args", argsMap);
+
+        argsMap.put("instType", "SPOT");
+        argsMap.put("channel", "books15");
+        argsMap.put("instId", symbol);
+
+        // 将Map转换为JSON字符串
+        String jsonString = mapper.writeValueAsString(jsonMap);
+        return jsonString;
+
+    }
+
+
+}
diff --git a/websocketClient/src/main/java/org/example/wsClient/GateClient.java b/websocketClient/src/main/java/org/example/wsClient/GateClient.java
index 729bf0f..466b1cb 100644
--- a/websocketClient/src/main/java/org/example/wsClient/GateClient.java
+++ b/websocketClient/src/main/java/org/example/wsClient/GateClient.java
@@ -11,6 +11,7 @@
 import lombok.extern.slf4j.Slf4j;
 import org.example.pojo.Currency;
 import org.example.util.RedisUtil;
+import org.json.JSONArray;
 import org.json.JSONException;
 import org.json.JSONObject;
 
@@ -18,10 +19,7 @@
 import java.math.BigDecimal;
 import java.net.URI;
 import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -101,12 +99,11 @@
             if (null != resultMap && null != resultMap.get("s")) {
                 HashMap<String,Object> hashMap = new HashMap<>();
                 ObjectMapper mapper = new ObjectMapper();
+
                 hashMap.put("bids",resultMap.get("bids"));
                 hashMap.put("asks",resultMap.get("asks"));
+
                 String key = "gate" + resultMap.get("s");
-                if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){
-                    System.out.println(hashMap.toString());
-                }
                 RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
             }
         } catch (JsonSyntaxException e) {
@@ -202,12 +199,38 @@
 
     }
 
-    public static void main(String[] args) {
-        String scientificNotation = "5.0E-5"; // 科学计数法字符串
-        BigDecimal bigDecimal = new BigDecimal(scientificNotation);
+//    public static void main(String[] args) throws JSONException {
+//        // 从 resultMap 中获取 "bids" 对应的 JSON 数组
+//        JSONArray jsonArray = new JSONArray(resultMap.get("bids").toString()); // 将获取的 bids 转换成 JSON 数组
+//        List<List<String>> resultList = new ArrayList<>(); // 存放所有的内层列表
+//
+//        // 遍历 JSON 数组
+//        for (int i = 0; i < jsonArray.length(); i++) {
+//            JSONArray innerArray = jsonArray.getJSONArray(i); // 获取当前内层 JSON 数组
+//            List<String> innerList = new ArrayList<>(); // 存放当前内层数组的元素
+//
+//            // 遍历内层 JSON 数组
+//            for (int j = 0; j < innerArray.length(); j++) {
+//                innerList.add(innerArray.getString(j)); // 将元素添加到内层列表中
+//            }
+//
+//            resultList.add(innerList); // 将内层列表添加到结果列表中
+//        }
+//
+//        // 考虑去掉未使用的 dataList,下面的代码使用 resultList 而不是 dataList
+//        List<Map<String, String>> resultMapList = new ArrayList<>(); // 存放最终的映射结果
+//        for (List<String> entry : resultList) { // 遍历 resultList 中的每个内层列表
+//            // 确保每个内层列表有足够的元素,再进行映射
+//            if (entry.size() >= 2) { // 判断 entry 的大小,避免 IndexOutOfBoundsException
+//                Map<String, String> mapKey = new HashMap<>(); // 新建一个 Map 以存储键值对
+//                mapKey.put("p", entry.get(0)); // 将内层列表的第一个元素作为键 "p"
+//                mapKey.put("v", entry.get(1)); // 将内层列表的第二个元素作为键 "v"
+//                resultMapList.add(mapKey); // 将 map 添加到结果映射列表中
+//            }
+//        }
+//
+//    }
 
-        System.out.println("Scientific Notation: " + scientificNotation);
-        System.out.println("BigDecimal Value: " + bigDecimal);
-    }
+
 
 }

--
Gitblit v1.9.3