From efa6ea80aba153fa07458baedd0d730505fe9665 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Sun, 21 Jul 2024 18:36:05 +0800
Subject: [PATCH] 1

---
 geteClient/src/main/java/org/example/geteclient/ThreadConfig/AsyncConfiguration.java       |   35 +
 geteClient/src/main/java/org/example/geteclient/server/CurrencySerivce.java                |    2 
 mexcClient/src/main/java/org/example/mexcclient/server/impl/CurrencySerivceImpl.java       |    6 
 bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/MexcWsBean.java               |   94 +++
 mexcClient/src/main/java/org/example/mexcclient/pojo/Currency.java                         |    2 
 websocketSerivce/pom.xml                                                                   |    5 
 .idea/encodings.xml                                                                        |   14 
 websocketSerivce/src/main/resources/application.yml                                        |    2 
 bitgetsClient/src/main/java/org/example/bitgetsclient/ThreadConfig/AsyncConfiguration.java |   35 +
 geteClient/src/main/java/org/example/geteclient/dao/CurrencyMapper.java                    |    4 
 bitgetsClient/src/main/java/org/example/bitgetsclient/util/RedisUtil.java                  |    7 
 kucoinClient/src/main/java/org/example/kucoinclient/dao/CurrencyMapper.java                |    4 
 geteClient/src/main/java/org/example/geteclient/WsBean/MexcWsBean.java                     |   68 ++
 pom.xml                                                                                    |    1 
 bitgetsClient/src/main/java/org/example/bitgetsclient/pojo/Currency.java                   |    2 
 .idea/misc.xml                                                                             |   16 
 kucoinClient/src/main/java/org/example/kucoinclient/ThreadConfig/AsyncConfiguration.java   |   35 +
 geteClient/src/main/java/org/example/geteclient/pojo/Currency.java                         |    2 
 mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java                   |    8 
 kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java             |  250 ++++++++++
 websocketSerivce/src/main/java/org/example/server/CurrencySerivce.java                     |    3 
 bitgetsClient/src/main/java/org/example/bitgetsclient/server/CurrencySerivce.java          |    2 
 .idea/gradle.xml                                                                           |   15 
 mexcClient/src/main/java/org/example/mexcclient/ThreadConfig/AsyncConfiguration.java       |   35 +
 websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java            |   77 +++
 mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java                     |   71 ++
 kucoinClient/src/main/java/org/example/kucoinclient/server/CurrencySerivce.java            |    2 
 websocketSerivce/src/main/java/org/example/WsServerApplication.java                        |   14 
 bitgetsClient/src/main/java/org/example/bitgetsclient/server/impl/CurrencySerivceImpl.java |    6 
 geteClient/src/main/java/org/example/geteclient/server/impl/CurrencySerivceImpl.java       |    6 
 kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java                 |   84 +++
 bitgetsClient/src/main/java/org/example/bitgetsclient/dao/CurrencyMapper.java              |    5 
 mexcClient/src/main/java/org/example/mexcclient/dao/CurrencyMapper.java                    |    5 
 mexcClient/src/main/java/org/example/mexcclient/server/CurrencySerivce.java                |    2 
 websocketSerivce/src/main/java/org/example/util/RedisUtil.java                             |   14 
 /dev/null                                                                                  |   75 ---
 geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java                   |  205 ++++++++
 mexcClient/src/main/java/org/example/mexcclient/util/RedisUtil.java                        |    7 
 bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java           |  217 ++++++++
 kucoinClient/src/main/java/org/example/kucoinclient/util/RedisUtil.java                    |    7 
 kucoinClient/src/main/java/org/example/kucoinclient/pojo/Currency.java                     |    2 
 geteClient/src/main/java/org/example/geteclient/util/RedisUtil.java                        |    7 
 kucoinClient/src/main/java/org/example/kucoinclient/server/impl/CurrencySerivceImpl.java   |    6 
 43 files changed, 1,333 insertions(+), 126 deletions(-)

diff --git a/.idea/encodings.xml b/.idea/encodings.xml
index 2351bff..6d2ed5f 100644
--- a/.idea/encodings.xml
+++ b/.idea/encodings.xml
@@ -1,6 +1,20 @@
 <?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
   <component name="Encoding" defaultCharsetForPropertiesFiles="UTF-8">
+    <file url="file://$PROJECT_DIR$/bitgetClient/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/bitgetClient/src/main/resources" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/bitgetsClient/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/bitgetsClient/src/main/resources" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/geteClient/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/geteClient/src/main/resources" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/kucoinClient/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/kucoinClient/src/main/resources" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/mexcClient-废弃/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/mexcClient-废弃/src/main/resources" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/mexcClient/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/mexcClient/src/main/resources" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/mexcWsClient/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/mexcWsClient/src/main/resources" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
     <file url="file://$PROJECT_DIR$/websocketClient/src/main/java" charset="UTF-8" />
diff --git a/.idea/gradle.xml b/.idea/gradle.xml
new file mode 100644
index 0000000..3f0f01e
--- /dev/null
+++ b/.idea/gradle.xml
@@ -0,0 +1,15 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="GradleSettings">
+    <option name="linkedExternalProjectsSettings">
+      <GradleProjectSettings>
+        <option name="externalProjectPath" value="$PROJECT_DIR$/bitgetClient" />
+        <option name="modules">
+          <set>
+            <option value="$PROJECT_DIR$/bitgetClient" />
+          </set>
+        </option>
+      </GradleProjectSettings>
+    </option>
+  </component>
+</project>
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
index f0d7b15..0066803 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,11 +1,27 @@
+<?xml version="1.0" encoding="UTF-8"?>
 <project version="4">
   <component name="ExternalStorageConfigurationManager" enabled="true" />
+  <component name="FrameworkDetectionExcludesConfiguration">
+    <file type="web" url="file://$PROJECT_DIR$/bitgetClient" />
+  </component>
   <component name="MavenProjectsManager">
     <option name="originalFiles">
       <list>
         <option value="$PROJECT_DIR$/pom.xml" />
+        <option value="$PROJECT_DIR$/mexcWsClient/pom.xml" />
+        <option value="$PROJECT_DIR$/bitgetClient/pom.xml" />
+        <option value="$PROJECT_DIR$/kucoinClient/pom.xml" />
+        <option value="$PROJECT_DIR$/geteClient/pom.xml" />
+        <option value="$PROJECT_DIR$/mexcClient/pom.xml" />
+        <option value="$PROJECT_DIR$/bitgetsClient/pom.xml" />
       </list>
     </option>
+    <option name="ignoredFiles">
+      <set>
+        <option value="$PROJECT_DIR$/bitgetClient/pom.xml" />
+        <option value="$PROJECT_DIR$/mexcWsClient/pom.xml" />
+      </set>
+    </option>
   </component>
   <component name="ProjectRootManager" version="2" languageLevel="JDK_1_8" project-jdk-name="1.8" project-jdk-type="JavaSDK">
     <output url="file://$PROJECT_DIR$/out" />
