.idea/misc.xml
@@ -1,4 +1,3 @@ <?xml version="1.0" encoding="UTF-8"?> <project version="4"> <component name="ExternalStorageConfigurationManager" enabled="true" /> <component name="MavenProjectsManager"> websocketClient/pom.xml
@@ -12,6 +12,22 @@ <artifactId>websocketClient</artifactId> <dependencies> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>20.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.7.0</version> </dependency> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>3.4.3.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> @@ -40,6 +56,56 @@ <artifactId>gson</artifactId> <version>2.8.8</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid-spring-boot-starter</artifactId> <version>1.2.11</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <dependency> <groupId>org.mybatis.spring.boot</groupId> <artifactId>mybatis-spring-boot-starter</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.11.1</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.7.0</version> </dependency> </dependencies> <properties> <maven.compiler.source>8</maven.compiler.source> websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java
File was renamed from websocketClient/src/main/java/org/example/config/AsyncConfiguration.java @@ -1,4 +1,4 @@ package org.example.config; package org.example.ThreadConfig; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -17,11 +17,11 @@ @Bean(name = "threadPoolTaskExecutor") public ThreadPoolTaskExecutor threadPoolTaskExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(50); // 核心线程数 executor.setMaxPoolSize(100); // 最大线程数 executor.setQueueCapacity(300); // 队列容量 executor.setCorePoolSize(400); // 核心线程数 executor.setMaxPoolSize(1200); // 最大线程数 executor.setQueueCapacity(1500); // 队列容量 executor.setKeepAliveSeconds(60); // 线程空闲时的存活时间为60秒 executor.setThreadNamePrefix("MyThread-"); // 线程名称的前缀 executor.setThreadNamePrefix("MexcThread-"); // 线程名称的前缀 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); // 使用 CallerRunsPolicy 拒绝策略 return executor; } websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
New file @@ -0,0 +1,51 @@ package org.example.WsBean; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.example.pojo.Currency; import org.example.server.impl.CurrencySerivceImpl; import org.example.wsClient.MexcClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.CollectionUtils; import java.util.List; /** * @ClassDescription: 客户端请求类 * @JdkVersion: 1.8 * @Created: 2023/8/31 16:13 */ @Slf4j @Configuration public class MexcWsBean { @Autowired private CurrencySerivceImpl currencyService; @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; @Bean public void mexcWebsocketRunClientMap() { List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc")); if (!CollectionUtils.isEmpty(mexc)) { int batchSize = 30; // 每个线程处理的数据量 int totalSize = mexc.size(); int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 for (int i = 0; i < threadCount; i++) { int fromIndex = i * batchSize; int toIndex = Math.min(fromIndex + batchSize, totalSize); List<Currency> sublist = mexc.subList(fromIndex, toIndex); // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(new MexcClient(sublist)::start); } } } } websocketClient/src/main/java/org/example/constant/StockConstant.java
File was deleted websocketClient/src/main/java/org/example/controller/WsClientController.java
File was deleted websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
New file @@ -0,0 +1,14 @@ package org.example.dao; import com.baomidou.mybatisplus.core.mapper.BaseMapper; import org.apache.ibatis.annotations.Mapper; import org.example.pojo.Currency; /** * @program: demo * @description: * @create: 2024-07-15 17:45 **/ @Mapper public interface CurrencyMapper extends BaseMapper<Currency> { } websocketClient/src/main/java/org/example/pojo/Currency.java
New file @@ -0,0 +1,25 @@ package org.example.pojo; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; import lombok.Data; /** * @program: demo * @description: 交易对 * @create: 2024-07-15 17:41 **/ @Data public class Currency { @TableId(value = "id",type = IdType.AUTO) private String id; //交易对 private String symbol; //交易币 private String baseAsset; //计价币 private String quoteAsset; //来源 private String source; } websocketClient/src/main/java/org/example/server/CurrencySerivce.java
New file @@ -0,0 +1,9 @@ package org.example.server; /** * @program: demo * @description: * @create: 2024-07-16 15:23 **/ public interface CurrencySerivce { } websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
New file @@ -0,0 +1,15 @@ package org.example.server.impl; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import org.example.dao.CurrencyMapper; import org.example.pojo.Currency; import org.springframework.stereotype.Service; /** * @program: demo * @description: * @create: 2024-07-16 15:23 **/ @Service public class CurrencySerivceImpl extends ServiceImpl<CurrencyMapper, Currency> { } websocketClient/src/main/java/org/example/server/wcServer.java
File was deleted websocketClient/src/main/java/org/example/util/RedisUtil.java
New file @@ -0,0 +1,119 @@ package org.example.util; import redis.clients.jedis.*; import java.util.List; import java.util.Map; import java.util.Set; public class RedisUtil { private static JedisPool jedisPool; // 在静态代码块中初始化 JedisPool static { jedisPool = new JedisPool(new JedisPoolConfig(), "localhost", 6379); } public static void set(String key, String value) { try (Jedis jedis = jedisPool.getResource()) { jedis.set(key, value); } } public static String get(String key) { try (Jedis jedis = jedisPool.getResource()) { return jedis.get(key); } } public static void hset(String key, String field, String value) { try (Jedis jedis = jedisPool.getResource()) { jedis.hset(key, field, value); } } public static String hget(String key, String field) { try (Jedis jedis = jedisPool.getResource()) { return jedis.hget(key, field); } } public static void lpush(String key, String... values) { try (Jedis jedis = jedisPool.getResource()) { jedis.lpush(key, values); } } public static List<String> lrange(String key, long start, long end) { try (Jedis jedis = jedisPool.getResource()) { return jedis.lrange(key, start, end); } } public static void sadd(String key, String... members) { try (Jedis jedis = jedisPool.getResource()) { jedis.sadd(key, members); } } public static Set<String> smembers(String key) { try (Jedis jedis = jedisPool.getResource()) { return jedis.smembers(key); } } public static void zadd(String key, double score, String member) { try (Jedis jedis = jedisPool.getResource()) { jedis.zadd(key, score, member); } } public static Set<String> zrange(String key, long start, long end) { try (Jedis jedis = jedisPool.getResource()) { return jedis.zrange(key, start, end); } } public static void delete(String key) { try (Jedis jedis = jedisPool.getResource()) { jedis.del(key); } } public static void disconnect() { jedisPool.close(); } // 示例用法 public static void main(String[] args) { // 直接使用 RedisUtil 的静态方法 RedisUtil.set("mykey", "myvalue"); String value = RedisUtil.get("mykey"); System.out.println("Value for 'mykey': " + value); RedisUtil.hset("user:1", "name", "Alice"); String userName = RedisUtil.hget("user:1", "name"); System.out.println("Name for 'user:1': " + userName); RedisUtil.lpush("mylist", "element1", "element2", "element3"); List<String> listValues = RedisUtil.lrange("mylist", 0, -1); System.out.println("Values in 'mylist': " + listValues); RedisUtil.sadd("myset", "member1", "member2", "member3"); Set<String> setMembers = RedisUtil.smembers("myset"); System.out.println("Members in 'myset': " + setMembers); RedisUtil.zadd("myzset", 1.0, "member1"); RedisUtil.zadd("myzset", 2.0, "member2"); Set<String> zsetMembers = RedisUtil.zrange("myzset", 0, -1); System.out.println("Members in 'myzset': " + zsetMembers); RedisUtil.delete("mykey"); RedisUtil.delete("user:1"); RedisUtil.delete("mylist"); RedisUtil.delete("myset"); RedisUtil.delete("myzset"); RedisUtil.disconnect(); } } websocketClient/src/main/java/org/example/wsClient/MexcClient.java
New file @@ -0,0 +1,148 @@ package org.example.wsClient; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.gson.Gson; import org.example.pojo.Currency; import org.example.util.RedisUtil; import javax.websocket.*; import java.io.IOException; import java.io.UncheckedIOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.concurrent.*; @ClientEndpoint public class MexcClient { private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址 private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间 private final List<Currency> subscriptions; private final ScheduledExecutorService executorService; private Session session; private boolean reconnecting = false; public MexcClient(List<Currency> subscriptions) { this.subscriptions = subscriptions; this.executorService = Executors.newScheduledThreadPool(1); } public void start() { try { connect(); if (session == null) { System.err.println("无法在超时时间内连接到服务器。"); return; } executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS); // 订阅消息 for (Currency subscription : subscriptions) { String parameter = getParameter(subscription.getSymbol()); session.getBasicRemote().sendText(parameter); } synchronized (this) { this.wait(); } } catch (Exception e) { System.err.println("连接过程中发生异常: " + e.getMessage()); e.printStackTrace(); } finally { executorService.shutdown(); } } private void connect() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(this, new URI(WS_ENDPOINT)); } @OnOpen public void onOpen(Session session) { System.out.println("已连接到服务器。"); this.session = session; synchronized (this) { this.notify(); } } @OnMessage public void onMessage(String message) { Gson gson = new Gson(); Map map = gson.fromJson(message, Map.class); if (map != null && map.containsKey("s")) { RedisUtil.set("mexc" + map.get("s").toString(), message); } // 没有过滤撤单的数据 } @OnClose public void onClose() { System.out.println("连接已关闭,尝试重新连接..."); session = null; if (!reconnecting) { reconnect(); } } @OnError public void onError(Throwable throwable) { System.err.println("发生错误: " + throwable.getMessage()); if (!reconnecting) { reconnect(); } } private void reconnect() { if (reconnecting) { return; } reconnecting = true; executorService.schedule(() -> { try { connect(); reconnecting = false; } catch (Exception e) { e.printStackTrace(); reconnect(); } }, calculateBackoffTime(), TimeUnit.MILLISECONDS); } private long calculateBackoffTime() { // 实现退避策略,例如指数退避 return 5000; // 例子:5秒 } private void sendPing() { try { if (session != null) { session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); } } catch (Exception e) { e.printStackTrace(); } } public String getParameter(String symbol) throws JsonProcessingException { ObjectMapper mapper = new ObjectMapper(); ObjectNode root = mapper.createObjectNode(); root.put("method", "SUBSCRIPTION"); ArrayNode paramsArray = mapper.createArrayNode(); String customParam = String.format("spot@public.limit.depth.v3.api@%s@20", symbol); paramsArray.add(customParam); root.set("params", paramsArray); return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(root); } } websocketClient/src/main/resources/application.yml
@@ -1,3 +1,75 @@ server: port: 8090 spring: # redis 配置 redis: # 地址 host: localhost # 端口,默认为6379 port: 6379 # 数据库索引 database: 0 # 密码 password: # 连接超时时间 timeout: 10s lettuce: pool: # 连接池中的最小空闲连接 min-idle: 0 # 连接池中的最大空闲连接 max-idle: 99 # 连接池的最大数据库连接数 max-active: 99 # #连接池最大阻塞等待时间(使用负值表示没有限制) max-wait: -1ms datasource: type: com.alibaba.druid.pool.DruidDataSource driverClassName: com.mysql.cj.jdbc.Driver url: jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8 username: root password: 123456 druid: # 初始连接数 initialSize: 5 # 最小连接池数量 minIdle: 10 # 最大连接池数量 maxActive: 20 # 配置获取连接等待超时的时间 maxWait: 60000 # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 timeBetweenEvictionRunsMillis: 60000 # 配置一个连接在池中最小生存的时间,单位是毫秒 minEvictableIdleTimeMillis: 300000 # 配置一个连接在池中最大生存的时间,单位是毫秒 maxEvictableIdleTimeMillis: 900000 # 配置检测连接是否有效 validationQuery: SELECT 1 FROM DUAL testWhileIdle: true testOnBorrow: false testOnReturn: false webStatFilter: enabled: true statViewServlet: enabled: true # 设置白名单,不填则允许所有访问 allow: url-pattern: /druid/* # 控制台管理用户名和密码 login-username: Greysparrow login-password: 123456 filter: stat: enabled: true # 慢SQL记录 log-slow-sql: true slow-sql-millis: 1000 merge-sql: true wall: config: multi-statement-allow: false websocketSerivce/src/main/java/org/example/task/BitgetStock.java
@@ -1,6 +1,7 @@ package org.example.task; import com.alibaba.fastjson.JSON; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; @@ -54,6 +55,7 @@ * 同步bitget交易所交易对 */ @Scheduled(cron = "0 0/45 * * * ?") // @Scheduled(cron = "0/10 * * * * ?") public void syncCurrency() { // 使用Lock来确保同步 syncCurrencyLock.lock(); @@ -85,7 +87,7 @@ getList.parallelStream().forEach(person -> person.setSymbol(StringUtils.remove(person.getSymbol(), "-"))); // 获取数据库中已有的symbol列表 List<Currency> dbList = currencyService.list(); List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource,"bitget")); Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); // 比对接口返回的数据和数据库中已有的数据,找出新增的数据 websocketSerivce/src/main/java/org/example/task/GateStock.java
@@ -1,5 +1,6 @@ package org.example.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -44,6 +45,7 @@ * 同步gate交易所交易对 */ @Scheduled(cron = "0 0/40 * * * ?") // @Scheduled(cron = "0/10 * * * * ?") public void syncCurrency() { // 使用Lock来确保同步 syncCurrencyLock.lock(); @@ -72,7 +74,7 @@ getList.parallelStream().forEach(person -> person.setId(StringUtils.remove(person.getId(), "_"))); // 获取数据库中已有的symbol列表 List<Currency> dbList = currencyService.list(); List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate")); Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); // 比对接口返回的数据和数据库中已有的数据,找出新增的数据 websocketSerivce/src/main/java/org/example/task/KucoinStock.java
@@ -1,5 +1,6 @@ package org.example.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -41,6 +42,7 @@ * 同步kucoin交易所交易对 */ @Scheduled(cron = "0 0/35 * * * ?") // @Scheduled(cron = "0/10 * * * * ?") public void syncCurrency() { // 使用Lock来确保同步 syncCurrencyLock.lock(); @@ -72,7 +74,7 @@ getList.parallelStream().forEach(person -> person.setSymbol(StringUtils.remove(person.getSymbol(), "-"))); // 获取数据库中已有的symbol列表 List<Currency> dbList = currencyService.list(); List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin")); Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); // 比对接口返回的数据和数据库中已有的数据,找出新增的数据 websocketSerivce/src/main/java/org/example/task/MexcStock.java
@@ -1,5 +1,6 @@ package org.example.task; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.toolkit.CollectionUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; @@ -40,6 +41,7 @@ * 同步mexc交易所交易对 */ @Scheduled(cron = "0 0/30 * * * ?") // @Scheduled(cron = "0/10 * * * * ?") public void syncCurrency() { // 使用Lock来确保同步 syncCurrencyLock.lock(); @@ -74,7 +76,7 @@ }.getType()); // 获取数据库中已有的symbol列表 List<Currency> dbList = currencyService.list(); List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc")); Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet()); // 比对接口返回的数据和数据库中已有的数据,找出新增的数据