From bf47f0de2c4b4ca35ce08c7abd2b5b2477b31dac Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Thu, 18 Jul 2024 03:20:57 +0800
Subject: [PATCH] 1

---
 websocketClient/src/main/java/org/example/pojo/Currency.java                   |   25 ++
 websocketClient/src/main/resources/application.yml                             |   74 ++++++
 websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java |   15 +
 websocketSerivce/src/main/java/org/example/task/GateStock.java                 |    4 
 .idea/misc.xml                                                                 |    1 
 websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java               |   51 ++++
 /dev/null                                                                      |   69 ------
 websocketClient/src/main/java/org/example/util/RedisUtil.java                  |  119 ++++++++++
 websocketSerivce/src/main/java/org/example/task/BitgetStock.java               |    4 
 websocketClient/src/main/java/org/example/wsClient/MexcClient.java             |  148 +++++++++++++
 websocketClient/pom.xml                                                        |   66 ++++++
 websocketClient/src/main/java/org/example/dao/CurrencyMapper.java              |   14 +
 websocketSerivce/src/main/java/org/example/task/MexcStock.java                 |    4 
 websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java |   10 
 websocketClient/src/main/java/org/example/server/CurrencySerivce.java          |    9 
 websocketSerivce/src/main/java/org/example/task/KucoinStock.java               |    4 
 16 files changed, 537 insertions(+), 80 deletions(-)

diff --git a/.idea/misc.xml b/.idea/misc.xml
index 4b661a5..f0d7b15 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,4 +1,3 @@
-<?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
   <component name="ExternalStorageConfigurationManager" enabled="true" />
   <component name="MavenProjectsManager">
diff --git a/websocketClient/pom.xml b/websocketClient/pom.xml
index 31b848e..8167829 100644
--- a/websocketClient/pom.xml
+++ b/websocketClient/pom.xml
@@ -12,6 +12,22 @@
     <artifactId>websocketClient</artifactId>
     <dependencies>
         <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>20.0</version>
+        </dependency>
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+            <version>3.7.0</version>
+        </dependency>
+        <dependency>
+            <groupId>com.baomidou</groupId>
+            <artifactId>mybatis-plus-boot-starter</artifactId>
+            <version>3.4.3.2</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-web</artifactId>
         </dependency>
@@ -40,6 +56,56 @@
             <artifactId>gson</artifactId>
             <version>2.8.8</version>
         </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid-spring-boot-starter</artifactId>
+            <version>1.2.11</version>
+        </dependency>
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.mybatis.spring.boot</groupId>
+            <artifactId>mybatis-spring-boot-starter</artifactId>
+            <version>2.2.2</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-websocket</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-redis</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter</artifactId>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-pool2</artifactId>
+            <version>2.11.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+            <version>3.7.0</version>
+        </dependency>
     </dependencies>
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
diff --git a/websocketClient/src/main/java/org/example/config/AsyncConfiguration.java b/websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java
similarity index 72%
rename from websocketClient/src/main/java/org/example/config/AsyncConfiguration.java
rename to websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java
index f77bada..bbbc919 100644
--- a/websocketClient/src/main/java/org/example/config/AsyncConfiguration.java
+++ b/websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java
@@ -1,4 +1,4 @@
-package org.example.config;
+package org.example.ThreadConfig;
 
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
@@ -17,11 +17,11 @@
     @Bean(name = "threadPoolTaskExecutor")
     public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-        executor.setCorePoolSize(50);    //  核心线程数
-        executor.setMaxPoolSize(100);    //  最大线程数
-        executor.setQueueCapacity(300);    //  队列容量
+        executor.setCorePoolSize(400);    //  核心线程数
+        executor.setMaxPoolSize(1200);    //  最大线程数
+        executor.setQueueCapacity(1500);    //  队列容量
         executor.setKeepAliveSeconds(60);    //  线程空闲时的存活时间为60秒
-        executor.setThreadNamePrefix("MyThread-");    //  线程名称的前缀
+        executor.setThreadNamePrefix("MexcThread-");    //  线程名称的前缀
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    //  使用  CallerRunsPolicy  拒绝策略
         return executor;
     }