diff --git a/bitgetsClient/src/main/java/org/example/bitgetsclient/ThreadConfig/AsyncConfiguration.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/ThreadConfig/AsyncConfiguration.java
new file mode 100644
index 0000000..1748f43
--- /dev/null
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/ThreadConfig/AsyncConfiguration.java
@@ -0,0 +1,35 @@
+package org.example.bitgetsclient.ThreadConfig;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * @program: dabaogp
+ * @description: 线程池配置
+ * @create: 2024-06-25 16:37
+ **/
+@Configuration
+public class AsyncConfiguration {
+
+    @Bean(name = "threadPoolTaskExecutor")
+    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+        executor.setCorePoolSize(100);    // 核心线程数, 根据需求进行调整
+        executor.setMaxPoolSize(150);    // 最大线程数, 适当设置以避免资源耗尽
+        executor.setQueueCapacity(200);    // 队列容量, 适当限制以避免请求堆积
+        executor.setKeepAliveSeconds(30);    // 线程空闲时的存活时间为30秒,减少系统开销
+        executor.setThreadNamePrefix("Thread-");    // 线程名称的前缀
+
+        // 使用 CallerRunsPolicy 拒绝策略,以减少任务被拒绝时带来的负担
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+
+        // 初始化线程池,配置其他参数(不过可以根据需要添加)
+        executor.initialize(); // 明确初始化,提升代码可读性
+
+        return executor; // 返回配置好的线程池
+    }
+}
diff --git a/bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/MexcWsBean.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/MexcWsBean.java
new file mode 100644
index 0000000..42cd429
--- /dev/null
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/WsBean/MexcWsBean.java
@@ -0,0 +1,94 @@
+package org.example.bitgetsclient.WsBean;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+import org.example.bitgetsclient.pojo.Currency;
+import org.example.bitgetsclient.server.impl.CurrencySerivceImpl;
+import org.example.bitgetsclient.wsClient.BitgetClient;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @ClassDescription: 客户端请求类
+ * @JdkVersion: 1.8
+ * @Created: 2023/8/31 16:13
+ */
+@Slf4j
+@Configuration
+public class MexcWsBean {
+
+    @Autowired
+    private CurrencySerivceImpl currencyService;
+
+    @Autowired
+    @Qualifier("threadPoolTaskExecutor")
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+    @Bean
+    public void bitgetWebsocketRunClientMap() throws JSONException, JsonProcessingException {
+        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "bitget"));
+        if (!CollectionUtils.isEmpty(mexc)) {
+            int batchSize = 100; // 每个线程处理的数据量
+            int totalSize = mexc.size();
+            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+
+            // 使用循环创建多个线程并提交任务
+            for (int i = 0; i < threadCount; i++) {
+                int fromIndex = i * batchSize; // 计算起始索引
+                int toIndex = Math.min(fromIndex + batchSize, totalSize); // 计算结束索引
+                List<Currency> sublist = mexc.subList(fromIndex, toIndex); // 切分子列表
+                String parameter = getParameter(sublist); // 获取参数
+                // 使用自定义线程池提交任务
+                threadPoolTaskExecutor.execute(new BitgetClient(parameter)::start); // 提交到线程池执行
+            }
+        }
+    }
+
+    public String getParameter(List<Currency> list) throws JsonProcessingException, JSONException {
+        // 创建一个ObjectMapper实例
+        ObjectMapper mapper = new ObjectMapper();
+        List<String> symbolList = list.stream().map(Currency::getSymbol).collect(Collectors.toList());
+        // 使用Map构建JSON对象
+        Map<String, Object> jsonMap = new HashMap<>();
+        jsonMap.put("op", "subscribe");
+        List<Map<String, String>> mapList = new ArrayList<>();
+        symbolList.forEach(f->{
+            Map<String, String> argsMap = new HashMap<>();
+            argsMap.put("instType", "SPOT");
+            argsMap.put("channel", "books15");
+            argsMap.put("instId", f);
+            mapList.add(argsMap);
+        });
+        jsonMap.put("args", mapList);
+
+        // 将Map转换为JSON字符串
+        String jsonString = mapper.writeValueAsString(jsonMap);
+        return jsonString;
+
+    }
+}
+
diff --git a/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/dao/CurrencyMapper.java
similarity index 73%
copy from websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
copy to bitgetsClient/src/main/java/org/example/bitgetsclient/dao/CurrencyMapper.java
index 86b645b..bcc05d9 100644
--- a/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/dao/CurrencyMapper.java
@@ -1,8 +1,9 @@
-package org.example.dao;
+package org.example.bitgetsclient.dao;
+
 
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import org.apache.ibatis.annotations.Mapper;
-import org.example.pojo.Currency;
+import org.example.bitgetsclient.pojo.Currency;
 
 /**
  * @program: demo
diff --git a/websocketClient/src/main/java/org/example/pojo/Currency.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/pojo/Currency.java
similarity index 91%
copy from websocketClient/src/main/java/org/example/pojo/Currency.java
copy to bitgetsClient/src/main/java/org/example/bitgetsclient/pojo/Currency.java
index 2e0e322..2a3d980 100644
--- a/websocketClient/src/main/java/org/example/pojo/Currency.java
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/pojo/Currency.java
@@ -1,4 +1,4 @@
-package org.example.pojo;
+package org.example.bitgetsclient.pojo;
 
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
diff --git a/websocketClient/src/main/java/org/example/server/CurrencySerivce.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/server/CurrencySerivce.java
similarity index 72%
copy from websocketClient/src/main/java/org/example/server/CurrencySerivce.java
copy to bitgetsClient/src/main/java/org/example/bitgetsclient/server/CurrencySerivce.java
index d50a81f..ae9e9f9 100644
--- a/websocketClient/src/main/java/org/example/server/CurrencySerivce.java
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/server/CurrencySerivce.java
@@ -1,4 +1,4 @@
-package org.example.server;
+package org.example.bitgetsclient.server;
 
 /**
  * @program: demo
diff --git a/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/server/impl/CurrencySerivceImpl.java
similarity index 65%
copy from websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
copy to bitgetsClient/src/main/java/org/example/bitgetsclient/server/impl/CurrencySerivceImpl.java
index 1b220f2..d495723 100644
--- a/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/server/impl/CurrencySerivceImpl.java
@@ -1,8 +1,8 @@
-package org.example.server.impl;
+package org.example.bitgetsclient.server.impl;
 
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import org.example.dao.CurrencyMapper;
-import org.example.pojo.Currency;
+import org.example.bitgetsclient.dao.CurrencyMapper;
+import org.example.bitgetsclient.pojo.Currency;
 import org.springframework.stereotype.Service;
 
 /**
diff --git a/websocketClient/src/main/java/org/example/util/RedisUtil.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/util/RedisUtil.java
similarity index 95%
rename from websocketClient/src/main/java/org/example/util/RedisUtil.java
rename to bitgetsClient/src/main/java/org/example/bitgetsclient/util/RedisUtil.java
index dab4e20..2daa1dd 100644
--- a/websocketClient/src/main/java/org/example/util/RedisUtil.java
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/util/RedisUtil.java
@@ -1,9 +1,10 @@
-package org.example.util;
+package org.example.bitgetsclient.util;
 
-import redis.clients.jedis.*;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public class RedisUtil {
diff --git a/bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java b/bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java
new file mode 100644
index 0000000..b9c61a8
--- /dev/null
+++ b/bitgetsClient/src/main/java/org/example/bitgetsclient/wsClient/BitgetClient.java
@@ -0,0 +1,217 @@
+package org.example.bitgetsclient.wsClient;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
+import org.example.bitgetsclient.util.RedisUtil;
+import org.json.JSONException;
+import org.springframework.context.annotation.Bean;
+import org.springframework.util.CollectionUtils;
+
+import javax.websocket.*;
+import java.io.IOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+@ClientEndpoint
+@Slf4j
+public class BitgetClient {
+
+    private static final String WS_ENDPOINT = "wss://ws.bitget.com/v2/ws/public"; // WebSocket 接口地址
+    private static final long PING_INTERVAL = 20000; // 心跳间隔,单位毫秒
+    private static final String PING_MESSAGE = "ping"; // 心跳消息
+    private static final int MAX_RECONNECT_ATTEMPTS = 5; // 最大重连次数
+    private final String subscriptions; // 订阅内容
+    private final ScheduledExecutorService executorService; // 定义调度任务执行服务
+    private Session session; // WebSocket 会话
+
+    private final Object lock = new Object(); // 使用锁对象以确保线程安全
+    private boolean reconnecting = false; // 重连状态
+    private int reconnectAttempts = 0; // 当前重连次数
+
+    // 静态单例 ObjectMapper 实例,避免重复创建
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+    private static final Gson gson = new Gson(); // 静态 Gson 实例
+
+    // 构造方法,初始化订阅内容和调度服务
+    public BitgetClient(String subscriptions) {
+        this.subscriptions = subscriptions; // 初始化订阅内容
+        this.executorService = Executors.newScheduledThreadPool(1); // 初始化调度线程池
+    }
+
+
+
+    public void start() {
+        try {
+            connect(); // 尝试连接
+            if (session == null) {
+                log.info("无法在超时内连接到服务器。"); // 记录连接失败的信息
+                return; // 如果连接失败,直接返回
+            }
+
+            // 定期发送心跳
+            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
+            // 发送订阅消息
+            session.getBasicRemote().sendText(subscriptions); // 发送订阅信息
+
+            synchronized (this) {
+                this.wait(); // 等待 WebSocket 消息到来
+            }
+
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt(); // 恢复中断状态
+            log.error("线程被中断: " + e.getMessage(), e); // 记录线程中断的错误
+        } catch (Exception e) {
+            log.error("bitget ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常
+        } finally {
+            executorService.shutdownNow(); // 尝试立即关闭调度服务
+        }
+    }
+
+    private void connect() throws Exception {
+        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+        container.connectToServer(this, new URI(WS_ENDPOINT)); // 连接 WebSocket 服务
+    }
+
+    @OnOpen
+    public void onOpen(Session session) {
+        log.info("bitget ws 已连接到服务器。"); // 记录连接成功的信息
+        this.session = session; // 存储会话
+        synchronized (this) {
+            this.notify(); // 通知等待的线程
+        }
+        reconnectAttempts = 0; // 连接成功重置重连次数
+    }
+
+    @OnMessage
+    public void onMessage(String message) {
+        try {
+            // 将消息解析为 Map
+            Map<String, Object> map = parseMessage(message); // 调用共用方法解析
+            if (map != null && map.containsKey("arg") && map.containsKey("data")) { // 确保 map 不为 null 且包含必要的键
+                JsonNode dataNode = getDataNode(message); // 获取数据节点
+
+                // 确保 dataNode 不为 null,以避免空指针异常
+                if (dataNode != null) {
+                    // 存储数据到 HashMap
+                    Map<String, Object> bidAskMap = new HashMap<>(); // 变量命名更具描述性
+                    bidAskMap.put("bids", dataNode.get("bids")); // 获取并存储 bids
+                    bidAskMap.put("asks", dataNode.get("asks")); // 获取并存储 asks
+
+                    Map<String, Object> argMap = gson.fromJson(map.get("arg").toString(), new TypeToken<Map<String, Object>>() {}.getType()); // 解析 arg 为 Map
+                    String key = "bitget" + argMap.get("instId"); // 构建 Redis 存储的键
+
+                    // 存储到 Redis,使用 ObjectMapper 转换为 JSON 字符串
+                    String jsonData = objectMapper.writeValueAsString(bidAskMap); // 先将 HashMap 转换为 JSON 字符串
+                    RedisUtil.set(key, jsonData); // 存储数据
+                }
+            }
+        } catch (JsonSyntaxException e) {
+            log.error("JSON 解析错误: " + e.getMessage(), e); // 记录 JSON 解析错误
+        } catch (IOException e) {
+            log.error("对象转换时发生 I/O 异常: " + e.getMessage(), e); // 记录对象转换 I/O 异常
+        } catch (Exception e) {
+            log.error("处理消息时发生异常: " + e.getMessage(), e); // 记录处理消息时发生的异常
+        }
+    }
+
+
+    // 解析消息的共用方法
+    private Map<String, Object> parseMessage(String message) {
+        if(!message.equals("pong")){
+            return gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType()); // 将消息解析为 Map
+        }
+        return null;
+    }
+
+    // 获取数据节点的共用方法
+    private JsonNode getDataNode(String message) throws JsonProcessingException {
+        JsonNode jsonNode = objectMapper.readTree(message); // 使用静态 ObjectMapper,避免重复创建
+        return jsonNode.get("data").get(0); // 获取第一个数据节点
+    }
+
+    @OnClose
+    public void onClose() {
+        log.info("bitget ws 连接已关闭,尝试重新连接..."); // 记录连接关闭的信息
+        handleConnectionClosedOrError(); // 处理连接关闭事件
+    }
+
+    @OnError
+    public void onError(Throwable throwable) {
+        log.error("bitget ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误信息
+        handleConnectionClosedOrError(); // 处理错误事件
+    }
+
+    private void handleConnectionClosedOrError() {
+        synchronized (lock) {
+            if (!reconnecting) {
+                reconnecting = true; // 开始重连
+                executorService.execute(this::attemptReconnect); // 执行重连操作
+            }
+        }
+    }
+
+    private void attemptReconnect() {
+        if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制
+            try {
+                log.info("bitget ws 开始重连"); // 记录开始重连的信息
+                connect(); // 尝试重连
+                log.info("bitget ws 重连成功"); // 成功重连的日志
+                reconnectAttempts = 0; // 重连成功,重置重连次数
+            } catch (Exception e) {
+                reconnectAttempts++; // 增加重连尝试次数
+                log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息
+                // 采用指数退避策略,增加重连间隔
+                long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒
+                scheduleReconnect(waitTime); // 调度下次重连
+            }
+        } else {
+            log.error("超过最大重连次数,停止重连"); // 超过最大重连次数
+            reconnecting = false; // 重连状态重置
+        }
+    }
+
+    private void scheduleReconnect(long waitTime) { // 接受等待时间参数
+        if (!executorService.isShutdown()) {
+            executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连
+        }
+    }
+
+    private void sendPing() {
+        try {
+            if (session != null) {
+                session.getBasicRemote().sendText(PING_MESSAGE); // 发送心跳消息
+            }
+        } catch (Exception e) {
+            log.error("发送心跳失败", e); // 记录发送心跳失败的错误
+        }
+    }
+
+    public String getParameter(String symbol) throws JsonProcessingException {
+        // 使用 ObjectMapper 构建 JSON 请求
+        Map<String, Object> jsonMap = new HashMap<>();
+        jsonMap.put("op", "subscribe"); // 设置操作类型
+        Map<String, Object> argsMap = new HashMap<>();
+        jsonMap.put("args", argsMap); // 参数放入请求
+
+        // 设置请求参数
+        argsMap.put("instType", "SPOT"); // 交易类型
+        argsMap.put("channel", "books15"); // 渠道类型
+        argsMap.put("instId", symbol); // 交易对符号
+
+        // 返回 JSON 字符串
+        return objectMapper.writeValueAsString(jsonMap); // 使用单一的 ObjectMapper 实例
+    }
+}
diff --git a/geteClient/src/main/java/org/example/geteclient/ThreadConfig/AsyncConfiguration.java b/geteClient/src/main/java/org/example/geteclient/ThreadConfig/AsyncConfiguration.java
new file mode 100644
index 0000000..22a71c3
--- /dev/null
+++ b/geteClient/src/main/java/org/example/geteclient/ThreadConfig/AsyncConfiguration.java
@@ -0,0 +1,35 @@
+package org.example.geteclient.ThreadConfig;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * @program: dabaogp
+ * @description: 线程池配置
+ * @create: 2024-06-25 16:37
+ **/
+@Configuration
+public class AsyncConfiguration {
+
+    @Bean(name = "threadPoolTaskExecutor")
+    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+        executor.setCorePoolSize(100);    // 核心线程数, 根据需求进行调整
+        executor.setMaxPoolSize(150);    // 最大线程数, 适当设置以避免资源耗尽
+        executor.setQueueCapacity(200);    // 队列容量, 适当限制以避免请求堆积
+        executor.setKeepAliveSeconds(30);    // 线程空闲时的存活时间为30秒,减少系统开销
+        executor.setThreadNamePrefix("Thread-");    // 线程名称的前缀
+
+        // 使用 CallerRunsPolicy 拒绝策略,以减少任务被拒绝时带来的负担
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+
+        // 初始化线程池,配置其他参数(不过可以根据需要添加)
+        executor.initialize(); // 明确初始化,提升代码可读性
+
+        return executor; // 返回配置好的线程池
+    }
+}
diff --git a/geteClient/src/main/java/org/example/geteclient/WsBean/MexcWsBean.java b/geteClient/src/main/java/org/example/geteclient/WsBean/MexcWsBean.java
new file mode 100644
index 0000000..bfa3bff
--- /dev/null
+++ b/geteClient/src/main/java/org/example/geteclient/WsBean/MexcWsBean.java
@@ -0,0 +1,68 @@
+package org.example.geteclient.WsBean;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.example.geteclient.pojo.Currency;
+import org.example.geteclient.server.impl.CurrencySerivceImpl;
+import org.example.geteclient.wsClinet.GateClient;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @ClassDescription: 客户端请求类
+ * @JdkVersion: 1.8
+ * @Created: 2023/8/31 16:13
+ */
+@Slf4j
+@Configuration
+public class MexcWsBean {
+
+    @Autowired
+    private CurrencySerivceImpl currencyService;
+
+    @Autowired
+    @Qualifier("threadPoolTaskExecutor")
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+    @Bean
+    public void gateWebsocketRunClientMap() {
+        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
+        if (!CollectionUtils.isEmpty(mexc)) {
+            int batchSize = 100; // 每个线程处理的数据量
+            int totalSize = mexc.size();
+            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+
+            for (int i = 0; i < threadCount; i++) {
+                int fromIndex = i * batchSize;
+                int toIndex = Math.min(fromIndex + batchSize, totalSize);
+                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
+
+                // 使用自定义线程池提交任务
+                threadPoolTaskExecutor.execute(new GateClient(sublist)::start);
+            }
+
+        }
+    }
+}
+
diff --git a/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java b/geteClient/src/main/java/org/example/geteclient/dao/CurrencyMapper.java
similarity index 75%
rename from websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
rename to geteClient/src/main/java/org/example/geteclient/dao/CurrencyMapper.java
index 86b645b..53c2206 100644
--- a/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
+++ b/geteClient/src/main/java/org/example/geteclient/dao/CurrencyMapper.java
@@ -1,8 +1,8 @@
-package org.example.dao;
+package org.example.geteclient.dao;
 
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import org.apache.ibatis.annotations.Mapper;
-import org.example.pojo.Currency;
+import org.example.geteclient.pojo.Currency;
 
 /**
  * @program: demo
diff --git a/websocketClient/src/main/java/org/example/pojo/Currency.java b/geteClient/src/main/java/org/example/geteclient/pojo/Currency.java
similarity index 92%
copy from websocketClient/src/main/java/org/example/pojo/Currency.java
copy to geteClient/src/main/java/org/example/geteclient/pojo/Currency.java
index 2e0e322..fedd6a6 100644
--- a/websocketClient/src/main/java/org/example/pojo/Currency.java
+++ b/geteClient/src/main/java/org/example/geteclient/pojo/Currency.java
@@ -1,4 +1,4 @@
-package org.example.pojo;
+package org.example.geteclient.pojo;
 
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
diff --git a/websocketClient/src/main/java/org/example/server/CurrencySerivce.java b/geteClient/src/main/java/org/example/geteclient/server/CurrencySerivce.java
similarity index 73%
rename from websocketClient/src/main/java/org/example/server/CurrencySerivce.java
rename to geteClient/src/main/java/org/example/geteclient/server/CurrencySerivce.java
index d50a81f..44a2d19 100644
--- a/websocketClient/src/main/java/org/example/server/CurrencySerivce.java
+++ b/geteClient/src/main/java/org/example/geteclient/server/CurrencySerivce.java
@@ -1,4 +1,4 @@
-package org.example.server;
+package org.example.geteclient.server;
 
 /**
  * @program: demo
diff --git a/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java b/geteClient/src/main/java/org/example/geteclient/server/impl/CurrencySerivceImpl.java
similarity index 66%
copy from websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
copy to geteClient/src/main/java/org/example/geteclient/server/impl/CurrencySerivceImpl.java
index 1b220f2..9405e5a 100644
--- a/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
+++ b/geteClient/src/main/java/org/example/geteclient/server/impl/CurrencySerivceImpl.java
@@ -1,8 +1,8 @@
-package org.example.server.impl;
+package org.example.geteclient.server.impl;
 
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import org.example.dao.CurrencyMapper;
-import org.example.pojo.Currency;
+import org.example.geteclient.dao.CurrencyMapper;
+import org.example.geteclient.pojo.Currency;
 import org.springframework.stereotype.Service;
 
 /**
diff --git a/websocketClient/src/main/java/org/example/util/RedisUtil.java b/geteClient/src/main/java/org/example/geteclient/util/RedisUtil.java
similarity index 95%
copy from websocketClient/src/main/java/org/example/util/RedisUtil.java
copy to geteClient/src/main/java/org/example/geteclient/util/RedisUtil.java
index dab4e20..b074f3d 100644
--- a/websocketClient/src/main/java/org/example/util/RedisUtil.java
+++ b/geteClient/src/main/java/org/example/geteclient/util/RedisUtil.java
@@ -1,9 +1,10 @@
-package org.example.util;
+package org.example.geteclient.util;
 
-import redis.clients.jedis.*;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public class RedisUtil {
diff --git a/geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java b/geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java
new file mode 100644
index 0000000..94024f1
--- /dev/null
+++ b/geteClient/src/main/java/org/example/geteclient/wsClinet/GateClient.java
@@ -0,0 +1,205 @@
+package org.example.geteclient.wsClinet;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
+import org.example.geteclient.pojo.Currency;
+import org.example.geteclient.util.RedisUtil;
+import org.json.JSONException;
+
+import javax.websocket.*;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @program: demo
+ * @description: GateClient 用于与 Gate.io WebSocket API 进行交互
+ * @create: 2024-07-18 15:30
+ **/
+@ClientEndpoint
+@Slf4j
+public class GateClient {
+
+    private static final String WS_ENDPOINT = "wss://api.gateio.ws/ws/v4/"; // WebSocket 端点 URL
+    private static final long PING_INTERVAL = 20000; // Ping 消息发送间隔,单位毫秒
+
+    private final List<Currency> subscriptions; // 存储要订阅的货币列表
+    private final ScheduledExecutorService executorService; // 定时任务调度服务
+    private Session session; // WebSocket 连接会话
+
+    private final Object lock = new Object(); // 添加一个锁对象
+    private volatile boolean reconnecting = false; // 表示是否正在重连,使用 volatile 保证可见性
+
+    public GateClient(List<Currency> subscriptions) {
+        this.subscriptions = subscriptions; // 初始化订阅的货币
+        this.executorService = Executors.newScheduledThreadPool(1); // 创建一个定时任务调度线程池
+    }
+
+    public void start() {
+        try {
+            connect(); // 尝试连接 WebSocket
+            if (session == null) { // 如果连接失败,session 仍然为 null
+                log.info("无法在超时时间内连接到服务器。");
+                return; // 提前返回
+            }
+
+            // 定期发送 Ping 消息保持连接
+            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
+
+            // 订阅消息
+            for (Currency subscription : subscriptions) {
+                String parameter = getParameter(subscription.getSymbol()); // 获取订阅参数
+                session.getBasicRemote().sendText(parameter); // 发送订阅请求
+            }
+
+            synchronized (this) { // 进入同步块以等待连接的激活
+                this.wait(); // 等待连接激活的通知
+            }
+
+        } catch (Exception e) {
+            log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常
+        } finally {
+            executorService.shutdown(); // 确保定时任务调度服务被关闭
+        }
+    }
+
+    private void connect() throws Exception {
+        WebSocketContainer container = ContainerProvider.getWebSocketContainer(); // 获取 WebSocket 容器
+        container.connectToServer(this, new URI(WS_ENDPOINT)); // 连接到 WebSocket 服务器
+    }
+
+    @OnOpen
+    public void onOpen(Session session) {
+        log.info("gate ws 已连接到服务器。"); // 连接成功日志
+        this.session = session; // 保存会话信息
+        synchronized (this) { // 进入同步块
+            this.notify(); // 通知等待的线程
+        }
+    }
+
+    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员,避免重复实例化
+    private static final String RESULT_KEY = "result"; // 定义结果键的常量
+    private static final String BIDS_KEY = "bids"; // 定义 bids 的常量
+    private static final String ASKS_KEY = "asks"; // 定义 asks 的常量
+    private static final String S_KEY = "s"; // 定义 s 的常量
+
+    @OnMessage
+    public void onMessage(String message) {
+        try {
+            // 解析收到的消息为 Map
+            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
+            Object object = map.get(RESULT_KEY); // 获取结果对象
+            Map<String, Object> resultMap = gson.fromJson(gson.toJson(object), new TypeToken<Map<String, Object>>() {}.getType()); // 直接转换为 Map
+            // 检查结果是否有效,并整合检查
+            if (resultMap != null && resultMap.get(S_KEY) != null) {
+                HashMap<String, Object> hashMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
+
+                // 放入 bids 和 asks 数据
+                hashMap.put(BIDS_KEY, resultMap.get(BIDS_KEY)); // 放入 bids 数据
+                hashMap.put(ASKS_KEY, resultMap.get(ASKS_KEY)); // 放入 asks 数据
+
+                String key = "gate" + resultMap.get(S_KEY); // 生成 Redis 键
+                RedisUtil.set(key.replace("_", ""), gson.toJson(hashMap)); // 存入 Redis,使用 Gson 进行序列化
+            }
+        } catch (JsonSyntaxException e) {
+            log.error("JSON 解析异常:" + e.getMessage(), e); // 记录 JSON 解析异常
+        } catch (Exception e) { // 捕获其他可能的异常
+            log.error("处理消息时发生异常:" + e.getMessage(), e); // 记录异常
+        }
+    }
+
+
+    @OnClose
+    public void onClose() {
+        log.info("gate ws 连接已关闭,尝试重新连接..."); // 连接关闭日志
+        handleConnectionClosedOrError(); // 处理连接关闭或错误
+    }
+
+    @OnError
+    public void onError(Throwable throwable) {
+        log.error("gate ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误日志
+        handleConnectionClosedOrError(); // 处理连接关闭或错误
+    }
+
+    private void handleConnectionClosedOrError() {
+        synchronized (lock) { // 进入同步块以防止并发重连
+            if (!reconnecting) { // 检查当前是否已经在重连
+                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
+            }
+        }
+    }
+
+    private void attemptReconnect() {
+        boolean doReconnect = true; // 是否进行重连的标志
+        try {
+            log.info("gate ws 开始重连"); // 开始重连日志
+            connect(); // 尝试重新连接
+            log.info("gate ws 重连成功"); // 重连成功日志
+        } catch (Exception e) {
+            log.error("gate ws 重连失败", e); // 重连失败记录日志
+            doReconnect = false; // 标记不再继续重连
+        } finally {
+            synchronized (lock) { // 进入同步块
+                if (doReconnect) {
+                    scheduleReconnect(); // 如果需要继续重连,调度重连任务
+                } else {
+                    reconnecting = false; // 重连结束,标记重连状态为 false
+                }
+            }
+        }
+    }
+
+    private void scheduleReconnect() {
+        if (!executorService.isShutdown()) { // 确保调度服务未关闭
+            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接
+        }
+    }
+
+    private void sendPing() {
+        try {
+            if (session != null) { // 检查会话是否存在
+                session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); // 发送 Ping 消息
+            }
+        } catch (Exception e) {
+            log.error("发送心跳失败", e); // 记录心跳发送失败日志
+        }
+    }
+
+    public String getParameter(String symbol) throws JsonProcessingException, JSONException {
+        // 替换USDT为_USDT
+        symbol = symbol.replaceAll("USDT", "_USDT"); // 替换货币符号中的 USDT
+
+        // 创建一个ObjectMapper实例
+        ObjectMapper mapper = new ObjectMapper(); // 创建 ObjectMapper 实例
+        // 获取当前时间的毫秒数
+        long currentTimeMillis = System.currentTimeMillis(); // 获取当前时间
+
+        // 定义常量
+        final String CHANNEL = "spot.order_book"; // 固定频道名称
+        final String EVENT_SUBSCRIBE = "subscribe"; // 订阅事件
+        final String EVENT_UNSUBSCRIBE = "unsubscribe"; // 取消订阅事件
+        final String[] PAYLOAD = new String[]{symbol, "20", "100ms"}; // 请求负载信息
+
+        // 使用Map构建JSON对象
+        Map<String, Object> jsonMap = new HashMap<>(); // 创建 Map 存放 JSON 内容
+        jsonMap.put("time", currentTimeMillis); // 将当前时间放入 Map
+        jsonMap.put("channel", CHANNEL); // 将频道信息放入 Map
+        jsonMap.put("event", EVENT_SUBSCRIBE); // 事件类型放入 Map
+        jsonMap.put("payload", Arrays.asList(PAYLOAD)); // 将负载数组转为 List 放入 Map
+
+        // 将Map转换为JSON字符串
+        String jsonString = mapper.writeValueAsString(jsonMap); // 转换为 JSON 字符串
+        return jsonString; // 返回 JSON 字符串
+    }
+}
diff --git a/kucoinClient/src/main/java/org/example/kucoinclient/ThreadConfig/AsyncConfiguration.java b/kucoinClient/src/main/java/org/example/kucoinclient/ThreadConfig/AsyncConfiguration.java
new file mode 100644
index 0000000..c2105e7
--- /dev/null
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/ThreadConfig/AsyncConfiguration.java
@@ -0,0 +1,35 @@
+package org.example.kucoinclient.ThreadConfig;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * @program: dabaogp
+ * @description: 线程池配置
+ * @create: 2024-06-25 16:37
+ **/
+@Configuration
+public class AsyncConfiguration {
+
+    @Bean(name = "threadPoolTaskExecutor")
+    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+        executor.setCorePoolSize(100);    // 核心线程数, 根据需求进行调整
+        executor.setMaxPoolSize(150);    // 最大线程数, 适当设置以避免资源耗尽
+        executor.setQueueCapacity(200);    // 队列容量, 适当限制以避免请求堆积
+        executor.setKeepAliveSeconds(30);    // 线程空闲时的存活时间为30秒,减少系统开销
+        executor.setThreadNamePrefix("Thread-");    // 线程名称的前缀
+
+        // 使用 CallerRunsPolicy 拒绝策略,以减少任务被拒绝时带来的负担
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+
+        // 初始化线程池,配置其他参数(不过可以根据需要添加)
+        executor.initialize(); // 明确初始化,提升代码可读性
+
+        return executor; // 返回配置好的线程池
+    }
+}
diff --git a/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java b/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java
new file mode 100644
index 0000000..408184b
--- /dev/null
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/WsBean/MexcWsBean.java
@@ -0,0 +1,84 @@
+package org.example.kucoinclient.WsBean;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.util.EntityUtils;
+import org.example.kucoinclient.pojo.Currency;
+import org.example.kucoinclient.server.impl.CurrencySerivceImpl;
+import org.example.kucoinclient.wsClient.KucoinClient;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @ClassDescription: 客户端请求类
+ * @JdkVersion: 1.8
+ * @Created: 2023/8/31 16:13
+ */
+@Slf4j
+@Configuration
+public class MexcWsBean {
+
+    @Autowired
+    private CurrencySerivceImpl currencyService;
+
+    @Autowired
+    @Qualifier("threadPoolTaskExecutor")
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+    @Bean
+    public void kucoinWebsocketRunClientMap() throws Exception {
+        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin"));
+        if (!CollectionUtils.isEmpty(mexc)) {
+            String result = doPost();
+            JSONObject jsonObject = new JSONObject(result);
+            String token = jsonObject.getJSONObject("data").getString("token");
+            int batchSize = 100; // 每个线程处理的数据量
+            int totalSize = mexc.size();
+            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+
+            for (int i = 0; i < threadCount; i++) {
+                int fromIndex = i * batchSize;
+                int toIndex = Math.min(fromIndex + batchSize, totalSize);
+                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
+
+                // 使用自定义线程池提交任务
+                threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start);
+            }
+
+        }
+    }
+
+    public static String doPost() throws Exception {
+        String url = "https://api.kucoin.com/api/v1/bullet-public";
+        HttpPost httpPost = new HttpPost(url);
+        DefaultHttpClient defaultHttpClient = new DefaultHttpClient();
+        List<NameValuePair> nvps = new ArrayList<NameValuePair>();
+        httpPost.setEntity(new UrlEncodedFormEntity(nvps));
+        HttpResponse response = defaultHttpClient.execute(httpPost);
+        HttpEntity respEntity = response.getEntity();
+        String text = EntityUtils.toString(respEntity, "UTF-8");
+        defaultHttpClient.getConnectionManager().shutdown();
+        return text;
+    }
+}
+
diff --git a/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java b/kucoinClient/src/main/java/org/example/kucoinclient/dao/CurrencyMapper.java
similarity index 74%
copy from websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
copy to kucoinClient/src/main/java/org/example/kucoinclient/dao/CurrencyMapper.java
index 86b645b..1626cd1 100644
--- a/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/dao/CurrencyMapper.java
@@ -1,8 +1,8 @@
-package org.example.dao;
+package org.example.kucoinclient.dao;
 
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import org.apache.ibatis.annotations.Mapper;
-import org.example.pojo.Currency;
+import org.example.kucoinclient.pojo.Currency;
 
 /**
  * @program: demo
diff --git a/websocketClient/src/main/java/org/example/pojo/Currency.java b/kucoinClient/src/main/java/org/example/kucoinclient/pojo/Currency.java
similarity index 92%
rename from websocketClient/src/main/java/org/example/pojo/Currency.java
rename to kucoinClient/src/main/java/org/example/kucoinclient/pojo/Currency.java
index 2e0e322..ab6ac76 100644
--- a/websocketClient/src/main/java/org/example/pojo/Currency.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/pojo/Currency.java
@@ -1,4 +1,4 @@
-package org.example.pojo;
+package org.example.kucoinclient.pojo;
 
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
diff --git a/websocketClient/src/main/java/org/example/server/CurrencySerivce.java b/kucoinClient/src/main/java/org/example/kucoinclient/server/CurrencySerivce.java
similarity index 72%
copy from websocketClient/src/main/java/org/example/server/CurrencySerivce.java
copy to kucoinClient/src/main/java/org/example/kucoinclient/server/CurrencySerivce.java
index d50a81f..084bf52 100644
--- a/websocketClient/src/main/java/org/example/server/CurrencySerivce.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/server/CurrencySerivce.java
@@ -1,4 +1,4 @@
-package org.example.server;
+package org.example.kucoinclient.server;
 
 /**
  * @program: demo
diff --git a/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java b/kucoinClient/src/main/java/org/example/kucoinclient/server/impl/CurrencySerivceImpl.java
similarity index 65%
copy from websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
copy to kucoinClient/src/main/java/org/example/kucoinclient/server/impl/CurrencySerivceImpl.java
index 1b220f2..ac784a9 100644
--- a/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/server/impl/CurrencySerivceImpl.java
@@ -1,8 +1,8 @@
-package org.example.server.impl;
+package org.example.kucoinclient.server.impl;
 
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import org.example.dao.CurrencyMapper;
-import org.example.pojo.Currency;
+import org.example.kucoinclient.dao.CurrencyMapper;
+import org.example.kucoinclient.pojo.Currency;
 import org.springframework.stereotype.Service;
 
 /**
diff --git a/websocketClient/src/main/java/org/example/util/RedisUtil.java b/kucoinClient/src/main/java/org/example/kucoinclient/util/RedisUtil.java
similarity index 95%
copy from websocketClient/src/main/java/org/example/util/RedisUtil.java
copy to kucoinClient/src/main/java/org/example/kucoinclient/util/RedisUtil.java
index dab4e20..055be19 100644
--- a/websocketClient/src/main/java/org/example/util/RedisUtil.java
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/util/RedisUtil.java
@@ -1,9 +1,10 @@
-package org.example.util;
+package org.example.kucoinclient.util;
 
-import redis.clients.jedis.*;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public class RedisUtil {
diff --git a/kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java b/kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java
new file mode 100644
index 0000000..18827f7
--- /dev/null
+++ b/kucoinClient/src/main/java/org/example/kucoinclient/wsClient/KucoinClient.java
@@ -0,0 +1,250 @@
+package org.example.kucoinclient.wsClient;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import com.sun.istack.internal.NotNull;
+import lombok.extern.slf4j.Slf4j;
+import org.example.kucoinclient.pojo.Currency;
+import org.example.kucoinclient.util.RedisUtil;
+import org.json.JSONException;
+
+import javax.websocket.*;
+import java.io.UnsupportedEncodingException;
+import java.net.URI;
+import java.net.URLEncoder;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * @program: demo
+ * @description: Kucoin WebSocket 客户端
+ * @create: 2024-07-19 16:44
+ **/
+@ClientEndpoint
+@Slf4j
+public class KucoinClient {
+
+    private static final String WS_ENDPOINT = "wss://ws-api-spot.kucoin.com/spotMarket/level2Depth5:"; // WebSocket 端点
+    private static final String CHARSET = "UTF-8"; // 定义字符集常量,避免魔法字符串
+    private static final String PING_MESSAGE = "Ping"; // 心跳消息
+    private static final long PING_INTERVAL = 20000; // 心跳间隔
+    private static final int RECONNECT_DELAY = 5; // 重连间隔
+
+    private final List<Currency> subscriptions; // 订阅的货币列表
+    private final ScheduledExecutorService executorService; // 调度线程池
+    private Session session; // WebSocket会话
+    private String token; // 访问令牌
+    private final Object lock = new Object(); // 锁对象,用于控制重连
+    private volatile boolean reconnecting = false; // 表示当前是否在重连
+
+    private String id; // 当前会话的 ID
+
+    public KucoinClient(List<Currency> subscriptions, String token) {
+        this.subscriptions = subscriptions; // 保存订阅的货币列表
+        this.token = token; // 保存令牌
+        this.executorService = Executors.newScheduledThreadPool(1); // 创建调度线程池
+    }
+
+    public void start() {
+        try {
+            connect(); // 连接到 WebSocket 服务器
+            if (session == null) {
+                log.info("无法在超时时间内连接到服务器。"); // 输出连接失败日志
+                return; // 结束方法
+            }
+
+            // 定期发送心跳
+            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
+
+            synchronized (this) { // 同步等待连接
+                this.wait(); // 当前线程等待
+            }
+
+        } catch (Exception e) {
+            log.error("kucoin ws 连接过程中发生异常: ", e); // 捕获并记录异常
+        } finally {
+            executorService.shutdown(); // 关闭调度线程池
+        }
+    }
+
+    private void connect() throws Exception {
+        WebSocketContainer container = ContainerProvider.getWebSocketContainer(); // 获取 WebSocket 容器
+        String url = generateWebSocketURL(); // 生成 WebSocket URL
+        container.connectToServer(this, new URI(url)); // 连接到 WebSocket 服务器
+        String parameter = subscription(); // 生成订阅消息
+        session.getBasicRemote().sendText(parameter); // 发送订阅消息
+    }
+
+    private String generateWebSocketURL() throws UnsupportedEncodingException {
+        String symbol = getSymbol(); // 获取符号
+        String url = WS_ENDPOINT + symbol + "?token=" + URLEncoder.encode(token, CHARSET); // 构建 WebSocket URL
+        return url; // 返回生成的 URL
+    }
+
+    @OnOpen
+    public void onOpen(Session session) {
+        log.info("kucoin ws 已连接到服务器。"); // 输出连接成功日志
+        this.session = session; // 保存当前会话
+        synchronized (this) { // 同步通知所有等待的线程
+            this.notify();
+        }
+    }
+
+    private static final Gson gson = new Gson(); // Gson 实例,负责 JSON 处理
+
+    @OnMessage
+    public void onMessage(String message) {
+        try {
+            Map<String, Object> map = parseMessage(message); // 解析消息
+            handleMessage(map); // 处理消息
+        } catch (JsonSyntaxException | JsonProcessingException e) {
+            log.error("JSON 解析异常:", e); // 捕获 JSON 解析异常
+        }
+    }
+
+    private Map<String, Object> parseMessage(String message) throws JsonSyntaxException {
+        return gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType()); // 解析 JSON 消息
+    }
+
+    private void handleMessage(Map<String, Object> map) throws JsonProcessingException {
+        if (map.get("id") != null) {
+            this.id = map.get("id").toString(); // 设置会话 ID
+        }
+        if (map.get("data") != null) {
+            Object object = map.get("data"); // 获取数据内容
+            processData(map.get("topic").toString(), object); // 处理数据
+        }
+    }
+
+    private static final String PREFIX = "kucoin"; // 创建常量,方便以后修改和维护
+
+    private void processData(String topic, Object object) throws JsonProcessingException {
+        // 将数据解析为 Map
+        Map<String, Object> resultMap = null;
+        try {
+            resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType()); // 解析 JSON 对象
+        } catch (JsonSyntaxException e) {
+            log.error("JSON 解析失败: {}", e.getMessage()); // 输出 JSON 解析错误日志
+            return; // 结束方法执行
+        }
+
+        if (resultMap != null) {
+            // 创建线程安全的 HashMap
+            Map<String, Object> hashMap = new ConcurrentHashMap<>();
+            ObjectMapper mapper = new ObjectMapper(); // 创建 ObjectMapper 实例
+
+            // 空值检查,避免存储 null 值到 Redis
+            if (resultMap.get("bids") != null) {
+                hashMap.put("bids", resultMap.get("bids")); // 存储 bids
+            }
+            if (resultMap.get("asks") != null) {
+                hashMap.put("asks", resultMap.get("asks")); // 存储 asks
+            }
+
+            String symbol = extractSymbolFromTopic(topic); // 从 topic 提取符号
+            String key = PREFIX + symbol; // 创建 Redis 缓存键
+            try {
+                RedisUtil.set(key, mapper.writeValueAsString(hashMap)); // 存储到 Redis
+            } catch (JsonProcessingException e) {
+                log.error("将数据存入 Redis 时出错: {}", e.getMessage()); // 输出数据存储错误日志
+            }
+        } else {
+            log.error("topic--->存入redis失败"); // 输出处理失败日志
+        }
+    }
+
+
+    private String extractSymbolFromTopic(String topic) {
+        int index = topic.indexOf(":"); // 找到分隔符的位置
+        if (index != -1) { // 如果找到了分隔符
+            String substring = topic.substring(index + 1);
+            return substring.replaceAll("-", ""); // 返回去掉"-"的符号
+        }
+        return ""; // 未找到分隔符返回空
+    }
+
+    @OnClose
+    public void onClose() {
+        log.info("kucoin ws 连接已关闭,尝试重新连接..."); // 输出连接关闭日志
+        handleConnectionClosedOrError(); // 处理连接关闭或错误
+    }
+
+    @OnError
+    public void onError(Throwable throwable) {
+        log.error("kucoin ws 发生错误: ", throwable); // 输出错误日志
+        handleConnectionClosedOrError(); // 处理连接关闭或错误
+    }
+
+    private void handleConnectionClosedOrError() {
+        synchronized (lock) { // 同步块,确保线程安全
+            if (!reconnecting) {
+                reconnecting = true; // 状态标记为重连中
+                executorService.execute(this::attemptReconnect); // 执行重连操作
+            }
+        }
+    }
+
+    private void attemptReconnect() {
+        try {
+            log.info("kucoin ws 开始重连"); // 输出重连开始日志
+            connect(); // 尝试重新连接
+            log.info("kucoin ws 重连成功"); // 输出重连成功日志
+        } catch (Exception e) {
+            log.error("kucoin ws 重连失败: ", e); // 输出重连失败日志
+        } finally {
+            synchronized (lock) { // 同步块
+                reconnecting = false; // 状态标记为未重连
+                scheduleReconnect(); // 重新调度重连任务
+            }
+        }
+    }
+
+    private void scheduleReconnect() {
+        if (!executorService.isShutdown()) { // 检查线程池是否已经关闭
+            executorService.schedule(this::attemptReconnect, RECONNECT_DELAY, TimeUnit.SECONDS); // 调度重连
+        }
+    }
+
+    private void sendPing() {
+        try {
+            if (session != null) {
+                session.getBasicRemote().sendPing(ByteBuffer.wrap(PING_MESSAGE.getBytes(CHARSET))); // 发送心跳消息
+            }
+        } catch (Exception e) {
+            log.error("发送心跳失败:", e); // 捕获并记录心跳发送失败的异常
+        }
+    }
+
+    public String subscription() throws JsonProcessingException, JSONException {
+        String symbol = getSymbol(); // 获取当前货币符号
+
+        // 创建 Map 构建 JSON 对象
+        Map<String, Object> jsonMap = new HashMap<>();
+        jsonMap.put("id", id); // 会话 ID
+        jsonMap.put("type", "subscribe"); // 订阅类型
+        jsonMap.put("topic", "/spotMarket/level2Depth50:" + symbol); // 订阅的主题
+        jsonMap.put("privateChannel", false); // 是否私有通道
+        jsonMap.put("response", true); // 是否返回响应
+
+        ObjectMapper mapper = new ObjectMapper(); // 创建 ObjectMapper 实例
+        return mapper.writeValueAsString(jsonMap); // 返回 JSON 字符串
+    }
+
+    @NotNull
+    private String getSymbol() {
+        List<String> symbolList = subscriptions.stream()
+                .map(currency -> currency.getSymbol().replaceAll("USDT", "-USDT")) // 替换符号
+                .collect(Collectors.toList()); // 收集符号到列表
+        return String.join(",", symbolList); // 将符号列表转换为 字符串
+    }
+}
diff --git a/mexcClient/src/main/java/org/example/mexcclient/ThreadConfig/AsyncConfiguration.java b/mexcClient/src/main/java/org/example/mexcclient/ThreadConfig/AsyncConfiguration.java
new file mode 100644
index 0000000..61c5a76
--- /dev/null
+++ b/mexcClient/src/main/java/org/example/mexcclient/ThreadConfig/AsyncConfiguration.java
@@ -0,0 +1,35 @@
+package org.example.mexcclient.ThreadConfig;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+
+import java.util.concurrent.ThreadPoolExecutor;
+
+/**
+ * @program: dabaogp
+ * @description: 线程池配置
+ * @create: 2024-06-25 16:37
+ **/
+@Configuration
+public class AsyncConfiguration {
+
+    @Bean(name = "threadPoolTaskExecutor")
+    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
+        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
+
+        executor.setCorePoolSize(100);    // 核心线程数, 根据需求进行调整
+        executor.setMaxPoolSize(150);    // 最大线程数, 适当设置以避免资源耗尽
+        executor.setQueueCapacity(200);    // 队列容量, 适当限制以避免请求堆积
+        executor.setKeepAliveSeconds(30);    // 线程空闲时的存活时间为30秒,减少系统开销
+        executor.setThreadNamePrefix("Thread-");    // 线程名称的前缀
+
+        // 使用 CallerRunsPolicy 拒绝策略,以减少任务被拒绝时带来的负担
+        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
+
+        // 初始化线程池,配置其他参数(不过可以根据需要添加)
+        executor.initialize(); // 明确初始化,提升代码可读性
+
+        return executor; // 返回配置好的线程池
+    }
+}
diff --git a/mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java b/mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java
new file mode 100644
index 0000000..3adeebf
--- /dev/null
+++ b/mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java
@@ -0,0 +1,71 @@
+package org.example.mexcclient.WsBean;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+import org.example.mexcclient.pojo.Currency;
+import org.example.mexcclient.server.impl.CurrencySerivceImpl;
+import org.example.mexcclient.wsClient.MexcClient;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.CollectionUtils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @ClassDescription: 客户端请求类
+ * @JdkVersion: 1.8
+ * @Created: 2023/8/31 16:13
+ */
+@Slf4j
+@Configuration
+public class MexcWsBean {
+
+    @Autowired
+    private CurrencySerivceImpl currencyService;
+
+    @Autowired
+    @Qualifier("threadPoolTaskExecutor")
+    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+    @Bean
+    public void mexcWebsocketRunClientMap() {
+        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
+        if (!CollectionUtils.isEmpty(mexc)) {
+            int batchSize = 30; // 每个线程处理的数据量
+            int totalSize = mexc.size();
+            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+
+            for (int i = 0; i < threadCount; i++) {
+                int fromIndex = i * batchSize;
+                int toIndex = Math.min(fromIndex + batchSize, totalSize);
+                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
+
+                // 使用自定义线程池提交任务
+                threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
+            }
+
+        }
+    }
+}
+
diff --git a/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java b/mexcClient/src/main/java/org/example/mexcclient/dao/CurrencyMapper.java
similarity index 74%
copy from websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
copy to mexcClient/src/main/java/org/example/mexcclient/dao/CurrencyMapper.java
index 86b645b..c22e4cb 100644
--- a/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
+++ b/mexcClient/src/main/java/org/example/mexcclient/dao/CurrencyMapper.java
@@ -1,8 +1,9 @@
-package org.example.dao;
+package org.example.mexcclient.dao;
+
 
 import com.baomidou.mybatisplus.core.mapper.BaseMapper;
 import org.apache.ibatis.annotations.Mapper;
