1
zj
2024-06-28 42c55aa0fb937c45d28f02bed25dd93888d5a6e0
1
1 files modified
2 files added
130 ■■■■ changed files
websocketClient/src/main/java/org/example/config/AsyncConfiguration.java 28 ●●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/websocket/config/AsyncConfiguration.java 28 ●●●●● patch | view | raw | blame | history
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java 74 ●●●●● patch | view | raw | blame | history
websocketClient/src/main/java/org/example/config/AsyncConfiguration.java
New file
@@ -0,0 +1,28 @@
package org.example.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
 * @program: dabaogp
 * @description:
 * @create: 2024-06-25 16:37
 **/
@Configuration
public class AsyncConfiguration {
    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(50);    //  核心线程数
        executor.setMaxPoolSize(100);    //  最大线程数
        executor.setQueueCapacity(300);    //  队列容量
        executor.setKeepAliveSeconds(60);    //  线程空闲时的存活时间为60秒
        executor.setThreadNamePrefix("MyThread-");    //  线程名称的前缀
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    //  使用  CallerRunsPolicy  拒绝策略
        return executor;
    }
}
websocketSerivce/src/main/java/org/example/websocket/config/AsyncConfiguration.java
New file
@@ -0,0 +1,28 @@
package org.example.websocket.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
 * @program: dabaogp
 * @description:
 * @create: 2024-06-25 16:37
 **/
@Configuration
public class AsyncConfiguration {
    @Bean(name = "threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);    //  核心线程数
        executor.setMaxPoolSize(100);    //  最大线程数
        executor.setQueueCapacity(300);    //  队列容量
        executor.setKeepAliveSeconds(60);    //  线程空闲时的存活时间为60秒
        executor.setThreadNamePrefix("MyThread-");    //  线程名称的前缀
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());    //  使用  CallerRunsPolicy  拒绝策略
        return executor;
    }
}
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
@@ -4,6 +4,9 @@
import com.google.gson.reflect.TypeToken;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
@@ -12,9 +15,14 @@
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.lang.reflect.Type;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
 * @ClassDescription: websocket服务端
@@ -28,7 +36,10 @@
    private Session session;
    private static AtomicInteger onlineCount = new AtomicInteger(0);
    private static CopyOnWriteArrayList<WsServer> wsServers = new CopyOnWriteArrayList<>();
    private static CopyOnWriteArraySet<WsServer> wsServers = new CopyOnWriteArraySet<>();
    @Autowired
    @Qualifier("threadPoolTaskExecutor")
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    @OnOpen
    public void onOpen(Session session) {
@@ -51,41 +62,48 @@
        log.info("服务端断开连接,当前连接的客户端数量为:{}", count);
    }
    @OnMessage
    public void sendMessage(String message) throws IOException {
    private Map<String, Lock> sessionLocks = new ConcurrentHashMap<>();
    private Lock getSessionLock(String sessionId) {
        sessionLocks.putIfAbsent(sessionId, new ReentrantLock());
        return sessionLocks.get(sessionId);
    }
    public void sendMessageToAll(String message) {
        Map<String, Object> map = jsonToMap(message);
        if(map.get("pid").equals("00000001")){
        if (map.get("pid").equals("00000001")) {
            System.out.println(message);
        }
        try {
            if (session.isOpen()) {
                this.session.getBasicRemote().sendText(message);
            }
        } catch (IOException e) {
            throw new IOException("消息发送失败", e);
        }
    }
    public void sendMessageToAll(String message) throws IOException {
        for (WsServer wsServer : wsServers) {
            wsServer.sendMessage(message);
        }
    }
    @PostMapping("/send2AllC")
    public void sendMessageToAll1(@RequestBody String message) throws  IOException {
        CopyOnWriteArrayList<WsServer> ws = wsServers;
        for (WsServer wsServer : ws){
            wsServer.sendMessage(message);
            wsServers.forEach(ws -> {
                threadPoolTaskExecutor.execute(() -> {
                    Session session = ws.session;
                    if (session != null && session.isOpen()) {
                        Lock sessionLock = getSessionLock(session.getId());
                        sessionLock.lock();
                        try {
                            synchronized (session){
                                session.getAsyncRemote().sendText(message);
                            }
                        } catch (Exception e) {
                            log.error("发送消息时出现异常: " + e.getMessage());
                        } finally {
                            sessionLock.unlock();
                        }
                    } else {
                        log.error("会话不存在或已关闭,无法发送消息");
                    }
                });
            });
        } catch (Exception e) {
            log.error("发送消息时出现异常: " + e.getMessage());
        }
    }
    public static Map<String, Object> jsonToMap(String json) {
        Gson gson = new Gson();
        Type type = new TypeToken<Map<String, Object>>(){}.getType();
        Type type = new TypeToken<Map<String, Object>>() {
        }.getType();
        return gson.fromJson(json, type);
    }
}