diff --git a/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
new file mode 100644
index 0000000..0315b55
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
@@ -0,0 +1,51 @@
+package org.example.WsBean;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.example.pojo.Currency;
+import org.example.server.impl.CurrencySerivceImpl;
+import org.example.wsClient.MexcClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+
+/**
+ * @ClassDescription: 客户端请求类
+ * @JdkVersion: 1.8
+ * @Created: 2023/8/31 16:13
+ */
+@Slf4j
+@Configuration
+public class MexcWsBean {
+
+    @Autowired
+    private CurrencySerivceImpl currencyService;
+
+    @Autowired
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+    @Bean
+    public void mexcWebsocketRunClientMap() {
+        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
+        if (!CollectionUtils.isEmpty(mexc)) {
+            int batchSize = 30; // 每个线程处理的数据量
+            int totalSize = mexc.size();
+            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+
+            for (int i = 0; i < threadCount; i++) {
+                int fromIndex = i * batchSize;
+                int toIndex = Math.min(fromIndex + batchSize, totalSize);
+                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
+
+                // 使用自定义线程池提交任务
+                threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
+            }
+
+        }
+    }
+}
+
diff --git a/websocketClient/src/main/java/org/example/constant/StockConstant.java b/websocketClient/src/main/java/org/example/constant/StockConstant.java
deleted file mode 100644
index 7ca7395..0000000
--- a/websocketClient/src/main/java/org/example/constant/StockConstant.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.example.constant;
-
-/**
- * 股票长量配置
- * */
-public class StockConstant {
-
-    public static String HTTP_API = "http://api-in-2.js-stock.top/";
-
-    public static String IO_HTTP_API = "http://api-in-2-socket.js-stock.top";
-
-    public static String HTTP_F_API = "http://api-v1-f.js-stock.top/";
-
-
-    public  static String WS_URL = "ws://api-in-2-ws.js-stock.top";
-
-    public static String KEY = "eVKtHt7aG4m6ozwWL9qG";
-
-
-    public static String US_API_URL = "http://api-us.js-stock.top/";
-
-    public  static String US_KEY = "jZFrku4RGQjP87Hmq5tm";
-
-
-    public  static String JP_HTTP_API = "http://api-jp.js-stock.top/";
-    public  static String JP_WS_URL = "ws://api-jp-ws.js-stock.top";
-    public  static String JP_KEY = "glcd4SAiD6oXExIDOtUJ";
-}
diff --git a/websocketClient/src/main/java/org/example/controller/WsClientController.java b/websocketClient/src/main/java/org/example/controller/WsClientController.java
deleted file mode 100644
index c30defe..0000000
--- a/websocketClient/src/main/java/org/example/controller/WsClientController.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.example.controller;
-
-import lombok.extern.slf4j.Slf4j;
-import org.example.server.wcServer;
-import org.java_websocket.client.WebSocketClient;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @ClassDescription: 客户端请求类
- * @JdkVersion: 1.8
- * @Created: 2023/8/31 16:13
- */
-@Slf4j
-@Configuration
-public class WsClientController {
-
-    @Bean
-    public Map<String, WebSocketClient> websocketRunClientMap() {
-
-        Map<String, WebSocketClient> retMap = new HashMap<>(2);
-        try {
-            wcServer websocketRunClient = new wcServer(new URI("wss://api.gateio.ws/ws/v4/"));
-            websocketRunClient.connect();
-            websocketRunClient.setConnectionLostTimeout(0);
-            new Thread(() -> {
-                while (true) {
-                    try {
-                        Thread.sleep(8000);
-                        websocketRunClient.send("heartbeat".getBytes());
-                    } catch (Exception e) {
-                        websocketRunClient.reconnect();
-                        websocketRunClient.setConnectionLostTimeout(0);
-                    }
-                }
-            }).start();
-        } catch (Exception e) {
-        }
-        return retMap;
-    }
-
-
-}
-
diff --git a/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java b/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
new file mode 100644
index 0000000..86b645b
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
@@ -0,0 +1,14 @@
+package org.example.dao;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+import org.example.pojo.Currency;
+
+/**
+ * @program: demo
+ * @description:
+ * @create: 2024-07-15 17:45
+ **/
+@Mapper
+public interface CurrencyMapper extends BaseMapper<Currency> {
+}
diff --git a/websocketClient/src/main/java/org/example/pojo/Currency.java b/websocketClient/src/main/java/org/example/pojo/Currency.java
new file mode 100644
index 0000000..2e0e322
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/pojo/Currency.java
@@ -0,0 +1,25 @@
+package org.example.pojo;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import lombok.Data;
+
+/**
+ * @program: demo
+ * @description: 交易对
+ * @create: 2024-07-15 17:41
+ **/
+@Data
+public class Currency {
+    @TableId(value = "id",type = IdType.AUTO)
+    private String id;
+    //交易对
+    private String symbol;
+    //交易币
+    private String baseAsset;
+    //计价币
+    private String quoteAsset;
+    //来源
+    private String source;
+
+}
diff --git a/websocketClient/src/main/java/org/example/server/CurrencySerivce.java b/websocketClient/src/main/java/org/example/server/CurrencySerivce.java
new file mode 100644
index 0000000..d50a81f
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/server/CurrencySerivce.java
@@ -0,0 +1,9 @@
+package org.example.server;
+
+/**
+ * @program: demo
+ * @description:
+ * @create: 2024-07-16 15:23
+ **/
+public interface CurrencySerivce {
+}
diff --git a/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java b/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
new file mode 100644
index 0000000..1b220f2
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
@@ -0,0 +1,15 @@
+package org.example.server.impl;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.example.dao.CurrencyMapper;
+import org.example.pojo.Currency;
+import org.springframework.stereotype.Service;
+
+/**
+ * @program: demo
+ * @description:
+ * @create: 2024-07-16 15:23
+ **/
+@Service
+public class CurrencySerivceImpl extends ServiceImpl<CurrencyMapper, Currency> {
+}
diff --git a/websocketClient/src/main/java/org/example/server/wcServer.java b/websocketClient/src/main/java/org/example/server/wcServer.java
deleted file mode 100644
index adeaf59..0000000
--- a/websocketClient/src/main/java/org/example/server/wcServer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.example.server;
-
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.message.BasicNameValuePair;
-import org.example.constant.StockConstant;
-import org.java_websocket.WebSocket;
-import org.java_websocket.client.WebSocketClient;
-import org.java_websocket.handshake.ServerHandshake;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import javax.websocket.ClientEndpoint;
-import javax.websocket.ContainerProvider;
-import javax.websocket.Session;
-import javax.websocket.WebSocketContainer;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @program: webSocketProject
- * @description:
- * @create: 2024-03-26 16:11
- **/
-@ClientEndpoint
-public class wcServer extends WebSocketClient {
-
-    public wcServer(URI serverUri) throws URISyntaxException {
-        super(serverUri);
-    }
-
-    @Override
-    public void onOpen(ServerHandshake shake) {
-        long timestamp = System.currentTimeMillis() / 1000;
-        String payload = "{\"time\": " + timestamp + ", \"channel\": \"spot.order_book\", \"event\": \"subscribe\", \"payload\": [\"BTC_USDT\", \"5\", \"100ms\"]}";
-        send(payload);
-    }
-
-    @Override
-    public void onMessage(String paramString) {
-        send(paramString);
-    }
-
-    @Override
-    public void onClose(int paramInt, String paramString, boolean paramBoolean) {
-        System.out.println("关闭...");
-    }
-
-    @Override
-    public void onError(Exception e) {
-        System.out.println("异常" + e);
-
-    }
-
-    @Override
-    public void send(String message) {
-    }
-
-}
diff --git a/websocketClient/src/main/java/org/example/util/RedisUtil.java b/websocketClient/src/main/java/org/example/util/RedisUtil.java
new file mode 100644
index 0000000..63361dd
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/util/RedisUtil.java
@@ -0,0 +1,119 @@
+package org.example.util;
+
+import redis.clients.jedis.*;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class RedisUtil {
+    private static JedisPool jedisPool;
+
+    // 在静态代码块中初始化 JedisPool
+    static {
+        jedisPool = new JedisPool(new JedisPoolConfig(), "localhost", 6379);
+    }
+
+    public static void set(String key, String value) {
+        try (Jedis jedis = jedisPool.getResource()) {
+            jedis.set(key, value);
+        }
+    }
+
+    public static String get(String key) {
+        try (Jedis jedis = jedisPool.getResource()) {
+            return jedis.get(key);
+        }
+    }
+
+    public static void hset(String key, String field, String value) {
+        try (Jedis jedis = jedisPool.getResource()) {
+            jedis.hset(key, field, value);
+        }
+    }
+
+    public static String hget(String key, String field) {
+        try (Jedis jedis = jedisPool.getResource()) {
+            return jedis.hget(key, field);
+        }
+    }
+
+    public static void lpush(String key, String... values) {
+        try (Jedis jedis = jedisPool.getResource()) {
+            jedis.lpush(key, values);
+        }
+    }
+
+    public static List<String> lrange(String key, long start, long end) {
+        try (Jedis jedis = jedisPool.getResource()) {
+            return jedis.lrange(key, start, end);
+        }
+    }
+
+    public static void sadd(String key, String... members) {
+        try (Jedis jedis = jedisPool.getResource()) {
+            jedis.sadd(key, members);
+        }
+    }
+
+    public static Set<String> smembers(String key) {
+        try (Jedis jedis = jedisPool.getResource()) {
+            return jedis.smembers(key);
+        }
+    }
+
+    public static void zadd(String key, double score, String member) {
+        try (Jedis jedis = jedisPool.getResource()) {
+            jedis.zadd(key, score, member);
+        }
+    }
+
+    public static Set<String> zrange(String key, long start, long end) {
+        try (Jedis jedis = jedisPool.getResource()) {
+            return jedis.zrange(key, start, end);
+        }
+    }
+
+    public static void delete(String key) {
+        try (Jedis jedis = jedisPool.getResource()) {
+            jedis.del(key);
+        }
+    }
+
+    public static void disconnect() {
+        jedisPool.close();
+    }
+
+    // 示例用法
+    public static void main(String[] args) {
+        // 直接使用 RedisUtil 的静态方法
+        RedisUtil.set("mykey", "myvalue");
+        String value = RedisUtil.get("mykey");
+        System.out.println("Value for 'mykey': " + value);
+
+        RedisUtil.hset("user:1", "name", "Alice");
+        String userName = RedisUtil.hget("user:1", "name");
+        System.out.println("Name for 'user:1': " + userName);
+
+        RedisUtil.lpush("mylist", "element1", "element2", "element3");
+        List<String> listValues = RedisUtil.lrange("mylist", 0, -1);
+        System.out.println("Values in 'mylist': " + listValues);
+
+        RedisUtil.sadd("myset", "member1", "member2", "member3");
+        Set<String> setMembers = RedisUtil.smembers("myset");
+        System.out.println("Members in 'myset': " + setMembers);
+
+        RedisUtil.zadd("myzset", 1.0, "member1");
+        RedisUtil.zadd("myzset", 2.0, "member2");
+        Set<String> zsetMembers = RedisUtil.zrange("myzset", 0, -1);
+        System.out.println("Members in 'myzset': " + zsetMembers);
+
+        RedisUtil.delete("mykey");
+        RedisUtil.delete("user:1");
+        RedisUtil.delete("mylist");
+        RedisUtil.delete("myset");
+        RedisUtil.delete("myzset");
+
+        RedisUtil.disconnect();
+    }
+}
diff --git a/websocketClient/src/main/java/org/example/wsClient/MexcClient.java b/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
new file mode 100644
index 0000000..6dceb87
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
@@ -0,0 +1,148 @@
+package org.example.wsClient;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.gson.Gson;
+import org.example.pojo.Currency;
+import org.example.util.RedisUtil;
+
+import javax.websocket.*;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+@ClientEndpoint
+public class MexcClient {
+    private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址
+    private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping
+    private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间
+
+    private final List<Currency> subscriptions;
+    private final ScheduledExecutorService executorService;
+    private Session session;
+    private boolean reconnecting = false;
+
+    public MexcClient(List<Currency> subscriptions) {
+        this.subscriptions = subscriptions;
+        this.executorService = Executors.newScheduledThreadPool(1);
+    }
+
+    public void start() {
+        try {
+            connect();
+            if (session == null) {
+                System.err.println("无法在超时时间内连接到服务器。");
+                return;
+            }
+
+            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
+
+            // 订阅消息
+            for (Currency subscription : subscriptions) {
+                String parameter = getParameter(subscription.getSymbol());
+                session.getBasicRemote().sendText(parameter);
+            }
+
+            synchronized (this) {
+                this.wait();
+            }
+
+        } catch (Exception e) {
+            System.err.println("连接过程中发生异常: " + e.getMessage());
+            e.printStackTrace();
+        } finally {
+            executorService.shutdown();
+        }
+    }
+
+    private void connect() throws Exception {
+        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+        container.connectToServer(this, new URI(WS_ENDPOINT));
+    }
+
+    @OnOpen
+    public void onOpen(Session session) {
+        System.out.println("已连接到服务器。");
+        this.session = session;
+        synchronized (this) {
+            this.notify();
+        }
+    }
+
+    @OnMessage
+    public void onMessage(String message) {
+        Gson gson = new Gson();
+        Map map = gson.fromJson(message, Map.class);
+        if (map != null && map.containsKey("s")) {
+            RedisUtil.set("mexc" + map.get("s").toString(), message);
+        }
+        // 没有过滤撤单的数据
+    }
+
+    @OnClose
+    public void onClose() {
+        System.out.println("连接已关闭,尝试重新连接...");
+        session = null;
+        if (!reconnecting) {
+            reconnect();
+        }
+    }
+
+    @OnError
+    public void onError(Throwable throwable) {
+        System.err.println("发生错误: " + throwable.getMessage());
+        if (!reconnecting) {
+            reconnect();
+        }
+    }
+
+    private void reconnect() {
+        if (reconnecting) {
+            return;
+        }
+        reconnecting = true;
+        executorService.schedule(() -> {
+            try {
+                connect();
+                reconnecting = false;
+            } catch (Exception e) {
+                e.printStackTrace();
+                reconnect();
+            }
+        }, calculateBackoffTime(), TimeUnit.MILLISECONDS);
+    }
+
+    private long calculateBackoffTime() {
+        // 实现退避策略,例如指数退避
+        return 5000; // 例子:5秒
+    }
+
+    private void sendPing() {
+        try {
+            if (session != null) {
+                session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+
+    public String getParameter(String symbol) throws JsonProcessingException {
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode root = mapper.createObjectNode();
+
+        root.put("method", "SUBSCRIPTION");
+        ArrayNode paramsArray = mapper.createArrayNode();
+        String customParam = String.format("spot@public.limit.depth.v3.api@%s@20", symbol);
+        paramsArray.add(customParam);
+        root.set("params", paramsArray);
+
+        return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(root);
+    }
+}
diff --git a/websocketClient/src/main/resources/application.yml b/websocketClient/src/main/resources/application.yml
index b56c205..8b50501 100644
--- a/websocketClient/src/main/resources/application.yml
+++ b/websocketClient/src/main/resources/application.yml
@@ -1,3 +1,75 @@
 server:
   port: 8090
-  
\ No newline at end of file
+
+
+spring:
+  # redis 配置
+  redis:
+    # 地址
+    host: localhost
+    # 端口,默认为6379
+    port: 6379
+    # 数据库索引
+    database: 0
+    # 密码
+    password:
+    # 连接超时时间
+    timeout: 10s
+    lettuce:
+      pool:
+        # 连接池中的最小空闲连接
+        min-idle: 0
+        # 连接池中的最大空闲连接
+        max-idle: 99
+        # 连接池的最大数据库连接数
+        max-active: 99
+        # #连接池最大阻塞等待时间(使用负值表示没有限制)
+        max-wait: -1ms
+
+  datasource:
+    type: com.alibaba.druid.pool.DruidDataSource
+    driverClassName: com.mysql.cj.jdbc.Driver
+    url: jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+    username: root
+    password: 123456
+
+    druid:
+      # 初始连接数
+      initialSize: 5
+      # 最小连接池数量
+      minIdle: 10
+      # 最大连接池数量
+      maxActive: 20
+      # 配置获取连接等待超时的时间
+      maxWait: 60000
+      # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+      timeBetweenEvictionRunsMillis: 60000
+      # 配置一个连接在池中最小生存的时间,单位是毫秒
+      minEvictableIdleTimeMillis: 300000
+      # 配置一个连接在池中最大生存的时间,单位是毫秒
+      maxEvictableIdleTimeMillis: 900000
+      # 配置检测连接是否有效
+      validationQuery: SELECT 1 FROM DUAL
+      testWhileIdle: true
+      testOnBorrow: false
+      testOnReturn: false
+      webStatFilter:
+        enabled: true
+      statViewServlet:
+        enabled: true
+        # 设置白名单,不填则允许所有访问
+        allow:
+        url-pattern: /druid/*
+        # 控制台管理用户名和密码
+        login-username: Greysparrow
+        login-password: 123456
+      filter:
+        stat:
+          enabled: true
+          # 慢SQL记录
+          log-slow-sql: true
+          slow-sql-millis: 1000
+          merge-sql: true
+        wall:
+          config:
+            multi-statement-allow: false
diff --git a/websocketSerivce/src/main/java/org/example/task/BitgetStock.java b/websocketSerivce/src/main/java/org/example/task/BitgetStock.java
index fba2768..58375d1 100644
--- a/websocketSerivce/src/main/java/org/example/task/BitgetStock.java
+++ b/websocketSerivce/src/main/java/org/example/task/BitgetStock.java
@@ -1,6 +1,7 @@
 package org.example.task;
 
 import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,6 +55,7 @@
      * 同步bitget交易所交易对
      */
     @Scheduled(cron = "0 0/45 * * * ?")
+//    @Scheduled(cron = "0/10 * * * * ?")
     public void syncCurrency() {
         //  使用Lock来确保同步
         syncCurrencyLock.lock();
@@ -85,7 +87,7 @@
             getList.parallelStream().forEach(person -> person.setSymbol(StringUtils.remove(person.getSymbol(), "-")));
 
             //  获取数据库中已有的symbol列表
-            List<Currency> dbList = currencyService.list();
+            List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource,"bitget"));
             Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet());
 
             //  比对接口返回的数据和数据库中已有的数据,找出新增的数据
diff --git a/websocketSerivce/src/main/java/org/example/task/GateStock.java b/websocketSerivce/src/main/java/org/example/task/GateStock.java
index b8b29a4..402ece4 100644
--- a/websocketSerivce/src/main/java/org/example/task/GateStock.java
+++ b/websocketSerivce/src/main/java/org/example/task/GateStock.java
@@ -1,5 +1,6 @@
 package org.example.task;
 
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -44,6 +45,7 @@
      * 同步gate交易所交易对
      */
     @Scheduled(cron = "0 0/40 * * * ?")
+//    @Scheduled(cron = "0/10 * * * * ?")
     public void syncCurrency() {
         //  使用Lock来确保同步
         syncCurrencyLock.lock();
@@ -72,7 +74,7 @@
             getList.parallelStream().forEach(person -> person.setId(StringUtils.remove(person.getId(), "_")));
 
             //  获取数据库中已有的symbol列表
-            List<Currency> dbList = currencyService.list();
+            List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
             Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet());
 
             //  比对接口返回的数据和数据库中已有的数据,找出新增的数据
diff --git a/websocketSerivce/src/main/java/org/example/task/KucoinStock.java b/websocketSerivce/src/main/java/org/example/task/KucoinStock.java
index 9e6d6f5..a5efa8f 100644
--- a/websocketSerivce/src/main/java/org/example/task/KucoinStock.java
+++ b/websocketSerivce/src/main/java/org/example/task/KucoinStock.java
@@ -1,5 +1,6 @@
 package org.example.task;
 
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -41,6 +42,7 @@
      * 同步kucoin交易所交易对
      */
     @Scheduled(cron = "0 0/35 * * * ?")
+//    @Scheduled(cron = "0/10 * * * * ?")
     public void syncCurrency() {
         //  使用Lock来确保同步
         syncCurrencyLock.lock();
@@ -72,7 +74,7 @@
             getList.parallelStream().forEach(person -> person.setSymbol(StringUtils.remove(person.getSymbol(), "-")));
 
             //  获取数据库中已有的symbol列表
-            List<Currency> dbList = currencyService.list();
+            List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin"));
             Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet());
 
             //  比对接口返回的数据和数据库中已有的数据,找出新增的数据
diff --git a/websocketSerivce/src/main/java/org/example/task/MexcStock.java b/websocketSerivce/src/main/java/org/example/task/MexcStock.java
index d4351e2..43db090 100644
--- a/websocketSerivce/src/main/java/org/example/task/MexcStock.java
+++ b/websocketSerivce/src/main/java/org/example/task/MexcStock.java
@@ -1,5 +1,6 @@
 package org.example.task;
 
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -40,6 +41,7 @@
      * 同步mexc交易所交易对
      */
     @Scheduled(cron = "0 0/30 * * * ?")
+//    @Scheduled(cron = "0/10 * * * * ?")
     public void syncCurrency() {
         //  使用Lock来确保同步
         syncCurrencyLock.lock();
@@ -74,7 +76,7 @@
             }.getType());
 
             //  获取数据库中已有的symbol列表
-            List<Currency> dbList = currencyService.list();
+            List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
             Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet());
 
             //  比对接口返回的数据和数据库中已有的数据,找出新增的数据

--
Gitblit v1.9.3