-import org.example.pojo.Currency;
+import org.example.mexcclient.pojo.Currency;
 
 /**
  * @program: demo
diff --git a/websocketClient/src/main/java/org/example/pojo/Currency.java b/mexcClient/src/main/java/org/example/mexcclient/pojo/Currency.java
similarity index 92%
copy from websocketClient/src/main/java/org/example/pojo/Currency.java
copy to mexcClient/src/main/java/org/example/mexcclient/pojo/Currency.java
index 2e0e322..b714d95 100644
--- a/websocketClient/src/main/java/org/example/pojo/Currency.java
+++ b/mexcClient/src/main/java/org/example/mexcclient/pojo/Currency.java
@@ -1,4 +1,4 @@
-package org.example.pojo;
+package org.example.mexcclient.pojo;
 
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
diff --git a/websocketClient/src/main/java/org/example/server/CurrencySerivce.java b/mexcClient/src/main/java/org/example/mexcclient/server/CurrencySerivce.java
similarity index 73%
copy from websocketClient/src/main/java/org/example/server/CurrencySerivce.java
copy to mexcClient/src/main/java/org/example/mexcclient/server/CurrencySerivce.java
index d50a81f..45c1b59 100644
--- a/websocketClient/src/main/java/org/example/server/CurrencySerivce.java
+++ b/mexcClient/src/main/java/org/example/mexcclient/server/CurrencySerivce.java
@@ -1,4 +1,4 @@
-package org.example.server;
+package org.example.mexcclient.server;
 
 /**
  * @program: demo
diff --git a/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java b/mexcClient/src/main/java/org/example/mexcclient/server/impl/CurrencySerivceImpl.java
similarity index 66%
rename from websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
rename to mexcClient/src/main/java/org/example/mexcclient/server/impl/CurrencySerivceImpl.java
index 1b220f2..da4d984 100644
--- a/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
+++ b/mexcClient/src/main/java/org/example/mexcclient/server/impl/CurrencySerivceImpl.java
@@ -1,8 +1,8 @@
-package org.example.server.impl;
+package org.example.mexcclient.server.impl;
 
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
-import org.example.dao.CurrencyMapper;
-import org.example.pojo.Currency;
+import org.example.mexcclient.dao.CurrencyMapper;
+import org.example.mexcclient.pojo.Currency;
 import org.springframework.stereotype.Service;
 
 /**
diff --git a/websocketClient/src/main/java/org/example/util/RedisUtil.java b/mexcClient/src/main/java/org/example/mexcclient/util/RedisUtil.java
similarity index 95%
copy from websocketClient/src/main/java/org/example/util/RedisUtil.java
copy to mexcClient/src/main/java/org/example/mexcclient/util/RedisUtil.java
index dab4e20..c033556 100644
--- a/websocketClient/src/main/java/org/example/util/RedisUtil.java
+++ b/mexcClient/src/main/java/org/example/mexcclient/util/RedisUtil.java
@@ -1,9 +1,10 @@
-package org.example.util;
+package org.example.mexcclient.util;
 
-import redis.clients.jedis.*;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public class RedisUtil {
diff --git a/websocketClient/src/main/java/org/example/wsClient/MexcClient.java b/mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java
similarity index 96%
rename from websocketClient/src/main/java/org/example/wsClient/MexcClient.java
rename to mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java
index 80352a8..8508ab9 100644
--- a/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
+++ b/mexcClient/src/main/java/org/example/mexcclient/wsClient/MexcClient.java
@@ -1,4 +1,4 @@
-package org.example.wsClient;
+package org.example.mexcclient.wsClient;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -8,8 +8,8 @@
 import com.google.gson.Gson;
 import com.google.gson.JsonSyntaxException;
 import lombok.extern.slf4j.Slf4j;
-import org.example.pojo.Currency;
-import org.example.util.RedisUtil;
+import org.example.mexcclient.pojo.Currency;
+import org.example.mexcclient.util.RedisUtil;
 
 import javax.websocket.*;
 import java.io.IOException;
@@ -94,8 +94,6 @@
                 hashMap.put("asks",resultMap.get("asks"));
                 String key = "mexc" + map.get("s").toString();
                 RedisUtil.set(key, mapper.writeValueAsString(hashMap));
-            } else {
-                log.warn("消息不包含 's' 字段或解析失败:" + message);
             }
         } catch (JsonSyntaxException e) {
             log.error("JSON 解析异常:" + e.getMessage(), e);
diff --git a/pom.xml b/pom.xml
index c102e2c..0191b17 100644
--- a/pom.xml
+++ b/pom.xml
@@ -9,7 +9,6 @@
     <version>1.0-SNAPSHOT</version>
     <packaging>pom</packaging>
     <modules>
-        <module>websocketClient</module>
         <module>websocketSerivce</module>
     </modules>
     <parent>
diff --git a/websocketClient/pom.xml b/websocketClient/pom.xml
deleted file mode 100644
index 8167829..0000000
--- a/websocketClient/pom.xml
+++ /dev/null
@@ -1,137 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.example</groupId>
-        <artifactId>webSocketProject</artifactId>
-        <version>1.0-SNAPSHOT</version>
-    </parent>
-
-    <artifactId>websocketClient</artifactId>
-    <dependencies>
-        <dependency>
-            <groupId>com.google.guava</groupId>
-            <artifactId>guava</artifactId>
-            <version>20.0</version>
-        </dependency>
-        <dependency>
-            <groupId>redis.clients</groupId>
-            <artifactId>jedis</artifactId>
-            <version>3.7.0</version>
-        </dependency>
-        <dependency>
-            <groupId>com.baomidou</groupId>
-            <artifactId>mybatis-plus-boot-starter</artifactId>
-            <version>3.4.3.2</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-web</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.java-websocket</groupId>
-            <artifactId>Java-WebSocket</artifactId>
-            <version>1.5.2</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.httpcomponents</groupId>
-            <artifactId>httpclient</artifactId>
-            <version>4.5.13</version>
-        </dependency>
-        <dependency>
-            <groupId>io.socket</groupId>
-            <artifactId>engine.io-client</artifactId>
-            <version>2.1.0</version>
-        </dependency>
-        <dependency>
-            <groupId>org.json</groupId>
-            <artifactId>json</artifactId>
-            <version>20090211</version>
-        </dependency>
-        <dependency>
-            <groupId>com.google.code.gson</groupId>
-            <artifactId>gson</artifactId>
-            <version>2.8.8</version>
-        </dependency>
-        <dependency>
-            <groupId>com.alibaba</groupId>
-            <artifactId>druid-spring-boot-starter</artifactId>
-            <version>1.2.11</version>
-        </dependency>
-        <dependency>
-            <groupId>mysql</groupId>
-            <artifactId>mysql-connector-java</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.mybatis.spring.boot</groupId>
-            <artifactId>mybatis-spring-boot-starter</artifactId>
-            <version>2.2.2</version>
-        </dependency>
-
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-websocket</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-data-redis</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter</artifactId>
-        </dependency>
-
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-test</artifactId>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-pool2</artifactId>
-            <version>2.11.1</version>
-        </dependency>
-
-        <dependency>
-            <groupId>redis.clients</groupId>
-            <artifactId>jedis</artifactId>
-            <version>3.7.0</version>
-        </dependency>
-    </dependencies>
-    <properties>
-        <maven.compiler.source>8</maven.compiler.source>
-        <maven.compiler.target>8</maven.compiler.target>
-        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-    </properties>
-    <build>
-        <finalName>websocketClient</finalName>
-        <plugins>
-            <!-- 打包项目 mvn clean package -->
-            <plugin>
-                <groupId>org.springframework.boot</groupId>
-                <artifactId>spring-boot-maven-plugin</artifactId>
-                <!-- 指定main方法入口 -->
-                <configuration>
-                    <mainClass>org.example.WsClientApplication</mainClass>
-                </configuration>
-                <executions>
-                    <execution>
-                        <goals>
-                            <goal>repackage</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
\ No newline at end of file
diff --git a/websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java b/websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java
deleted file mode 100644
index bbbc919..0000000
--- a/websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.example.ThreadConfig;
-
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-
-import java.util.concurrent.ThreadPoolExecutor;
-
-/**
- * @program: dabaogp
- * @description:
- * @create: 2024-06-25 16:37
- **/
-@Configuration
-public class AsyncConfiguration {
-
-    @Bean(name = "threadPoolTaskExecutor")
-    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
-        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
-        executor.setCorePoolSize(400);    //  核心线程数
-        executor.setMaxPoolSize(1200);    //  最大线程数
-        executor.setQueueCapacity(1500);    //  队列容量
-        executor.setKeepAliveSeconds(60);    //  线程空闲时的存活时间为60秒
-        executor.setThreadNamePrefix("MexcThread-");    //  线程名称的前缀
-        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    //  使用  CallerRunsPolicy  拒绝策略
-        return executor;
-    }
-}
diff --git a/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
deleted file mode 100644
index d6664e9..0000000
--- a/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
+++ /dev/null
@@ -1,171 +0,0 @@
-package org.example.WsBean;
-
-import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.http.HttpEntity;
-import org.apache.http.HttpResponse;
-import org.apache.http.NameValuePair;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.DefaultHttpClient;
-import org.apache.http.message.BasicNameValuePair;
-import org.apache.http.util.EntityUtils;
-import org.example.pojo.Currency;
-import org.example.server.impl.CurrencySerivceImpl;
-import org.example.wsClient.BitgetClient;
-import org.example.wsClient.GateClient;
-import org.example.wsClient.KucoinClient;
-import org.example.wsClient.MexcClient;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
-import org.springframework.util.CollectionUtils;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * @ClassDescription: 客户端请求类
- * @JdkVersion: 1.8
- * @Created: 2023/8/31 16:13
- */
-@Slf4j
-@Configuration
-public class MexcWsBean {
-
-    @Autowired
-    private CurrencySerivceImpl currencyService;
-
-    @Autowired
-    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
-
-//    @Bean
-//    public void mexcWebsocketRunClientMap() {
-//        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
-//        if (!CollectionUtils.isEmpty(mexc)) {
-//            int batchSize = 30; // 每个线程处理的数据量
-//            int totalSize = mexc.size();
-//            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
-//
-//            for (int i = 0; i < threadCount; i++) {
-//                int fromIndex = i * batchSize;
-//                int toIndex = Math.min(fromIndex + batchSize, totalSize);
-//                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
-//
-//                // 使用自定义线程池提交任务
-//                threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
-//            }
-//
-//        }
-//    }
-
-//    @Bean
-//    public void gateWebsocketRunClientMap() {
-//        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
-//        if (!CollectionUtils.isEmpty(mexc)) {
-//            int batchSize = 100; // 每个线程处理的数据量
-//            int totalSize = mexc.size();
-//            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
-//
-//            for (int i = 0; i < threadCount; i++) {
-//                int fromIndex = i * batchSize;
-//                int toIndex = Math.min(fromIndex + batchSize, totalSize);
-//                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
-//
-//                // 使用自定义线程池提交任务
-//                threadPoolTaskExecutor.execute(new GateClient(sublist)::start);
-//            }
-//
-//        }
-//    }
-
-    @Bean
-    public void bitgetWebsocketRunClientMap() throws JSONException, JsonProcessingException {
-        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "bitget"));
-        if (!CollectionUtils.isEmpty(mexc)) {
-            int batchSize = 100; // 每个线程处理的数据量
-            int totalSize = mexc.size();
-            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
-
-            for (int i = 0; i < threadCount; i++) {
-                int fromIndex = i * batchSize;
-                int toIndex = Math.min(fromIndex + batchSize, totalSize);
-                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
-                String parameter = getParameter(sublist);
-                // 使用自定义线程池提交任务
-                threadPoolTaskExecutor.execute(new BitgetClient(parameter)::start);
-            }
-
-        }
-    }
-//
-//    @Bean
-//    public void kucoinWebsocketRunClientMap() throws Exception {
-//        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin"));
-//        if (!CollectionUtils.isEmpty(mexc)) {
-//            String result = doPost();
-//            JSONObject jsonObject = new JSONObject(result);
-//            String token = jsonObject.getJSONObject("data").getString("token");
-//            int batchSize = 100; // 每个线程处理的数据量
-//            int totalSize = mexc.size();
-//            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
-//
-//            for (int i = 0; i < threadCount; i++) {
-//                int fromIndex = i * batchSize;
-//                int toIndex = Math.min(fromIndex + batchSize, totalSize);
-//                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
-//
-//                // 使用自定义线程池提交任务
-//                threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start);
-//            }
-//
-//        }
-//    }
-
-    public static String doPost() throws Exception {
-        String url = "https://api.kucoin.com/api/v1/bullet-public";
-        HttpPost httpPost = new HttpPost(url);
-        DefaultHttpClient defaultHttpClient = new DefaultHttpClient();
-        List<NameValuePair> nvps = new ArrayList<NameValuePair>();
-        httpPost.setEntity(new UrlEncodedFormEntity(nvps));
-        HttpResponse response = defaultHttpClient.execute(httpPost);
-        HttpEntity respEntity = response.getEntity();
-        String text = EntityUtils.toString(respEntity, "UTF-8");
-        defaultHttpClient.getConnectionManager().shutdown();
-        return text;
-    }
-
-    public String getParameter(List<Currency> list) throws JsonProcessingException, JSONException {
-        // 创建一个ObjectMapper实例
-        ObjectMapper mapper = new ObjectMapper();
-        List<String> symbolList = list.stream().map(Currency::getSymbol).collect(Collectors.toList());
-        // 使用Map构建JSON对象
-        Map<String, Object> jsonMap = new HashMap<>();
-        jsonMap.put("op", "subscribe");
-        List<Map<String, String>> mapList = new ArrayList<>();
-        symbolList.forEach(f->{
-            Map<String, String> argsMap = new HashMap<>();
-            argsMap.put("instType", "SPOT");
-            argsMap.put("channel", "books15");
-            argsMap.put("instId", f);
-            mapList.add(argsMap);
-        });
-        jsonMap.put("args", mapList);
-
-        // 将Map转换为JSON字符串
-        String jsonString = mapper.writeValueAsString(jsonMap);
-        return jsonString;
-
-    }
-}
-
diff --git a/websocketClient/src/main/java/org/example/WsClientApplication.java b/websocketClient/src/main/java/org/example/WsClientApplication.java
deleted file mode 100644
index 54aa288..0000000
--- a/websocketClient/src/main/java/org/example/WsClientApplication.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package org.example;
-
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.scheduling.annotation.EnableScheduling;
-
-/**
- * @ClassDescription:
- * @JdkVersion: 1.8
- * @Created: 2023/8/31 16:09
- */
-@EnableScheduling
-@SpringBootApplication
-public class WsClientApplication {
-    public static void main(String[] args) {
-        SpringApplication.run(WsClientApplication.class, args);
-    }
-}
-
diff --git a/websocketClient/src/main/java/org/example/wsClient/BitgetClient.java b/websocketClient/src/main/java/org/example/wsClient/BitgetClient.java
deleted file mode 100644
index d7f06c8..0000000
--- a/websocketClient/src/main/java/org/example/wsClient/BitgetClient.java
+++ /dev/null
@@ -1,186 +0,0 @@
-package org.example.wsClient;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-import com.google.gson.JsonSyntaxException;
-import lombok.extern.slf4j.Slf4j;
-import org.example.pojo.Currency;
-import org.example.util.RedisUtil;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import javax.websocket.*;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-@ClientEndpoint
-@Slf4j
-public class BitgetClient {
-
-
-    private static final String WS_ENDPOINT = "wss://ws.bitget.com/v2/ws/public";
-    private static final long PING_INTERVAL = 20000;
-
-    private final String subscriptions;
-    private final ScheduledExecutorService executorService;
-    private Session session;
-
-    private final Object lock = new Object(); // 添加一个锁对象
-    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
-
-    public BitgetClient(String subscriptions) {
-        this.subscriptions = subscriptions;
-        this.executorService = Executors.newScheduledThreadPool(1);
-    }
-
-    public void start() {
-        try {
-            connect();
-            if (session == null) {
-                log.info("无法在超时时间内连接到服务器。");
-                return;
-            }
-
-            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
-
-            // 订阅消息
-            session.getBasicRemote().sendText(subscriptions);
-
-            synchronized (this) {
-                this.wait();
-            }
-
-        } catch (Exception e) {
-            log.error("bitget ws 连接过程中发生异常: " + e.getMessage(), e);
-        } finally {
-            executorService.shutdown();
-        }
-    }
-
-    private void connect() throws Exception {
-        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
-        container.connectToServer(this, new URI(WS_ENDPOINT));
-    }
-
-    @OnOpen
-    public void onOpen(Session session) {
-        log.info("bitget ws 已连接到服务器。");
-        this.session = session;
-        synchronized (this) {
-            this.notify();
-        }
-    }
-    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
-
-    @OnMessage
-    public void onMessage(String message) {
-        try {
-            JSONObject jsonObject = new JSONObject(message);
-            if (null != jsonObject && null != jsonObject.getJSONObject("arg").getString("instId") && null != jsonObject.get("data")) {
-                HashMap<String,Object> hashMap = new HashMap<>();
-                ObjectMapper mapper = new ObjectMapper();
-
-                hashMap.put("bids",jsonObject.getJSONObject("data").getString("bids"));
-                hashMap.put("asks",jsonObject.getJSONObject("data").getString("asks"));
-
-                String key = "bitget" + jsonObject.getJSONObject("arg").getString("instId");
-                RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
-            }
-        } catch (JsonSyntaxException e) {
-            log.error("JSON 解析异常:" + e.getMessage(), e);
-        } catch (JsonProcessingException e) {
-            log.error("JSON 解析异常:" + e.getMessage(), e);
-        } catch (JSONException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @OnClose
-    public void onClose() {
-        log.info("bitget ws 连接已关闭,尝试重新连接...");
-        handleConnectionClosedOrError();
-    }
-
-    @OnError
-    public void onError(Throwable throwable) {
-        log.error("bitget ws 发生错误: " + throwable.getMessage(), throwable);
-        handleConnectionClosedOrError();
-    }
-
-    private void handleConnectionClosedOrError() {
-        synchronized (lock) {
-            if (!reconnecting) {
-                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
-                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
-            }
-        }
-    }
-
-    private void attemptReconnect() {
-        boolean doReconnect = true;
-        try {
-            log.info("bitget ws 开始重连");
-            connect(); // 假设 connect() 方法用于实际的连接逻辑
-            log.info("bitget ws 重连成功");
-        } catch (Exception e) {
-            log.error("bitget ws 重连失败", e);
-            // 连接失败时,可以根据具体情况决定是否继续重连
-            // 在这里假设总是继续尝试重连
-        } finally {
-            synchronized (lock) {
-                if (doReconnect) {
-                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
-                } else {
-                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
-                }
-            }
-        }
-    }
-
-    private void scheduleReconnect() {
-        if (!executorService.isShutdown()) {
-            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
-        }
-    }
-
-    private void sendPing() {
-        try {
-            if (session != null) {
-                session.getBasicRemote().sendText("ping"); // 发送心跳消息
-            }
-        } catch (Exception e) {
-            log.error("发送心跳失败", e);
-        }
-    }
-
-    public String getParameter(String symbol) throws JsonProcessingException, JSONException {
-        // 创建一个ObjectMapper实例
-        ObjectMapper mapper = new ObjectMapper();
-
-        // 使用Map构建JSON对象
-        Map<String, Object> jsonMap = new HashMap<>();
-        jsonMap.put("op", "subscribe");
-        Map<String, Object> argsMap = new HashMap<>();
-        jsonMap.put("args", argsMap);
-
-        argsMap.put("instType", "SPOT");
-        argsMap.put("channel", "books15");
-        argsMap.put("instId", symbol);
-
-        // 将Map转换为JSON字符串
-        String jsonString = mapper.writeValueAsString(jsonMap);
-        return jsonString;
-
-    }
-
-
-}
diff --git a/websocketClient/src/main/java/org/example/wsClient/GateClient.java b/websocketClient/src/main/java/org/example/wsClient/GateClient.java
deleted file mode 100644
index 466b1cb..0000000
--- a/websocketClient/src/main/java/org/example/wsClient/GateClient.java
+++ /dev/null
@@ -1,236 +0,0 @@
-package org.example.wsClient;
-
-import com.alibaba.druid.support.json.JSONUtils;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-import com.google.gson.JsonSyntaxException;
-import lombok.extern.slf4j.Slf4j;
-import org.example.pojo.Currency;
-import org.example.util.RedisUtil;
-import org.json.JSONArray;
-import org.json.JSONException;
-import org.json.JSONObject;
-
-import javax.websocket.*;
-import java.math.BigDecimal;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.json.JSONObject;
-/**
- * @program: demo
- * @description:
- * @create: 2024-07-18 15:30
- **/
-@ClientEndpoint
-@Slf4j
-public class GateClient {
-
-    private static final String WS_ENDPOINT = "wss://api.gateio.ws/ws/v4/";
-    private static final long PING_INTERVAL = 20000;
-
-    private final List<Currency> subscriptions;
-    private final ScheduledExecutorService executorService;
-    private Session session;
-
-    private final Object lock = new Object(); // 添加一个锁对象
-    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
-
-    public GateClient(List<Currency> subscriptions) {
-        this.subscriptions = subscriptions;
-        this.executorService = Executors.newScheduledThreadPool(1);
-    }
-
-    public void start() {
-        try {
-            connect();
-            if (session == null) {
-                log.info("无法在超时时间内连接到服务器。");
-                return;
-            }
-
-            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
-
-            // 订阅消息
-            for (Currency subscription : subscriptions) {
-                String parameter = getParameter(subscription.getSymbol());
-                session.getBasicRemote().sendText(parameter);
-            }
-
-            synchronized (this) {
-                this.wait();
-            }
-
-        } catch (Exception e) {
-            log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e);
-        } finally {
-            executorService.shutdown();
-        }
-    }
-
-    private void connect() throws Exception {
-        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
-        container.connectToServer(this, new URI(WS_ENDPOINT));
-    }
-
-    @OnOpen
-    public void onOpen(Session session) {
-        log.info("gate ws 已连接到服务器。");
-        this.session = session;
-        synchronized (this) {
-            this.notify();
-        }
-    }
-    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
-
-    @OnMessage
-    public void onMessage(String message) {
-        try {
-            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
-            Object object = map.get("result");
-            Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
-            if (null != resultMap && null != resultMap.get("s")) {
-                HashMap<String,Object> hashMap = new HashMap<>();
-                ObjectMapper mapper = new ObjectMapper();
-
-                hashMap.put("bids",resultMap.get("bids"));
-                hashMap.put("asks",resultMap.get("asks"));
-
-                String key = "gate" + resultMap.get("s");
-                RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
-            }
-        } catch (JsonSyntaxException e) {
-            log.error("JSON 解析异常:" + e.getMessage(), e);
-        } catch (JsonProcessingException e) {
-            log.error("JSON 解析异常:" + e.getMessage(), e);
-        }
-    }
-
-    @OnClose
-    public void onClose() {
-        log.info("gate ws 连接已关闭,尝试重新连接...");
-        handleConnectionClosedOrError();
-    }
-
-    @OnError
-    public void onError(Throwable throwable) {
-        log.error("gate ws 发生错误: " + throwable.getMessage(), throwable);
-        handleConnectionClosedOrError();
-    }
-
-    private void handleConnectionClosedOrError() {
-        synchronized (lock) {
-            if (!reconnecting) {
-                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
-                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
-            }
-        }
-    }
-
-    private void attemptReconnect() {
-        boolean doReconnect = true;
-        try {
-            log.info("gate ws 开始重连");
-            connect(); // 假设 connect() 方法用于实际的连接逻辑
-            log.info("gate ws 重连成功");
-        } catch (Exception e) {
-            log.error("gate ws 重连失败", e);
-            // 连接失败时,可以根据具体情况决定是否继续重连
-            // 在这里假设总是继续尝试重连
-        } finally {
-            synchronized (lock) {
-                if (doReconnect) {
-                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
-                } else {
-                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
-                }
-            }
-        }
-    }
-
-    private void scheduleReconnect() {
-        if (!executorService.isShutdown()) {
-            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
-        }
-    }
-
-    private void sendPing() {
-        try {
-            if (session != null) {
-                session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
-            }
-        } catch (Exception e) {
-            log.error("发送心跳失败", e);
-        }
-    }
-
-    public String getParameter(String symbol) throws JsonProcessingException, JSONException {
-        // 替换USDT为_USDT
-        symbol = symbol.replaceAll("USDT", "_USDT");
-
-        // 创建一个ObjectMapper实例
-        ObjectMapper mapper = new ObjectMapper();
-        // 获取当前时间的毫秒数
-        long currentTimeMillis = System.currentTimeMillis();
-
-        // 定义常量
-        final String CHANNEL = "spot.order_book";  // 固定频道
-        final String EVENT_SUBSCRIBE = "subscribe"; // 订阅事件
-        final String EVENT_UNSUBSCRIBE = "unsubscribe"; // 取消订阅事件
-        final String[] PAYLOAD = new String[] {symbol, "20", "100ms"}; // 负载信息
-
-        // 使用Map构建JSON对象
-        Map<String, Object> jsonMap = new HashMap<>(); // 创建Map用于存放JSON内容
-        jsonMap.put("time", currentTimeMillis); // 放入当前时间
-        jsonMap.put("channel", CHANNEL); // 放入频道
-        jsonMap.put("event", EVENT_SUBSCRIBE); // 放入事件类型
-        jsonMap.put("payload", Arrays.asList(PAYLOAD)); // 将数组转换为List并放入Map
-
-        // 将Map转换为JSON字符串
-        String jsonString = mapper.writeValueAsString(jsonMap); // 使用ObjectMapper转换
-        return jsonString; // 返回JSON字符串
-
-    }
-
-//    public static void main(String[] args) throws JSONException {
-//        // 从 resultMap 中获取 "bids" 对应的 JSON 数组
-//        JSONArray jsonArray = new JSONArray(resultMap.get("bids").toString()); // 将获取的 bids 转换成 JSON 数组
-//        List<List<String>> resultList = new ArrayList<>(); // 存放所有的内层列表
-//
-//        // 遍历 JSON 数组
-//        for (int i = 0; i < jsonArray.length(); i++) {
-//            JSONArray innerArray = jsonArray.getJSONArray(i); // 获取当前内层 JSON 数组
-//            List<String> innerList = new ArrayList<>(); // 存放当前内层数组的元素
-//
-//            // 遍历内层 JSON 数组
-//            for (int j = 0; j < innerArray.length(); j++) {
-//                innerList.add(innerArray.getString(j)); // 将元素添加到内层列表中
-//            }
-//
-//            resultList.add(innerList); // 将内层列表添加到结果列表中
-//        }
-//
-//        // 考虑去掉未使用的 dataList,下面的代码使用 resultList 而不是 dataList
-//        List<Map<String, String>> resultMapList = new ArrayList<>(); // 存放最终的映射结果
-//        for (List<String> entry : resultList) { // 遍历 resultList 中的每个内层列表
-//            // 确保每个内层列表有足够的元素,再进行映射
-//            if (entry.size() >= 2) { // 判断 entry 的大小,避免 IndexOutOfBoundsException
-//                Map<String, String> mapKey = new HashMap<>(); // 新建一个 Map 以存储键值对
-//                mapKey.put("p", entry.get(0)); // 将内层列表的第一个元素作为键 "p"
-//                mapKey.put("v", entry.get(1)); // 将内层列表的第二个元素作为键 "v"
-//                resultMapList.add(mapKey); // 将 map 添加到结果映射列表中
-//            }
-//        }
-//
-//    }
-
-
-
-}
diff --git a/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java b/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
deleted file mode 100644
index 88ad686..0000000
--- a/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
+++ /dev/null
@@ -1,239 +0,0 @@
-package org.example.wsClient;
-
-import com.alibaba.druid.util.StringUtils;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.reflect.TypeToken;
-import com.google.gson.Gson;
-import com.google.gson.JsonSyntaxException;
-import lombok.extern.slf4j.Slf4j;
-import org.example.pojo.Currency;
-import org.example.util.RedisUtil;
-import org.jetbrains.annotations.NotNull;
-import org.json.JSONException;
-
-import java.io.*;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import javax.websocket.*;
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URLEncoder;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-/**
- * @program: demo
- * @description:
- * @create: 2024-07-19 16:44
- **/
-@ClientEndpoint
-@Slf4j
-public class KucoinClient {
-
-    private static final String WS_ENDPOINT = "wss://ws-api-spot.kucoin.com/spotMarket/level2Depth5:";
-    private static final long PING_INTERVAL = 20000;
-
-    private final List<Currency> subscriptions;
-    private final ScheduledExecutorService executorService;
-    private Session session;
-    private String token;
-    private final Object lock = new Object(); // 添加一个锁对象
-    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
-
-    private String id;
-
-    public KucoinClient(List<Currency> subscriptions,String token) {
-        this.subscriptions = subscriptions;
-        this.token = token;
-        this.executorService = Executors.newScheduledThreadPool(1);
-    }
-
-    public void start() {
-        try {
-            connect();
-            if (session == null) {
-                log.info("无法在超时时间内连接到服务器。");
-                return;
-            }
-
-            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
-
-            synchronized (this) {
-                this.wait();
-            }
-
-        } catch (Exception e) {
-            log.error("kucoin ws 连接过程中发生异常: " + e.getMessage(), e);
-        } finally {
-            executorService.shutdown();
-        }
-    }
-
-    private void connect() throws Exception {
-        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
-        String url = extracted();
-        if(!StringUtils.isEmpty(url)){
-            container.connectToServer(this, new URI(url));
-            // 订阅消息
-            String parameter = subscription();
-            session.getBasicRemote().sendText(parameter);
-
-        }
-    }
-
-    private static final String CHARSET = "UTF-8"; // 定义字符集常量,避免魔法字符串
-
-    private String extracted() throws UnsupportedEncodingException {
-        String symbol = getSymbol();
-        String url = WS_ENDPOINT + symbol + "?token=" + URLEncoder.encode(token, CHARSET);
-        return url;
-    }
-
-
-    @OnOpen
-    public void onOpen(Session session) {
-        log.info("kucoin ws 已连接到服务器。");
-        this.session = session;
-        synchronized (this) {
-            this.notify();
-        }
-    }
-    private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
-
-    @OnMessage
-    public void onMessage(String message) {
-        try {
-            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
-            if(null != map.get("id")){
-                this.id = map.get("id").toString();
-            }
-            if(null != map.get("data")){
-                Object object = map.get("data");
-                String topic = map.get("topic").toString();
-                Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
-                if (null != resultMap) {
-                    HashMap<String,Object> hashMap = new HashMap<>();
-                    ObjectMapper mapper = new ObjectMapper();
-                    hashMap.put("bids",resultMap.get("bids"));
-                    hashMap.put("asks",resultMap.get("asks"));
-
-                    int index = topic.indexOf(":"); // 找到逗号的位置
-                    if (index != -1) { // 如果找到了逗号
-                        String substring = topic.substring(index + 1);
-                        String symbol = substring.replaceAll("-", "");
-                        String key = "kucoin" + symbol;
-                        RedisUtil.set(key, mapper.writeValueAsString(hashMap));
-                    } else {
-                        // 处理未找到特定字符的情况
-                        log.error("topic--->存入redis失败");
-                    }
-                }
-            }
-        } catch (JsonSyntaxException e) {
-            log.error("JSON 解析异常:" + e.getMessage(), e);
-        } catch (JsonProcessingException e) {
-            log.error("JSON 解析异常:" + e.getMessage(), e);
-        }
-    }
-
-    @OnClose
-    public void onClose() {
-        log.info("kucoin ws 连接已关闭,尝试重新连接...");
-        handleConnectionClosedOrError();
-    }
-
-    @OnError
-    public void onError(Throwable throwable) {
-        log.error("kucoin ws 发生错误: " + throwable.getMessage(), throwable);
-        handleConnectionClosedOrError();
-    }
-
-    private void handleConnectionClosedOrError() {
-        synchronized (lock) {
-            if (!reconnecting) {
-                reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
-                executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
-            }
-        }
-    }
-
-    private void attemptReconnect() {
-        boolean doReconnect = true;
-        try {
-            log.info("kucoin ws 开始重连");
-            connect(); // 假设 connect() 方法用于实际的连接逻辑
-            log.info("kucoin ws 重连成功");
-        } catch (Exception e) {
-            log.error("kucoin ws 重连失败", e);
-            // 连接失败时,可以根据具体情况决定是否继续重连
-            // 在这里假设总是继续尝试重连
-        } finally {
-            synchronized (lock) {
-                if (doReconnect) {
-                    scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
-                } else {
-                    reconnecting = false; // 重连结束后设置 reconnecting 为 false
-                }
-            }
-        }
-    }
-
-    private void scheduleReconnect() {
-        if (!executorService.isShutdown()) {
-            executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
-        }
-    }
-
-    private void sendPing() {
-        try {
-            if (session != null) {
-                session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
-            }
-        } catch (Exception e) {
-            log.error("发送心跳失败", e);
-        }
-    }
-
-    public String subscription() throws JsonProcessingException, JSONException {
-        String symbol = getSymbol();
-        // 创建一个ObjectMapper实例
-        ObjectMapper mapper = new ObjectMapper();
-
-        // 使用Map构建JSON对象
-        Map<String, Object> jsonMap = new HashMap<>();
-        jsonMap.put("id", id);
-        jsonMap.put("type", "subscribe");
-        jsonMap.put("topic", "/spotMarket/level2Depth50:"+symbol);
-        jsonMap.put("privateChannel", false);
-        jsonMap.put("response", true);
-
-        // 将Map转换为JSON字符串
-        String jsonString = mapper.writeValueAsString(jsonMap);
-        return jsonString;
-
-    }
-
-    @NotNull
-    private String getSymbol() {
-        String symbol;
-        List<String> symbolList = subscriptions.stream()
-                .map(currency -> currency.getSymbol().replaceAll("USDT", "-USDT"))
-                .collect(Collectors.toList());
-        symbol = String.join(",", symbolList);
-        return symbol;
-    }
-
-
-}
-
-
diff --git a/websocketClient/src/main/resources/application.yml b/websocketClient/src/main/resources/application.yml
deleted file mode 100644
index 9253a37..0000000
--- a/websocketClient/src/main/resources/application.yml
+++ /dev/null
@@ -1,75 +0,0 @@
-server:
-  port: 8095
-
-
-spring:
-  # redis 配置
-  redis:
-    # 地址
-    host: localhost
-    # 端口,默认为6379
-    port: 6379
-    # 数据库索引
-    database: 1
-    # 密码
-    password:
-    # 连接超时时间
-    timeout: 10s
-    lettuce:
-      pool:
-        # 连接池中的最小空闲连接
-        min-idle: 0
-        # 连接池中的最大空闲连接
-        max-idle: 99
-        # 连接池的最大数据库连接数
-        max-active: 99
-        # #连接池最大阻塞等待时间(使用负值表示没有限制)
-        max-wait: -1ms
-
-  datasource:
-    type: com.alibaba.druid.pool.DruidDataSource
-    driverClassName: com.mysql.cj.jdbc.Driver
-    url: jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
-    username: root
-    password: 123456
-
-    druid:
-      # 初始连接数
-      initialSize: 5
-      # 最小连接池数量
-      minIdle: 10
-      # 最大连接池数量
-      maxActive: 20
-      # 配置获取连接等待超时的时间
-      maxWait: 60000
-      # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
-      timeBetweenEvictionRunsMillis: 60000
-      # 配置一个连接在池中最小生存的时间,单位是毫秒
-      minEvictableIdleTimeMillis: 300000
-      # 配置一个连接在池中最大生存的时间,单位是毫秒
-      maxEvictableIdleTimeMillis: 900000
-      # 配置检测连接是否有效
-      validationQuery: SELECT 1 FROM DUAL
-      testWhileIdle: true
-      testOnBorrow: false
-      testOnReturn: false
-      webStatFilter:
-        enabled: true
-      statViewServlet:
-        enabled: true
-        # 设置白名单,不填则允许所有访问
-        allow:
-        url-pattern: /druid/*
-        # 控制台管理用户名和密码
-        login-username: Greysparrow
-        login-password: 123456
-      filter:
-        stat:
-          enabled: true
-          # 慢SQL记录
-          log-slow-sql: true
-          slow-sql-millis: 1000
-          merge-sql: true
-        wall:
-          config:
-            multi-statement-allow: false
diff --git a/websocketSerivce/pom.xml b/websocketSerivce/pom.xml
index 6b59c27..7e3fbed 100644
--- a/websocketSerivce/pom.xml
+++ b/websocketSerivce/pom.xml
@@ -19,6 +19,11 @@
     </properties>
 
     <dependencies>
+        <dependency>
+            <groupId>redis.clients</groupId>
+            <artifactId>jedis</artifactId>
+            <version>3.7.0</version>
+        </dependency>
         <!-- 阿里数据库连接池 -->
         <dependency>
             <groupId>com.alibaba</groupId>
diff --git a/websocketSerivce/src/main/java/org/example/WsServerApplication.java b/websocketSerivce/src/main/java/org/example/WsServerApplication.java
index 7f222f2..76c266b 100644
--- a/websocketSerivce/src/main/java/org/example/WsServerApplication.java
+++ b/websocketSerivce/src/main/java/org/example/WsServerApplication.java
@@ -1,8 +1,14 @@
 package org.example;
 
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import org.checkerframework.checker.units.qual.A;
+import org.example.server.CurrencySerivce;
+import org.example.server.impl.CurrencySerivceImpl;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.web.socket.config.annotation.EnableWebSocket;
 
@@ -19,5 +25,13 @@
     public static void main(String[] args) {
         SpringApplication.run(WsServerApplication.class, args);
     }
+
+    @Autowired
+    private CurrencySerivce currencySerivce;
+
+    @Bean
+    public void start() throws JsonProcessingException {
+        currencySerivce.start();
+    }
 }
 
diff --git a/websocketSerivce/src/main/java/org/example/server/CurrencySerivce.java b/websocketSerivce/src/main/java/org/example/server/CurrencySerivce.java
index d50a81f..2abdadc 100644
--- a/websocketSerivce/src/main/java/org/example/server/CurrencySerivce.java
+++ b/websocketSerivce/src/main/java/org/example/server/CurrencySerivce.java
@@ -1,9 +1,12 @@
 package org.example.server;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+
 /**
  * @program: demo
  * @description:
  * @create: 2024-07-16 15:23
  **/
 public interface CurrencySerivce {
+    void start() throws JsonProcessingException;
 }
diff --git a/websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java b/websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
index 1b220f2..83616a9 100644
--- a/websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
+++ b/websocketSerivce/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
@@ -1,9 +1,18 @@
 package org.example.server.impl;
 
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
 import org.example.dao.CurrencyMapper;
 import org.example.pojo.Currency;
+import org.example.server.CurrencySerivce;
+import org.example.util.RedisUtil;
 import org.springframework.stereotype.Service;
+
+import java.util.*;
 
 /**
  * @program: demo
@@ -11,5 +20,71 @@
  * @create: 2024-07-16 15:23
  **/
 @Service
-public class CurrencySerivceImpl extends ServiceImpl<CurrencyMapper, Currency> {
+public class CurrencySerivceImpl extends ServiceImpl<CurrencyMapper, Currency> implements CurrencySerivce {
+
+    private HashMap hashMap = new HashMap();
+    private static final Gson gson = new Gson();
+    private Set<String> keys;
+    @Override
+    public void start() throws JsonProcessingException {
+        Set<String> mexcSet = RedisUtil.keys("mexc");
+        Set<String> gateSet = RedisUtil.keys("gate");
+        Set<String> bitgetSet = RedisUtil.keys("bitget");
+        Set<String> kucoinSet = RedisUtil.keys("kucoin");
+
+        //这里做一个定时器,每10秒更新一次
+        keys = RedisUtil.keys("*");
+
+        HashMap<String,Map<String, Object>> mexcMap = new HashMap<>();
+        for (String key : mexcSet) {
+            String v = RedisUtil.get(key);
+            Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType());
+            mexcMap.put(key.replaceAll("mexc",""),redisValueMap);
+        }
+
+        HashMap<String,List<HashMap<String,String>>> asksHashMapList = new HashMap<>();
+        HashMap<String,Map<String, Object>> gateMap = new HashMap<>();
+        for (String key : gateSet) {
+            String v = RedisUtil.get(key);
+            Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType());
+            String asks = redisValueMap.get("asks").toString();
+            String bids = redisValueMap.get("bids").toString();
+
+            // 使用 Jackson 解析 JSON
+            ObjectMapper objectMapper = new ObjectMapper();
+            JsonNode asksNode = objectMapper.readTree(asks);
+            JsonNode bidsNode = objectMapper.readTree(bids);
+
+            // 将 "asks" 数组转换为 List<List<String>>
+            List<HashMap<String,String>> asksList = new ArrayList<>();
+            for (JsonNode arrayNode : asksNode) {
+                HashMap<String,String> asksMap = new HashMap<>();
+                asksMap.put("p",arrayNode.get(0).toString());
+                asksMap.put("v",arrayNode.get(1).toString());
+                asksList.add(asksMap);
+            }
+            for (JsonNode arrayNode : bidsNode) {
+                HashMap<String,String> asksMap = new HashMap<>();
+                asksMap.put("p",arrayNode.get(0).toString());
+                asksMap.put("v",arrayNode.get(1).toString());
+                asksList.add(asksMap);
+            }
+            gateMap.put(key.replaceAll("gate",""),redisValueMap);
+        }
+
+//        HashMap<String,Map<String, Object>> bitgetMap = new HashMap<>();
+//        for (String key : mexcSet) {
+//            String v = RedisUtil.get(key);
+//            Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType());
+//            mexcMap.put(key.replaceAll("bitget",""),redisValueMap);
+//        }
+//
+//        HashMap<String,Map<String, Object>> kucoinMap = new HashMap<>();
+//        for (String key : mexcSet) {
+//            String v = RedisUtil.get(key);
+//            Map<String, Object> redisValueMap = gson.fromJson(v, new TypeToken<Map<String, Object>>() {}.getType());
+//            mexcMap.put(key.replaceAll("kucoin",""),redisValueMap);
+//        }
+
+    }
 }
