package com.nq.ws; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import com.nq.enums.EStockType; import com.nq.pojo.StockRealTimeBean; import com.nq.service.IMandatoryLiquidationService; import com.nq.service.impl.MandatoryLiquidationService; import com.nq.utils.ApplicationContextRegisterUtil; import com.nq.utils.redis.RedisKeyUtil; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import org.springframework.context.ApplicationContext; import java.lang.reflect.Type; import java.net.URI; import java.util.HashMap; import java.util.Map; @Slf4j public class WebsocketRunClient extends WebSocketClient { private EStockType eStockType; public WebsocketRunClient(URI serverUri, EStockType eStockType) { super(serverUri); this.eStockType = eStockType; } @Override public void onOpen(ServerHandshake serverHandshake) { log.info("WebSocket连接已建立,服务器: {}", getURI()); // 根据新服务器的协议格式发送认证或订阅消息 String subscribeMessage = buildSubscribeMessage(); send(subscribeMessage); log.info("已发送订阅消息: {}", subscribeMessage); } @Override public void onMessage(String message) { try { log.debug("收到WebSocket消息: {}", message); ApplicationContext act = ApplicationContextRegisterUtil.getApplicationContext(); MandatoryLiquidationService liquidationService = (MandatoryLiquidationService) act.getBean(IMandatoryLiquidationService.class); // 根据新服务器的数据格式解析消息 StockRealTimeBean stockDetailBean = parseMessage(message); if (stockDetailBean != null) { liquidationService.RealTimeDataProcess(eStockType, stockDetailBean); RedisKeyUtil.setCacheRealTimeStock(eStockType, stockDetailBean); } } catch (Exception e) { log.error("处理WebSocket消息时发生错误: {}", e.getMessage(), e); } } @Override public void onClose(int code, String reason, boolean remote) { log.info("WebSocket连接关闭 - 代码: {}, 原因: {}, 远程关闭: {}", code, reason, remote); log.info("股票类型: {}", eStockType); } @Override public void onError(Exception e) { log.error("WebSocket连接错误: {}", e.getMessage(), e); } /** * 构建订阅消息 - 根据新服务器的协议格式调整 */ private String buildSubscribeMessage() { // 根据新服务器的API文档调整消息格式 // 示例格式,需要根据实际API调整 // 使用 HashMap Map subscribeMsg = new HashMap<>(); subscribeMsg.put("action", "subscribe"); subscribeMsg.put("key", eStockType.getStockKey()); subscribeMsg.put("country", eStockType.getContryId()); subscribeMsg.put("type", "stock_realtime"); return new Gson().toJson(subscribeMsg); } /** * 解析消息 - 根据新服务器的数据格式调整 */ private StockRealTimeBean parseMessage(String message) { try { // 首先将消息解析为Map查看结构 Map messageMap = jsonToMap(message); log.debug("消息结构: {}", messageMap); // 根据实际的数据结构进行转换 // 这里需要根据新服务器的数据格式调整解析逻辑 // 如果是直接符合StockRealTimeBean结构的消息 return new Gson().fromJson(message, StockRealTimeBean.class); } catch (Exception e) { log.error("解析消息失败: {}, 原始消息: {}", e.getMessage(), message); return null; } } public static Map jsonToMap(String json) { Gson gson = new Gson(); Type type = new TypeToken>(){}.getType(); return gson.fromJson(json, type); } }