package com.nq.ws;
|
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.google.gson.Gson;
|
import com.google.gson.reflect.TypeToken;
|
import com.nq.enums.EStockType;
|
import com.nq.pojo.StockRealTimeBean;
|
import com.nq.utils.redis.RedisKeyUtil;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.http.client.config.RequestConfig;
|
import org.apache.http.client.entity.UrlEncodedFormEntity;
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
import org.apache.http.client.methods.HttpPost;
|
import org.apache.http.impl.client.CloseableHttpClient;
|
import org.apache.http.impl.client.HttpClients;
|
import org.apache.http.message.BasicNameValuePair;
|
import org.apache.http.util.EntityUtils;
|
import org.java_websocket.client.WebSocketClient;
|
import org.java_websocket.handshake.ServerHandshake;
|
|
import java.io.IOException;
|
import java.lang.reflect.Type;
|
import java.net.URI;
|
import java.nio.charset.StandardCharsets;
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.Timer;
|
import java.util.TimerTask;
|
|
@Slf4j
|
public class WebsocketRunClient extends WebSocketClient {
|
|
private static final int HTTP_CONNECT_TIMEOUT_MS = 2000;
|
private static final int HTTP_SOCKET_TIMEOUT_MS = 3000;
|
|
private static final CloseableHttpClient HTTP_CLIENT = HttpClients.custom()
|
.setDefaultRequestConfig(RequestConfig.custom()
|
.setConnectTimeout(HTTP_CONNECT_TIMEOUT_MS)
|
.setConnectionRequestTimeout(HTTP_CONNECT_TIMEOUT_MS)
|
.setSocketTimeout(HTTP_SOCKET_TIMEOUT_MS)
|
.build())
|
.setMaxConnTotal(20)
|
.setMaxConnPerRoute(10)
|
.build();
|
|
private final EStockType eStockType;
|
private Timer heartbeatTimer;
|
private volatile boolean localNotifyEnabled = true;
|
|
public WebsocketRunClient(URI serverUri, EStockType eStockType) {
|
super(serverUri);
|
this.eStockType = eStockType;
|
}
|
|
private void startHeartbeat() {
|
stopHeartbeat();
|
heartbeatTimer = new Timer("ws-heartbeat-" + eStockType.getCode(), true);
|
heartbeatTimer.schedule(new TimerTask() {
|
@Override
|
public void run() {
|
if (!isOpen()) {
|
return;
|
}
|
try {
|
sendAuthKey();
|
} catch (Exception e) {
|
log.warn("websocket {} 心跳失败: {}", eStockType.getCode(), e.getMessage());
|
}
|
}
|
}, 0, 3000);
|
}
|
|
private void stopHeartbeat() {
|
if (heartbeatTimer != null) {
|
heartbeatTimer.cancel();
|
heartbeatTimer = null;
|
}
|
}
|
|
private void sendAuthKey() {
|
send(("key:" + eStockType.getStockKey() + ":" + eStockType.getContryId()).getBytes());
|
}
|
|
@Override
|
public void onOpen(ServerHandshake serverHandshake) {
|
sendAuthKey();
|
startHeartbeat();
|
}
|
|
@Override
|
public void onMessage(String s) {
|
if ("pong".equals(s) || "身份验证成功".equals(s)) {
|
log.debug("websocket {} 握手/心跳: {}", eStockType.getCode(), s);
|
return;
|
}
|
try {
|
Map<String, String> stringObjectMap = jsonToMap(s);
|
StockRealTimeBean stockRealTimeBean = new StockRealTimeBean();
|
stockRealTimeBean.setPid(stringObjectMap.get("Id"));
|
stockRealTimeBean.setLast(stringObjectMap.get("Last"));
|
stockRealTimeBean.setBid(stringObjectMap.get("Bid"));
|
stockRealTimeBean.setAsk(stringObjectMap.get("Ask"));
|
stockRealTimeBean.setHigh(stringObjectMap.get("High"));
|
stockRealTimeBean.setLow(stringObjectMap.get("Low"));
|
stockRealTimeBean.setPc(stringObjectMap.get("Chg"));
|
stockRealTimeBean.setPcp(stringObjectMap.get("ChgPct") + "%");
|
stockRealTimeBean.setTime(stringObjectMap.get("Time"));
|
RedisKeyUtil.setCacheRealTimeStock(EStockType.IN, stockRealTimeBean);
|
|
if (!stockRealTimeBean.getPcp().contains("-")) {
|
stockRealTimeBean.setPcp("+" + stringObjectMap.get("ChgPct") + "%");
|
}
|
if (localNotifyEnabled) {
|
ObjectMapper objectMapper = new ObjectMapper();
|
sendLocal(objectMapper.writeValueAsString(stockRealTimeBean));
|
}
|
StockRealTimeBean stockDetailBean = new Gson().fromJson(s, StockRealTimeBean.class);
|
RedisKeyUtil.setCacheRealTimeStock(EStockType.IN, stockDetailBean);
|
} catch (Exception e) {
|
log.warn("websocket {} 消息处理失败: {}", eStockType.getCode(), e.getMessage());
|
}
|
}
|
|
public static Map<String, String> jsonToMap(String json) {
|
Gson gson = new Gson();
|
Type type = new TypeToken<Map<String, String>>() {
|
}.getType();
|
return gson.fromJson(json, type);
|
}
|
|
@Override
|
public void onClose(int i, String s, boolean b) {
|
stopHeartbeat();
|
log.info("websocket {} 连接关闭 code={} reason={} remote={}", eStockType.getCode(), i, s, b);
|
}
|
|
@Override
|
public void onError(Exception e) {
|
log.warn("websocket {} 错误: {}", eStockType.getCode(), e.getMessage());
|
}
|
|
/**
|
* 推送本地通知服务;必须 consume 响应体,否则 HttpClient 连接池会泄漏。
|
*/
|
public void sendLocal(String message) {
|
HttpPost httpPost = new HttpPost("http://127.0.0.1:8001/api/sendNotification");
|
CloseableHttpResponse response = null;
|
try {
|
List<BasicNameValuePair> params = new ArrayList<>();
|
params.add(new BasicNameValuePair("message", message));
|
httpPost.setEntity(new UrlEncodedFormEntity(params, StandardCharsets.UTF_8));
|
response = HTTP_CLIENT.execute(httpPost);
|
EntityUtils.consume(response.getEntity());
|
} catch (IOException e) {
|
localNotifyEnabled = false;
|
log.warn("本地通知服务不可用,已暂停推送(127.0.0.1:8001): {}", e.getMessage());
|
} finally {
|
if (response != null) {
|
try {
|
response.close();
|
} catch (IOException ignored) {
|
}
|
}
|
}
|
}
|
}
|