diff --git a/websocketClient/src/main/java/org/example/util/RedisUtil.java b/websocketSerivce/src/main/java/org/example/util/RedisUtil.java
similarity index 90%
copy from websocketClient/src/main/java/org/example/util/RedisUtil.java
copy to websocketSerivce/src/main/java/org/example/util/RedisUtil.java
index dab4e20..8fb6808 100644
--- a/websocketClient/src/main/java/org/example/util/RedisUtil.java
+++ b/websocketSerivce/src/main/java/org/example/util/RedisUtil.java
@@ -1,16 +1,18 @@
 package org.example.util;
 
-import redis.clients.jedis.*;
+import redis.clients.jedis.Jedis;
+import redis.clients.jedis.JedisPool;
+import redis.clients.jedis.JedisPoolConfig;
+import redis.clients.jedis.Protocol;
 
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 
 public class RedisUtil {
     private static JedisPool jedisPool;
 
     static {
-        jedisPool = new JedisPool(new JedisPoolConfig(), "localhost", 6379);
+        jedisPool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, Protocol.DEFAULT_TIMEOUT, null, 0);
     }
 
     // 私有构造方法,防止实例化
@@ -28,6 +30,12 @@
         }
     }
 
+    public static Set<String> keys(String key) {
+        try (Jedis jedis = jedisPool.getResource()) {
+            return jedis.keys("*"+key+"*");
+        }
+    }
+
     public static void hset(String key, String field, String value) {
         try (Jedis jedis = jedisPool.getResource()) {
             jedis.hset(key, field, value);
diff --git a/websocketSerivce/src/main/resources/application.yml b/websocketSerivce/src/main/resources/application.yml
index bfad70d..191842b 100644
--- a/websocketSerivce/src/main/resources/application.yml
+++ b/websocketSerivce/src/main/resources/application.yml
@@ -1,5 +1,5 @@
 server:
-  port: 8091
+  port: 8080
 
 spring:
   datasource:

--
Gitblit v1.9.3