1
zj
2024-10-21 3a4ca606fad5d286e8b0de99f39ffbea8ef3cc21
1
3 files modified
102 ■■■■■ changed files
mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java 2 ●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/ThreadConfig/MarkConfiguration.java 29 ●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java 71 ●●●● patch | view | raw | blame | history
mexcClient/src/main/java/org/example/mexcclient/WsBean/MexcWsBean.java
@@ -53,7 +53,7 @@
    @Bean
    public void mexcWebsocketRunClientMap() throws InterruptedException {
        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc"));
        List<Currency> mexc = currencyService.list(new LambdaQueryWrapper<Currency>().eq(Currency::getSource, "mexc").eq(Currency::getBaseAsset,"USBT"));
        if (!CollectionUtils.isEmpty(mexc)) {
            int batchSize = 30; // 每个线程处理的数据量
            int totalSize = mexc.size();
websocketSerivce/src/main/java/org/example/ThreadConfig/MarkConfiguration.java
@@ -1,5 +1,6 @@
package org.example.ThreadConfig;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@@ -12,25 +13,25 @@
 * @description: 线程池配置
 * @create: 2024-06-25 16:37
 **/
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import javax.annotation.PreDestroy;
@Configuration
@EnableAsync
public class MarkConfiguration {
    @Bean(name = "markthreadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(100);    // 核心线程数, 根据需求进行调整
        executor.setMaxPoolSize(150);    // 最大线程数, 适当设置以避免资源耗尽
        executor.setQueueCapacity(200);    // 队列容量, 适当限制以避免请求堆积
        executor.setKeepAliveSeconds(30);    // 线程空闲时的存活时间为30秒,减少系统开销
        executor.setThreadNamePrefix("wsThread-");    // 线程名称的前缀
        // 使用 CallerRunsPolicy 拒绝策略,以减少任务被拒绝时带来的负担
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 初始化线程池,配置其他参数(不过可以根据需要添加)
        executor.initialize(); // 明确初始化,提升代码可读性
        return executor; // 返回配置好的线程池
        executor.setCorePoolSize(100);
        executor.setMaxPoolSize(150);
        executor.setQueueCapacity(200);
        executor.setKeepAliveSeconds(30);
        executor.setThreadNamePrefix("wsThread-");
        executor.initialize();
        return executor;
    }
}
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
@@ -176,9 +176,9 @@
//        // 等待所有任务执行完成
//        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
//    }
    public void sendMessageToAll(String message) {
        List<CompletableFuture<Void>> futures = new ArrayList<>();
        List<Future<Void>> futures = new ArrayList<>();
        ExecutorService executorService = Executors.newFixedThreadPool(100); // 使用固定大小的线程池
        // 收集所有活动的会话
        List<Session> activeSessions = new ArrayList<>();
@@ -193,33 +193,66 @@
        });
        // 并发处理所有活动的会话
        activeSessions.forEach(session -> {
            futures.add(CompletableFuture.runAsync(() -> {
        for (Session session : activeSessions) {
            Future<Void> future = executorService.submit(() -> {
                Lock sessionLock = getSessionLock(session.getId());
                sessionLock.lock();
                try {
                    schedulePushMessage(session, message);
                } catch (Exception e) {
                    log.error("发送消息时出现异常: {}", e.getMessage());
                    closeSession(session, "发送消息异常,断开链接");
                } finally {
                    sessionLock.unlock();
                    if (sessionLock.tryLock(100, TimeUnit.MILLISECONDS)) {
                        try {
                            schedulePushMessage(session, message);
                        } finally {
                            sessionLock.unlock(); // 确保锁只在这里释放
                        }
                    } else {
                        log.error("无法获取锁,放弃对会话 {} 的操作", session.getId());
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt(); // 重新设置中断状态
                    log.error("线程被中断: {}", e.getMessage());
                } catch (JsonProcessingException e) {
                    log.error("JSON处理异常: {}", e.getMessage());
                }
            }, threadPoolTaskExecutor));
        });
                return null; // 需要返回一个值,Void 类型的
            });
        CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
        try {
            allFutures.join(); // 等待所有任务完成
        } finally {
            futures.clear(); // 清理未使用的 futures 列表
            futures.add(future);
        }
        // 等待所有任务完成或超时
        for (Future<Void> future : futures) {
            try {
                future.get(60, TimeUnit.SECONDS); // 指定超时时间
            } catch (TimeoutException e) {
                log.error("某个任务超时,可能未能完成: {}", e.getMessage());
                // 这里可以选择是否取消该任务
                future.cancel(true); // 取消任务,设置为 true 则会中断正在执行的线程
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt(); // 重新设置中断状态
                log.error("线程被中断: {}", e.getMessage());
            } catch (ExecutionException e) {
                log.error("处理会话时发生异常: {}", e.getCause().getMessage());
            }
        }
        // 关闭线程池
        executorService.shutdown();
    }
    @PreDestroy
    public void shutdownExecutor() {
        threadPoolTaskExecutor.shutdown();
        threadPoolTaskExecutor.shutdown(); // 先关闭线程池
        try {
            // 等待现有任务在指定时间内完成
            if (!threadPoolTaskExecutor.getThreadPoolExecutor().awaitTermination(60, TimeUnit.SECONDS)) {
                // 如果未能完成,则强制关闭
                threadPoolTaskExecutor.getThreadPoolExecutor().shutdownNow();
            }
        } catch (InterruptedException e) {
            // 如果当前线程被中断,则强制关闭
            threadPoolTaskExecutor.getThreadPoolExecutor().shutdownNow();
            Thread.currentThread().interrupt(); // 还原中断状态
        }
    }
    private WsBo getWsBoForSession(String sessionId) {