1
zj
2024-07-18 bf47f0de2c4b4ca35ce08c7abd2b5b2477b31dac
1
7 files modified
1 files renamed
7 files added
3 files deleted
693 ■■■■ changed files
.idea/misc.xml 1 ●●●● patch | view | raw | blame | history
websocketClient/pom.xml 66 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java 10 ●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java 51 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/constant/StockConstant.java 28 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/controller/WsClientController.java 48 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/dao/CurrencyMapper.java 14 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/pojo/Currency.java 25 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/server/CurrencySerivce.java 9 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java 15 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/server/wcServer.java 69 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/util/RedisUtil.java 119 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/wsClient/MexcClient.java 148 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/resources/application.yml 74 ●●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/task/BitgetStock.java 4 ●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/task/GateStock.java 4 ●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/task/KucoinStock.java 4 ●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/task/MexcStock.java 4 ●●● patch | view | raw | blame | history
.idea/misc.xml
@@ -1,4 +1,3 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
  <component name="ExternalStorageConfigurationManager" enabled="true" />
  <component name="MavenProjectsManager">
websocketClient/pom.xml
@@ -12,6 +12,22 @@
    <artifactId>websocketClient</artifactId>
    <dependencies>
        <dependency>
            <groupId>com.google.guava</groupId>
            <artifactId>guava</artifactId>
            <version>20.0</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.7.0</version>
        </dependency>
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>3.4.3.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
@@ -40,6 +56,56 @@
            <artifactId>gson</artifactId>
            <version>2.8.8</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid-spring-boot-starter</artifactId>
            <version>1.2.11</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
        </dependency>
        <dependency>
            <groupId>org.mybatis.spring.boot</groupId>
            <artifactId>mybatis-spring-boot-starter</artifactId>
            <version>2.2.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.11.1</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>3.7.0</version>
        </dependency>
    </dependencies>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
websocketClient/src/main/java/org/example/ThreadConfig/AsyncConfiguration.java
File was renamed from websocketClient/src/main/java/org/example/config/AsyncConfiguration.java
@@ -1,4 +1,4 @@
package org.example.config;
package org.example.ThreadConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -17,11 +17,11 @@
    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(50);    //  核心线程数
        executor.setMaxPoolSize(100);    //  最大线程数
        executor.setQueueCapacity(300);    //  队列容量
        executor.setCorePoolSize(400);    //  核心线程数
        executor.setMaxPoolSize(1200);    //  最大线程数
        executor.setQueueCapacity(1500);    //  队列容量
        executor.setKeepAliveSeconds(60);    //  线程空闲时的存活时间为60秒
        executor.setThreadNamePrefix("MyThread-");    //  线程名称的前缀
        executor.setThreadNamePrefix("MexcThread-");    //  线程名称的前缀
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    //  使用  CallerRunsPolicy  拒绝策略
        return executor;
    }
websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
New file
@@ -0,0 +1,51 @@
package org.example.WsBean;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
import org.example.pojo.Currency;
import org.example.server.impl.CurrencySerivceImpl;
import org.example.wsClient.MexcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;
import java.util.List;
/**
 * @ClassDescription: 客户端请求类
 * @JdkVersion: 1.8
 * @Created: 2023/8/31 16:13
 */
@Slf4j
@Configuration
public class MexcWsBean {
    @Autowired
    private CurrencySerivceImpl currencyService;
    @Autowired
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    @Bean
    public void mexcWebsocketRunClientMap() {
        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
        if (!CollectionUtils.isEmpty(mexc)) {
            int batchSize = 30; // 每个线程处理的数据量
            int totalSize = mexc.size();
            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
            for (int i = 0; i < threadCount; i++) {
                int fromIndex = i * batchSize;
                int toIndex = Math.min(fromIndex + batchSize, totalSize);
                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
                // 使用自定义线程池提交任务
                threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
            }
        }
    }
}
websocketClient/src/main/java/org/example/constant/StockConstant.java
File was deleted
websocketClient/src/main/java/org/example/controller/WsClientController.java
File was deleted
websocketClient/src/main/java/org/example/dao/CurrencyMapper.java
New file
@@ -0,0 +1,14 @@
package org.example.dao;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.example.pojo.Currency;
/**
 * @program: demo
 * @description:
 * @create: 2024-07-15 17:45
 **/
