| | |
| | | import com.google.gson.reflect.TypeToken; |
| | | import lombok.NonNull; |
| | | import lombok.extern.slf4j.Slf4j; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.beans.factory.annotation.Qualifier; |
| | | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
| | | import org.springframework.stereotype.Component; |
| | | import org.springframework.web.bind.annotation.PostMapping; |
| | | import org.springframework.web.bind.annotation.RequestBody; |
| | |
| | | import javax.websocket.server.ServerEndpoint; |
| | | import java.io.IOException; |
| | | import java.lang.reflect.Type; |
| | | import java.text.SimpleDateFormat; |
| | | import java.util.Date; |
| | | import java.util.HashMap; |
| | | import java.util.Map; |
| | | import java.util.concurrent.CopyOnWriteArrayList; |
| | | import java.util.concurrent.*; |
| | | import java.util.concurrent.atomic.AtomicInteger; |
| | | import java.util.concurrent.locks.Lock; |
| | | import java.util.concurrent.locks.ReentrantLock; |
| | | |
| | | /** |
| | | * @ClassDescription: websocket服务端 |
| | |
| | | |
| | | private Session session; |
| | | private static AtomicInteger onlineCount = new AtomicInteger(0); |
| | | private static CopyOnWriteArrayList<WsServer> wsServers = new CopyOnWriteArrayList<>(); |
| | | private static CopyOnWriteArraySet<WsServer> wsServers = new CopyOnWriteArraySet<>(); |
| | | @Autowired |
| | | @Qualifier("threadPoolTaskExecutor") |
| | | private ThreadPoolTaskExecutor threadPoolTaskExecutor; |
| | | |
| | | @OnOpen |
| | | public void onOpen(Session session) { |
| | |
| | | log.info("服务端断开连接,当前连接的客户端数量为:{}", count); |
| | | } |
| | | |
| | | @OnMessage |
| | | public void sendMessage(String message) throws IOException { |
| | | private Map<String, Lock> sessionLocks = new ConcurrentHashMap<>(); |
| | | |
| | | private Lock getSessionLock(String sessionId) { |
| | | sessionLocks.putIfAbsent(sessionId, new ReentrantLock()); |
| | | return sessionLocks.get(sessionId); |
| | | } |
| | | |
| | | public void sendMessageToAll(String message) { |
| | | Map<String, Object> map = jsonToMap(message); |
| | | if(map.get("pid").equals("00000001")){ |
| | | if (map.get("pid").equals("00000001")) { |
| | | System.out.println(message); |
| | | } |
| | | try { |
| | | if (session.isOpen()) { |
| | | this.session.getBasicRemote().sendText(message); |
| | | } |
| | | } catch (IOException e) { |
| | | throw new IOException("消息发送失败", e); |
| | | } |
| | | } |
| | | |
| | | public void sendMessageToAll(String message) throws IOException { |
| | | for (WsServer wsServer : wsServers) { |
| | | wsServer.sendMessage(message); |
| | | } |
| | | } |
| | | |
| | | |
| | | @PostMapping("/send2AllC") |
| | | public void sendMessageToAll1(@RequestBody String message) throws IOException { |
| | | CopyOnWriteArrayList<WsServer> ws = wsServers; |
| | | for (WsServer wsServer : ws){ |
| | | wsServer.sendMessage(message); |
| | | wsServers.forEach(ws -> { |
| | | threadPoolTaskExecutor.execute(() -> { |
| | | Session session = ws.session; |
| | | if (session != null && session.isOpen()) { |
| | | Lock sessionLock = getSessionLock(session.getId()); |
| | | sessionLock.lock(); |
| | | try { |
| | | synchronized (session){ |
| | | session.getAsyncRemote().sendText(message); |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("发送消息时出现异常: " + e.getMessage()); |
| | | } finally { |
| | | sessionLock.unlock(); |
| | | } |
| | | } else { |
| | | log.error("会话不存在或已关闭,无法发送消息"); |
| | | } |
| | | }); |
| | | }); |
| | | } catch (Exception e) { |
| | | log.error("发送消息时出现异常: " + e.getMessage()); |
| | | } |
| | | } |
| | | |
| | | public static Map<String, Object> jsonToMap(String json) { |
| | | Gson gson = new Gson(); |
| | | Type type = new TypeToken<Map<String, Object>>(){}.getType(); |
| | | Type type = new TypeToken<Map<String, Object>>() { |
| | | }.getType(); |
| | | return gson.fromJson(json, type); |
| | | } |
| | | |
| | | } |
| | | |