| websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java | ●●●●● patch | view | raw | blame | history | |
| websocketClient/src/main/java/org/example/wsClient/BitgetClient.java | ●●●●● patch | view | raw | blame | history | |
| websocketClient/src/main/java/org/example/wsClient/GateClient.java | ●●●●● patch | view | raw | blame | history |
websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
@@ -1,6 +1,8 @@ package org.example.WsBean; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; @@ -14,9 +16,11 @@ import org.apache.http.util.EntityUtils; import org.example.pojo.Currency; import org.example.server.impl.CurrencySerivceImpl; import org.example.wsClient.BitgetClient; import org.example.wsClient.GateClient; import org.example.wsClient.KucoinClient; import org.example.wsClient.MexcClient; import org.json.JSONException; import org.json.JSONObject; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -25,8 +29,10 @@ import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * @ClassDescription: 客户端请求类 @@ -84,12 +90,9 @@ // } @Bean public void kucoinWebsocketRunClientMap() throws Exception { List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin")); public void bitgetWebsocketRunClientMap() throws JSONException, JsonProcessingException { List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "bitget")); if (!CollectionUtils.isEmpty(mexc)) { String result = doPost(); JSONObject jsonObject = new JSONObject(result); String token = jsonObject.getJSONObject("data").getString("token"); int batchSize = 100; // 每个线程处理的数据量 int totalSize = mexc.size(); int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 @@ -98,13 +101,36 @@ int fromIndex = i * batchSize; int toIndex = Math.min(fromIndex + batchSize, totalSize); List<Currency> sublist = mexc.subList(fromIndex, toIndex); String parameter = getParameter(sublist); // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start); threadPoolTaskExecutor.execute(new BitgetClient(parameter)::start); } } } // // @Bean // public void kucoinWebsocketRunClientMap() throws Exception { // List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "kucoin")); // if (!CollectionUtils.isEmpty(mexc)) { // String result = doPost(); // JSONObject jsonObject = new JSONObject(result); // String token = jsonObject.getJSONObject("data").getString("token"); // int batchSize = 100; // 每个线程处理的数据量 // int totalSize = mexc.size(); // int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 // // for (int i = 0; i < threadCount; i++) { // int fromIndex = i * batchSize; // int toIndex = Math.min(fromIndex + batchSize, totalSize); // List<Currency> sublist = mexc.subList(fromIndex, toIndex); // // // 使用自定义线程池提交任务 // threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start); // } // // } // } public static String doPost() throws Exception { String url = "https://api.kucoin.com/api/v1/bullet-public"; @@ -118,5 +144,28 @@ defaultHttpClient.getConnectionManager().shutdown(); return text; } public String getParameter(List<Currency> list) throws JsonProcessingException, JSONException { // 创建一个ObjectMapper实例 ObjectMapper mapper = new ObjectMapper(); List<String> symbolList = list.stream().map(Currency::getSymbol).collect(Collectors.toList()); // 使用Map构建JSON对象 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("op", "subscribe"); List<Map<String, String>> mapList = new ArrayList<>(); symbolList.forEach(f->{ Map<String, String> argsMap = new HashMap<>(); argsMap.put("instType", "SPOT"); argsMap.put("channel", "books15"); argsMap.put("instId", f); mapList.add(argsMap); }); jsonMap.put("args", mapList); // 将Map转换为JSON字符串 String jsonString = mapper.writeValueAsString(jsonMap); return jsonString; } } websocketClient/src/main/java/org/example/wsClient/BitgetClient.java
New file @@ -0,0 +1,186 @@ package org.example.wsClient; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.pojo.Currency; import org.example.util.RedisUtil; import org.json.JSONException; import org.json.JSONObject; import javax.websocket.*; import java.net.URI; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @ClientEndpoint @Slf4j public class BitgetClient { private static final String WS_ENDPOINT = "wss://ws.bitget.com/v2/ws/public"; private static final long PING_INTERVAL = 20000; private final String subscriptions; private final ScheduledExecutorService executorService; private Session session; private final Object lock = new Object(); // 添加一个锁对象 private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性 public BitgetClient(String subscriptions) { this.subscriptions = subscriptions; this.executorService = Executors.newScheduledThreadPool(1); } public void start() { try { connect(); if (session == null) { log.info("无法在超时时间内连接到服务器。"); return; } executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS); // 订阅消息 session.getBasicRemote().sendText(subscriptions); synchronized (this) { this.wait(); } } catch (Exception e) { log.error("bitget ws 连接过程中发生异常: " + e.getMessage(), e); } finally { executorService.shutdown(); } } private void connect() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(this, new URI(WS_ENDPOINT)); } @OnOpen public void onOpen(Session session) { log.info("bitget ws 已连接到服务器。"); this.session = session; synchronized (this) { this.notify(); } } private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例 @OnMessage public void onMessage(String message) { try { JSONObject jsonObject = new JSONObject(message); if (null != jsonObject && null != jsonObject.getJSONObject("arg").getString("instId") && null != jsonObject.get("data")) { HashMap<String,Object> hashMap = new HashMap<>(); ObjectMapper mapper = new ObjectMapper(); hashMap.put("bids",jsonObject.getJSONObject("data").getString("bids")); hashMap.put("asks",jsonObject.getJSONObject("data").getString("asks")); String key = "bitget" + jsonObject.getJSONObject("arg").getString("instId"); RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap)); } } catch (JsonSyntaxException e) { log.error("JSON 解析异常:" + e.getMessage(), e); } catch (JsonProcessingException e) { log.error("JSON 解析异常:" + e.getMessage(), e); } catch (JSONException e) { throw new RuntimeException(e); } } @OnClose public void onClose() { log.info("bitget ws 连接已关闭,尝试重新连接..."); handleConnectionClosedOrError(); } @OnError public void onError(Throwable throwable) { log.error("bitget ws 发生错误: " + throwable.getMessage(), throwable); handleConnectionClosedOrError(); } private void handleConnectionClosedOrError() { synchronized (lock) { if (!reconnecting) { reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 } } } private void attemptReconnect() { boolean doReconnect = true; try { log.info("bitget ws 开始重连"); connect(); // 假设 connect() 方法用于实际的连接逻辑 log.info("bitget ws 重连成功"); } catch (Exception e) { log.error("bitget ws 重连失败", e); // 连接失败时,可以根据具体情况决定是否继续重连 // 在这里假设总是继续尝试重连 } finally { synchronized (lock) { if (doReconnect) { scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务 } else { reconnecting = false; // 重连结束后设置 reconnecting 为 false } } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); } } private void sendPing() { try { if (session != null) { session.getBasicRemote().sendText("ping"); // 发送心跳消息 } } catch (Exception e) { log.error("发送心跳失败", e); } } public String getParameter(String symbol) throws JsonProcessingException, JSONException { // 创建一个ObjectMapper实例 ObjectMapper mapper = new ObjectMapper(); // 使用Map构建JSON对象 Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("op", "subscribe"); Map<String, Object> argsMap = new HashMap<>(); jsonMap.put("args", argsMap); argsMap.put("instType", "SPOT"); argsMap.put("channel", "books15"); argsMap.put("instId", symbol); // 将Map转换为JSON字符串 String jsonString = mapper.writeValueAsString(jsonMap); return jsonString; } } websocketClient/src/main/java/org/example/wsClient/GateClient.java
@@ -11,6 +11,7 @@ import lombok.extern.slf4j.Slf4j; import org.example.pojo.Currency; import org.example.util.RedisUtil; import org.json.JSONArray; import org.json.JSONException; import org.json.JSONObject; @@ -18,10 +19,7 @@ import java.math.BigDecimal; import java.net.URI; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.*; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -101,12 +99,11 @@ if (null != resultMap && null != resultMap.get("s")) { HashMap<String,Object> hashMap = new HashMap<>(); ObjectMapper mapper = new ObjectMapper(); hashMap.put("bids",resultMap.get("bids")); hashMap.put("asks",resultMap.get("asks")); String key = "gate" + resultMap.get("s"); if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){ System.out.println(hashMap.toString()); } RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap)); } } catch (JsonSyntaxException e) { @@ -202,12 +199,38 @@ } public static void main(String[] args) { String scientificNotation = "5.0E-5"; // 科学计数法字符串 BigDecimal bigDecimal = new BigDecimal(scientificNotation); // public static void main(String[] args) throws JSONException { // // 从 resultMap 中获取 "bids" 对应的 JSON 数组 // JSONArray jsonArray = new JSONArray(resultMap.get("bids").toString()); // 将获取的 bids 转换成 JSON 数组 // List<List<String>> resultList = new ArrayList<>(); // 存放所有的内层列表 // // // 遍历 JSON 数组 // for (int i = 0; i < jsonArray.length(); i++) { // JSONArray innerArray = jsonArray.getJSONArray(i); // 获取当前内层 JSON 数组 // List<String> innerList = new ArrayList<>(); // 存放当前内层数组的元素 // // // 遍历内层 JSON 数组 // for (int j = 0; j < innerArray.length(); j++) { // innerList.add(innerArray.getString(j)); // 将元素添加到内层列表中 // } // // resultList.add(innerList); // 将内层列表添加到结果列表中 // } // // // 考虑去掉未使用的 dataList,下面的代码使用 resultList 而不是 dataList // List<Map<String, String>> resultMapList = new ArrayList<>(); // 存放最终的映射结果 // for (List<String> entry : resultList) { // 遍历 resultList 中的每个内层列表 // // 确保每个内层列表有足够的元素,再进行映射 // if (entry.size() >= 2) { // 判断 entry 的大小,避免 IndexOutOfBoundsException // Map<String, String> mapKey = new HashMap<>(); // 新建一个 Map 以存储键值对 // mapKey.put("p", entry.get(0)); // 将内层列表的第一个元素作为键 "p" // mapKey.put("v", entry.get(1)); // 将内层列表的第二个元素作为键 "v" // resultMapList.add(mapKey); // 将 map 添加到结果映射列表中 // } // } // // } System.out.println("Scientific Notation: " + scientificNotation); System.out.println("BigDecimal Value: " + bigDecimal); } }