package org.example.bitgetsclient.wsClient;
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.JsonNode;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.google.common.reflect.TypeToken;
|
import com.google.gson.Gson;
|
import com.google.gson.JsonSyntaxException;
|
import lombok.extern.slf4j.Slf4j;
|
import org.example.bitgetsclient.util.RedisUtil;
|
import org.json.JSONException;
|
import org.springframework.context.annotation.Bean;
|
import org.springframework.util.CollectionUtils;
|
|
import javax.websocket.*;
|
import java.io.IOException;
|
import java.lang.reflect.Type;
|
import java.math.BigDecimal;
|
import java.net.URI;
|
import java.nio.ByteBuffer;
|
import java.util.Arrays;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
|
@ClientEndpoint
|
@Slf4j
|
public class BitgetClient {
|
|
private static final String WS_ENDPOINT = "wss://ws.bitget.com/v2/ws/public"; // WebSocket 接口地址
|
private static final long PING_INTERVAL = 20000; // 心跳间隔,单位毫秒
|
private static final String PING_MESSAGE = "ping"; // 心跳消息
|
private static final int MAX_RECONNECT_ATTEMPTS = 5; // 最大重连次数
|
private final String subscriptions; // 订阅内容
|
private final ScheduledExecutorService executorService; // 定义调度任务执行服务
|
private Session session; // WebSocket 会话
|
|
private final Object lock = new Object(); // 使用锁对象以确保线程安全
|
private boolean reconnecting = false; // 重连状态
|
private int reconnectAttempts = 0; // 当前重连次数
|
|
// 静态单例 ObjectMapper 实例,避免重复创建
|
private static final ObjectMapper objectMapper = new ObjectMapper();
|
private static final Gson gson = new Gson(); // 静态 Gson 实例
|
|
// 构造方法,初始化订阅内容和调度服务
|
public BitgetClient(String subscriptions) {
|
this.subscriptions = subscriptions; // 初始化订阅内容
|
this.executorService = Executors.newScheduledThreadPool(1); // 初始化调度线程池
|
}
|
|
|
|
public void start() {
|
try {
|
connect(); // 尝试连接
|
if (session == null) {
|
log.info("无法在超时内连接到服务器。"); // 记录连接失败的信息
|
return; // 如果连接失败,直接返回
|
}
|
|
// 定期发送心跳
|
executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
|
// 发送订阅消息
|
session.getBasicRemote().sendText(subscriptions); // 发送订阅信息
|
|
synchronized (this) {
|
this.wait(); // 等待 WebSocket 消息到来
|
}
|
|
} catch (InterruptedException e) {
|
Thread.currentThread().interrupt(); // 恢复中断状态
|
log.error("线程被中断: " + e.getMessage(), e); // 记录线程中断的错误
|
} catch (Exception e) {
|
log.error("bitget ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常
|
} finally {
|
executorService.shutdownNow(); // 尝试立即关闭调度服务
|
}
|
}
|
|
private void connect() throws Exception {
|
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
|
container.connectToServer(this, new URI(WS_ENDPOINT)); // 连接 WebSocket 服务
|
}
|
|
@OnOpen
|
public void onOpen(Session session) {
|
log.info("bitget ws 已连接到服务器。"); // 记录连接成功的信息
|
this.session = session; // 存储会话
|
synchronized (this) {
|
this.notify(); // 通知等待的线程
|
}
|
reconnectAttempts = 0; // 连接成功重置重连次数
|
}
|
|
@OnMessage
|
public void onMessage(String message) {
|
try {
|
// 将消息解析为 Map
|
Map<String, Object> map = parseMessage(message); // 调用共用方法解析
|
if (map != null && map.containsKey("arg") && map.containsKey("data")) { // 确保 map 不为 null 且包含必要的键
|
JsonNode dataNode = getDataNode(message); // 获取数据节点
|
|
// 确保 dataNode 不为 null,以避免空指针异常
|
if (dataNode != null) {
|
Map<String, Object> hashMap = new HashMap<>(); // 变量命名更具描述性
|
|
String bids = dataNode.get("bids").toString();
|
String asks = dataNode.get("asks").toString();
|
|
Type listType = new TypeToken<List<List<String>>>(){}.getType();
|
List<List<String>> bidsList = gson.fromJson(bids, listType);
|
List<List<String>> asksList = gson.fromJson(asks, listType);
|
|
if(!bidsList.isEmpty() && !bidsList.get(0).isEmpty() && !asksList.isEmpty() && !asksList.get(0).isEmpty()){
|
if (!bidsList.isEmpty() && !bidsList.get(0).isEmpty()) {
|
List<String> bidsStringList = bidsList.get(0);
|
HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
|
pvMap.put("p", new BigDecimal(bidsStringList.get(0)).toPlainString());
|
pvMap.put("v", new BigDecimal(bidsStringList.get(1)).toPlainString());
|
hashMap.put("bids", pvMap); // 获取并存储 bids
|
}
|
|
if (!asksList.isEmpty() && !asksList.get(0).isEmpty()) {
|
List<String> asksStringList = asksList.get(0); // 修正此处为 asksList
|
HashMap<String, Object> pvMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
|
pvMap.put("p", new BigDecimal(asksStringList.get(0)).toPlainString());
|
pvMap.put("v", new BigDecimal(asksStringList.get(1)).toPlainString());
|
hashMap.put("asks", pvMap); // 获取并存储 asks
|
}
|
|
Map<String, Object> argMap = gson.fromJson(map.get("arg").toString(), new TypeToken<Map<String, Object>>() {}.getType()); // 解析 arg 为 Map
|
String key = "bitget" + argMap.get("instId"); // 构建 Redis 存储的键
|
|
// 存储到 Redis,使用 ObjectMapper 转换为 JSON 字符串
|
String jsonData = objectMapper.writeValueAsString(hashMap); // 先将 HashMap 转换为 JSON 字符串
|
RedisUtil.set(key, jsonData); // 存储数据
|
}
|
}
|
}
|
} catch (JsonSyntaxException e) {
|
log.error("JSON 解析错误: " + e.getMessage(), e); // 记录 JSON 解析错误
|
} catch (IOException e) {
|
log.error("对象转换时发生 I/O 异常: " + e.getMessage(), e); // 记录对象转换 I/O 异常
|
} catch (Exception e) {
|
log.error("处理消息时发生异常: " + e.getMessage(), e); // 记录处理消息时发生的异常
|
}
|
}
|
|
|
// 解析消息的共用方法
|
private Map<String, Object> parseMessage(String message) {
|
if(!message.equals("pong")){
|
return gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType()); // 将消息解析为 Map
|
}
|
return null;
|
}
|
|
// 获取数据节点的共用方法
|
private JsonNode getDataNode(String message) throws JsonProcessingException {
|
JsonNode jsonNode = objectMapper.readTree(message); // 使用静态 ObjectMapper,避免重复创建
|
return jsonNode.get("data").get(0); // 获取第一个数据节点
|
}
|
|
@OnClose
|
public void onClose() {
|
log.info("bitget ws 连接已关闭,尝试重新连接..."); // 记录连接关闭的信息
|
handleConnectionClosedOrError(); // 处理连接关闭事件
|
}
|
|
@OnError
|
public void onError(Throwable throwable) {
|
log.error("bitget ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误信息
|
handleConnectionClosedOrError(); // 处理错误事件
|
}
|
|
private void handleConnectionClosedOrError() {
|
synchronized (lock) {
|
if (!reconnecting) {
|
reconnecting = true; // 开始重连
|
executorService.execute(this::attemptReconnect); // 执行重连操作
|
}
|
}
|
}
|
|
private void attemptReconnect() {
|
if (reconnectAttempts < MAX_RECONNECT_ATTEMPTS) { // 检查重连次数是否超过限制
|
try {
|
log.info("bitget ws 开始重连"); // 记录开始重连的信息
|
connect(); // 尝试重连
|
log.info("bitget ws 重连成功"); // 成功重连的日志
|
reconnectAttempts = 0; // 重连成功,重置重连次数
|
} catch (Exception e) {
|
reconnectAttempts++; // 增加重连尝试次数
|
log.warn("bitget ws 重连失败,尝试次数: " + reconnectAttempts, e); // 记录重连失败的警告信息
|
// 采用指数退避策略,增加重连间隔
|
long waitTime = Math.min(60, (long) Math.pow(2, reconnectAttempts)) * 1000; // 最大等待时间为 60 秒
|
scheduleReconnect(waitTime); // 调度下次重连
|
}
|
} else {
|
log.error("超过最大重连次数,停止重连"); // 超过最大重连次数
|
reconnecting = false; // 重连状态重置
|
}
|
}
|
|
private void scheduleReconnect(long waitTime) { // 接受等待时间参数
|
if (!executorService.isShutdown()) {
|
executorService.schedule(this::attemptReconnect, waitTime, TimeUnit.MILLISECONDS); // 调度重连
|
}
|
}
|
|
private void sendPing() {
|
try {
|
if (session != null) {
|
session.getBasicRemote().sendText(PING_MESSAGE); // 发送心跳消息
|
}
|
} catch (Exception e) {
|
log.error("发送心跳失败", e); // 记录发送心跳失败的错误
|
}
|
}
|
|
public String getParameter(String symbol) throws JsonProcessingException {
|
// 使用 ObjectMapper 构建 JSON 请求
|
Map<String, Object> jsonMap = new HashMap<>();
|
jsonMap.put("op", "subscribe"); // 设置操作类型
|
Map<String, Object> argsMap = new HashMap<>();
|
jsonMap.put("args", argsMap); // 参数放入请求
|
|
// 设置请求参数
|
argsMap.put("instType", "SPOT"); // 交易类型
|
argsMap.put("channel", "books15"); // 渠道类型
|
argsMap.put("instId", symbol); // 交易对符号
|
|
// 返回 JSON 字符串
|
return objectMapper.writeValueAsString(jsonMap); // 使用单一的 ObjectMapper 实例
|
}
|
}
|