From 708e35e4df613836db1289820d82ab0377bf0442 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Thu, 17 Oct 2024 22:25:27 +0800
Subject: [PATCH] 1
---
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java | 107 +++++++++++++++++++++++++++++++++++++++++------------
1 files changed, 82 insertions(+), 25 deletions(-)
diff --git a/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java b/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
index 4b02b1a..eea67d1 100644
--- a/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
+++ b/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
@@ -17,10 +17,13 @@
import org.example.util.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
+import javax.annotation.PreDestroy;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
@@ -49,7 +52,7 @@
private static final Map<String, WsBo> threadLocalData = new ConcurrentHashMap<>();
@Autowired
- @Qualifier("threadPoolTaskExecutor")
+ @Qualifier("markthreadPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
// 定义常量:任务检查的超时时间(秒)
@@ -142,36 +145,81 @@
return sessionLocks.get(sessionId);
}
+// public void sendMessageToAll(String message) {
+// List<CompletableFuture<Void>> futures = new ArrayList<>();
+// wsServers.forEach(ws -> {
+// futures.add(CompletableFuture.runAsync(() -> {
+// try {
+// Session session = ws.session;
+// if (session != null && session.isOpen()) {
+// Lock sessionLock = getSessionLock(session.getId());
+// sessionLock.lock();
+// try {
+// schedulePushMessage(session, message);
+// } catch (Exception e) {
+// e.printStackTrace();
+// closeSession(session, "发送消息异常,断开链接");
+// log.error("发送消息时出现异常: {}", e.getMessage());
+// } finally {
+// sessionLock.unlock();
+// }
+// } else {
+// closeSession(session, "会话不存在或已关闭");
+// log.error("会话不存在或已关闭,无法发送消息");
+// }
+// } catch (Exception e) {
+// log.error("处理消息失败: {}", e.getMessage());
+// }
+// }, threadPoolTaskExecutor));
+// });
+//
+// // 等待所有任务执行完成
+// CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+// }
+
public void sendMessageToAll(String message) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ // 收集所有活动的会话
+ List<Session> activeSessions = new ArrayList<>();
wsServers.forEach(ws -> {
+ Session session = ws.session;
+ if (session != null && session.isOpen()) {
+ activeSessions.add(session);
+ } else {
+ closeSession(session, "会话不存在或已关闭");
+ log.error("会话不存在或已关闭,无法发送消息");
+ }
+ });
+
+ // 并发处理所有活动的会话
+ activeSessions.forEach(session -> {
futures.add(CompletableFuture.runAsync(() -> {
+ Lock sessionLock = getSessionLock(session.getId());
+ sessionLock.lock();
try {
- Session session = ws.session;
- if (session != null && session.isOpen()) {
- Lock sessionLock = getSessionLock(session.getId());
- sessionLock.lock();
- try {
- schedulePushMessage(session, message);
- } catch (Exception e) {
- e.printStackTrace();
- closeSession(session, "发送消息异常,断开链接");
- log.error("发送消息时出现异常: {}", e.getMessage());
- } finally {
- sessionLock.unlock();
- }
- } else {
- closeSession(session, "会话不存在或已关闭");
- log.error("会话不存在或已关闭,无法发送消息");
- }
+ schedulePushMessage(session, message);
} catch (Exception e) {
- log.error("处理消息失败: {}", e.getMessage());
+ log.error("发送消息时出现异常: {}", e.getMessage());
+ closeSession(session, "发送消息异常,断开链接");
+ } finally {
+ sessionLock.unlock();
}
}, threadPoolTaskExecutor));
});
- // 等待所有任务执行完成
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+ CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ try {
+ allFutures.join(); // 等待所有任务完成
+ } finally {
+ futures.clear(); // 清理未使用的 futures 列表
+ }
+ }
+
+
+ @PreDestroy
+ public void shutdownExecutor() {
+ threadPoolTaskExecutor.shutdown();
}
private WsBo getWsBoForSession(String sessionId) {
@@ -213,6 +261,9 @@
}
}
}
+
+
+
private static final Gson gson = new Gson();
private String megFiltration(WsBo wsBo,String message) throws JsonProcessingException {
List<MarketDataOut> redisValueMap = gson.fromJson(message, new TypeToken<List<MarketDataOut>>() {}.getType());
@@ -348,10 +399,16 @@
}
}
- // 关闭会话的方法
private void closeSession(Session session, String reason) {
- wsServers.remove(this);
- log.info(reason);
- onClose();
+ try {
+ if (session != null && session.isOpen()) {
+ wsServers.remove(this);
+ log.info(reason);
+ session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, reason));
+ }
+ } catch (IOException e) {
+ log.error("关闭会话时出现异常: {}", e.getMessage());
+ }
}
+
}
--
Gitblit v1.9.3