package com.yami.trading.huobi.wss;
|
|
import cn.hutool.http.Header;
|
import cn.hutool.http.HttpRequest;
|
import com.alibaba.fastjson.JSON;
|
import com.alibaba.fastjson.JSONObject;
|
import com.yami.trading.bean.item.domain.Item;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.java_websocket.handshake.ServerHandshake;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
import org.springframework.web.socket.client.WebSocketClient;
|
|
import java.net.URI;
|
import java.util.Arrays;
|
import java.util.List;
|
import java.util.Objects;
|
import java.util.Random;
|
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Executors;
|
import java.util.stream.Collectors;
|
|
/**
|
* 极度证券 数据源
|
* https://m.jdnx.com/
|
* @author 哪吒
|
*/
|
@Slf4j
|
@Component
|
public class JiduWebSocket {
|
|
public static final List<String> JD_WSS = Arrays.asList(
|
"wss://stream.jdnx3.com",
|
"wss://stream.jdnx4.com",
|
"wss://stream.jdnx5.com",
|
"wss://stream.jdnx6.com",
|
"wss://stream.jdnx7.com",
|
"wss://stream.jdnx8.com",
|
"wss://stream.jdnx9.com",
|
"wss://stream.jdnx10.com"
|
);
|
|
/**
|
* 获取订阅token
|
*/
|
public static final String JD_TOKEN = "https://api.jdnx6.com/Public/openSubscribe";
|
|
/**
|
* 更新订阅产品 [配合前端更新,传wss token]
|
*/
|
public static final String JD_PRODUCT = "https://api.jdnx7.com/Public/updateSubscribe";
|
|
/**
|
* 全局token key
|
*/
|
public static final String WSS_TOKEN_KEY = "JD_WSS_TOKEN";
|
|
private ExecutorService wssTask = Executors.newCachedThreadPool();
|
|
/*使用线程安全的 ConcurrentMap 来记录已启动的 token*/
|
private ConcurrentMap<String, Boolean> startedTokens = new ConcurrentHashMap<>();
|
|
/**
|
* 各个 行情的token值
|
*/
|
private ConcurrentMap<String, String> tokens = new ConcurrentHashMap<>();
|
|
/**
|
* 启动行情
|
* @param symbols
|
*/
|
public void startWss(List<String> symbols, String type){
|
Random random = new Random();
|
String wssUrl = JD_WSS.get(random.nextInt(JD_WSS.size()));
|
String token = socketToken();
|
tokens.put(type,token);
|
if (Item.HK_STOCKS.equalsIgnoreCase(type)) {
|
//处理symbol对
|
symbols = symbols.stream()
|
.map(val -> val + ".HK")
|
.collect(Collectors.toList());
|
}
|
updateSubscribe(symbols,token);
|
if(Objects.nonNull(token)){
|
//开始连接
|
wssTask.execute(() -> {
|
log.info("【极度证券】 {} 行情开启成功!",type);
|
connectionWss(wssUrl + "/Quotation/" + token);
|
});
|
}
|
}
|
|
private void connectionWss(String url){
|
try {
|
JiduWebSocketClient webSocketClient = new JiduWebSocketClient(new URI(url));
|
webSocketClient.connect();
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
/**
|
* 开启ws连接
|
* @param token
|
*/
|
public void startWs(String token){
|
//如果这个用户token 启动了wss 无需再开启
|
if (startedTokens.putIfAbsent(token, Boolean.TRUE) != null) {
|
return;
|
}
|
Random random = new Random();
|
String wssUrl = JD_WSS.get(random.nextInt(JD_WSS.size()));
|
if(Objects.nonNull(token)){
|
//开始连接
|
wssTask.execute(() -> {
|
connectionWss(wssUrl + "/Quotation/" + token,token);
|
});
|
}
|
}
|
|
private void connectionWss(String url, String token){
|
try {
|
JiduWebSocketClient webSocketClient = new JiduWebSocketClient(new URI(url));
|
webSocketClient.connect();
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
private void updateSubscribeOne(String symbol, String token){
|
updateSubscribe(Arrays.asList(symbol),token);
|
}
|
private void updateSubscribe(List<String> symbols, String token){
|
String body = HttpRequest
|
.post(JD_PRODUCT)
|
.form("symbols", JSON.toJSONString(symbols))
|
.header(Header.AUTHORIZATION,"Bearer " + token)
|
.execute()
|
.body();
|
JSONObject bodyJson = JSON.parseObject(body);
|
if(bodyJson.getInteger("code").intValue() == 200){
|
System.out.println("【极度证券】产品订阅设置为:" + JSON.toJSONString(symbols));
|
}
|
}
|
|
/**
|
* 根据用户指定获取的token 更新产品订阅
|
* @param token
|
* @param symbols
|
*/
|
public boolean updateSubscribe(String token, List<String> symbols){
|
String body = HttpRequest
|
.post(JD_PRODUCT)
|
.form("symbols", JSON.toJSONString(symbols))
|
.header(Header.AUTHORIZATION,"Bearer " + token)
|
.execute()
|
.body();
|
JSONObject bodyJson = JSON.parseObject(body);
|
if(bodyJson.getInteger("code").intValue() == 200){
|
System.out.println("【极度证券】产品订阅设置为:" + JSON.toJSONString(symbols));
|
return true;
|
}
|
return false;
|
}
|
|
|
public String socketToken(){
|
String body = HttpRequest.post(JD_TOKEN).execute().body();
|
JSONObject bodyJson = JSON.parseObject(body);
|
if(bodyJson.getInteger("code").intValue() == 200){
|
String token = bodyJson.getJSONObject("data").getString("token");
|
System.out.println("成功获取到【极度socketToken】:" + token);
|
return token;
|
} else {
|
System.out.println("获取【极度token报错】 :" + body);
|
}
|
return null;
|
}
|
|
|
}
|