package org.example.wsClient;
|
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.google.common.reflect.TypeToken;
|
import com.google.gson.Gson;
|
import com.google.gson.JsonSyntaxException;
|
import lombok.extern.slf4j.Slf4j;
|
import org.example.pojo.Currency;
|
import org.example.util.RedisUtil;
|
import org.json.JSONException;
|
import org.json.JSONObject;
|
|
import javax.websocket.*;
|
import java.net.URI;
|
import java.nio.ByteBuffer;
|
import java.util.Arrays;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
|
@ClientEndpoint
|
@Slf4j
|
public class BitgetClient {
|
|
|
private static final String WS_ENDPOINT = "wss://ws.bitget.com/v2/ws/public";
|
private static final long PING_INTERVAL = 20000;
|
|
private final String subscriptions;
|
private final ScheduledExecutorService executorService;
|
private Session session;
|
|
private final Object lock = new Object(); // 添加一个锁对象
|
private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
|
|
public BitgetClient(String subscriptions) {
|
this.subscriptions = subscriptions;
|
this.executorService = Executors.newScheduledThreadPool(1);
|
}
|
|
public void start() {
|
try {
|
connect();
|
if (session == null) {
|
log.info("无法在超时时间内连接到服务器。");
|
return;
|
}
|
|
executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
|
|
// 订阅消息
|
session.getBasicRemote().sendText(subscriptions);
|
|
synchronized (this) {
|
this.wait();
|
}
|
|
} catch (Exception e) {
|
log.error("bitget ws 连接过程中发生异常: " + e.getMessage(), e);
|
} finally {
|
executorService.shutdown();
|
}
|
}
|
|
private void connect() throws Exception {
|
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
|
container.connectToServer(this, new URI(WS_ENDPOINT));
|
}
|
|
@OnOpen
|
public void onOpen(Session session) {
|
log.info("bitget ws 已连接到服务器。");
|
this.session = session;
|
synchronized (this) {
|
this.notify();
|
}
|
}
|
private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
|
|
@OnMessage
|
public void onMessage(String message) {
|
try {
|
JSONObject jsonObject = new JSONObject(message);
|
if (null != jsonObject && null != jsonObject.getJSONObject("arg").getString("instId") && null != jsonObject.get("data")) {
|
HashMap<String,Object> hashMap = new HashMap<>();
|
ObjectMapper mapper = new ObjectMapper();
|
|
hashMap.put("bids",jsonObject.getJSONObject("data").getString("bids"));
|
hashMap.put("asks",jsonObject.getJSONObject("data").getString("asks"));
|
|
String key = "bitget" + jsonObject.getJSONObject("arg").getString("instId");
|
RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
|
}
|
} catch (JsonSyntaxException e) {
|
log.error("JSON 解析异常:" + e.getMessage(), e);
|
} catch (JsonProcessingException e) {
|
log.error("JSON 解析异常:" + e.getMessage(), e);
|
} catch (JSONException e) {
|
throw new RuntimeException(e);
|
}
|
}
|
|
@OnClose
|
public void onClose() {
|
log.info("bitget ws 连接已关闭,尝试重新连接...");
|
handleConnectionClosedOrError();
|
}
|
|
@OnError
|
public void onError(Throwable throwable) {
|
log.error("bitget ws 发生错误: " + throwable.getMessage(), throwable);
|
handleConnectionClosedOrError();
|
}
|
|
private void handleConnectionClosedOrError() {
|
synchronized (lock) {
|
if (!reconnecting) {
|
reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
|
executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
|
}
|
}
|
}
|
|
private void attemptReconnect() {
|
boolean doReconnect = true;
|
try {
|
log.info("bitget ws 开始重连");
|
connect(); // 假设 connect() 方法用于实际的连接逻辑
|
log.info("bitget ws 重连成功");
|
} catch (Exception e) {
|
log.error("bitget ws 重连失败", e);
|
// 连接失败时,可以根据具体情况决定是否继续重连
|
// 在这里假设总是继续尝试重连
|
} finally {
|
synchronized (lock) {
|
if (doReconnect) {
|
scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
|
} else {
|
reconnecting = false; // 重连结束后设置 reconnecting 为 false
|
}
|
}
|
}
|
}
|
|
private void scheduleReconnect() {
|
if (!executorService.isShutdown()) {
|
executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
|
}
|
}
|
|
private void sendPing() {
|
try {
|
if (session != null) {
|
session.getBasicRemote().sendText("ping"); // 发送心跳消息
|
}
|
} catch (Exception e) {
|
log.error("发送心跳失败", e);
|
}
|
}
|
|
public String getParameter(String symbol) throws JsonProcessingException, JSONException {
|
// 创建一个ObjectMapper实例
|
ObjectMapper mapper = new ObjectMapper();
|
|
// 使用Map构建JSON对象
|
Map<String, Object> jsonMap = new HashMap<>();
|
jsonMap.put("op", "subscribe");
|
Map<String, Object> argsMap = new HashMap<>();
|
jsonMap.put("args", argsMap);
|
|
argsMap.put("instType", "SPOT");
|
argsMap.put("channel", "books15");
|
argsMap.put("instId", symbol);
|
|
// 将Map转换为JSON字符串
|
String jsonString = mapper.writeValueAsString(jsonMap);
|
return jsonString;
|
|
}
|
|
|
}
|