@Mapper
public interface CurrencyMapper extends BaseMapper<Currency> {
}
websocketClient/src/main/java/org/example/pojo/Currency.java
New file
@@ -0,0 +1,25 @@
package org.example.pojo;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import lombok.Data;
/**
 * @program: demo
 * @description: 交易对
 * @create: 2024-07-15 17:41
 **/
@Data
public class Currency {
    @TableId(value = "id",type = IdType.AUTO)
    private String id;
    //交易对
    private String symbol;
    //交易币
    private String baseAsset;
    //计价币
    private String quoteAsset;
    //来源
    private String source;
}
websocketClient/src/main/java/org/example/server/CurrencySerivce.java
New file
@@ -0,0 +1,9 @@
package org.example.server;
/**
 * @program: demo
 * @description:
 * @create: 2024-07-16 15:23
 **/
public interface CurrencySerivce {
}
websocketClient/src/main/java/org/example/server/impl/CurrencySerivceImpl.java
New file
@@ -0,0 +1,15 @@
package org.example.server.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import org.example.dao.CurrencyMapper;
import org.example.pojo.Currency;
import org.springframework.stereotype.Service;
/**
 * @program: demo
 * @description:
 * @create: 2024-07-16 15:23
 **/
@Service
public class CurrencySerivceImpl extends ServiceImpl<CurrencyMapper, Currency> {
}
websocketClient/src/main/java/org/example/server/wcServer.java
File was deleted
websocketClient/src/main/java/org/example/util/RedisUtil.java
New file
@@ -0,0 +1,119 @@
package org.example.util;
import redis.clients.jedis.*;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class RedisUtil {
    private static JedisPool jedisPool;
    // 在静态代码块中初始化 JedisPool
    static {
        jedisPool = new JedisPool(new JedisPoolConfig(), "localhost", 6379);
    }
    public static void set(String key, String value) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.set(key, value);
        }
    }
    public static String get(String key) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.get(key);
        }
    }
    public static void hset(String key, String field, String value) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.hset(key, field, value);
        }
    }
    public static String hget(String key, String field) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.hget(key, field);
        }
    }
    public static void lpush(String key, String... values) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.lpush(key, values);
        }
    }
    public static List<String> lrange(String key, long start, long end) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.lrange(key, start, end);
        }
    }
    public static void sadd(String key, String... members) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.sadd(key, members);
        }
    }
    public static Set<String> smembers(String key) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.smembers(key);
        }
    }
    public static void zadd(String key, double score, String member) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.zadd(key, score, member);
        }
    }
    public static Set<String> zrange(String key, long start, long end) {
        try (Jedis jedis = jedisPool.getResource()) {
            return jedis.zrange(key, start, end);
        }
    }
    public static void delete(String key) {
        try (Jedis jedis = jedisPool.getResource()) {
            jedis.del(key);
        }
    }
    public static void disconnect() {
        jedisPool.close();
    }
    // 示例用法
    public static void main(String[] args) {
        // 直接使用 RedisUtil 的静态方法
        RedisUtil.set("mykey", "myvalue");
        String value = RedisUtil.get("mykey");
        System.out.println("Value for 'mykey': " + value);
        RedisUtil.hset("user:1", "name", "Alice");
        String userName = RedisUtil.hget("user:1", "name");
        System.out.println("Name for 'user:1': " + userName);
        RedisUtil.lpush("mylist", "element1", "element2", "element3");
        List<String> listValues = RedisUtil.lrange("mylist", 0, -1);
        System.out.println("Values in 'mylist': " + listValues);
        RedisUtil.sadd("myset", "member1", "member2", "member3");
        Set<String> setMembers = RedisUtil.smembers("myset");
        System.out.println("Members in 'myset': " + setMembers);
        RedisUtil.zadd("myzset", 1.0, "member1");
        RedisUtil.zadd("myzset", 2.0, "member2");
        Set<String> zsetMembers = RedisUtil.zrange("myzset", 0, -1);
        System.out.println("Members in 'myzset': " + zsetMembers);
        RedisUtil.delete("mykey");
        RedisUtil.delete("user:1");
        RedisUtil.delete("mylist");
        RedisUtil.delete("myset");
        RedisUtil.delete("myzset");
        RedisUtil.disconnect();
    }
}
websocketClient/src/main/java/org/example/wsClient/MexcClient.java
New file
@@ -0,0 +1,148 @@
package org.example.wsClient;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.gson.Gson;
import org.example.pojo.Currency;
import org.example.util.RedisUtil;
import javax.websocket.*;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@ClientEndpoint
public class MexcClient {
    private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址
    private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping
    private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间
    private final List<Currency> subscriptions;
    private final ScheduledExecutorService executorService;
    private Session session;
    private boolean reconnecting = false;
    public MexcClient(List<Currency> subscriptions) {
        this.subscriptions = subscriptions;
        this.executorService = Executors.newScheduledThreadPool(1);
    }
    public void start() {
        try {
            connect();
            if (session == null) {
                System.err.println("无法在超时时间内连接到服务器。");
                return;
            }
            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
            // 订阅消息
            for (Currency subscription : subscriptions) {
                String parameter = getParameter(subscription.getSymbol());
                session.getBasicRemote().sendText(parameter);
            }
            synchronized (this) {
                this.wait();
            }
        } catch (Exception e) {
            System.err.println("连接过程中发生异常: " + e.getMessage());
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
    private void connect() throws Exception {
        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
        container.connectToServer(this, new URI(WS_ENDPOINT));
    }
    @OnOpen
    public void onOpen(Session session) {
        System.out.println("已连接到服务器。");
        this.session = session;
        synchronized (this) {
            this.notify();
        }
    }
    @OnMessage
    public void onMessage(String message) {
        Gson gson = new Gson();
        Map map = gson.fromJson(message, Map.class);
        if (map != null && map.containsKey("s")) {
            RedisUtil.set("mexc" + map.get("s").toString(), message);
        }
        // 没有过滤撤单的数据
    }
    @OnClose
    public void onClose() {
        System.out.println("连接已关闭,尝试重新连接...");
        session = null;
        if (!reconnecting) {
            reconnect();
        }
    }
    @OnError
    public void onError(Throwable throwable) {
        System.err.println("发生错误: " + throwable.getMessage());
        if (!reconnecting) {
            reconnect();
        }
    }
    private void reconnect() {
        if (reconnecting) {
            return;
        }
        reconnecting = true;
        executorService.schedule(() -> {
            try {
                connect();
                reconnecting = false;
            } catch (Exception e) {
                e.printStackTrace();
                reconnect();
            }
        }, calculateBackoffTime(), TimeUnit.MILLISECONDS);
    }
    private long calculateBackoffTime() {
        // 实现退避策略,例如指数退避
        return 5000; // 例子:5秒
    }
    private void sendPing() {
        try {
            if (session != null) {
                session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    public String getParameter(String symbol) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        ObjectNode root = mapper.createObjectNode();
        root.put("method", "SUBSCRIPTION");
        ArrayNode paramsArray = mapper.createArrayNode();
        String customParam = String.format("spot@public.limit.depth.v3.api@%s@20", symbol);
        paramsArray.add(customParam);
        root.set("params", paramsArray);
        return mapper.writerWithDefaultPrettyPrinter().writeValueAsString(root);
    }
}
websocketClient/src/main/resources/application.yml
@@ -1,3 +1,75 @@
server:
  port: 8090
spring:
  # redis 配置
  redis:
    # 地址
    host: localhost
    # 端口,默认为6379
    port: 6379
    # 数据库索引
    database: 0
    # 密码
    password:
    # 连接超时时间
    timeout: 10s
    lettuce:
      pool:
        # 连接池中的最小空闲连接
        min-idle: 0
        # 连接池中的最大空闲连接
        max-idle: 99
        # 连接池的最大数据库连接数
        max-active: 99
        # #连接池最大阻塞等待时间(使用负值表示没有限制)
        max-wait: -1ms
  datasource:
    type: com.alibaba.druid.pool.DruidDataSource
    driverClassName: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/demo?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=true&serverTimezone=GMT%2B8
    username: root
    password: 123456
    druid:
      # 初始连接数
      initialSize: 5
      # 最小连接池数量
      minIdle: 10
      # 最大连接池数量
      maxActive: 20
      # 配置获取连接等待超时的时间
      maxWait: 60000
      # 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒
      timeBetweenEvictionRunsMillis: 60000
      # 配置一个连接在池中最小生存的时间,单位是毫秒
      minEvictableIdleTimeMillis: 300000
      # 配置一个连接在池中最大生存的时间,单位是毫秒
      maxEvictableIdleTimeMillis: 900000
      # 配置检测连接是否有效
      validationQuery: SELECT 1 FROM DUAL
      testWhileIdle: true
      testOnBorrow: false
      testOnReturn: false
      webStatFilter:
        enabled: true
      statViewServlet:
        enabled: true
        # 设置白名单,不填则允许所有访问
        allow:
        url-pattern: /druid/*
        # 控制台管理用户名和密码
        login-username: Greysparrow
        login-password: 123456
      filter:
        stat:
          enabled: true
          # 慢SQL记录
          log-slow-sql: true
          slow-sql-millis: 1000
          merge-sql: true
        wall:
          config:
            multi-statement-allow: false
websocketSerivce/src/main/java/org/example/task/BitgetStock.java
@@ -1,6 +1,7 @@
package org.example.task;
import com.alibaba.fastjson.JSON;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -54,6 +55,7 @@
     * 同步bitget交易所交易对
     */
    @Scheduled(cron = "0 0/45 * * * ?")
//    @Scheduled(cron = "0/10 * * * * ?")
    public void syncCurrency() {
        //  使用Lock来确保同步
        syncCurrencyLock.lock();
@@ -85,7 +87,7 @@
            getList.parallelStream().forEach(person -> person.setSymbol(StringUtils.remove(person.getSymbol(), "-")));
            //  获取数据库中已有的symbol列表
            List<Currency> dbList = currencyService.list();
            List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource,"bitget"));
            Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet());
            //  比对接口返回的数据和数据库中已有的数据,找出新增的数据
websocketSerivce/src/main/java/org/example/task/GateStock.java
@@ -1,5 +1,6 @@
package org.example.task;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -44,6 +45,7 @@
     * 同步gate交易所交易对
     */
    @Scheduled(cron = "0 0/40 * * * ?")
//    @Scheduled(cron = "0/10 * * * * ?")
    public void syncCurrency() {
        //  使用Lock来确保同步
        syncCurrencyLock.lock();
@@ -72,7 +74,7 @@
            getList.parallelStream().forEach(person -> person.setId(StringUtils.remove(person.getId(), "_")));
            //  获取数据库中已有的symbol列表
            List<Currency> dbList = currencyService.list();
            List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
            Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet());
            //  比对接口返回的数据和数据库中已有的数据,找出新增的数据
websocketSerivce/src/main/java/org/example/task/KucoinStock.java
@@ -1,5 +1,6 @@
package org.example.task;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -41,6 +42,7 @@
     * 同步kucoin交易所交易对
     */
    @Scheduled(cron = "0 0/35 * * * ?")
//    @Scheduled(cron = "0/10 * * * * ?")
    public void syncCurrency() {
        //  使用Lock来确保同步
        syncCurrencyLock.lock();
@@ -72,7 +74,7 @@
            getList.parallelStream().forEach(person -> person.setSymbol(StringUtils.remove(person.getSymbol(), "-")));
            //  获取数据库中已有的symbol列表
            List<Currency> dbList = currencyService.list();
            List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin"));
            Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet());
            //  比对接口返回的数据和数据库中已有的数据,找出新增的数据
websocketSerivce/src/main/java/org/example/task/MexcStock.java
@@ -1,5 +1,6 @@
package org.example.task;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -40,6 +41,7 @@
     * 同步mexc交易所交易对
     */
    @Scheduled(cron = "0 0/30 * * * ?")
//    @Scheduled(cron = "0/10 * * * * ?")
    public void syncCurrency() {
        //  使用Lock来确保同步
        syncCurrencyLock.lock();
@@ -74,7 +76,7 @@
            }.getType());
            //  获取数据库中已有的symbol列表
            List<Currency> dbList = currencyService.list();
            List<Currency> dbList = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
            Set<String> symbolSet = dbList.stream().map(Currency::getSymbol).collect(Collectors.toSet());
            //  比对接口返回的数据和数据库中已有的数据,找出新增的数据