From 4b8e10e605d28fc1b4ad3d33a6cf2bfbbea15bd5 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Fri, 19 Jul 2024 18:34:01 +0800
Subject: [PATCH] 1
---
websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java | 74 +++++++
websocketClient/src/main/java/org/example/wsClient/KucoinClient.java | 172 +++++++++++++++++
.idea/inspectionProfiles/Project_Default.xml | 1
websocketClient/src/main/java/org/example/util/RedisUtil.java | 4
websocketClient/src/main/java/org/example/wsClient/MexcClient.java | 108 ++++++---
websocketClient/src/main/java/org/example/wsClient/GateClient.java | 213 +++++++++++++++++++++
websocketClient/src/main/resources/application.yml | 4
7 files changed, 530 insertions(+), 46 deletions(-)
diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml
index 56d11c3..5db292a 100644
--- a/.idea/inspectionProfiles/Project_Default.xml
+++ b/.idea/inspectionProfiles/Project_Default.xml
@@ -2,6 +2,7 @@
<profile version="1.0">
<option name="myName" value="Project Default" />
<inspection_tool class="AliAccessStaticViaInstance" enabled="true" level="WARNING" enabled_by_default="true" />
+ <inspection_tool class="AliDeprecation" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AlibabaAbstractClassShouldStartWithAbstractNaming" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AlibabaAbstractMethodOrInterfaceMethodMustUseJavadoc" enabled="true" level="WARNING" enabled_by_default="true" />
<inspection_tool class="AlibabaAvoidApacheBeanUtilsCopy" enabled="true" level="WARNING" enabled_by_default="true" />
diff --git a/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
index 0315b55..7d45771 100644
--- a/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
+++ b/websocketClient/src/main/java/org/example/WsBean/MexcWsBean.java
@@ -2,8 +2,18 @@
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.HttpResponse;
+import org.apache.http.NameValuePair;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.DefaultHttpClient;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
import org.example.pojo.Currency;
import org.example.server.impl.CurrencySerivceImpl;
+import org.example.wsClient.GateClient;
+import org.example.wsClient.KucoinClient;
import org.example.wsClient.MexcClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
@@ -11,7 +21,9 @@
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.util.CollectionUtils;
+import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
/**
* @ClassDescription: 客户端请求类
@@ -28,11 +40,31 @@
@Autowired
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
+// @Bean
+// public void mexcWebsocketRunClientMap() {
+// List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
+// if (!CollectionUtils.isEmpty(mexc)) {
+// int batchSize = 30; // 每个线程处理的数据量
+// int totalSize = mexc.size();
+// int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+//
+// for (int i = 0; i < threadCount; i++) {
+// int fromIndex = i * batchSize;
+// int toIndex = Math.min(fromIndex + batchSize, totalSize);
+// List<Currency> sublist = mexc.subList(fromIndex, toIndex);
+//
+// // 使用自定义线程池提交任务
+// threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
+// }
+//
+// }
+// }
+
@Bean
- public void mexcWebsocketRunClientMap() {
- List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
+ public void gateWebsocketRunClientMap() {
+ List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
if (!CollectionUtils.isEmpty(mexc)) {
- int batchSize = 30; // 每个线程处理的数据量
+ int batchSize = 100; // 每个线程处理的数据量
int totalSize = mexc.size();
int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
@@ -42,10 +74,44 @@
List<Currency> sublist = mexc.subList(fromIndex, toIndex);
// 使用自定义线程池提交任务
- threadPoolTaskExecutor.execute(new MexcClient(sublist)::start);
+ threadPoolTaskExecutor.execute(new GateClient(sublist)::start);
}
}
}
+
+ @Bean
+ public void kucoinWebsocketRunClientMap() throws Exception {
+ List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "gate"));
+ if (!CollectionUtils.isEmpty(mexc)) {
+ String token = doPost();
+ int batchSize = 100; // 每个线程处理的数据量
+ int totalSize = mexc.size();
+ int threadCount = (int) Math.ceil((double) totalSize / batchSize); // 计算需要的线程数
+
+ for (int i = 0; i < threadCount; i++) {
+ int fromIndex = i * batchSize;
+ int toIndex = Math.min(fromIndex + batchSize, totalSize);
+ List<Currency> sublist = mexc.subList(fromIndex, toIndex);
+
+ // 使用自定义线程池提交任务
+ threadPoolTaskExecutor.execute(new KucoinClient(sublist,token)::start);
+ }
+
+ }
+ }
+
+ public static String doPost() throws Exception {
+ String url = "https://api.kucoin.com/api/v1/bullet-public";
+ HttpPost httpPost = new HttpPost(url);
+ DefaultHttpClient defaultHttpClient = new DefaultHttpClient();
+ List<NameValuePair> nvps = new ArrayList<NameValuePair>();
+ httpPost.setEntity(new UrlEncodedFormEntity(nvps));
+ HttpResponse response = defaultHttpClient.execute(httpPost);
+ HttpEntity respEntity = response.getEntity();
+ String text = EntityUtils.toString(respEntity, "UTF-8");
+ defaultHttpClient.getConnectionManager().shutdown();
+ return text;
+ }
}
diff --git a/websocketClient/src/main/java/org/example/util/RedisUtil.java b/websocketClient/src/main/java/org/example/util/RedisUtil.java
index 63361dd..dab4e20 100644
--- a/websocketClient/src/main/java/org/example/util/RedisUtil.java
+++ b/websocketClient/src/main/java/org/example/util/RedisUtil.java
@@ -9,11 +9,13 @@
public class RedisUtil {
private static JedisPool jedisPool;
- // 在静态代码块中初始化 JedisPool
static {
jedisPool = new JedisPool(new JedisPoolConfig(), "localhost", 6379);
}
+ // 私有构造方法,防止实例化
+ private RedisUtil() {}
+
public static void set(String key, String value) {
try (Jedis jedis = jedisPool.getResource()) {
jedis.set(key, value);
diff --git a/websocketClient/src/main/java/org/example/wsClient/GateClient.java b/websocketClient/src/main/java/org/example/wsClient/GateClient.java
new file mode 100644
index 0000000..729bf0f
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/wsClient/GateClient.java
@@ -0,0 +1,213 @@
+package org.example.wsClient;
+
+import com.alibaba.druid.support.json.JSONUtils;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
+import org.example.pojo.Currency;
+import org.example.util.RedisUtil;
+import org.json.JSONException;
+import org.json.JSONObject;
+
+import javax.websocket.*;
+import java.math.BigDecimal;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.json.JSONObject;
+/**
+ * @program: demo
+ * @description:
+ * @create: 2024-07-18 15:30
+ **/
+@ClientEndpoint
+@Slf4j
+public class GateClient {
+
+ private static final String WS_ENDPOINT = "wss://api.gateio.ws/ws/v4/";
+ private static final long PING_INTERVAL = 20000;
+
+ private final List<Currency> subscriptions;
+ private final ScheduledExecutorService executorService;
+ private Session session;
+
+ private final Object lock = new Object(); // 添加一个锁对象
+ private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
+
+ public GateClient(List<Currency> subscriptions) {
+ this.subscriptions = subscriptions;
+ this.executorService = Executors.newScheduledThreadPool(1);
+ }
+
+ public void start() {
+ try {
+ connect();
+ if (session == null) {
+ log.info("无法在超时时间内连接到服务器。");
+ return;
+ }
+
+ executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
+
+ // 订阅消息
+ for (Currency subscription : subscriptions) {
+ String parameter = getParameter(subscription.getSymbol());
+ session.getBasicRemote().sendText(parameter);
+ }
+
+ synchronized (this) {
+ this.wait();
+ }
+
+ } catch (Exception e) {
+ log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e);
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ private void connect() throws Exception {
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+ container.connectToServer(this, new URI(WS_ENDPOINT));
+ }
+
+ @OnOpen
+ public void onOpen(Session session) {
+ log.info("gate ws 已连接到服务器。");
+ this.session = session;
+ synchronized (this) {
+ this.notify();
+ }
+ }
+ private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
+
+ @OnMessage
+ public void onMessage(String message) {
+ try {
+ Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
+ Object object = map.get("result");
+ Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
+ if (null != resultMap && null != resultMap.get("s")) {
+ HashMap<String,Object> hashMap = new HashMap<>();
+ ObjectMapper mapper = new ObjectMapper();
+ hashMap.put("bids",resultMap.get("bids"));
+ hashMap.put("asks",resultMap.get("asks"));
+ String key = "gate" + resultMap.get("s");
+ if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){
+ System.out.println(hashMap.toString());
+ }
+ RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
+ }
+ } catch (JsonSyntaxException e) {
+ log.error("JSON 解析异常:" + e.getMessage(), e);
+ } catch (JsonProcessingException e) {
+ log.error("JSON 解析异常:" + e.getMessage(), e);
+ }
+ }
+
+ @OnClose
+ public void onClose() {
+ log.info("gate ws 连接已关闭,尝试重新连接...");
+ handleConnectionClosedOrError();
+ }
+
+ @OnError
+ public void onError(Throwable throwable) {
+ log.error("gate ws 发生错误: " + throwable.getMessage(), throwable);
+ handleConnectionClosedOrError();
+ }
+
+ private void handleConnectionClosedOrError() {
+ synchronized (lock) {
+ if (!reconnecting) {
+ reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+ executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
+ }
+ }
+ }
+
+ private void attemptReconnect() {
+ boolean doReconnect = true;
+ try {
+ log.info("gate ws 开始重连");
+ connect(); // 假设 connect() 方法用于实际的连接逻辑
+ log.info("gate ws 重连成功");
+ } catch (Exception e) {
+ log.error("gate ws 重连失败", e);
+ // 连接失败时,可以根据具体情况决定是否继续重连
+ // 在这里假设总是继续尝试重连
+ } finally {
+ synchronized (lock) {
+ if (doReconnect) {
+ scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
+ } else {
+ reconnecting = false; // 重连结束后设置 reconnecting 为 false
+ }
+ }
+ }
+ }
+
+ private void scheduleReconnect() {
+ if (!executorService.isShutdown()) {
+ executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
+ }
+ }
+
+ private void sendPing() {
+ try {
+ if (session != null) {
+ session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
+ }
+ } catch (Exception e) {
+ log.error("发送心跳失败", e);
+ }
+ }
+
+ public String getParameter(String symbol) throws JsonProcessingException, JSONException {
+ // 替换USDT为_USDT
+ symbol = symbol.replaceAll("USDT", "_USDT");
+
+ // 创建一个ObjectMapper实例
+ ObjectMapper mapper = new ObjectMapper();
+ // 获取当前时间的毫秒数
+ long currentTimeMillis = System.currentTimeMillis();
+
+ // 定义常量
+ final String CHANNEL = "spot.order_book"; // 固定频道
+ final String EVENT_SUBSCRIBE = "subscribe"; // 订阅事件
+ final String EVENT_UNSUBSCRIBE = "unsubscribe"; // 取消订阅事件
+ final String[] PAYLOAD = new String[] {symbol, "20", "100ms"}; // 负载信息
+
+ // 使用Map构建JSON对象
+ Map<String, Object> jsonMap = new HashMap<>(); // 创建Map用于存放JSON内容
+ jsonMap.put("time", currentTimeMillis); // 放入当前时间
+ jsonMap.put("channel", CHANNEL); // 放入频道
+ jsonMap.put("event", EVENT_SUBSCRIBE); // 放入事件类型
+ jsonMap.put("payload", Arrays.asList(PAYLOAD)); // 将数组转换为List并放入Map
+
+ // 将Map转换为JSON字符串
+ String jsonString = mapper.writeValueAsString(jsonMap); // 使用ObjectMapper转换
+ return jsonString; // 返回JSON字符串
+
+ }
+
+ public static void main(String[] args) {
+ String scientificNotation = "5.0E-5"; // 科学计数法字符串
+ BigDecimal bigDecimal = new BigDecimal(scientificNotation);
+
+ System.out.println("Scientific Notation: " + scientificNotation);
+ System.out.println("BigDecimal Value: " + bigDecimal);
+ }
+
+}
diff --git a/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java b/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
new file mode 100644
index 0000000..2700dc6
--- /dev/null
+++ b/websocketClient/src/main/java/org/example/wsClient/KucoinClient.java
@@ -0,0 +1,172 @@
+package org.example.wsClient;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
+import org.example.pojo.Currency;
+import org.example.util.RedisUtil;
+
+import javax.websocket.*;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @program: demo
+ * @description:
+ * @create: 2024-07-19 16:44
+ **/
+@ClientEndpoint
+@Slf4j
+public class KucoinClient {
+
+ private static final String WS_ENDPOINT = "wss://api.kucoinio.ws/ws/v4/";
+ private static final long PING_INTERVAL = 20000;
+
+ private final List<Currency> subscriptions;
+ private final ScheduledExecutorService executorService;
+ private Session session;
+ private String token;
+ private final Object lock = new Object(); // 添加一个锁对象
+ private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
+
+ public KucoinClient(List<Currency> subscriptions,String token) {
+ this.subscriptions = subscriptions;
+ this.token = token;
+ this.executorService = Executors.newScheduledThreadPool(1);
+ }
+
+ public void start() {
+ try {
+ connect();
+ if (session == null) {
+ log.info("无法在超时时间内连接到服务器。");
+ return;
+ }
+
+ executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
+
+ // 订阅消息
+ for (Currency subscription : subscriptions) {
+ String parameter = getParameter(subscription.getSymbol());
+ session.getBasicRemote().sendText(parameter);
+ }
+
+ synchronized (this) {
+ this.wait();
+ }
+
+ } catch (Exception e) {
+ log.error("kucoin ws 连接过程中发生异常: " + e.getMessage(), e);
+ } finally {
+ executorService.shutdown();
+ }
+ }
+
+ private void connect() throws Exception {
+ WebSocketContainer container = ContainerProvider.getWebSocketContainer();
+ container.connectToServer(this, new URI(WS_ENDPOINT));
+ }
+
+ @OnOpen
+ public void onOpen(Session session) {
+ log.info("kucoin ws 已连接到服务器。");
+ this.session = session;
+ synchronized (this) {
+ this.notify();
+ }
+ }
+ private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
+
+ @OnMessage
+ public void onMessage(String message) {
+ try {
+ Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
+ Object object = map.get("result");
+ Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
+ if (null != resultMap && null != resultMap.get("s")) {
+ HashMap<String,Object> hashMap = new HashMap<>();
+ ObjectMapper mapper = new ObjectMapper();
+ hashMap.put("bids",resultMap.get("bids"));
+ hashMap.put("asks",resultMap.get("asks"));
+ String key = "kucoin" + resultMap.get("s");
+ if(resultMap.get("s").toString().replace("_","").equals("BTCUSDT")){
+ System.out.println(hashMap.toString());
+ }
+ RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
+ }
+ } catch (JsonSyntaxException e) {
+ log.error("JSON 解析异常:" + e.getMessage(), e);
+ } catch (JsonProcessingException e) {
+ log.error("JSON 解析异常:" + e.getMessage(), e);
+ }
+ }
+
+ @OnClose
+ public void onClose() {
+ log.info("kucoin ws 连接已关闭,尝试重新连接...");
+ handleConnectionClosedOrError();
+ }
+
+ @OnError
+ public void onError(Throwable throwable) {
+ log.error("kucoin ws 发生错误: " + throwable.getMessage(), throwable);
+ handleConnectionClosedOrError();
+ }
+
+ private void handleConnectionClosedOrError() {
+ synchronized (lock) {
+ if (!reconnecting) {
+ reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+ executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
+ }
+ }
+ }
+
+ private void attemptReconnect() {
+ boolean doReconnect = true;
+ try {
+ log.info("kucoin ws 开始重连");
+ connect(); // 假设 connect() 方法用于实际的连接逻辑
+ log.info("kucoin ws 重连成功");
+ } catch (Exception e) {
+ log.error("kucoin ws 重连失败", e);
+ // 连接失败时,可以根据具体情况决定是否继续重连
+ // 在这里假设总是继续尝试重连
+ } finally {
+ synchronized (lock) {
+ if (doReconnect) {
+ scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
+ } else {
+ reconnecting = false; // 重连结束后设置 reconnecting 为 false
+ }
+ }
+ }
+ }
+
+ private void scheduleReconnect() {
+ if (!executorService.isShutdown()) {
+ executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
+ }
+ }
+
+ private void sendPing() {
+ try {
+ if (session != null) {
+ session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
+ }
+ } catch (Exception e) {
+ log.error("发送心跳失败", e);
+ }
+ }
+}
+
+
diff --git a/websocketClient/src/main/java/org/example/wsClient/MexcClient.java b/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
index 6dceb87..80352a8 100644
--- a/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
+++ b/websocketClient/src/main/java/org/example/wsClient/MexcClient.java
@@ -4,7 +4,10 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.reflect.TypeToken;
import com.google.gson.Gson;
+import com.google.gson.JsonSyntaxException;
+import lombok.extern.slf4j.Slf4j;
import org.example.pojo.Currency;
import org.example.util.RedisUtil;
@@ -13,20 +16,23 @@
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.ByteBuffer;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
@ClientEndpoint
+@Slf4j
public class MexcClient {
- private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws"; // 实际WebSocket服务器地址
- private static final long PING_INTERVAL = 20000; // 每20秒发送一次ping
- private static final int MAX_BACKOFF_TIME = 60000; // 最大重连时间
+ private static final String WS_ENDPOINT = "wss://wbs.mexc.com/ws";
+ private static final long PING_INTERVAL = 20000;
private final List<Currency> subscriptions;
private final ScheduledExecutorService executorService;
private Session session;
- private boolean reconnecting = false;
+
+ private final Object lock = new Object(); // 添加一个锁对象
+ private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
public MexcClient(List<Currency> subscriptions) {
this.subscriptions = subscriptions;
@@ -37,7 +43,7 @@
try {
connect();
if (session == null) {
- System.err.println("无法在超时时间内连接到服务器。");
+ log.info("无法在超时时间内连接到服务器。");
return;
}
@@ -54,8 +60,7 @@
}
} catch (Exception e) {
- System.err.println("连接过程中发生异常: " + e.getMessage());
- e.printStackTrace();
+ log.error("mexc ws 连接过程中发生异常: " + e.getMessage(), e);
} finally {
executorService.shutdown();
}
@@ -68,59 +73,84 @@
@OnOpen
public void onOpen(Session session) {
- System.out.println("已连接到服务器。");
+ log.info("mexc ws 已连接到服务器。");
this.session = session;
synchronized (this) {
this.notify();
}
}
+ private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
@OnMessage
public void onMessage(String message) {
- Gson gson = new Gson();
- Map map = gson.fromJson(message, Map.class);
- if (map != null && map.containsKey("s")) {
- RedisUtil.set("mexc" + map.get("s").toString(), message);
+ try {
+ Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
+ if (map != null && map.containsKey("s")) {
+ Object object = map.get("d");
+ Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
+ HashMap<String,Object> hashMap = new HashMap<>();
+ ObjectMapper mapper = new ObjectMapper();
+ hashMap.put("bids",resultMap.get("bids"));
+ hashMap.put("asks",resultMap.get("asks"));
+ String key = "mexc" + map.get("s").toString();
+ RedisUtil.set(key, mapper.writeValueAsString(hashMap));
+ } else {
+ log.warn("消息不包含 's' 字段或解析失败:" + message);
+ }
+ } catch (JsonSyntaxException e) {
+ log.error("JSON 解析异常:" + e.getMessage(), e);
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
}
- // 没有过滤撤单的数据
}
+
@OnClose
public void onClose() {
- System.out.println("连接已关闭,尝试重新连接...");
- session = null;
- if (!reconnecting) {
- reconnect();
- }
+ log.info("mexc ws 连接已关闭,尝试重新连接...");
+ handleConnectionClosedOrError();
}
@OnError
public void onError(Throwable throwable) {
- System.err.println("发生错误: " + throwable.getMessage());
- if (!reconnecting) {
- reconnect();
- }
+ log.error("mexc ws 发生错误: " + throwable.getMessage(), throwable);
+ handleConnectionClosedOrError();
}
- private void reconnect() {
- if (reconnecting) {
- return;
- }
- reconnecting = true;
- executorService.schedule(() -> {
- try {
- connect();
- reconnecting = false;
- } catch (Exception e) {
- e.printStackTrace();
- reconnect();
+ private void handleConnectionClosedOrError() {
+ synchronized (lock) {
+ if (!reconnecting) {
+ reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
+ executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
}
- }, calculateBackoffTime(), TimeUnit.MILLISECONDS);
+ }
}
- private long calculateBackoffTime() {
- // 实现退避策略,例如指数退避
- return 5000; // 例子:5秒
+ private void attemptReconnect() {
+ boolean doReconnect = true;
+ try {
+ log.info("mexc ws 开始重连");
+ connect(); // 假设 connect() 方法用于实际的连接逻辑
+ log.info("mexc ws 重连成功");
+ } catch (Exception e) {
+ log.error("mexc ws 重连失败", e);
+ // 连接失败时,可以根据具体情况决定是否继续重连
+ // 在这里假设总是继续尝试重连
+ } finally {
+ synchronized (lock) {
+ if (doReconnect) {
+ scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
+ } else {
+ reconnecting = false; // 重连结束后设置 reconnecting 为 false
+ }
+ }
+ }
+ }
+
+ private void scheduleReconnect() {
+ if (!executorService.isShutdown()) {
+ executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
+ }
}
private void sendPing() {
@@ -129,7 +159,7 @@
session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
}
} catch (Exception e) {
- e.printStackTrace();
+ log.error("发送心跳失败", e);
}
}
diff --git a/websocketClient/src/main/resources/application.yml b/websocketClient/src/main/resources/application.yml
index 8b50501..9253a37 100644
--- a/websocketClient/src/main/resources/application.yml
+++ b/websocketClient/src/main/resources/application.yml
@@ -1,5 +1,5 @@
server:
- port: 8090
+ port: 8095
spring:
@@ -10,7 +10,7 @@
# 端口,默认为6379
port: 6379
# 数据库索引
- database: 0
+ database: 1
# 密码
password:
# 连接超时时间
--
Gitblit v1.9.3