1
zj
2024-10-17 708e35e4df613836db1289820d82ab0377bf0442
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
@@ -52,7 +52,7 @@
    private static final Map<String, WsBo> threadLocalData = new ConcurrentHashMap<>();
    @Autowired
    @Qualifier("threadPoolTaskExecutor")
    @Qualifier("markthreadPoolTaskExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    // 定义常量:任务检查的超时时间(秒)
@@ -94,7 +94,6 @@
    @OnError
    public void onError(Session session, @NonNull Throwable throwable) {
        onClose();
        onlineCount.decrementAndGet();
        log.error("连接发生报错: {}", throwable.getMessage());
        throwable.printStackTrace();
    }
@@ -146,36 +145,81 @@
        return sessionLocks.get(sessionId);
    }
//    public void sendMessageToAll(String message) {
//        List<CompletableFuture<Void>> futures = new ArrayList<>();
//        wsServers.forEach(ws -> {
//            futures.add(CompletableFuture.runAsync(() -> {
//                try {
//                    Session session = ws.session;
//                    if (session != null && session.isOpen()) {
//                        Lock sessionLock = getSessionLock(session.getId());
//                        sessionLock.lock();
//                        try {
//                            schedulePushMessage(session, message);
//                        } catch (Exception e) {
//                            e.printStackTrace();
//                            closeSession(session, "发送消息异常,断开链接");
//                            log.error("发送消息时出现异常: {}", e.getMessage());
//                        } finally {
//                            sessionLock.unlock();
//                        }
//                    } else {
//                        closeSession(session, "会话不存在或已关闭");
//                        log.error("会话不存在或已关闭,无法发送消息");
//                    }
//                } catch (Exception e) {
//                    log.error("处理消息失败: {}", e.getMessage());
//                }
//            }, threadPoolTaskExecutor));
//        });
//
//        // 等待所有任务执行完成
//        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
//    }
    public void sendMessageToAll(String message) {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        // 收集所有活动的会话
        List<Session> activeSessions = new ArrayList<>();
        wsServers.forEach(ws -> {
            Session session = ws.session;
            if (session != null && session.isOpen()) {
                activeSessions.add(session);
            } else {
                closeSession(session, "会话不存在或已关闭");
                log.error("会话不存在或已关闭,无法发送消息");
            }
        });
        // 并发处理所有活动的会话
        activeSessions.forEach(session -> {
            futures.add(CompletableFuture.runAsync(() -> {
                Lock sessionLock = getSessionLock(session.getId());
                sessionLock.lock();
                try {
                    Session session = ws.session;
                    if (session != null && session.isOpen()) {
                        Lock sessionLock = getSessionLock(session.getId());
                        sessionLock.lock();
                        try {
                            schedulePushMessage(session, message);
                        } catch (Exception e) {
                            e.printStackTrace();
                            closeSession(session, "发送消息异常,断开链接");
                            log.error("发送消息时出现异常: {}", e.getMessage());
                        } finally {
                            sessionLock.unlock();
                        }
                    } else {
                        closeSession(session, "会话不存在或已关闭");
                        log.error("会话不存在或已关闭,无法发送消息");
                    }
                    schedulePushMessage(session, message);
                } catch (Exception e) {
                    log.error("处理消息失败: {}", e.getMessage());
                    log.error("发送消息时出现异常: {}", e.getMessage());
                    closeSession(session, "发送消息异常,断开链接");
                } finally {
                    sessionLock.unlock();
                }
            }, threadPoolTaskExecutor));
        });
        // 等待所有任务执行完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        try {
            allFutures.join(); // 等待所有任务完成
        } finally {
            futures.clear(); // 清理未使用的 futures 列表
        }
    }
    @PreDestroy
    public void shutdownExecutor() {
        threadPoolTaskExecutor.shutdown();
    }
    private WsBo getWsBoForSession(String sessionId) {
@@ -218,10 +262,7 @@
        }
    }
    @PreDestroy
    public void shutdownExecutor() {
        threadPoolTaskExecutor.shutdown();
    }
    private static final Gson gson = new Gson();
    private String megFiltration(WsBo wsBo,String message) throws JsonProcessingException {
@@ -358,10 +399,16 @@
        }
    }
    // 关闭会话的方法
    private void closeSession(Session session, String reason) {
        wsServers.remove(this);
        log.info(reason);
        onClose();
        try {
            if (session != null && session.isOpen()) {
                wsServers.remove(this);
                log.info(reason);
                session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, reason));
            }
        } catch (IOException e) {
            log.error("关闭会话时出现异常: {}", e.getMessage());
        }
    }
}