| | |
| | | package org.example.wsClient; |
| | | |
| | | import com.alibaba.druid.util.StringUtils; |
| | | import com.fasterxml.jackson.core.JsonProcessingException; |
| | | import com.fasterxml.jackson.databind.ObjectMapper; |
| | | import com.google.common.reflect.TypeToken; |
| | |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.example.pojo.Currency; |
| | | import org.example.util.RedisUtil; |
| | | import org.jetbrains.annotations.NotNull; |
| | | import org.json.JSONException; |
| | | |
| | | import java.io.*; |
| | | import java.net.HttpURLConnection; |
| | | import java.net.URL; |
| | | import javax.websocket.*; |
| | | import java.io.BufferedReader; |
| | | import java.io.IOException; |
| | | import java.io.InputStreamReader; |
| | | import java.net.URI; |
| | | import java.net.URLEncoder; |
| | | import java.nio.ByteBuffer; |
| | | import java.util.Arrays; |
| | | import java.util.HashMap; |
| | | import java.util.List; |
| | | import java.util.Map; |
| | | import java.util.concurrent.Executors; |
| | | import java.util.concurrent.ScheduledExecutorService; |
| | | import java.util.concurrent.TimeUnit; |
| | | import java.util.stream.Collectors; |
| | | |
| | | /** |
| | | * @program: demo |
| | |
| | | @Slf4j |
| | | public class KucoinClient { |
| | | |
| | | private static final String WS_ENDPOINT = "wss://api.kucoinio.ws/ws/v4/"; |
| | | private static final String WS_ENDPOINT = "wss://ws-api-spot.kucoin.com/spotMarket/level2Depth5:"; |
| | | private static final long PING_INTERVAL = 20000; |
| | | |
| | | private final List<Currency> subscriptions; |
| | |
| | | private String token; |
| | | private final Object lock = new Object(); // 添加一个锁对象 |
| | | private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性 |
| | | |
| | | private String id; |
| | | |
| | | public KucoinClient(List<Currency> subscriptions,String token) { |
| | | this.subscriptions = subscriptions; |
| | |
| | | |
| | | executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS); |
| | | |
| | | // 订阅消息 |
| | | for (Currency subscription : subscriptions) { |
| | | String parameter = getParameter(subscription.getSymbol()); |
| | | session.getBasicRemote().sendText(parameter); |
| | | } |
| | | |
| | | synchronized (this) { |
| | | this.wait(); |
| | | } |
| | |
| | | |
| | | private void connect() throws Exception { |
| | | WebSocketContainer container = ContainerProvider.getWebSocketContainer(); |
| | | container.connectToServer(this, new URI(WS_ENDPOINT)); |
| | | String url = extracted(); |
| | | if(!StringUtils.isEmpty(url)){ |
| | | container.connectToServer(this, new URI(url)); |
| | | // 订阅消息 |
| | | String parameter = subscription(); |
| | | session.getBasicRemote().sendText(parameter); |
| | | |
| | | } |
| | | } |
| | | |
| | | private static final String CHARSET = "UTF-8"; // 定义字符集常量,避免魔法字符串 |
| | | |
| | | private String extracted() throws UnsupportedEncodingException { |
| | | String symbol = getSymbol(); |
| | | String url = WS_ENDPOINT + symbol + "?token=" + URLEncoder.encode(token, CHARSET); |
| | | return url; |
| | | } |
| | | |
| | | |
| | | @OnOpen |
| | | public void onOpen(Session session) { |
| | |
| | | public void onMessage(String message) { |
| | | try { |
| | | Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType()); |
| | | Object object = map.get("result"); |
| | | Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType()); |
| | | if (null != resultMap && null != resultMap.get("s")) { |
| | | HashMap<String,Object> hashMap = new HashMap<>(); |
| | | ObjectMapper mapper = new ObjectMapper(); |
| | | hashMap.put("bids",resultMap.get("bids")); |
| | | hashMap.put("asks",resultMap.get("asks")); |
| | | String key = "kucoin" + resultMap.get("s"); |
| | | if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){ |
| | | System.out.println(hashMap.toString()); |
| | | if(null != map.get("id")){ |
| | | this.id = map.get("id").toString(); |
| | | } |
| | | if(null != map.get("data")){ |
| | | Object object = map.get("data"); |
| | | String topic = map.get("topic").toString(); |
| | | Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType()); |
| | | if (null != resultMap) { |
| | | HashMap<String,Object> hashMap = new HashMap<>(); |
| | | ObjectMapper mapper = new ObjectMapper(); |
| | | hashMap.put("bids",resultMap.get("bids")); |
| | | hashMap.put("asks",resultMap.get("asks")); |
| | | |
| | | int index = topic.indexOf(":"); // 找到逗号的位置 |
| | | if (index != -1) { // 如果找到了逗号 |
| | | String substring = topic.substring(index + 1); |
| | | String symbol = substring.replaceAll("-", ""); |
| | | String key = "kucoin" + symbol; |
| | | RedisUtil.set(key, mapper.writeValueAsString(hashMap)); |
| | | } else { |
| | | // 处理未找到特定字符的情况 |
| | | log.error("topic--->存入redis失败"); |
| | | } |
| | | } |
| | | RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap)); |
| | | } |
| | | } catch (JsonSyntaxException e) { |
| | | log.error("JSON 解析异常:" + e.getMessage(), e); |
| | |
| | | log.error("发送心跳失败", e); |
| | | } |
| | | } |
| | | |
| | | public String subscription() throws JsonProcessingException, JSONException { |
| | | String symbol = getSymbol(); |
| | | // 创建一个ObjectMapper实例 |
| | | ObjectMapper mapper = new ObjectMapper(); |
| | | |
| | | // 使用Map构建JSON对象 |
| | | Map<String, Object> jsonMap = new HashMap<>(); |
| | | jsonMap.put("id", id); |
| | | jsonMap.put("type", "subscribe"); |
| | | jsonMap.put("topic", "/spotMarket/level2Depth50:"+symbol); |
| | | jsonMap.put("privateChannel", false); |
| | | jsonMap.put("response", true); |
| | | |
| | | // 将Map转换为JSON字符串 |
| | | String jsonString = mapper.writeValueAsString(jsonMap); |
| | | return jsonString; |
| | | |
| | | } |
| | | |
| | | @NotNull |
| | | private String getSymbol() { |
| | | String symbol; |
| | | List<String> symbolList = subscriptions.stream() |
| | | .map(currency -> currency.getSymbol().replaceAll("USDT", "-USDT")) |
| | | .collect(Collectors.toList()); |
| | | symbol = String.join(",", symbolList); |
| | | return symbol; |
| | | } |
| | | |
| | | |
| | | } |
| | | |
| | | |