From 378b31595f7eebdf46149fa2052cec41f7ce9565 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Sat, 20 Jul 2024 18:06:25 +0800
Subject: [PATCH] 1
---
websocketClient/src/main/java/org/example/wsClient/KucoinClient.java | 105 +++++++++++++++++++++++++++++++++++++++++++---------
1 files changed, 86 insertions(+), 19 deletions(-)
diff --git a/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java b/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
index 2700dc6..88ad686 100644
--- a/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
+++ b/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
@@ -1,5 +1,6 @@
package org.example.wsClient;
+import com.alibaba.druid.util.StringUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.reflect.TypeToken;
@@ -8,16 +9,27 @@
import lombok.extern.slf4j.Slf4j;
import org.example.pojo.Currency;
import org.example.util.RedisUtil;
+import org.jetbrains.annotations.NotNull;
+import org.json.JSONException;
+import java.io.*;
+import java.net.HttpURLConnection;
+import java.net.URL;
import javax.websocket.*;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.net.URI;
+import java.net.URLEncoder;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
/**
* @program: demo
@@ -28,7 +40,7 @@
@Slf4j
public class KucoinClient {
- private static final String WS_ENDPOINT = "wss://api.kucoinio.ws/ws/v4/";
+ private static final String WS_ENDPOINT = "wss://ws-api-spot.kucoin.com/spotMarket/level2Depth5:";
private static final long PING_INTERVAL = 20000;
private final List<Currency> subscriptions;
@@ -37,6 +49,8 @@
private String token;
private final Object lock = new Object(); // 添加一个锁对象
private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
+
+ private String id;
public KucoinClient(List<Currency> subscriptions,String token) {
this.subscriptions = subscriptions;
@@ -54,12 +68,6 @@
executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
- // 订阅消息
- for (Currency subscription : subscriptions) {
- String parameter = getParameter(subscription.getSymbol());
- session.getBasicRemote().sendText(parameter);
- }
-
synchronized (this) {
this.wait();
}
@@ -73,8 +81,24 @@
private void connect() throws Exception {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
- container.connectToServer(this, new URI(WS_ENDPOINT));
+ String url = extracted();
+ if(!StringUtils.isEmpty(url)){
+ container.connectToServer(this, new URI(url));
+ // 订阅消息
+ String parameter = subscription();
+ session.getBasicRemote().sendText(parameter);
+
+ }
}
+
+ private static final String CHARSET = "UTF-8"; // 定义字符集常量,避免魔法字符串
+
+ private String extracted() throws UnsupportedEncodingException {
+ String symbol = getSymbol();
+ String url = WS_ENDPOINT + symbol + "?token=" + URLEncoder.encode(token, CHARSET);
+ return url;
+ }
+
@OnOpen
public void onOpen(Session session) {
@@ -90,18 +114,30 @@
public void onMessage(String message) {
try {
Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
- Object object = map.get("result");
- Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
- if (null != resultMap && null != resultMap.get("s")) {
- HashMap<String,Object> hashMap = new HashMap<>();
- ObjectMapper mapper = new ObjectMapper();
- hashMap.put("bids",resultMap.get("bids"));
- hashMap.put("asks",resultMap.get("asks"));
- String key = "kucoin" + resultMap.get("s");
- if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){
- System.out.println(hashMap.toString());
+ if(null != map.get("id")){
+ this.id = map.get("id").toString();
+ }
+ if(null != map.get("data")){
+ Object object = map.get("data");
+ String topic = map.get("topic").toString();
+ Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
+ if (null != resultMap) {
+ HashMap<String,Object> hashMap = new HashMap<>();
+ ObjectMapper mapper = new ObjectMapper();
+ hashMap.put("bids",resultMap.get("bids"));
+ hashMap.put("asks",resultMap.get("asks"));
+
+ int index = topic.indexOf(":"); // 找到逗号的位置
+ if (index != -1) { // 如果找到了逗号
+ String substring = topic.substring(index + 1);
+ String symbol = substring.replaceAll("-", "");
+ String key = "kucoin" + symbol;
+ RedisUtil.set(key, mapper.writeValueAsString(hashMap));
+ } else {
+ // 处理未找到特定字符的情况
+ log.error("topic--->存入redis失败");
+ }
}
- RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
}
} catch (JsonSyntaxException e) {
log.error("JSON 解析异常:" + e.getMessage(), e);
@@ -167,6 +203,37 @@
log.error("发送心跳失败", e);
}
}
+
+ public String subscription() throws JsonProcessingException, JSONException {
+ String symbol = getSymbol();
+ // 创建一个ObjectMapper实例
+ ObjectMapper mapper = new ObjectMapper();
+
+ // 使用Map构建JSON对象
+ Map<String, Object> jsonMap = new HashMap<>();
+ jsonMap.put("id", id);
+ jsonMap.put("type", "subscribe");
+ jsonMap.put("topic", "/spotMarket/level2Depth50:"+symbol);
+ jsonMap.put("privateChannel", false);
+ jsonMap.put("response", true);
+
+ // 将Map转换为JSON字符串
+ String jsonString = mapper.writeValueAsString(jsonMap);
+ return jsonString;
+
+ }
+
+ @NotNull
+ private String getSymbol() {
+ String symbol;
+ List<String> symbolList = subscriptions.stream()
+ .map(currency -> currency.getSymbol().replaceAll("USDT", "-USDT"))
+ .collect(Collectors.toList());
+ symbol = String.join(",", symbolList);
+ return symbol;
+ }
+
+
}
--
Gitblit v1.9.3