package org.example.geteclient.wsClinet;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.google.common.reflect.TypeToken;
|
import com.google.gson.Gson;
|
import com.google.gson.JsonSyntaxException;
|
import lombok.extern.slf4j.Slf4j;
|
import org.example.geteclient.pojo.Currency;
|
import org.example.geteclient.util.RedisUtil;
|
import org.json.JSONException;
|
|
import javax.websocket.*;
|
import java.net.URI;
|
import java.nio.ByteBuffer;
|
import java.util.Arrays;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
|
/**
|
* @program: demo
|
* @description: GateClient 用于与 Gate.io WebSocket API 进行交互
|
* @create: 2024-07-18 15:30
|
**/
|
@ClientEndpoint
|
@Slf4j
|
public class GateClient {
|
|
private static final String WS_ENDPOINT = "wss://api.gateio.ws/ws/v4/"; // WebSocket 端点 URL
|
private static final long PING_INTERVAL = 20000; // Ping 消息发送间隔,单位毫秒
|
|
private final List<Currency> subscriptions; // 存储要订阅的货币列表
|
private final ScheduledExecutorService executorService; // 定时任务调度服务
|
private Session session; // WebSocket 连接会话
|
|
private final Object lock = new Object(); // 添加一个锁对象
|
private volatile boolean reconnecting = false; // 表示是否正在重连,使用 volatile 保证可见性
|
|
public GateClient(List<Currency> subscriptions) {
|
this.subscriptions = subscriptions; // 初始化订阅的货币
|
this.executorService = Executors.newScheduledThreadPool(1); // 创建一个定时任务调度线程池
|
}
|
|
public void start() {
|
try {
|
connect(); // 尝试连接 WebSocket
|
if (session == null) { // 如果连接失败,session 仍然为 null
|
log.info("无法在超时时间内连接到服务器。");
|
return; // 提前返回
|
}
|
|
// 定期发送 Ping 消息保持连接
|
executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
|
|
// 订阅消息
|
for (Currency subscription : subscriptions) {
|
String parameter = getParameter(subscription.getSymbol()); // 获取订阅参数
|
session.getBasicRemote().sendText(parameter); // 发送订阅请求
|
}
|
|
synchronized (this) { // 进入同步块以等待连接的激活
|
this.wait(); // 等待连接激活的通知
|
}
|
|
} catch (Exception e) {
|
log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e); // 记录连接过程中发生的异常
|
} finally {
|
executorService.shutdown(); // 确保定时任务调度服务被关闭
|
}
|
}
|
|
private void connect() throws Exception {
|
WebSocketContainer container = ContainerProvider.getWebSocketContainer(); // 获取 WebSocket 容器
|
container.connectToServer(this, new URI(WS_ENDPOINT)); // 连接到 WebSocket 服务器
|
}
|
|
@OnOpen
|
public void onOpen(Session session) {
|
log.info("gate ws 已连接到服务器。"); // 连接成功日志
|
this.session = session; // 保存会话信息
|
synchronized (this) { // 进入同步块
|
this.notify(); // 通知等待的线程
|
}
|
}
|
|
private static final Gson gson = new Gson(); // 将 Gson 作为静态成员,避免重复实例化
|
private static final String RESULT_KEY = "result"; // 定义结果键的常量
|
private static final String BIDS_KEY = "bids"; // 定义 bids 的常量
|
private static final String ASKS_KEY = "asks"; // 定义 asks 的常量
|
private static final String S_KEY = "s"; // 定义 s 的常量
|
|
@OnMessage
|
public void onMessage(String message) {
|
try {
|
// 解析收到的消息为 Map
|
Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
|
Object object = map.get(RESULT_KEY); // 获取结果对象
|
Map<String, Object> resultMap = gson.fromJson(gson.toJson(object), new TypeToken<Map<String, Object>>() {}.getType()); // 直接转换为 Map
|
// 检查结果是否有效,并整合检查
|
if (resultMap != null && resultMap.get(S_KEY) != null) {
|
HashMap<String, Object> hashMap = new HashMap<>(); // 创建新的 HashMap 保存 bids 和 asks
|
|
// 放入 bids 和 asks 数据
|
hashMap.put(BIDS_KEY, resultMap.get(BIDS_KEY)); // 放入 bids 数据
|
hashMap.put(ASKS_KEY, resultMap.get(ASKS_KEY)); // 放入 asks 数据
|
|
String key = "gate" + resultMap.get(S_KEY); // 生成 Redis 键
|
RedisUtil.set(key.replace("_", ""), gson.toJson(hashMap)); // 存入 Redis,使用 Gson 进行序列化
|
}
|
} catch (JsonSyntaxException e) {
|
log.error("JSON 解析异常:" + e.getMessage(), e); // 记录 JSON 解析异常
|
} catch (Exception e) { // 捕获其他可能的异常
|
log.error("处理消息时发生异常:" + e.getMessage(), e); // 记录异常
|
}
|
}
|
|
|
@OnClose
|
public void onClose() {
|
log.info("gate ws 连接已关闭,尝试重新连接..."); // 连接关闭日志
|
handleConnectionClosedOrError(); // 处理连接关闭或错误
|
}
|
|
@OnError
|
public void onError(Throwable throwable) {
|
log.error("gate ws 发生错误: " + throwable.getMessage(), throwable); // 记录错误日志
|
handleConnectionClosedOrError(); // 处理连接关闭或错误
|
}
|
|
private void handleConnectionClosedOrError() {
|
synchronized (lock) { // 进入同步块以防止并发重连
|
if (!reconnecting) { // 检查当前是否已经在重连
|
reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
|
executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
|
}
|
}
|
}
|
|
private void attemptReconnect() {
|
boolean doReconnect = true; // 是否进行重连的标志
|
try {
|
log.info("gate ws 开始重连"); // 开始重连日志
|
connect(); // 尝试重新连接
|
log.info("gate ws 重连成功"); // 重连成功日志
|
} catch (Exception e) {
|
log.error("gate ws 重连失败", e); // 重连失败记录日志
|
doReconnect = false; // 标记不再继续重连
|
} finally {
|
synchronized (lock) { // 进入同步块
|
if (doReconnect) {
|
scheduleReconnect(); // 如果需要继续重连,调度重连任务
|
} else {
|
reconnecting = false; // 重连结束,标记重连状态为 false
|
}
|
}
|
}
|
}
|
|
private void scheduleReconnect() {
|
if (!executorService.isShutdown()) { // 确保调度服务未关闭
|
executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS); // 重连调度,5秒后尝试重新连接
|
}
|
}
|
|
private void sendPing() {
|
try {
|
if (session != null) { // 检查会话是否存在
|
session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes())); // 发送 Ping 消息
|
}
|
} catch (Exception e) {
|
log.error("发送心跳失败", e); // 记录心跳发送失败日志
|
}
|
}
|
|
public String getParameter(String symbol) throws JsonProcessingException, JSONException {
|
// 替换USDT为_USDT
|
symbol = symbol.replaceAll("USDT", "_USDT"); // 替换货币符号中的 USDT
|
|
// 创建一个ObjectMapper实例
|
ObjectMapper mapper = new ObjectMapper(); // 创建 ObjectMapper 实例
|
// 获取当前时间的毫秒数
|
long currentTimeMillis = System.currentTimeMillis(); // 获取当前时间
|
|
// 定义常量
|
final String CHANNEL = "spot.order_book"; // 固定频道名称
|
final String EVENT_SUBSCRIBE = "subscribe"; // 订阅事件
|
final String EVENT_UNSUBSCRIBE = "unsubscribe"; // 取消订阅事件
|
final String[] PAYLOAD = new String[]{symbol, "20", "100ms"}; // 请求负载信息
|
|
// 使用Map构建JSON对象
|
Map<String, Object> jsonMap = new HashMap<>(); // 创建 Map 存放 JSON 内容
|
jsonMap.put("time", currentTimeMillis); // 将当前时间放入 Map
|
jsonMap.put("channel", CHANNEL); // 将频道信息放入 Map
|
jsonMap.put("event", EVENT_SUBSCRIBE); // 事件类型放入 Map
|
jsonMap.put("payload", Arrays.asList(PAYLOAD)); // 将负载数组转为 List 放入 Map
|
|
// 将Map转换为JSON字符串
|
String jsonString = mapper.writeValueAsString(jsonMap); // 转换为 JSON 字符串
|
return jsonString; // 返回 JSON 字符串
|
}
|
}
|