package com.nq.ws;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.google.gson.Gson;
|
import com.google.gson.reflect.TypeToken;
|
import com.nq.enums.EStockType;
|
import com.nq.pojo.StockRealTimeBean;
|
import com.nq.service.IMandatoryLiquidationService;
|
import com.nq.service.impl.MandatoryLiquidationService;
|
import com.nq.utils.ApplicationContextRegisterUtil;
|
import com.nq.utils.redis.RedisKeyUtil;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.http.HttpResponse;
|
import org.apache.http.client.HttpClient;
|
import org.apache.http.client.entity.UrlEncodedFormEntity;
|
import org.apache.http.client.methods.HttpPost;
|
import org.apache.http.impl.client.HttpClients;
|
import org.apache.http.message.BasicNameValuePair;
|
import org.java_websocket.client.WebSocketClient;
|
import org.java_websocket.handshake.ServerHandshake;
|
import org.springframework.context.ApplicationContext;
|
|
import java.io.IOException;
|
import java.lang.reflect.Type;
|
import java.net.URI;
|
import java.nio.charset.StandardCharsets;
|
import java.util.*;
|
|
@Slf4j
|
public class WebsocketRunClient extends WebSocketClient {
|
|
private EStockType eStockType;
|
|
public WebsocketRunClient(URI serverUri, EStockType eStockType) {
|
// 修改为新的WebSocket服务器地址
|
super(URI.create("wss://usws.yanshiz.com/websocket-server"));
|
this.eStockType = eStockType;
|
}
|
|
|
@Override
|
public void onOpen(ServerHandshake serverHandshake) {
|
log.info("WebSocket连接已建立,连接到: wss://usws.yanshiz.com/websocket-server");
|
// 发送身份验证消息
|
send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
|
Timer heartbeatTimer;
|
// 启动心跳定时器
|
heartbeatTimer = new Timer();
|
heartbeatTimer.schedule(new TimerTask() {
|
@Override
|
public void run() {
|
if (isOpen()) {
|
send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes());
|
}
|
}
|
}, 0, 3000); // 每3秒发送一次心跳消息
|
}
|
|
@Override
|
public void onMessage(String s) {
|
if(!s.equals("pong") && !s.equals("身份验证成功")){
|
try {
|
Map<String, String> stringObjectMap = jsonToMap(s);
|
StockRealTimeBean stockRealTimeBean = new StockRealTimeBean();
|
stockRealTimeBean.setPid(stringObjectMap.get("Id"));
|
stockRealTimeBean.setLast(stringObjectMap.get("Last"));
|
stockRealTimeBean.setBid(stringObjectMap.get("Bid"));
|
stockRealTimeBean.setAsk(stringObjectMap.get("Ask"));
|
stockRealTimeBean.setHigh(stringObjectMap.get("High"));
|
stockRealTimeBean.setLow(stringObjectMap.get("Low"));
|
stockRealTimeBean.setPc(stringObjectMap.get("Chg"));
|
stockRealTimeBean.setPcp(stringObjectMap.get("ChgPct") + "%");
|
stockRealTimeBean.setTime(stringObjectMap.get("Time"));
|
|
RedisKeyUtil.setCacheRealTimeStock(EStockType.US, stockRealTimeBean);
|
|
ObjectMapper objectMapper = new ObjectMapper();
|
if(!stockRealTimeBean.getPcp().contains("-")){
|
stockRealTimeBean.setPcp("+" + stringObjectMap.get("ChgPct") + "%");
|
}
|
|
StockRealTimeBean stockDetailBean = new Gson().fromJson(s, StockRealTimeBean.class);
|
RedisKeyUtil.setCacheRealTimeStock(EStockType.US, stockDetailBean);
|
|
} catch (Exception e) {
|
log.error("处理WebSocket消息时发生错误: {}", e.getMessage(), e);
|
}
|
|
} else {
|
log.info("WebSocket心跳或认证响应: {}", s);
|
}
|
}
|
|
public static Map<String, String> jsonToMap(String json) {
|
Gson gson = new Gson();
|
Type type = new TypeToken<Map<String, String>>(){}.getType();
|
return gson.fromJson(json, type);
|
}
|
|
@Override
|
public void onClose(int i, String s, boolean b) {
|
log.info("WebSocket连接已关闭,代码: {}, 原因: {}, 远程关闭: {}", i, s, b);
|
}
|
|
@Override
|
public void onError(Exception e) {
|
log.error("WebSocket连接发生错误: {}", e.getMessage(), e);
|
}
|
}
|