From bf47f0de2c4b4ca35ce08c7abd2b5b2477b31dac Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Thu, 18 Jul 2024 03:20:57 +0800
Subject: [PATCH] 1
---
websocketClient/src/main/java/org/example/pojo/Currency.java | 25 ++
websocketClient/src/main/resources/application.yml | 74 ++++++
websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java | 15 +
websocketSerivce/src/main/java/org/example/task/GateStock.java | 4
.idea/misc.xml | 1
websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java | 51 ++++
/dev/null | 69 ------
websocketClient/src/main/java/org/example/util/RedisUtil.java | 119 ++++++++++
websocketSerivce/src/main/java/org/example/task/BitgetStock.java | 4
websocketClient/src/main/java/org/example/wsClient/MexcClient.java | 148 +++++++++++++
websocketClient/pom.xml | 66 ++++++
websocketClient/src/main/java/org/example/dao/CurrencyMapper.java | 14 +
websocketSerivce/src/main/java/org/example/task/MexcStock.java | 4
websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java | 10
websocketClient/src/main/java/org/example/server/CurrencySerivce.java | 9
websocketSerivce/src/main/java/org/example/task/KucoinStock.java | 4
16 files changed, 537 insertions(+), 80 deletions(-)
diff --git a/.idea/misc.xml b/.idea/misc.xml
index 4b661a5..f0d7b15 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -1,4 +1,3 @@
-<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ExternalStorageConfigurationManager" enabled="true" />
<component name="MavenProjectsManager">
diff --git a/websocketClient/pom.xml b/websocketClient/pom.xml
index 31b848e..8167829 100644
--- a/websocketClient/pom.xml
+++ b/websocketClient/pom.xml
@@ -12,6 +12,22 @@
<artifactId>websocketClient</artifactId>
<dependencies>
<dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>20.0</version>
+ </dependency>
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ <version>3.7.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.baomidou</groupId>
+ <artifactId>mybatis-plus-boot-starter</artifactId>
+ <version>3.4.3.2</version>
+ </dependency>
+
+ <dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
@@ -40,6 +56,56 @@
<artifactId>gson</artifactId>
<version>2.8.8</version>
</dependency>
+ <dependency>
+ <groupId>com.alibaba</groupId>
+ <artifactId>druid-spring-boot-starter</artifactId>
+ <version>1.2.11</version>
+ </dependency>
+ <dependency>
+ <groupId>mysql</groupId>
+ <artifactId>mysql-connector-java</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.mybatis.spring.boot</groupId>
+ <artifactId>mybatis-spring-boot-starter</artifactId>
+ <version>2.2.2</version>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-websocket</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-data-redis</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter</artifactId>
+ </dependency>
+
+
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-pool2</artifactId>
+ <version>2.11.1</version>
+ </dependency>
+
+ <dependency>
+ <groupId>redis.clients</groupId>
+ <artifactId>jedis</artifactId>
+ <version>3.7.0</version>
+ </dependency>
</dependencies>
<properties>
<maven.compiler.source>8</maven.compiler.source>
diff --git a/websocketClient/src/main/java/org/example/config/AsyncConfiguration.java b/websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java
similarity index 72%
rename from websocketClient/src/main/java/org/example/config/AsyncConfiguration.java
rename to websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java
index f77bada..bbbc919 100644
--- a/websocketClient/src/main/java/org/example/config/AsyncConfiguration.java
+++ b/websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java
@@ -1,4 +1,4 @@
-package org.example.config;
+package org.example.ThreadConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -17,11 +17,11 @@
@Bean(name = "threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
- executor.setCorePoolSize(50); // 核心线程数
- executor.setMaxPoolSize(100); // 最大线程数
- executor.setQueueCapacity(300); // 队列容量
+ executor.setCorePoolSize(400); // 核心线程数
+ executor.setMaxPoolSize(1200); // 最大线程数
+ executor.setQueueCapacity(1500); // 队列容量
executor.setKeepAliveSeconds(60); // 线程空闲时的存活时间为60秒
- executor.setThreadNamePrefix("MyThread-"); // 线程名称的前缀
+ executor.setThreadNamePrefix("MexcThread-"); // 线程名称的前缀
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 使用 CallerRunsPolicy 拒绝策略
return executor;
}
diff --git a/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
new file mode 100644
index 0000000..0315b55
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
@@ -0,0 +1,51 @@
+package org.example.WsBean;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.example.pojo.Currency;
+import org.example.server.impl.CurrencySerivceImpl;
+import org.example.wsClient.MexcClient;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import org.springframework.util.CollectionUtils;
+
+import java.util.List;
+
+/**
+ * @ClassDescription: 客户端请求类
+ * @JdkVersion: 1.8
+ * @Created: 2023/8/31 16:13
+ */
+@Slf4j
+@Configuration
+public class MexcWsBean {
+
+ @Autowired
+ private CurrencySerivceImpl currencyService;
+
+ @Autowired
+ private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+
+ @Bean
+ public void mexcWebsocketRunClientMap() {
+ List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
+ if (!CollectionUtils.isEmpty(mexc)) {
+ int batchSize = 30; // 每个线程处理的数据量
+ int totalSize = mexc.size();
+ int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+
+ for (int i = 0; i < threadCount; i++) {
+ int fromIndex = i * batchSize;
+ int toIndex = Math.min(fromIndex + batchSize, totalSize);
+ List<Currency> sublist = mexc.subList(fromIndex, toIndex);
+
+ // 使用自定义线程池提交任务
+ threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
+ }
+
+ }
+ }
+}
+
diff --git a/websocketClient/src/main/java/org/example/constant/StockConstant.java b/websocketClient/src/main/java/org/example/constant/StockConstant.java
deleted file mode 100644
index 7ca7395..0000000
--- a/websocketClient/src/main/java/org/example/constant/StockConstant.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package org.example.constant;
-
-/**
- * 股票长量配置
- * */
-public class StockConstant {
-
- public static String HTTP_API = "http://api-in-2.js-stock.top/";
-
- public static String IO_HTTP_API = "http://api-in-2-socket.js-stock.top";
-
- public static String HTTP_F_API = "http://api-v1-f.js-stock.top/";
-
-
- public static String WS_URL = "ws://api-in-2-ws.js-stock.top";
-
- public static String KEY = "eVKtHt7aG4m6ozwWL9qG";
-
-
- public static String US_API_URL = "http://api-us.js-stock.top/";
-
- public static String US_KEY = "jZFrku4RGQjP87Hmq5tm";
-
-
- public static String JP_HTTP_API = "http://api-jp.js-stock.top/";
- public static String JP_WS_URL = "ws://api-jp-ws.js-stock.top";
- public static String JP_KEY = "glcd4SAiD6oXExIDOtUJ";
-}
diff --git a/websocketClient/src/main/java/org/example/controller/WsClientController.java b/websocketClient/src/main/java/org/example/controller/WsClientController.java
deleted file mode 100644
index c30defe..0000000
--- a/websocketClient/src/main/java/org/example/controller/WsClientController.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package org.example.controller;
-
-import lombok.extern.slf4j.Slf4j;
-import org.example.server.wcServer;
-import org.java_websocket.client.WebSocketClient;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.net.URI;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @ClassDescription: 客户端请求类
- * @JdkVersion: 1.8
- * @Created: 2023/8/31 16:13
- */
-@Slf4j
-@Configuration
-public class WsClientController {
-
- @Bean
- public Map<String, WebSocketClient> websocketRunClientMap() {
-
- Map<String, WebSocketClient> retMap = new HashMap<>(2);
- try {
- wcServer websocketRunClient = new wcServer(new URI("wss://api.gateio.ws/ws/v4/"));
- websocketRunClient.connect();
- websocketRunClient.setConnectionLostTimeout(0);
- new Thread(() -> {
- while (true) {
- try {
- Thread.sleep(8000);
- websocketRunClient.send("heartbeat".getBytes());
- } catch (Exception e) {
- websocketRunClient.reconnect();
- websocketRunClient.setConnectionLostTimeout(0);
- }
- }
- }).start();
- } catch (Exception e) {
- }
- return retMap;
- }
-
-
-}
-
diff --git a/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java b/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
new file mode 100644
index 0000000..86b645b
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
@@ -0,0 +1,14 @@
+package org.example.dao;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+import org.apache.ibatis.annotations.Mapper;
+import org.example.pojo.Currency;
+
+/**
+ * @program: demo
+ * @description:
+ * @create: 2024-07-15 17:45
+ **/
+@Mapper
+public interface CurrencyMapper extends BaseMapper<Currency> {
+}
diff --git a/websocketClient/src/main/java/org/example/pojo/Currency.java b/websocketClient/src/main/java/org/example/pojo/Currency.java
new file mode 100644
index 0000000..2e0e322
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/pojo/Currency.java
@@ -0,0 +1,25 @@
+package org.example.pojo;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import lombok.Data;
+
+/**
+ * @program: demo
+ * @description: 交易对
+ * @create: 2024-07-15 17:41
+ **/
+@Data
+public class Currency {
+ @TableId(value = "id",type = IdType.AUTO)
+ private String id;
+ //交易对
+ private String symbol;
+ //交易币
+ private String baseAsset;
+ //计价币
+ private String quoteAsset;
+ //来源
+ private String source;
+
+}
diff --git a/websocketClient/src/main/java/org/example/server/CurrencySerivce.java b/websocketClient/src/main/java/org/example/server/CurrencySerivce.java
new file mode 100644
index 0000000..d50a81f
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/server/CurrencySerivce.java
@@ -0,0 +1,9 @@
+package org.example.server;
+
+/**
+ * @program: demo
+ * @description:
+ * @create: 2024-07-16 15:23
+ **/
+public interface CurrencySerivce {
+}
diff --git a/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java b/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
new file mode 100644
index 0000000..1b220f2
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
@@ -0,0 +1,15 @@
+package org.example.server.impl;
+
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import org.example.dao.CurrencyMapper;
+import org.example.pojo.Currency;
+import org.springframework.stereotype.Service;
+
+/**
+ * @program: demo
+ * @description:
+ * @create: 2024-07-16 15:23
+ **/
+@Service
+public class CurrencySerivceImpl extends ServiceImpl<CurrencyMapper, Currency> {
+}
diff --git a/websocketClient/src/main/java/org/example/server/wcServer.java b/websocketClient/src/main/java/org/example/server/wcServer.java
deleted file mode 100644
index adeaf59..0000000
--- a/websocketClient/src/main/java/org/example/server/wcServer.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.example.server;
-
-import org.apache.http.HttpResponse;
-import org.apache.http.client.HttpClient;
-import org.apache.http.client.entity.UrlEncodedFormEntity;
-import org.apache.http.client.methods.HttpPost;
-import org.apache.http.impl.client.HttpClients;
-import org.apache.http.message.BasicNameValuePair;
-import org.example.constant.StockConstant;
-import org.java_websocket.WebSocket;
-import org.java_websocket.client.WebSocketClient;
-import org.java_websocket.handshake.ServerHandshake;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import javax.websocket.ClientEndpoint;
-import javax.websocket.ContainerProvider;
-import javax.websocket.Session;
-import javax.websocket.WebSocketContainer;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.HttpURLConnection;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.URL;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * @program: webSocketProject
- * @description:
- * @create: 2024-03-26 16:11
- **/
-@ClientEndpoint
-public class wcServer extends WebSocketClient {
-
- public wcServer(URI serverUri) throws URISyntaxException {
- super(serverUri);
- }
-
- @Override
- public void onOpen(ServerHandshake shake) {
- long timestamp = System.currentTimeMillis() / 1000;
- String payload = "{\"time\": " + timestamp + ", \"channel\": \"spot.order_book\", \"event\": \"subscribe\", \"payload\": [\"BTC_USDT\", \"5\", \"100ms\"]}";
- send(payload);
- }
-
- @Override
- public void onMessage(String paramString) {
- send(paramString);
- }
-
- @Override
- public void onClose(int paramInt, String paramString, boolean paramBoolean) {
- System.out.println("关闭...");
- }
-
- @Override
- public void onError(Exception e) {
- System.out.println("异常" + e);
-
- }
-
- @Override
- public void send(String message) {
- }
-
-}
diff --git a/websocketClient/src/main/java/org/example/util/RedisUtil.java b/websocketClient/src/main/java/org/example/util/RedisUtil.java
new file mode 100644
index 0000000..63361dd
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/util/RedisUtil.java
@@ -0,0 +1,119 @@
+package org.example.util;
+
+import redis.clients.jedis.*;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class RedisUtil {
+ private static JedisPool jedisPool;
+
+ // 在静态代码块中初始化 JedisPool
+ static {
+ jedisPool = new JedisPool(new JedisPoolConfig(), "localhost", 6379);
+ }
+
+ public static void set(String key, String value) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ jedis.set(key, value);
+ }
+ }
+
+ public static String get(String key) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ return jedis.get(key);
+ }
+ }
+
+ public static void hset(String key, String field, String value) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ jedis.hset(key, field, value);
+ }
+ }
+
+ public static String hget(String key, String field) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ return jedis.hget(key, field);
+ }
+ }
+
+ public static void lpush(String key, String... values) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ jedis.lpush(key, values);
+ }
+ }
+
+ public static List<String> lrange(String key, long start, long end) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ return jedis.lrange(key, start, end);
+ }
+ }
+
+ public static void sadd(String key, String... members) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ jedis.sadd(key, members);
+ }
+ }
+
+ public static Set<String> smembers(String key) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ return jedis.smembers(key);
+ }
+ }
+
+ public static void zadd(String key, double score, String member) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ jedis.zadd(key, score, member);
+ }
+ }
+
+ public static Set<String> zrange(String key, long start, long end) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ return jedis.zrange(key, start, end);
+ }
+ }
+
+ public static void delete(String key) {
+ try (Jedis jedis = jedisPool.getResource()) {
+ jedis.del(key);
+ }
+ }
+
+ public static void disconnect() {
+ jedisPool.close();
+ }
+
+ // 示例用法
+ public static void main(String[] args) {
+ // 直接使用 RedisUtil 的静态方法
+ RedisUtil.set("mykey", "myvalue");
+ String value = RedisUtil.get("mykey");
+ System.out.println("Value for 'mykey': " + value);
+
+ RedisUtil.hset("user:1", "name", "Alice");
+ String userName = RedisUtil.hget("user:1", "name");
+ System.out.println("Name for 'user:1': " + userName);
+
+ RedisUtil.lpush("mylist", "element1", "element2", "element3");
+ List<String> listValues = RedisUtil.lrange("mylist", 0, -1);
+ System.out.println("Values in 'mylist': " + listValues);
+
+ RedisUtil.sadd("myset", "member1", "member2", "member3");
+ Set<String> setMembers = RedisUtil.smembers("myset");
+ System.out.println("Members in 'myset': " + setMembers);
+
+ RedisUtil.zadd("myzset", 1.0, "member1");
+ RedisUtil.zadd("myzset", 2.0, "member2");
+ Set<String> zsetMembers = RedisUtil.zrange("myzset", 0, -1);
+ System.out.println("Members in 'myzset': " + zsetMembers);
+
+ RedisUtil.delete("mykey");
+ RedisUtil.delete("user:1");
+ RedisUtil.delete("mylist");
+ RedisUtil.delete("myset");
+ RedisUtil.delete("myzset");
+
+ RedisUtil.disconnect();
+ }
+}
diff --git a/websocketClient/src/main/java/org/example/wsClient/MexcClient.java b/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
new file mode 100644
index 0000000..6dceb87
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
@@ -0,0 +1,148 @@
+package org.example.wsClient;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.gson.Gson;
+import org.example.pojo.Currency;
+import org.example.util.RedisUtil;
+
+import javax.websocket.*;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+
+@ClientEndpoint
+public class MexcClient {
+ private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址
+ private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping
+ private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间
+
+ private final List<Currency> subscriptions;
+ private final ScheduledExecutorService executorService;
+ private Session session;
+ private boolean reconnecting = false;
+
+ public MexcClient(List<Currency> subscriptions) {
+ this.subscriptions = subscriptions;
+ this.executorService = Executors.newScheduledThreadPool(1);
+ }
+
+ public void start() {
+ try {
+ connect();
+ if (session == null) {
+ System.err.println("无法在超时时间内连接到服务器。");
+ return;
+ }
+
+ executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
+
+ // 订阅消息
+ for (Currency subscription : subscriptions) {
+ String parameter = getParameter(subscription.getSymbol());
+ session.getBasicRemote().sendText(parameter);
+ }
+
+ synchronized (this) {
+ this.wait();
+ }
+
+ } catch (Exception e) {
+ System.err.println("连接过程中发生异常: " + e.getMessage());
+ e.printStackTrace();
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ private void connect() throws Exception {
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+ container.connectToServer(this, new URI(WS_ENDPOINT));
+ }
+
+ @OnOpen
+ public void onOpen(Session session) {
+ System.out.println("已连接到服务器。");
+ this.session = session;
+ synchronized (this) {
+ this.notify();
+ }
+ }
+
+ @OnMessage
+ public void onMessage(String message) {
+ Gson gson = new Gson();
+ Map map = gson.fromJson(message, Map.class);
+ if (map != null && map.containsKey("s")) {
+ RedisUtil.set("mexc" + map.get("s").toString(), message);
+ }
+ // 没有过滤撤单的数据
+ }
+
+ @OnClose
+ public void onClose() {
+ System.out.println("连接已关闭,尝试重新连接...");
+ session = null;
+ if (!reconnecting) {
+ reconnect();
+ }
+ }
+
+ @OnError
+ public void onError(Throwable throwable) {
+ System.err.println("发生错误: " + throwable.getMessage());
+ if (!reconnecting) {
+ reconnect();
+ }
+ }
+
+ private void reconnect() {
+ if (reconnecting) {
+ return;
+ }
+ reconnecting = true;
+ executorService.schedule(() -> {
+ try {
+ connect();
+ reconnecting = false;
+ } catch (Exception e) {
+ e.printStackTrace();
+ reconnect();
+ }
+ }, calculateBackoffTime(), TimeUnit.MILLISECONDS);
+ }
+
+ private long calculateBackoffTime() {
+ // 实现退避策略,例如指数退避
+ return 5000; // 例子:5秒
+ }
+
+ private void sendPing() {
+ try {
+ if (session != null) {
+ session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ public String getParameter(String symbol) throws JsonProcessingException {
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode root = mapper.createObjectNode();
+
+ root.put("method", "SUBSCRIPTION");
+ ArrayNode paramsArray = mapper.createArrayNode();
+ String customParam = String.format("spot@public.limit.depth.v3.api@%s@20", symbol);
+ paramsArray.add(customParam);
+ root.set("params", paramsArray);
+
+ return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(root);
+ }
+}
diff --git a/websocketClient/src/main/resources/application.yml b/websocketClient/src/main/resources/application.yml
index b56c205..8b50501 100644
--- a/websocketClient/src/main/resources/application.yml
+++ b/websocketClient/src/main/resources/application.yml
@@ -1,3 +1,75 @@
server:
port: 8090
-
\ No newline at end of file
+
+
+spring:
+ # redis 配置
+ redis:
+ # 地址
+ host: localhost
+ # 端口,默认为6379
+ port: 6379
+ # 数据库索引
+ database: 0
+ # 密码
+ password:
+ # 连接超时时间
+ timeout: 10s
+ lettuce:
+ pool:
+ # 连接池中的最小空闲连接
+ min-idle: 0
+ # 连接池中的最大空闲连接
+ max-idle: 99
+ # 连接池的最大数据库连接数
+ max-active: 99
+ # #连接池最大阻塞等待时间(使用负值表示没有限制)
+ max-wait: -1ms
+
+ datasource:
+ type: com.alibaba.druid.pool.DruidDataSource
+ driverClassName: com.mysql.cj.jdbc.Driver
+ url: jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
+ username: root
+ password: 123456
+
+ druid:
+ # 初始连接数
+ initialSize: 5
+ # 最小连接池数量
+ minIdle: 10
+ # 最大连接池数量
+ maxActive: 20
+ # 配置获取连接等待超时的时间
+ maxWait: 60000
+ # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
+ timeBetweenEvictionRunsMillis: 60000
+ # 配置一个连接在池中最小生存的时间,单位是毫秒
+ minEvictableIdleTimeMillis: 300000
+ # 配置一个连接在池中最大生存的时间,单位是毫秒
+ maxEvictableIdleTimeMillis: 900000
+ # 配置检测连接是否有效
+ validationQuery: SELECT 1 FROM DUAL
+ testWhileIdle: true
+ testOnBorrow: false
+ testOnReturn: false
+ webStatFilter:
+ enabled: true
+ statViewServlet:
+ enabled: true
+ # 设置白名单,不填则允许所有访问
+ allow:
+ url-pattern: /druid/*
+ # 控制台管理用户名和密码
+ login-username: Greysparrow
+ login-password: 123456
+ filter:
+ stat:
+ enabled: true
+ # 慢SQL记录
+ log-slow-sql: true
+ slow-sql-millis: 1000
+ merge-sql: true
+ wall:
+ config:
+ multi-statement-allow: false
diff --git a/websocketSerivce/src/main/java/org/example/task/BitgetStock.java b/websocketSerivce/src/main/java/org/example/task/BitgetStock.java
index fba2768..58375d1 100644
--- a/websocketSerivce/src/main/java/org/example/task/BitgetStock.java
+++ b/websocketSerivce/src/main/java/org/example/task/BitgetStock.java
@@ -1,6 +1,7 @@
package org.example.task;
import com.alibaba.fastjson.JSON;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,6 +55,7 @@
* 同步bitget交易所交易对
*/
@Scheduled(cron = "0 0/45 * * * ?")
+// @Scheduled(cron = "0/10 * * * * ?")
public void syncCurrency() {
// 使用Lock来确保同步
syncCurrencyLock.lock();
@@ -85,7 +87,7 @@
getList.parallelStream().forEach(person -> person.setSymbol(StringUtils.remove(person.getSymbol(), "-")));
// 获取数据库中已有的symbol列表
- List<Currency> dbList = currencyService.list();
+ List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource,"bitget"));
Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet());
// 比对接口返回的数据和数据库中已有的数据,找出新增的数据
diff --git a/websocketSerivce/src/main/java/org/example/task/GateStock.java b/websocketSerivce/src/main/java/org/example/task/GateStock.java
index b8b29a4..402ece4 100644
--- a/websocketSerivce/src/main/java/org/example/task/GateStock.java
+++ b/websocketSerivce/src/main/java/org/example/task/GateStock.java
@@ -1,5 +1,6 @@
package org.example.task;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -44,6 +45,7 @@
* 同步gate交易所交易对
*/
@Scheduled(cron = "0 0/40 * * * ?")
+// @Scheduled(cron = "0/10 * * * * ?")
public void syncCurrency() {
// 使用Lock来确保同步
syncCurrencyLock.lock();
@@ -72,7 +74,7 @@
getList.parallelStream().forEach(person -> person.setId(StringUtils.remove(person.getId(), "_")));
// 获取数据库中已有的symbol列表
- List<Currency> dbList = currencyService.list();
+ List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet());
// 比对接口返回的数据和数据库中已有的数据,找出新增的数据
diff --git a/websocketSerivce/src/main/java/org/example/task/KucoinStock.java b/websocketSerivce/src/main/java/org/example/task/KucoinStock.java
index 9e6d6f5..a5efa8f 100644
--- a/websocketSerivce/src/main/java/org/example/task/KucoinStock.java
+++ b/websocketSerivce/src/main/java/org/example/task/KucoinStock.java
@@ -1,5 +1,6 @@
package org.example.task;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -41,6 +42,7 @@
* 同步kucoin交易所交易对
*/
@Scheduled(cron = "0 0/35 * * * ?")
+// @Scheduled(cron = "0/10 * * * * ?")
public void syncCurrency() {
// 使用Lock来确保同步
syncCurrencyLock.lock();
@@ -72,7 +74,7 @@
getList.parallelStream().forEach(person -> person.setSymbol(StringUtils.remove(person.getSymbol(), "-")));
// 获取数据库中已有的symbol列表
- List<Currency> dbList = currencyService.list();
+ List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin"));
Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet());
// 比对接口返回的数据和数据库中已有的数据,找出新增的数据
diff --git a/websocketSerivce/src/main/java/org/example/task/MexcStock.java b/websocketSerivce/src/main/java/org/example/task/MexcStock.java
index d4351e2..43db090 100644
--- a/websocketSerivce/src/main/java/org/example/task/MexcStock.java
+++ b/websocketSerivce/src/main/java/org/example/task/MexcStock.java
@@ -1,5 +1,6 @@
package org.example.task;
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -40,6 +41,7 @@
* 同步mexc交易所交易对
*/
@Scheduled(cron = "0 0/30 * * * ?")
+// @Scheduled(cron = "0/10 * * * * ?")
public void syncCurrency() {
// 使用Lock来确保同步
syncCurrencyLock.lock();
@@ -74,7 +76,7 @@
}.getType());
// 获取数据库中已有的symbol列表
- List<Currency> dbList = currencyService.list();
+ List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet());
// 比对接口返回的数据和数据库中已有的数据,找出新增的数据
--
Gitblit v1.9.3