1
zj
2024-07-20 378b31595f7eebdf46149fa2052cec41f7ce9565
websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
@@ -1,5 +1,6 @@
package org.example.wsClient;
import com.alibaba.druid.util.StringUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.reflect.TypeToken;
@@ -8,16 +9,27 @@
import lombok.extern.slf4j.Slf4j;
import org.example.pojo.Currency;
import org.example.util.RedisUtil;
import org.jetbrains.annotations.NotNull;
import org.json.JSONException;
import java.io.*;
import java.net.HttpURLConnection;
import java.net.URL;
import javax.websocket.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URLEncoder;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
 * @program: demo
@@ -28,7 +40,7 @@
@Slf4j
public class KucoinClient {
    private static final String WS_ENDPOINT = "wss://api.kucoinio.ws/ws/v4/";
    private static final String WS_ENDPOINT = "wss://ws-api-spot.kucoin.com/spotMarket/level2Depth5:";
    private static final long PING_INTERVAL = 20000;
    private final List<Currency> subscriptions;
@@ -37,6 +49,8 @@
    private String token;
    private final Object lock = new Object(); // 添加一个锁对象
    private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
    private String id;
    public KucoinClient(List<Currency> subscriptions,String token) {
        this.subscriptions = subscriptions;
@@ -54,12 +68,6 @@
            executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
            // 订阅消息
            for (Currency subscription : subscriptions) {
                String parameter = getParameter(subscription.getSymbol());
                session.getBasicRemote().sendText(parameter);
            }
            synchronized (this) {
                this.wait();
            }
@@ -73,8 +81,24 @@
    private void connect() throws Exception {
        WebSocketContainer container = ContainerProvider.getWebSocketContainer();
        container.connectToServer(this, new URI(WS_ENDPOINT));
        String url = extracted();
        if(!StringUtils.isEmpty(url)){
            container.connectToServer(this, new URI(url));
            // 订阅消息
            String parameter = subscription();
            session.getBasicRemote().sendText(parameter);
        }
    }
    private static final String CHARSET = "UTF-8"; // 定义字符集常量,避免魔法字符串
    private String extracted() throws UnsupportedEncodingException {
        String symbol = getSymbol();
        String url = WS_ENDPOINT + symbol + "?token=" + URLEncoder.encode(token, CHARSET);
        return url;
    }
    @OnOpen
    public void onOpen(Session session) {
@@ -90,18 +114,30 @@
    public void onMessage(String message) {
        try {
            Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
            Object object = map.get("result");
            Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
            if (null != resultMap && null != resultMap.get("s")) {
                HashMap<String,Object> hashMap = new HashMap<>();
                ObjectMapper mapper = new ObjectMapper();
                hashMap.put("bids",resultMap.get("bids"));
                hashMap.put("asks",resultMap.get("asks"));
                String key = "kucoin" + resultMap.get("s");
                if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){
                    System.out.println(hashMap.toString());
            if(null != map.get("id")){
                this.id = map.get("id").toString();
            }
            if(null != map.get("data")){
                Object object = map.get("data");
                String topic = map.get("topic").toString();
                Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
                if (null != resultMap) {
                    HashMap<String,Object> hashMap = new HashMap<>();
                    ObjectMapper mapper = new ObjectMapper();
                    hashMap.put("bids",resultMap.get("bids"));
                    hashMap.put("asks",resultMap.get("asks"));
                    int index = topic.indexOf(":"); // 找到逗号的位置
                    if (index != -1) { // 如果找到了逗号
                        String substring = topic.substring(index + 1);
                        String symbol = substring.replaceAll("-", "");
                        String key = "kucoin" + symbol;
                        RedisUtil.set(key, mapper.writeValueAsString(hashMap));
                    } else {
                        // 处理未找到特定字符的情况
                        log.error("topic--->存入redis失败");
                    }
                }
                RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
            }
        } catch (JsonSyntaxException e) {
            log.error("JSON 解析异常:" + e.getMessage(), e);
@@ -167,6 +203,37 @@
            log.error("发送心跳失败", e);
        }
    }
    public String subscription() throws JsonProcessingException, JSONException {
        String symbol = getSymbol();
        // 创建一个ObjectMapper实例
        ObjectMapper mapper = new ObjectMapper();
        // 使用Map构建JSON对象
        Map<String, Object> jsonMap = new HashMap<>();
        jsonMap.put("id", id);
        jsonMap.put("type", "subscribe");
        jsonMap.put("topic", "/spotMarket/level2Depth50:"+symbol);
        jsonMap.put("privateChannel", false);
        jsonMap.put("response", true);
        // 将Map转换为JSON字符串
        String jsonString = mapper.writeValueAsString(jsonMap);
        return jsonString;
    }
    @NotNull
    private String getSymbol() {
        String symbol;
        List<String> symbolList = subscriptions.stream()
                .map(currency -> currency.getSymbol().replaceAll("USDT", "-USDT"))
                .collect(Collectors.toList());
        symbol = String.join(",", symbolList);
        return symbol;
    }
}