1
zj
2024-10-17 708e35e4df613836db1289820d82ab0377bf0442
1
2 files modified
110 ■■■■ changed files
websocketSerivce/src/main/java/org/example/ThreadConfig/MarkConfiguration.java 3 ●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java 107 ●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/ThreadConfig/MarkConfiguration.java
@@ -4,6 +4,7 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import javax.annotation.PreDestroy;
import java.util.concurrent.ThreadPoolExecutor;
/**
@@ -22,7 +23,7 @@
        executor.setMaxPoolSize(150);    // 最大线程数, 适当设置以避免资源耗尽
        executor.setQueueCapacity(200);    // 队列容量, 适当限制以避免请求堆积
        executor.setKeepAliveSeconds(30);    // 线程空闲时的存活时间为30秒,减少系统开销
        executor.setThreadNamePrefix("Thread-");    // 线程名称的前缀
        executor.setThreadNamePrefix("wsThread-");    // 线程名称的前缀
        // 使用 CallerRunsPolicy 拒绝策略,以减少任务被拒绝时带来的负担
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
@@ -52,7 +52,7 @@
    private static final Map<String, WsBo> threadLocalData = new ConcurrentHashMap<>();
    @Autowired
    @Qualifier("threadPoolTaskExecutor")
    @Qualifier("markthreadPoolTaskExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    // 定义常量:任务检查的超时时间(秒)
@@ -94,7 +94,6 @@
    @OnError
    public void onError(Session session, @NonNull Throwable throwable) {
        onClose();
        onlineCount.decrementAndGet();
        log.error("连接发生报错: {}", throwable.getMessage());
        throwable.printStackTrace();
    }
@@ -146,36 +145,81 @@
        return sessionLocks.get(sessionId);
    }
//    public void sendMessageToAll(String message) {
//        List<CompletableFuture<Void>> futures = new ArrayList<>();
//        wsServers.forEach(ws -> {
//            futures.add(CompletableFuture.runAsync(() -> {
//                try {
//                    Session session = ws.session;
//                    if (session != null && session.isOpen()) {
//                        Lock sessionLock = getSessionLock(session.getId());
//                        sessionLock.lock();
//                        try {
//                            schedulePushMessage(session, message);
//                        } catch (Exception e) {
//                            e.printStackTrace();
//                            closeSession(session, "发送消息异常,断开链接");
//                            log.error("发送消息时出现异常: {}", e.getMessage());
//                        } finally {
//                            sessionLock.unlock();
//                        }
//                    } else {
//                        closeSession(session, "会话不存在或已关闭");
//                        log.error("会话不存在或已关闭,无法发送消息");
//                    }
//                } catch (Exception e) {
//                    log.error("处理消息失败: {}", e.getMessage());
//                }
//            }, threadPoolTaskExecutor));
//        });
//
//        // 等待所有任务执行完成
//        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
//    }
    public void sendMessageToAll(String message) {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        // 收集所有活动的会话
        List<Session> activeSessions = new ArrayList<>();
        wsServers.forEach(ws -> {
            Session session = ws.session;
            if (session != null && session.isOpen()) {
                activeSessions.add(session);
            } else {
                closeSession(session, "会话不存在或已关闭");
                log.error("会话不存在或已关闭,无法发送消息");
            }
        });
        // 并发处理所有活动的会话
        activeSessions.forEach(session -> {
            futures.add(CompletableFuture.runAsync(() -> {
                Lock sessionLock = getSessionLock(session.getId());
                sessionLock.lock();
                try {
                    Session session = ws.session;
                    if (session != null && session.isOpen()) {
                        Lock sessionLock = getSessionLock(session.getId());
                        sessionLock.lock();
                        try {
                            schedulePushMessage(session, message);
                        } catch (Exception e) {
                            e.printStackTrace();
                            closeSession(session, "发送消息异常,断开链接");
                            log.error("发送消息时出现异常: {}", e.getMessage());
                        } finally {
                            sessionLock.unlock();
                        }
                    } else {
                        closeSession(session, "会话不存在或已关闭");
                        log.error("会话不存在或已关闭,无法发送消息");
                    }
                    schedulePushMessage(session, message);
                } catch (Exception e) {
                    log.error("处理消息失败: {}", e.getMessage());
                    log.error("发送消息时出现异常: {}", e.getMessage());
                    closeSession(session, "发送消息异常,断开链接");
                } finally {
                    sessionLock.unlock();
                }
            }, threadPoolTaskExecutor));
        });
        // 等待所有任务执行完成
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        try {
            allFutures.join(); // 等待所有任务完成
        } finally {
            futures.clear(); // 清理未使用的 futures 列表
        }
    }
    @PreDestroy
    public void shutdownExecutor() {
        threadPoolTaskExecutor.shutdown();
    }
    private WsBo getWsBoForSession(String sessionId) {
@@ -218,10 +262,7 @@
        }
    }
    @PreDestroy
    public void shutdownExecutor() {
        threadPoolTaskExecutor.shutdown();
    }
    private static final Gson gson = new Gson();
    private String megFiltration(WsBo wsBo,String message) throws JsonProcessingException {
@@ -358,10 +399,16 @@
        }
    }
    // 关闭会话的方法
    private void closeSession(Session session, String reason) {
        wsServers.remove(this);
        log.info(reason);
        onClose();
        try {
            if (session != null && session.isOpen()) {
                wsServers.remove(this);
                log.info(reason);
                session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE, reason));
            }
        } catch (IOException e) {
            log.error("关闭会话时出现异常: {}", e.getMessage());
        }
    }
}