1
zj
2024-07-20 378b31595f7eebdf46149fa2052cec41f7ce9565
1
2 files modified
152 ■■■■ changed files
websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java 47 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/wsClient/KucoinClient.java 105 ●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
@@ -1,6 +1,8 @@
package org.example.WsBean;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
@@ -15,6 +17,7 @@
import org.example.wsClient.GateClient;
import org.example.wsClient.KucoinClient;
import org.example.wsClient.MexcClient;
import org.json.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@@ -60,31 +63,33 @@
//        }
//    }
    @Bean
    public void gateWebsocketRunClientMap() {
        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
        if (!CollectionUtils.isEmpty(mexc)) {
            int batchSize = 100; // 每个线程处理的数据量
            int totalSize = mexc.size();
            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
            for (int i = 0; i < threadCount; i++) {
                int fromIndex = i * batchSize;
                int toIndex = Math.min(fromIndex + batchSize, totalSize);
                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
                // 使用自定义线程池提交任务
                threadPoolTaskExecutor.execute(new GateClient(sublist)::start);
            }
        }
    }
//    @Bean
//    public void gateWebsocketRunClientMap() {
//        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
//        if (!CollectionUtils.isEmpty(mexc)) {
//            int batchSize = 100; // 每个线程处理的数据量
//            int totalSize = mexc.size();
//            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
//
//            for (int i = 0; i < threadCount; i++) {
//                int fromIndex = i * batchSize;
//                int toIndex = Math.min(fromIndex + batchSize, totalSize);
//                List<Currency> sublist = mexc.subList(fromIndex, toIndex);
//
//                // 使用自定义线程池提交任务
//                threadPoolTaskExecutor.execute(new GateClient(sublist)::start);
//            }
//
//        }
//    }
    @Bean
    public void kucoinWebsocketRunClientMap() throws Exception {
        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin"));
        if (!CollectionUtils.isEmpty(mexc)) {
            String token = doPost();
            String result = doPost();
            JSONObject jsonObject = new JSONObject(result);
            String token = jsonObject.getJSONObject("data").getString("token");
            int batchSize = 100; // 每个线程处理的数据量
            int totalSize = mexc.size();
            int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
@@ -1,5 +1,6 @@
package org.example.wsClient;
import com.alibaba.druid.util.StringUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.reflect.TypeToken;
@@ -8,16 +9,27 @@
import lombok.extern.slf4j.Slf4j;
import org.example.pojo.Currency;
import org.example.util.RedisUtil;
import org.jetbrains.annotations.NotNull;
import org.json.JSONException;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import javax.websocket.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
 * @program: demo
@@ -28,7 +40,7 @@
@Slf4j
public class KucoinClient {
    private static final String WS_ENDPOINT = "wss://api.kucoinio.ws/ws/v4/";
    private static final String WS_ENDPOINT = "wss://ws-api-spot.kucoin.com/spotMarket/level2Depth5:";
    private static final long PING_INTERVAL = 20000;
    private final List<Currency> subscriptions;
@@ -37,6 +49,8 @@
    private String token;
    private final Object lock = new Object(); // 添加一个锁对象
    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
    private String id;
    public KucoinClient(List<Currency> subscriptions,String token) {
        this.subscriptions = subscriptions;
@@ -54,12 +68,6 @@
            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
            // 订阅消息
            for (Currency subscription : subscriptions) {
                String parameter = getParameter(subscription.getSymbol());
                session.getBasicRemote().sendText(parameter);
            }
            synchronized (this) {
                this.wait();
            }
@@ -73,8 +81,24 @@
    private void connect() throws Exception {
        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
        container.connectToServer(this, new URI(WS_ENDPOINT));
        String url = extracted();
        if(!StringUtils.isEmpty(url)){
            container.connectToServer(this, new URI(url));
            // 订阅消息
            String parameter = subscription();
            session.getBasicRemote().sendText(parameter);
        }
    }
    private static final String CHARSET = "UTF-8"; // 定义字符集常量,避免魔法字符串
    private String extracted() throws UnsupportedEncodingException {
        String symbol = getSymbol();
        String url = WS_ENDPOINT + symbol + "?token=" + URLEncoder.encode(token, CHARSET);
        return url;
    }
    @OnOpen
    public void onOpen(Session session) {
@@ -90,18 +114,30 @@
    public void onMessage(String message) {
        try {
            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
            Object object = map.get("result");
            Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
            if (null != resultMap && null != resultMap.get("s")) {
                HashMap<String,Object> hashMap = new HashMap<>();
                ObjectMapper mapper = new ObjectMapper();
                hashMap.put("bids",resultMap.get("bids"));
                hashMap.put("asks",resultMap.get("asks"));
                String key = "kucoin" + resultMap.get("s");
                if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){
                    System.out.println(hashMap.toString());
            if(null != map.get("id")){
                this.id = map.get("id").toString();
            }
            if(null != map.get("data")){
                Object object = map.get("data");
                String topic = map.get("topic").toString();
                Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
                if (null != resultMap) {
                    HashMap<String,Object> hashMap = new HashMap<>();
                    ObjectMapper mapper = new ObjectMapper();
                    hashMap.put("bids",resultMap.get("bids"));
                    hashMap.put("asks",resultMap.get("asks"));
                    int index = topic.indexOf(":"); // 找到逗号的位置
                    if (index != -1) { // 如果找到了逗号
                        String substring = topic.substring(index + 1);
                        String symbol = substring.replaceAll("-", "");
                        String key = "kucoin" + symbol;
                        RedisUtil.set(key, mapper.writeValueAsString(hashMap));
                    } else {
                        // 处理未找到特定字符的情况
                        log.error("topic--->存入redis失败");
                    }
                }
                RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
            }
        } catch (JsonSyntaxException e) {
            log.error("JSON 解析异常:" + e.getMessage(), e);
@@ -167,6 +203,37 @@
            log.error("发送心跳失败", e);
        }
    }
    public String subscription() throws JsonProcessingException, JSONException {
        String symbol = getSymbol();
        // 创建一个ObjectMapper实例
        ObjectMapper mapper = new ObjectMapper();
        // 使用Map构建JSON对象
        Map<String, Object> jsonMap = new HashMap<>();
        jsonMap.put("id", id);
        jsonMap.put("type", "subscribe");
        jsonMap.put("topic", "/spotMarket/level2Depth50:"+symbol);
        jsonMap.put("privateChannel", false);
        jsonMap.put("response", true);
        // 将Map转换为JSON字符串
        String jsonString = mapper.writeValueAsString(jsonMap);
        return jsonString;
    }
    @NotNull
    private String getSymbol() {
        String symbol;
        List<String> symbolList = subscriptions.stream()
                .map(currency -> currency.getSymbol().replaceAll("USDT", "-USDT"))
                .collect(Collectors.toList());
        symbol = String.join(",", symbolList);
        return symbol;
    }
}