package org.example.wsClient;
|
|
import com.alibaba.druid.support.json.JSONUtils;
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.node.ArrayNode;
|
import com.fasterxml.jackson.databind.node.ObjectNode;
|
import com.google.common.reflect.TypeToken;
|
import com.google.gson.Gson;
|
import com.google.gson.JsonSyntaxException;
|
import lombok.extern.slf4j.Slf4j;
|
import org.example.pojo.Currency;
|
import org.example.util.RedisUtil;
|
import org.json.JSONArray;
|
import org.json.JSONException;
|
import org.json.JSONObject;
|
|
import javax.websocket.*;
|
import java.math.BigDecimal;
|
import java.net.URI;
|
import java.nio.ByteBuffer;
|
import java.util.*;
|
import java.util.concurrent.Executors;
|
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.TimeUnit;
|
import org.json.JSONObject;
|
/**
|
* @program: demo
|
* @description:
|
* @create: 2024-07-18 15:30
|
**/
|
@ClientEndpoint
|
@Slf4j
|
public class GateClient {
|
|
private static final String WS_ENDPOINT = "wss://api.gateio.ws/ws/v4/";
|
private static final long PING_INTERVAL = 20000;
|
|
private final List<Currency> subscriptions;
|
private final ScheduledExecutorService executorService;
|
private Session session;
|
|
private final Object lock = new Object(); // 添加一个锁对象
|
private volatile boolean reconnecting = false; // 使用 volatile 关键字保证可见性
|
|
public GateClient(List<Currency> subscriptions) {
|
this.subscriptions = subscriptions;
|
this.executorService = Executors.newScheduledThreadPool(1);
|
}
|
|
public void start() {
|
try {
|
connect();
|
if (session == null) {
|
log.info("无法在超时时间内连接到服务器。");
|
return;
|
}
|
|
executorService.scheduleAtFixedRate(this::sendPing, PING_INTERVAL, PING_INTERVAL, TimeUnit.MILLISECONDS);
|
|
// 订阅消息
|
for (Currency subscription : subscriptions) {
|
String parameter = getParameter(subscription.getSymbol());
|
session.getBasicRemote().sendText(parameter);
|
}
|
|
synchronized (this) {
|
this.wait();
|
}
|
|
} catch (Exception e) {
|
log.error("gate ws 连接过程中发生异常: " + e.getMessage(), e);
|
} finally {
|
executorService.shutdown();
|
}
|
}
|
|
private void connect() throws Exception {
|
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
|
container.connectToServer(this, new URI(WS_ENDPOINT));
|
}
|
|
@OnOpen
|
public void onOpen(Session session) {
|
log.info("gate ws 已连接到服务器。");
|
this.session = session;
|
synchronized (this) {
|
this.notify();
|
}
|
}
|
private static final Gson gson = new Gson(); // 将 Gson 作为静态成员或单例
|
|
@OnMessage
|
public void onMessage(String message) {
|
try {
|
Map<String, Object> map = gson.fromJson(message, new TypeToken<Map<String, Object>>() {}.getType());
|
Object object = map.get("result");
|
Map<String, Object> resultMap = gson.fromJson(object.toString(), new TypeToken<Map<String, Object>>() {}.getType());
|
if (null != resultMap && null != resultMap.get("s")) {
|
HashMap<String,Object> hashMap = new HashMap<>();
|
ObjectMapper mapper = new ObjectMapper();
|
|
hashMap.put("bids",resultMap.get("bids"));
|
hashMap.put("asks",resultMap.get("asks"));
|
|
String key = "gate" + resultMap.get("s");
|
RedisUtil.set(key.replace("_",""), mapper.writeValueAsString(hashMap));
|
}
|
} catch (JsonSyntaxException e) {
|
log.error("JSON 解析异常:" + e.getMessage(), e);
|
} catch (JsonProcessingException e) {
|
log.error("JSON 解析异常:" + e.getMessage(), e);
|
}
|
}
|
|
@OnClose
|
public void onClose() {
|
log.info("gate ws 连接已关闭,尝试重新连接...");
|
handleConnectionClosedOrError();
|
}
|
|
@OnError
|
public void onError(Throwable throwable) {
|
log.error("gate ws 发生错误: " + throwable.getMessage(), throwable);
|
handleConnectionClosedOrError();
|
}
|
|
private void handleConnectionClosedOrError() {
|
synchronized (lock) {
|
if (!reconnecting) {
|
reconnecting = true; // 设置 reconnecting 为 true 表示开始重连
|
executorService.execute(this::attemptReconnect); // 使用 execute 方法立即执行重连
|
}
|
}
|
}
|
|
private void attemptReconnect() {
|
boolean doReconnect = true;
|
try {
|
log.info("gate ws 开始重连");
|
connect(); // 假设 connect() 方法用于实际的连接逻辑
|
log.info("gate ws 重连成功");
|
} catch (Exception e) {
|
log.error("gate ws 重连失败", e);
|
// 连接失败时,可以根据具体情况决定是否继续重连
|
// 在这里假设总是继续尝试重连
|
} finally {
|
synchronized (lock) {
|
if (doReconnect) {
|
scheduleReconnect(); // 如果需要继续重连,则重新调度重连任务
|
} else {
|
reconnecting = false; // 重连结束后设置 reconnecting 为 false
|
}
|
}
|
}
|
}
|
|
private void scheduleReconnect() {
|
if (!executorService.isShutdown()) {
|
executorService.schedule(this::attemptReconnect, 5, TimeUnit.SECONDS);
|
}
|
}
|
|
private void sendPing() {
|
try {
|
if (session != null) {
|
session.getBasicRemote().sendPing(ByteBuffer.wrap("Ping".getBytes()));
|
}
|
} catch (Exception e) {
|
log.error("发送心跳失败", e);
|
}
|
}
|
|
public String getParameter(String symbol) throws JsonProcessingException, JSONException {
|
// 替换USDT为_USDT
|
symbol = symbol.replaceAll("USDT", "_USDT");
|
|
// 创建一个ObjectMapper实例
|
ObjectMapper mapper = new ObjectMapper();
|
// 获取当前时间的毫秒数
|
long currentTimeMillis = System.currentTimeMillis();
|
|
// 定义常量
|
final String CHANNEL = "spot.order_book"; // 固定频道
|
final String EVENT_SUBSCRIBE = "subscribe"; // 订阅事件
|
final String EVENT_UNSUBSCRIBE = "unsubscribe"; // 取消订阅事件
|
final String[] PAYLOAD = new String[] {symbol, "20", "100ms"}; // 负载信息
|
|
// 使用Map构建JSON对象
|
Map<String, Object> jsonMap = new HashMap<>(); // 创建Map用于存放JSON内容
|
jsonMap.put("time", currentTimeMillis); // 放入当前时间
|
jsonMap.put("channel", CHANNEL); // 放入频道
|
jsonMap.put("event", EVENT_SUBSCRIBE); // 放入事件类型
|
jsonMap.put("payload", Arrays.asList(PAYLOAD)); // 将数组转换为List并放入Map
|
|
// 将Map转换为JSON字符串
|
String jsonString = mapper.writeValueAsString(jsonMap); // 使用ObjectMapper转换
|
return jsonString; // 返回JSON字符串
|
|
}
|
|
// public static void main(String[] args) throws JSONException {
|
// // 从 resultMap 中获取 "bids" 对应的 JSON 数组
|
// JSONArray jsonArray = new JSONArray(resultMap.get("bids").toString()); // 将获取的 bids 转换成 JSON 数组
|
// List<List<String>> resultList = new ArrayList<>(); // 存放所有的内层列表
|
//
|
// // 遍历 JSON 数组
|
// for (int i = 0; i < jsonArray.length(); i++) {
|
// JSONArray innerArray = jsonArray.getJSONArray(i); // 获取当前内层 JSON 数组
|
// List<String> innerList = new ArrayList<>(); // 存放当前内层数组的元素
|
//
|
// // 遍历内层 JSON 数组
|
// for (int j = 0; j < innerArray.length(); j++) {
|
// innerList.add(innerArray.getString(j)); // 将元素添加到内层列表中
|
// }
|
//
|
// resultList.add(innerList); // 将内层列表添加到结果列表中
|
// }
|
//
|
// // 考虑去掉未使用的 dataList,下面的代码使用 resultList 而不是 dataList
|
// List<Map<String, String>> resultMapList = new ArrayList<>(); // 存放最终的映射结果
|
// for (List<String> entry : resultList) { // 遍历 resultList 中的每个内层列表
|
// // 确保每个内层列表有足够的元素,再进行映射
|
// if (entry.size() >= 2) { // 判断 entry 的大小,避免 IndexOutOfBoundsException
|
// Map<String, String> mapKey = new HashMap<>(); // 新建一个 Map 以存储键值对
|
// mapKey.put("p", entry.get(0)); // 将内层列表的第一个元素作为键 "p"
|
// mapKey.put("v", entry.get(1)); // 将内层列表的第二个元素作为键 "v"
|
// resultMapList.add(mapKey); // 将 map 添加到结果映射列表中
|
// }
|
// }
|
//
|
// }
|
|
|
|
}
|