| .idea/inspectionProfiles/Project_Default.xml | ●●●●● patch | view | raw | blame | history | |
| websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java | ●●●●● patch | view | raw | blame | history | |
| websocketClient/src/main/java/org/example/util/RedisUtil.java | ●●●●● patch | view | raw | blame | history | |
| websocketClient/src/main/java/org/example/wsClient/GateClient.java | ●●●●● patch | view | raw | blame | history | |
| websocketClient/src/main/java/org/example/wsClient/KucoinClient.java | ●●●●● patch | view | raw | blame | history | |
| websocketClient/src/main/java/org/example/wsClient/MexcClient.java | ●●●●● patch | view | raw | blame | history | |
| websocketClient/src/main/resources/application.yml | ●●●●● patch | view | raw | blame | history |
.idea/inspectionProfiles/Project_Default.xml
@@ -2,6 +2,7 @@ <profile version="1.0"> <option name="myName" value="Project Default" /> <inspection_tool class="AliAccessStaticViaInstance" enabled="true" level="WARNING" enabled_by_default="true" /> <inspection_tool class="AliDeprecation" enabled="true" level="WARNING" enabled_by_default="true" /> <inspection_tool class="AlibabaAbstractClassShouldStartWithAbstractNaming" enabled="true" level="WARNING" enabled_by_default="true" /> <inspection_tool class="AlibabaAbstractMethodOrInterfaceMethodMustUseJavadoc" enabled="true" level="WARNING" enabled_by_default="true" /> <inspection_tool class="AlibabaAvoidApacheBeanUtilsCopy" enabled="true" level="WARNING" enabled_by_default="true" /> websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
@@ -2,8 +2,18 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpEntity; import org.apache.http.HttpResponse; import org.apache.http.NameValuePair; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.HttpPost; import org.apache.http.impl.client.DefaultHttpClient; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import org.example.pojo.Currency; import org.example.server.impl.CurrencySerivceImpl; import org.example.wsClient.GateClient; import org.example.wsClient.KucoinClient; import org.example.wsClient.MexcClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; @@ -11,7 +21,9 @@ import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.util.CollectionUtils; import java.util.ArrayList; import java.util.List; import java.util.Map; /** * @ClassDescription: 客户端请求类 @@ -28,11 +40,31 @@ @Autowired private ThreadPoolTaskExecutor threadPoolTaskExecutor; // @Bean // public void mexcWebsocketRunClientMap() { // List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc")); // if (!CollectionUtils.isEmpty(mexc)) { // int batchSize = 30; // 每个线程处理的数据量 // int totalSize = mexc.size(); // int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 // // for (int i = 0; i < threadCount; i++) { // int fromIndex = i * batchSize; // int toIndex = Math.min(fromIndex + batchSize, totalSize); // List<Currency> sublist = mexc.subList(fromIndex, toIndex); // // // 使用自定义线程池提交任务 // threadPoolTaskExecutor.execute(new MexcClient(sublist)::start); // } // // } // } @Bean public void mexcWebsocketRunClientMap() { List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc")); public void gateWebsocketRunClientMap() { List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate")); if (!CollectionUtils.isEmpty(mexc)) { int batchSize = 30; // 每个线程处理的数据量 int batchSize = 100; // 每个线程处理的数据量 int totalSize = mexc.size(); int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 @@ -42,10 +74,44 @@ List<Currency> sublist = mexc.subList(fromIndex, toIndex); // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(new MexcClient(sublist)::start); threadPoolTaskExecutor.execute(new GateClient(sublist)::start); } } } @Bean public void kucoinWebsocketRunClientMap() throws Exception { List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate")); if (!CollectionUtils.isEmpty(mexc)) { String token = doPost(); int batchSize = 100; // 每个线程处理的数据量 int totalSize = mexc.size(); int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数 for (int i = 0; i < threadCount; i++) { int fromIndex = i * batchSize; int toIndex = Math.min(fromIndex + batchSize, totalSize); List<Currency> sublist = mexc.subList(fromIndex, toIndex); // 使用自定义线程池提交任务 threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start); } } } public static String doPost() throws Exception { String url = "https://api.kucoin.com/api/v1/bullet-public"; HttpPost httpPost = new HttpPost(url); DefaultHttpClient defaultHttpClient = new DefaultHttpClient(); List<NameValuePair> nvps = new ArrayList<NameValuePair>(); httpPost.setEntity(new UrlEncodedFormEntity(nvps)); HttpResponse response = defaultHttpClient.execute(httpPost); HttpEntity respEntity = response.getEntity(); String text = EntityUtils.toString(respEntity, "UTF-8"); defaultHttpClient.getConnectionManager().shutdown(); return text; } } websocketClient/src/main/java/org/example/util/RedisUtil.java
@@ -9,11 +9,13 @@ public class RedisUtil { private static JedisPool jedisPool; // 在静态代码块中初始化 JedisPool static { jedisPool = new JedisPool(new JedisPoolConfig(), "localhost", 6379); } // 私有构造方法,防止实例化 private RedisUtil() {} public static void set(String key, String value) { try (Jedis jedis = jedisPool.getResource()) { jedis.set(key, value); websocketClient/src/main/java/org/example/wsClient/GateClient.java
New file @@ -0,0 +1,213 @@ package org.example.wsClient; import com.alibaba.druid.support.json.JSONUtils; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.pojo.Currency; import org.example.util.RedisUtil; import org.json.JSONException; import org.json.JSONObject; import javax.websocket.*; import java.math.BigDecimal; import java.net.URI; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.json.JSONObject; /** * @program: demo * @description: * @create: 2024-07-18 15:30 **/ @ClientEndpoint @Slf4j public class GateClient { private static final String WS_ENDPOINT = "wss://api.gateio.ws/ws/v4/"; private static final long PING_INTERVAL = 20000; private final List<Currency> subscriptions; private final ScheduledExecutorService executorService; private Session session; private final Object lock = new Object(); // 添加一个锁对象 private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性 public GateClient(List<Currency> subscriptions) { this.subscriptions = subscriptions; this.executorService = Executors.newScheduledThreadPool(1); } public void start() { try { connect(); if (session == null) { log.info("无法在超时时间内连接到服务器。"); return; } executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS); // 订阅消息 for (Currency subscription : subscriptions) { String parameter = getParameter(subscription.getSymbol()); session.getBasicRemote().sendText(parameter); } synchronized (this) { this.wait(); } } catch (Exception e) { log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e); } finally { executorService.shutdown(); } } private void connect() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(this, new URI(WS_ENDPOINT)); } @OnOpen public void onOpen(Session session) { log.info("gate ws 已连接到服务器。"); this.session = session; synchronized (this) { this.notify(); } } private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例 @OnMessage public void onMessage(String message) { try { Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType()); Object object = map.get("result"); Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType()); if (null != resultMap && null != resultMap.get("s")) { HashMap<String,Object> hashMap = new HashMap<>(); ObjectMapper mapper = new ObjectMapper(); hashMap.put("bids",resultMap.get("bids")); hashMap.put("asks",resultMap.get("asks")); String key = "gate" + resultMap.get("s"); if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){ System.out.println(hashMap.toString()); } RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap)); } } catch (JsonSyntaxException e) { log.error("JSON 解析异常:" + e.getMessage(), e); } catch (JsonProcessingException e) { log.error("JSON 解析异常:" + e.getMessage(), e); } } @OnClose public void onClose() { log.info("gate ws 连接已关闭,尝试重新连接..."); handleConnectionClosedOrError(); } @OnError public void onError(Throwable throwable) { log.error("gate ws 发生错误: " + throwable.getMessage(), throwable); handleConnectionClosedOrError(); } private void handleConnectionClosedOrError() { synchronized (lock) { if (!reconnecting) { reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 } } } private void attemptReconnect() { boolean doReconnect = true; try { log.info("gate ws 开始重连"); connect(); // 假设 connect() 方法用于实际的连接逻辑 log.info("gate ws 重连成功"); } catch (Exception e) { log.error("gate ws 重连失败", e); // 连接失败时,可以根据具体情况决定是否继续重连 // 在这里假设总是继续尝试重连 } finally { synchronized (lock) { if (doReconnect) { scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务 } else { reconnecting = false; // 重连结束后设置 reconnecting 为 false } } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); } } private void sendPing() { try { if (session != null) { session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); } } catch (Exception e) { log.error("发送心跳失败", e); } } public String getParameter(String symbol) throws JsonProcessingException, JSONException { // 替换USDT为_USDT symbol = symbol.replaceAll("USDT", "_USDT"); // 创建一个ObjectMapper实例 ObjectMapper mapper = new ObjectMapper(); // 获取当前时间的毫秒数 long currentTimeMillis = System.currentTimeMillis(); // 定义常量 final String CHANNEL = "spot.order_book"; // 固定频道 final String EVENT_SUBSCRIBE = "subscribe"; // 订阅事件 final String EVENT_UNSUBSCRIBE = "unsubscribe"; // 取消订阅事件 final String[] PAYLOAD = new String[] {symbol, "20", "100ms"}; // 负载信息 // 使用Map构建JSON对象 Map<String, Object> jsonMap = new HashMap<>(); // 创建Map用于存放JSON内容 jsonMap.put("time", currentTimeMillis); // 放入当前时间 jsonMap.put("channel", CHANNEL); // 放入频道 jsonMap.put("event", EVENT_SUBSCRIBE); // 放入事件类型 jsonMap.put("payload", Arrays.asList(PAYLOAD)); // 将数组转换为List并放入Map // 将Map转换为JSON字符串 String jsonString = mapper.writeValueAsString(jsonMap); // 使用ObjectMapper转换 return jsonString; // 返回JSON字符串 } public static void main(String[] args) { String scientificNotation = "5.0E-5"; // 科学计数法字符串 BigDecimal bigDecimal = new BigDecimal(scientificNotation); System.out.println("Scientific Notation: " + scientificNotation); System.out.println("BigDecimal Value: " + bigDecimal); } } websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
New file @@ -0,0 +1,172 @@ package org.example.wsClient; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.pojo.Currency; import org.example.util.RedisUtil; import javax.websocket.*; import java.net.URI; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; /** * @program: demo * @description: * @create: 2024-07-19 16:44 **/ @ClientEndpoint @Slf4j public class KucoinClient { private static final String WS_ENDPOINT = "wss://api.kucoinio.ws/ws/v4/"; private static final long PING_INTERVAL = 20000; private final List<Currency> subscriptions; private final ScheduledExecutorService executorService; private Session session; private String token; private final Object lock = new Object(); // 添加一个锁对象 private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性 public KucoinClient(List<Currency> subscriptions,String token) { this.subscriptions = subscriptions; this.token = token; this.executorService = Executors.newScheduledThreadPool(1); } public void start() { try { connect(); if (session == null) { log.info("无法在超时时间内连接到服务器。"); return; } executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS); // 订阅消息 for (Currency subscription : subscriptions) { String parameter = getParameter(subscription.getSymbol()); session.getBasicRemote().sendText(parameter); } synchronized (this) { this.wait(); } } catch (Exception e) { log.error("kucoin ws 连接过程中发生异常: " + e.getMessage(), e); } finally { executorService.shutdown(); } } private void connect() throws Exception { WebSocketContainer container = ContainerProvider.getWebSocketContainer(); container.connectToServer(this, new URI(WS_ENDPOINT)); } @OnOpen public void onOpen(Session session) { log.info("kucoin ws 已连接到服务器。"); this.session = session; synchronized (this) { this.notify(); } } private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例 @OnMessage public void onMessage(String message) { try { Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType()); Object object = map.get("result"); Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType()); if (null != resultMap && null != resultMap.get("s")) { HashMap<String,Object> hashMap = new HashMap<>(); ObjectMapper mapper = new ObjectMapper(); hashMap.put("bids",resultMap.get("bids")); hashMap.put("asks",resultMap.get("asks")); String key = "kucoin" + resultMap.get("s"); if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){ System.out.println(hashMap.toString()); } RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap)); } } catch (JsonSyntaxException e) { log.error("JSON 解析异常:" + e.getMessage(), e); } catch (JsonProcessingException e) { log.error("JSON 解析异常:" + e.getMessage(), e); } } @OnClose public void onClose() { log.info("kucoin ws 连接已关闭,尝试重新连接..."); handleConnectionClosedOrError(); } @OnError public void onError(Throwable throwable) { log.error("kucoin ws 发生错误: " + throwable.getMessage(), throwable); handleConnectionClosedOrError(); } private void handleConnectionClosedOrError() { synchronized (lock) { if (!reconnecting) { reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 } } } private void attemptReconnect() { boolean doReconnect = true; try { log.info("kucoin ws 开始重连"); connect(); // 假设 connect() 方法用于实际的连接逻辑 log.info("kucoin ws 重连成功"); } catch (Exception e) { log.error("kucoin ws 重连失败", e); // 连接失败时,可以根据具体情况决定是否继续重连 // 在这里假设总是继续尝试重连 } finally { synchronized (lock) { if (doReconnect) { scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务 } else { reconnecting = false; // 重连结束后设置 reconnecting 为 false } } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); } } private void sendPing() { try { if (session != null) { session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); } } catch (Exception e) { log.error("发送心跳失败", e); } } } websocketClient/src/main/java/org/example/wsClient/MexcClient.java
@@ -4,7 +4,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import com.google.gson.JsonSyntaxException; import lombok.extern.slf4j.Slf4j; import org.example.pojo.Currency; import org.example.util.RedisUtil; @@ -13,20 +16,23 @@ import java.io.UncheckedIOException; import java.net.URI; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.*; @ClientEndpoint @Slf4j public class MexcClient { private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址 private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间 private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; private static final long PING_INTERVAL = 20000; private final List<Currency> subscriptions; private final ScheduledExecutorService executorService; private Session session; private boolean reconnecting = false; private final Object lock = new Object(); // 添加一个锁对象 private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性 public MexcClient(List<Currency> subscriptions) { this.subscriptions = subscriptions; @@ -37,7 +43,7 @@ try { connect(); if (session == null) { System.err.println("无法在超时时间内连接到服务器。"); log.info("无法在超时时间内连接到服务器。"); return; } @@ -54,8 +60,7 @@ } } catch (Exception e) { System.err.println("连接过程中发生异常: " + e.getMessage()); e.printStackTrace(); log.error("mexc ws 连接过程中发生异常: " + e.getMessage(), e); } finally { executorService.shutdown(); } @@ -68,59 +73,84 @@ @OnOpen public void onOpen(Session session) { System.out.println("已连接到服务器。"); log.info("mexc ws 已连接到服务器。"); this.session = session; synchronized (this) { this.notify(); } } private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例 @OnMessage public void onMessage(String message) { Gson gson = new Gson(); Map map = gson.fromJson(message, Map.class); if (map != null && map.containsKey("s")) { RedisUtil.set("mexc" + map.get("s").toString(), message); try { Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType()); if (map != null && map.containsKey("s")) { Object object = map.get("d"); Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType()); HashMap<String,Object> hashMap = new HashMap<>(); ObjectMapper mapper = new ObjectMapper(); hashMap.put("bids",resultMap.get("bids")); hashMap.put("asks",resultMap.get("asks")); String key = "mexc" + map.get("s").toString(); RedisUtil.set(key, mapper.writeValueAsString(hashMap)); } else { log.warn("消息不包含 's' 字段或解析失败:" + message); } } catch (JsonSyntaxException e) { log.error("JSON 解析异常:" + e.getMessage(), e); } catch (JsonProcessingException e) { throw new RuntimeException(e); } // 没有过滤撤单的数据 } @OnClose public void onClose() { System.out.println("连接已关闭,尝试重新连接..."); session = null; if (!reconnecting) { reconnect(); } log.info("mexc ws 连接已关闭,尝试重新连接..."); handleConnectionClosedOrError(); } @OnError public void onError(Throwable throwable) { System.err.println("发生错误: " + throwable.getMessage()); if (!reconnecting) { reconnect(); } log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable); handleConnectionClosedOrError(); } private void reconnect() { if (reconnecting) { return; } reconnecting = true; executorService.schedule(() -> { try { connect(); reconnecting = false; } catch (Exception e) { e.printStackTrace(); reconnect(); private void handleConnectionClosedOrError() { synchronized (lock) { if (!reconnecting) { reconnecting = true; // 设置 reconnecting 为 true 表示开始重连 executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连 } }, calculateBackoffTime(), TimeUnit.MILLISECONDS); } } private long calculateBackoffTime() { // 实现退避策略,例如指数退避 return 5000; // 例子:5秒 private void attemptReconnect() { boolean doReconnect = true; try { log.info("mexc ws 开始重连"); connect(); // 假设 connect() 方法用于实际的连接逻辑 log.info("mexc ws 重连成功"); } catch (Exception e) { log.error("mexc ws 重连失败", e); // 连接失败时,可以根据具体情况决定是否继续重连 // 在这里假设总是继续尝试重连 } finally { synchronized (lock) { if (doReconnect) { scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务 } else { reconnecting = false; // 重连结束后设置 reconnecting 为 false } } } } private void scheduleReconnect() { if (!executorService.isShutdown()) { executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); } } private void sendPing() { @@ -129,7 +159,7 @@ session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); } } catch (Exception e) { e.printStackTrace(); log.error("发送心跳失败", e); } } websocketClient/src/main/resources/application.yml
@@ -1,5 +1,5 @@ server: port: 8090 port: 8095 spring: @@ -10,7 +10,7 @@ # 端口,默认为6379 port: 6379 # 数据库索引 database: 0 database: 1 # 密码 password: # 连接超时时间