zyy
2025-07-24 ec72fc1827d3e46a99d302dd3c9ca61411c70943
src/main/java/com/nq/ws/WebsocketRunClient.java
@@ -6,9 +6,6 @@
import com.google.gson.reflect.TypeToken;
import com.nq.enums.EStockType;
import com.nq.pojo.StockRealTimeBean;
import com.nq.service.IMandatoryLiquidationService;
import com.nq.service.impl.MandatoryLiquidationService;
import com.nq.utils.ApplicationContextRegisterUtil;
import com.nq.utils.redis.RedisKeyUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
@@ -49,43 +46,51 @@
    @Override
    public void onOpen(ServerHandshake serverHandshake) {
        send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
//        Timer heartbeatTimer;
//        // 启动心跳定时器
//        heartbeatTimer = new Timer();
//        heartbeatTimer.schedule(new TimerTask() {
//            @Override
//            public void run() {
//                send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
//            }
//        }, 0, 3000); // 每3秒发送一次心跳消息
        Timer heartbeatTimer;
        // 启动心跳定时器
        heartbeatTimer = new Timer();
        heartbeatTimer.schedule(new TimerTask() {
            @Override
            public void run() {
                if (isOpen()) {
                    send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
                }
            }
        }, 0, 3000); // 每3秒发送一次心跳消息
    }
    @Override
    public void onMessage(String s) {
        if(!s.equals("pong") && !s.equals("身份验证成功")){
            Map<String, String> stringObjectMap = jsonToMap(s);
            StockRealTimeBean stockRealTimeBean = new StockRealTimeBean();
            stockRealTimeBean.setPid(stringObjectMap.get("Id").toString());
            stockRealTimeBean.setLast(stringObjectMap.get("Last").toString());
            stockRealTimeBean.setBid(stringObjectMap.get("Bid").toString());
            stockRealTimeBean.setAsk(stringObjectMap.get("Ask").toString());
            stockRealTimeBean.setHigh(stringObjectMap.get("High").toString());
            stockRealTimeBean.setLow(stringObjectMap.get("Low").toString());
            stockRealTimeBean.setPc(stringObjectMap.get("Chg").toString());
            stockRealTimeBean.setPcp(stringObjectMap.get("ChgPct").toString()+"%");
            stockRealTimeBean.setTime(stringObjectMap.get("Time").toString());
            RedisKeyUtil.setCacheRealTimeStock(EStockType.IN,stockRealTimeBean);
            ObjectMapper objectMapper = new ObjectMapper();
            try {
                if(!stockRealTimeBean.getPcp().contains("-")){
                    stockRealTimeBean.setPcp("+"+stringObjectMap.get("ChgPct").toString()+"%");
                }
                String json = objectMapper.writeValueAsString(stockRealTimeBean);
                sendLoca(json);
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
    public void onMessage(String message) {
        if (message.contains("身份验证成功") || message.contains("pong") || message.contains("身份验证失败") || message.contains("ws连接点只能有一个")) {
            System.out.println("us" + message);
            return;
        }
        System.out.println("us2" + message);
        Map<String, String> stringObjectMap = jsonToMap(message);
        StockRealTimeBean stockRealTimeBean = new StockRealTimeBean();
        stockRealTimeBean.setPid(stringObjectMap.get("Id").toString());
        stockRealTimeBean.setLast(stringObjectMap.get("Last").toString());
        stockRealTimeBean.setBid(stringObjectMap.get("Bid").toString());
        stockRealTimeBean.setAsk(stringObjectMap.get("Ask").toString());
        stockRealTimeBean.setHigh(stringObjectMap.get("High").toString());
        stockRealTimeBean.setLow(stringObjectMap.get("Low").toString());
        stockRealTimeBean.setPc(stringObjectMap.get("Chg").toString());
        stockRealTimeBean.setPcp(stringObjectMap.get("ChgPct").toString()+"%");
        stockRealTimeBean.setTime(stringObjectMap.get("Time").toString());
        RedisKeyUtil.setCacheRealTimeStock(EStockType.US,stockRealTimeBean);
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            if(!stockRealTimeBean.getPcp().contains("-")){
                stockRealTimeBean.setPcp("+"+stringObjectMap.get("ChgPct").toString()+"%");
            }
            String json = objectMapper.writeValueAsString(stockRealTimeBean);
            sendLoca(json);
            StockRealTimeBean stockDetailBean =  new Gson().fromJson(message, StockRealTimeBean.class);
            System.out.println(stockDetailBean);
            RedisKeyUtil.setCacheRealTimeStock(EStockType.US,stockDetailBean);
        } catch (JsonProcessingException e) {
            log.error("websocket 美国股票 消息错误:{}", e.getMessage());
        }
    }
@@ -97,12 +102,12 @@
    @Override
    public void onClose(int i, String s, boolean b) {
        log.info("websocket  印度股票  关闭"+1);
        log.info("websocket 美国股票 关闭 {} ", i);
    }
    @Override
    public void onError(Exception e) {
        log.info("websocket 错误");
        log.info("websocket 美国股票 错误{}", e.getMessage());
    }
    public void sendLoca(String message) {