1
zj
2024-08-05 a522f170fe22ffeb03e2b6bc81ae40243fb4cecb
websocketSerivce/src/main/java/org/example/websocket/server/WsServer.java
@@ -1,5 +1,6 @@
package org.example.websocket.server;
import cn.hutool.core.util.NumberUtil;
import cn.hutool.json.JSONUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
@@ -9,6 +10,7 @@
import com.google.gson.GsonBuilder;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.example.pojo.ConfigCurrency;
import org.example.pojo.MarketDataOut;
import org.example.pojo.bo.WsBo;
@@ -143,6 +145,8 @@
                        try {
                            schedulePushMessage(session, message);
                        } catch (Exception e) {
                            e.printStackTrace();
                            closeSession(session, "发送消息异常,断开链接");
                            log.error("发送消息时出现异常: {}", e.getMessage());
                        } finally {
                            sessionLock.unlock();
@@ -172,6 +176,7 @@
            long currentTime = System.currentTimeMillis();
            long lastMessageTime = lastMessageTimeMap.getOrDefault(session, 0L);
            int time = wsBo.getTime();
            message = megFiltration(wsBo,message);
            if (currentTime - lastMessageTime >= time * 1000) {
                // 时间间隔达到要求,可以发送消息
@@ -179,14 +184,14 @@
                lastMessageTimeMap.put(session, currentTime); // 更新最后发送时间
            } else {
                // 时间间隔未达到,不发送消息,可以记录日志或者其他操作
                log.info("距离上次发送消息时间未达到指定间隔,不发送消息。");
//                log.info("距离上次发送消息时间未达到指定间隔,不发送消息。");
            }
        }
    }
    private static final Gson gson = new Gson();
    private String megFiltration(WsBo wsBo,String message) throws JsonProcessingException {
        List<MarketDataOut> redisValueMap = gson.fromJson(message, new TypeToken<List<MarketDataOut>>() {}.getType());
        Map<String,Object> map = new HashMap<>();
        String key = "config_";
        String value = RedisUtil.get(key + wsBo.getUserId());
        List<ConfigCurrency> currencies = null;
@@ -203,7 +208,7 @@
        //查询币种
        if(null != wsBo.getCurrency()){
        if(StringUtils.isNotEmpty(wsBo.getCurrency())){
            redisValueMap = redisValueMap.stream()
                    .filter(data -> wsBo.getCurrency().equals(data.getBaseAsset()))
                    .collect(Collectors.toList());
@@ -242,10 +247,36 @@
                    .filter(data -> list.contains(data.getBuyAndSell()))
                    .forEach(data -> data.setMarker(true));
        }
        map.put("current",wsBo.getCurrent());
        map.put("sizes",wsBo.getSizes());
        map.put("total",redisValueMap.size());
        sortBySpread(redisValueMap);
        Integer current = 0;
        if(wsBo.getCurrent() != 1){
            current = (wsBo.getCurrent() - 1) * wsBo.getSizes();
        }
        // 确保 startIndex 在有效范围内
        current = Math.min(current, redisValueMap.size());
        // 计算子列表的结束索引
        int endIndex = Math.min(current + wsBo.getSizes(), redisValueMap.size());
        // 根据计算出的索引获取子列表
        redisValueMap = redisValueMap.subList(current, endIndex);
        map.put("data",redisValueMap);
        Gson gson = new GsonBuilder().setPrettyPrinting().create();
        String json = gson.toJson(redisValueMap);
        String json = gson.toJson(map);
        return json;
    }
    public static void sortBySpread(List<MarketDataOut> marketDataList) {
        Collections.sort(marketDataList, new Comparator<MarketDataOut>() {
            @Override
            public int compare(MarketDataOut a, MarketDataOut b) {
                BigDecimal spreadA = new BigDecimal(a.getSpread());
                BigDecimal spreadB = new BigDecimal(b.getSpread());
                return spreadB.compareTo(spreadA);
            }
        });
    }
    private void pushMessage(Session session, String message) {
        try {