From f07b9f6d18cd85123a01a0c11a76328427b30034 Mon Sep 17 00:00:00 2001
From: zj <1772600164@qq.com>
Date: Fri, 11 Oct 2024 23:27:55 +0800
Subject: [PATCH] Merge remote-tracking branch 'origin/master'
---
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java | 24 +++++++++++++++---------
1 files changed, 15 insertions(+), 9 deletions(-)
diff --git a/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java b/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
index 89459d6..17d4de6 100644
--- a/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
+++ b/websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
@@ -17,10 +17,13 @@
import org.example.util.RedisUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
+import javax.annotation.PreDestroy;
import javax.websocket.*;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
@@ -90,8 +93,8 @@
@OnError
public void onError(Session session, @NonNull Throwable throwable) {
- threadLocalData.remove(session.getId());
- wsServers.remove(this);
+ onClose();
+ onlineCount.decrementAndGet();
log.error("连接发生报错: {}", throwable.getMessage());
throwable.printStackTrace();
}
@@ -214,6 +217,12 @@
}
}
}
+
+ @PreDestroy
+ public void shutdownExecutor() {
+ threadPoolTaskExecutor.shutdown();
+ }
+
private static final Gson gson = new Gson();
private String megFiltration(WsBo wsBo,String message) throws JsonProcessingException {
List<MarketDataOut> redisValueMap = gson.fromJson(message, new TypeToken<List<MarketDataOut>>() {}.getType());
@@ -227,8 +236,10 @@
}
if (!CollectionUtils.isEmpty(currencies)) {
Set<String> filtrationSet = currencies.stream()
- .map(f -> f.getCurrency() + f.getBuy() + f.getSell())
+ .map(f -> f.getCurrency() + f.getBuy() + f.getSell()) //组合过滤 ,暂时不使用,直接过滤整个币种
+// .map(f -> f.getCurrency())
.collect(Collectors.toSet());
+// redisValueMap.removeIf(data -> filtrationSet.contains(data.getBaseAsset()));
redisValueMap.removeIf(data -> filtrationSet.contains(data.getBuyAndSell()));
}
@@ -349,13 +360,8 @@
// 关闭会话的方法
private void closeSession(Session session, String reason) {
- try {
- threadLocalData.remove(session.getId());
- session.close(new CloseReason(CloseReason.CloseCodes.UNEXPECTED_CONDITION, reason));
- } catch (IOException e) {
- log.error("强制断开连接----异常: {}", e.getMessage());
- }
wsServers.remove(this);
log.info(reason);
+ onClose();
}
}
--
Gitblit v1.9.3