From 708e35e4df613836db1289820d82ab0377bf0442 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Thu, 17 Oct 2024 22:25:27 +0800
Subject: [PATCH] 1
---
websocketSerivce/src/main/java/org/example/ThreadConfig/MarkConfiguration.java | 3 +
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java | 107 ++++++++++++++++++++++++++++++++++++++---------------
2 files changed, 79 insertions(+), 31 deletions(-)
diff --git a/websocketSerivce/src/main/java/org/example/ThreadConfig/MarkConfiguration.java b/websocketSerivce/src/main/java/org/example/ThreadConfig/MarkConfiguration.java
index 7cd5d54..050cdd3 100644
--- a/websocketSerivce/src/main/java/org/example/ThreadConfig/MarkConfiguration.java
+++ b/websocketSerivce/src/main/java/org/example/ThreadConfig/MarkConfiguration.java
@@ -4,6 +4,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
+import javax.annotation.PreDestroy;
import java.util.concurrent.ThreadPoolExecutor;
/**
@@ -22,7 +23,7 @@
executor.setMaxPoolSize(150); // 最大线程数, 适当设置以避免资源耗尽
executor.setQueueCapacity(200); // 队列容量, 适当限制以避免请求堆积
executor.setKeepAliveSeconds(30); // 线程空闲时的存活时间为30秒,减少系统开销
- executor.setThreadNamePrefix("Thread-"); // 线程名称的前缀
+ executor.setThreadNamePrefix("wsThread-"); // 线程名称的前缀
// 使用 CallerRunsPolicy 拒绝策略,以减少任务被拒绝时带来的负担
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
diff --git a/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java b/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
index 17d4de6..eea67d1 100644
--- a/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
+++ b/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
@@ -52,7 +52,7 @@
private static final Map<String, WsBo> threadLocalData = new ConcurrentHashMap<>();
@Autowired
- @Qualifier("threadPoolTaskExecutor")
+ @Qualifier("markthreadPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
// 定义常量:任务检查的超时时间(秒)
@@ -94,7 +94,6 @@
@OnError
public void onError(Session session, @NonNull Throwable throwable) {
onClose();
- onlineCount.decrementAndGet();
log.error("连接发生报错: {}", throwable.getMessage());
throwable.printStackTrace();
}
@@ -146,36 +145,81 @@
return sessionLocks.get(sessionId);
}
+// public void sendMessageToAll(String message) {
+// List<CompletableFuture<Void>> futures = new ArrayList<>();
+// wsServers.forEach(ws -> {
+// futures.add(CompletableFuture.runAsync(() -> {
+// try {
+// Session session = ws.session;
+// if (session != null && session.isOpen()) {
+// Lock sessionLock = getSessionLock(session.getId());
+// sessionLock.lock();
+// try {
+// schedulePushMessage(session, message);
+// } catch (Exception e) {
+// e.printStackTrace();
+// closeSession(session, "发送消息异常,断开链接");
+// log.error("发送消息时出现异常: {}", e.getMessage());
+// } finally {
+// sessionLock.unlock();
+// }
+// } else {
+// closeSession(session, "会话不存在或已关闭");
+// log.error("会话不存在或已关闭,无法发送消息");
+// }
+// } catch (Exception e) {
+// log.error("处理消息失败: {}", e.getMessage());
+// }
+// }, threadPoolTaskExecutor));
+// });
+//
+// // 等待所有任务执行完成
+// CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+// }
+
public void sendMessageToAll(String message) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
+
+ // 收集所有活动的会话
+ List<Session> activeSessions = new ArrayList<>();
wsServers.forEach(ws -> {
+ Session session = ws.session;
+ if (session != null && session.isOpen()) {
+ activeSessions.add(session);
+ } else {
+ closeSession(session, "会话不存在或已关闭");
+ log.error("会话不存在或已关闭,无法发送消息");
+ }
+ });
+
+ // 并发处理所有活动的会话
+ activeSessions.forEach(session -> {
futures.add(CompletableFuture.runAsync(() -> {
+ Lock sessionLock = getSessionLock(session.getId());
+ sessionLock.lock();
try {
- Session session = ws.session;
- if (session != null && session.isOpen()) {
- Lock sessionLock = getSessionLock(session.getId());
- sessionLock.lock();
- try {
- schedulePushMessage(session, message);
- } catch (Exception e) {
- e.printStackTrace();
- closeSession(session, "发送消息异常,断开链接");
- log.error("发送消息时出现异常: {}", e.getMessage());
- } finally {
- sessionLock.unlock();
- }
- } else {
- closeSession(session, "会话不存在或已关闭");
- log.error("会话不存在或已关闭,无法发送消息");
- }
+ schedulePushMessage(session, message);
} catch (Exception e) {
- log.error("处理消息失败: {}", e.getMessage());
+ log.error("发送消息时出现异常: {}", e.getMessage());
+ closeSession(session, "发送消息异常,断开链接");
+ } finally {
+ sessionLock.unlock();
}
}, threadPoolTaskExecutor));
});
- // 等待所有任务执行完成
- CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
+ CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ try {
+ allFutures.join(); // 等待所有任务完成
+ } finally {
+ futures.clear(); // 清理未使用的 futures 列表
+ }
+ }
+
+
+ @PreDestroy
+ public void shutdownExecutor() {
+ threadPoolTaskExecutor.shutdown();
}
private WsBo getWsBoForSession(String sessionId) {
@@ -218,10 +262,7 @@
}
}
- @PreDestroy
- public void shutdownExecutor() {
- threadPoolTaskExecutor.shutdown();
- }
+
private static final Gson gson = new Gson();
private String megFiltration(WsBo wsBo,String message) throws JsonProcessingException {
@@ -358,10 +399,16 @@
}
}
- // 关闭会话的方法
private void closeSession(Session session, String reason) {
- wsServers.remove(this);
- log.info(reason);
- onClose();
+ try {
+ if (session != null && session.isOpen()) {
+ wsServers.remove(this);
+ log.info(reason);
+ session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, reason));
+ }
+ } catch (IOException e) {
+ log.error("关闭会话时出现异常: {}", e.getMessage());
+ }
}
+
}
--
Gitblit v1.9.3