trading-order-admin/src/main/java/com/yami/trading/admin/task/StockTask.java
New file @@ -0,0 +1,177 @@ package com.yami.trading.admin.task; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.google.gson.Gson; import com.yami.trading.admin.util.us.HttpClientRequest; import com.yami.trading.admin.util.us.ReponseBase; import com.yami.trading.bean.data.domain.Realtime; import com.yami.trading.bean.item.domain.Item; import com.yami.trading.bean.model.StockRealTimeBean; import com.yami.trading.common.constants.RedisKeys; import com.yami.trading.common.util.RedisUtil; import com.yami.trading.huobi.data.DataCache; import com.yami.trading.huobi.websocket.constant.enums.EStockType; import com.yami.trading.service.item.ItemService; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @Component public class StockTask { private static final Logger log = LoggerFactory.getLogger(StockTask.class); private final AtomicBoolean syncINStockData = new AtomicBoolean(false); private final Lock syncINStockDataLock = new ReentrantLock(); @Autowired private ThreadPoolTaskExecutor taskExecutor; @Autowired ItemService itemService; /** * 同步系统所需要的股票 */ @Scheduled(cron = "0 0/5 * * * ?") public void syncINStockData() { if (syncINStockData.get()) { // 判断任务是否在处理中 return; } if (syncINStockDataLock.tryLock()) { try { syncINStockData.set(true); // 1. 定义需要处理的所有股票类型(集中管理,新增类型只需添加到列表) List<EStockType> stockTypes = Arrays.asList( EStockType.US ); // 2. 批量创建所有异步任务 List<CompletableFuture<Void>> futures = new ArrayList<>(); for (EStockType type : stockTypes) { // 添加loadAllStock任务 futures.add(CompletableFuture.runAsync(() -> loadAllStock(type), taskExecutor)); } // 3. 等待所有任务完成(将List转换为数组传入allOf) CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); } catch (Exception e) { log.error("同步股票数据出错", e); } finally { syncINStockDataLock.unlock(); syncINStockData.set(false); } } } /** * 加载所有股票数据 */ public void loadAllStock(EStockType eStockType) { log.info("同步股票 数据 {}", eStockType.getCode()); List<StockRealTimeBean> list = new ArrayList<>(); int totleStock = 1; int page = 0; try { while (totleStock > list.size()) { try { String result = HttpClientRequest.doGet(eStockType.stockUrl + "list?country_id=" + eStockType.getContryId() + "&size=100000&page=" + page + "&key=" + eStockType.stockKey); ReponseBase reponseBase = new Gson().fromJson(result, ReponseBase.class); list.addAll(reponseBase.getData()); page++; totleStock = reponseBase.getTotal(); } catch (Exception e) { e.printStackTrace(); break; } } if (list.isEmpty()) { return; } List<String> stockCodeList = list.stream().map(StockRealTimeBean::getSymbol).collect(Collectors.toList()); List<Item> stockList = itemService.list(new QueryWrapper<Item>().in("symbol", stockCodeList)); List<Item> updateStockList = new ArrayList<>(); for (StockRealTimeBean o : list) { //System.out.println(o); Item item = stockList.stream() .filter(x -> x.getSymbol().equals(o.getSymbol())) .findFirst() .orElse(null); if (item == null) { item = new Item(); } String name = StringUtils.trim(o.getName()); item.setEnName(name); item.setName(name); item.setSymbolFullName(name); item.setSymbol(o.getSymbol()); item.setSymbolData(o.getSymbol()); item.setPips(BigDecimal.valueOf(0.01).doubleValue()); item.setPipsAmount(BigDecimal.valueOf(0.02).doubleValue()); item.setAdjustmentValue(BigDecimal.ZERO.doubleValue()); item.setUnitAmount(BigDecimal.valueOf(1000).doubleValue()); item.setUnitFee(BigDecimal.valueOf(30).doubleValue()); item.setMarket("FOREVER"); item.setDecimals(2); item.setMultiple(BigDecimal.ZERO.doubleValue()); item.setBorrowingRate(BigDecimal.ZERO.doubleValue()); item.setDelFlag(0); item.setType(Item.US_STOCKS); item.setCategory(Item.US_STOCKS); item.setSorted("100"); item.setOpenCloseType(Item.US_STOCKS); item.setFake("0"); item.setShowStatus("1"); item.setTradeStatus("1"); item.setQuoteCurrency("USDT"); updateStockList.add(item); Realtime realtime = new Realtime(); realtime.setUuid(o.getId()); realtime.setSymbol(o.getSymbol()); realtime.setName(o.getName()); realtime.setClose(new BigDecimal(o.getLast().trim()).doubleValue()); realtime.setLow(new BigDecimal(o.getLow().trim()).doubleValue()); realtime.setHigh(new BigDecimal(o.getHigh().trim()).doubleValue()); realtime.setOpen(new BigDecimal(o.getOpen().trim()).doubleValue()); realtime.setPrevClose(new BigDecimal(o.getPrevClose().trim()).doubleValue()); realtime.setTs(Long.valueOf(o.getTime() + "000")); realtime.setVolume(new BigDecimal(o.getVolume().trim()).doubleValue()); realtime.setNetChange(new BigDecimal(o.getChg().trim()).doubleValue()); realtime.setChangeRatio(new BigDecimal(o.getChgPct()).doubleValue()); realtime.setType(o.getType()); realtime.setBid(new BigDecimal(o.getBid()).doubleValue()); realtime.setAsk(new BigDecimal(o.getAsk()).doubleValue()); DataCache.putRealtime(realtime.getSymbol(), realtime); } itemService.saveOrUpdateBatch(updateStockList); log.info("同步股票 数据 成功 {} 总共同步数据 {}", eStockType.getCode(), list.size()); } catch ( Exception e) { log.error("同步出错", e); } } public static void main(String[] args) { StockTask task = new StockTask(); task.loadAllStock(EStockType.US); } } trading-order-admin/src/main/java/com/yami/trading/admin/util/us/HttpClientRequest.java
New file @@ -0,0 +1,186 @@ package com.yami.trading.admin.util.us; import com.google.gson.Gson; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpEntity; import org.apache.http.NameValuePair; import org.apache.http.client.ClientProtocolException; import org.apache.http.client.config.RequestConfig; import org.apache.http.client.entity.UrlEncodedFormEntity; import org.apache.http.client.methods.CloseableHttpResponse; import org.apache.http.client.methods.HttpGet; import org.apache.http.client.methods.HttpPost; import org.apache.http.entity.StringEntity; import org.apache.http.impl.client.CloseableHttpClient; import org.apache.http.impl.client.HttpClients; import org.apache.http.message.BasicNameValuePair; import org.apache.http.util.EntityUtils; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.*; @Slf4j public class HttpClientRequest { public static String doGet(String url) { CloseableHttpClient httpClient = null; CloseableHttpResponse response = null; String result = ""; try { httpClient = HttpClients.createDefault(); HttpGet httpGet = new HttpGet(url); httpGet.setHeader("Authorization", "Bearer da3efcbf-0845-4fe3-8aba-ee040be542c0"); httpGet.setHeader("Referer","https://quotes.sina.cn/hs/company/quotes/view/sz399001?vt=4&cid=76524&node_id=76524&autocallup=no&isfromsina=yes"); //cookie httpGet.setHeader("Cookie", "xq_a_token=d269ad4aee7ece063038900846f9541a7d0ead07"); RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(35000).setConnectionRequestTimeout(35000).setSocketTimeout(60000).build(); httpGet.setConfig(requestConfig); response = httpClient.execute(httpGet); HttpEntity entity = response.getEntity(); result = EntityUtils.toString(entity); } catch (ClientProtocolException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (null != response) { try { response.close(); } catch (IOException e) { e.printStackTrace(); } } if (null != httpClient) { try { httpClient.close(); } catch (IOException e) { e.printStackTrace(); } } } return result; } public static String doPost(String url, Map<String, Object> paramMap) { CloseableHttpClient httpClient = null; CloseableHttpResponse httpResponse = null; String result = ""; httpClient = HttpClients.createDefault(); HttpPost httpPost = new HttpPost(url); RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(35000).setConnectionRequestTimeout(35000).setSocketTimeout(60000).build(); httpPost.setConfig(requestConfig); httpPost.addHeader("Content-Type", "application/x-www-form-urlencoded"); if (null != paramMap && paramMap.size() > 0) { List<NameValuePair> nvps = new ArrayList<NameValuePair>(); Set<Map.Entry<String, Object>> entrySet = paramMap.entrySet(); Iterator<Map.Entry<String, Object>> iterator = entrySet.iterator(); while (iterator.hasNext()) { Map.Entry<String, Object> mapEntry = (Map.Entry) iterator.next(); nvps.add(new BasicNameValuePair((String) mapEntry.getKey(), mapEntry.getValue().toString())); } try { httpPost.setEntity(new UrlEncodedFormEntity(nvps, "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } try { httpResponse = httpClient.execute(httpPost); HttpEntity entity = httpResponse.getEntity(); result = EntityUtils.toString(entity); } catch (ClientProtocolException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (null != httpResponse) { try { httpResponse.close(); } catch (IOException e) { e.printStackTrace(); } } if (null != httpClient) { try { httpClient.close(); } catch (IOException e) { e.printStackTrace(); } } } return result; } public static String doPostJson(String url, Map<String, String> paramMap) { CloseableHttpClient httpClient = null; CloseableHttpResponse httpResponse = null; String result = ""; httpClient = HttpClients.createDefault(); HttpPost httpPost = new HttpPost(url); RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(35000).setConnectionRequestTimeout(35000).setSocketTimeout(60000).build(); httpPost.setConfig(requestConfig); httpPost.addHeader("Content-Type", "application/json"); StringEntity reqEntity = new StringEntity(new Gson().toJson(paramMap),"utf-8"); httpPost.setEntity(reqEntity); try { httpResponse = httpClient.execute(httpPost); HttpEntity entity = httpResponse.getEntity(); result = EntityUtils.toString(entity); } catch (ClientProtocolException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { if (null != httpResponse) { try { httpResponse.close(); } catch (IOException e) { e.printStackTrace(); } } if (null != httpClient) { try { httpClient.close(); } catch (IOException e) { e.printStackTrace(); } } } return result; } } trading-order-admin/src/main/java/com/yami/trading/admin/util/us/ReponseBase.java
New file @@ -0,0 +1,47 @@ package com.yami.trading.admin.util.us; import com.yami.trading.bean.model.StockRealTimeBean; import java.util.List; public class ReponseBase { private List<StockRealTimeBean> data; private int page; private int pageSize; private int total; public List<StockRealTimeBean> getData() { return data; } public void setData(List<StockRealTimeBean> data) { this.data = data; } public int getPage() { return page; } public void setPage(int page) { this.page = page; } public int getPageSize() { return pageSize; } public void setPageSize(int pageSize) { this.pageSize = pageSize; } public int getTotal() { return total; } public void setTotal(int total) { this.total = total; } } trading-order-admin/src/main/java/com/yami/trading/api/controller/ApiItemController.java
@@ -272,7 +272,7 @@ // 倒序吗? TODO queryWrapper.orderByDesc("sorted"); long current = itemQuery.getCurrent() == 0 ? 1 : itemQuery.getCurrent(); long size = itemQuery.getSize() == 0 ? 1000 : itemQuery.getSize(); long size = itemQuery.getSize() == 0 ? 50 : itemQuery.getSize(); Page<Item> page = new Page<>(current, size); IPage<Item> result = itemService.page(page, queryWrapper); List<Item> records = result.getRecords(); trading-order-admin/src/main/resources/application-dev.yml
@@ -130,9 +130,9 @@ config: timezone: # 配置当前盘口存储数据使用的时区 record: GMT+8 record: America/New_York # 配置当前盘口展示数据使用的时区 show: GMT+8 show: America/New_York sign: encryption-key: d78585e683ed11eaa13f0242ac110003 trading-order-bean/src/main/java/com/yami/trading/bean/data/domain/Realtime.java
@@ -92,6 +92,10 @@ */ private String type; @TableField(exist = false) @ApiModelProperty("昨日收盘价") private double prevClose; /** * 涨跌幅 */ trading-order-bean/src/main/java/com/yami/trading/bean/model/StockRealTimeBean.java
New file @@ -0,0 +1,26 @@ package com.yami.trading.bean.model; import lombok.Data; @Data public class StockRealTimeBean { private String Id; // 股票Id 也是股票的pid private String Symbol; // 股票编码 private String Name; // 股票名称 private String Last; // 股票最新价格 private String Low; // 最低 private String High; // 最高 private String Open; // 今开 private String PrevClose; // 昨收 private String Time; // 价格更新时间 private String Volume; // 交易量 private String Chg; // 涨幅 private String ChgPct; // 涨幅率 private String type; // 股票所在的交易所 private String Ratio; // 市盈率 private String MarketCap; // 市值 private String Eps; // 每股收益 private String Bid; // 买进价 private String Ask; // 卖出价 } trading-order-common/src/main/java/com/yami/trading/common/serializer/redis/KryoRedisSerializer.java
@@ -67,6 +67,7 @@ try (Input input = new Input(bytes)) { return (T) kryo.readClassAndObject(input); } catch (Exception e) { e.printStackTrace(); log.error(e.getMessage(), e); } return null; trading-order-huobi/src/main/java/com/yami/trading/huobi/data/job/StockGetDataJob.java
@@ -123,7 +123,7 @@ } if(MarketOpenChecker.isMarketOpen(Item.US_STOCKS, 30)){ //美股 this.realtimeHandleTradingViewUsStock(); //this.realtimeHandleTradingViewUsStock(); //this.realtimeHandleXueQiu(usStockRemarks); //美股ETF @@ -133,7 +133,7 @@ this.realtimeHandleTradingViewUsEtf(); //美股 this.realtimeHandleTradingViewUsStock(); //this.realtimeHandleTradingViewUsStock(); //if(MarketOpenChecker.isMarketOpen(Item.TW_STOCKS, 30)){ // this.realtimeHandleTW(twStockRemarks); trading-order-huobi/src/main/java/com/yami/trading/huobi/jsws/WebSocketClientBeanConfig.java
New file @@ -0,0 +1,61 @@ package com.yami.trading.huobi.jsws; import com.yami.trading.huobi.websocket.constant.enums.EStockType; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.net.URI; import java.util.HashMap; import java.util.Map; @Slf4j @Component public class WebSocketClientBeanConfig { @Bean public Map<String, WebSocketClient> websocketRunClientMap() { Map<String, WebSocketClient> retMap = new HashMap<>(1); try { WebsocketRunClient websocketRunClient = new WebsocketRunClient(new URI(EStockType.US.getWsUrl()), EStockType.US); websocketRunClient.connect(); websocketRunClient.setConnectionLostTimeout(0); startHeartbeatThread(websocketRunClient); retMap.put(EStockType.US.getStockKey(), websocketRunClient); } catch (Exception e) { log.error("WebsocketRunClient 异常: {}", e.getMessage()); } return retMap; } private void startHeartbeatThread(WebSocketClient client) { new Thread(() -> { while (true) { try { Thread.sleep(8000); if (client.isOpen()) { // 先检查连接状态 client.send("heartbeat".getBytes()); } else { client.reconnect(); client.setConnectionLostTimeout(0); } } catch (Exception e) { log.error("心跳线程异常, 尝试重连: {}", e.getMessage()); try { client.reconnect(); client.setConnectionLostTimeout(0); } catch (Exception re) { log.error("重连失败: {}", re.getMessage()); } } } }).start(); } } trading-order-huobi/src/main/java/com/yami/trading/huobi/jsws/WebsocketRunClient.java
New file @@ -0,0 +1,120 @@ package com.yami.trading.huobi.jsws; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import com.yami.trading.bean.data.domain.Realtime; import com.yami.trading.huobi.data.DataCache; import com.yami.trading.huobi.websocket.constant.enums.EStockType; import lombok.extern.slf4j.Slf4j; import org.java_websocket.client.WebSocketClient; import org.java_websocket.handshake.ServerHandshake; import java.lang.reflect.Type; import java.math.BigDecimal; import java.math.RoundingMode; import java.net.URI; import java.util.*; @Slf4j public class WebsocketRunClient extends WebSocketClient { private EStockType eStockType; public WebsocketRunClient(URI serverUri, EStockType eStockType) { // 修改为新的WebSocket服务器地址 super(URI.create("wss://usws.yanshiz.com/websocket-server")); this.eStockType = eStockType; } @Override public void onOpen(ServerHandshake serverHandshake) { log.info("WebSocket连接已建立,连接到: wss://usws.yanshiz.com/websocket-server"); // 发送身份验证消息 send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes()); Timer heartbeatTimer; // 启动心跳定时器 heartbeatTimer = new Timer(); heartbeatTimer.schedule(new TimerTask() { @Override public void run() { if (isOpen()) { //send(("key:"+ eStockType.getStockKey()+":"+eStockType.getContryId()).getBytes()); send("heartbeat"); } } }, 0, 3000); // 每3秒发送一次心跳消息 } @Override public void onMessage(String s) { if(!s.equals("pong") && !s.equals("身份验证成功")){ try { Map<String, String> stringObjectMap = jsonToMap(s); Realtime realtime = new Realtime(); realtime.setUuid(stringObjectMap.get("pid")); realtime.setSymbol(stringObjectMap.get("symbol")); realtime.setClose(new BigDecimal(stringObjectMap.get("last")).doubleValue()); realtime.setLow(new BigDecimal(stringObjectMap.get("low")).doubleValue()); realtime.setHigh(new BigDecimal(stringObjectMap.get("high")).doubleValue()); realtime.setOpen(new BigDecimal(stringObjectMap.get("open")).doubleValue()); realtime.setPrevClose(new BigDecimal(stringObjectMap.get("prevClose")).doubleValue()); realtime.setTs(Long.valueOf(stringObjectMap.get("time") + "000")); realtime.setNetChange(new BigDecimal(stringObjectMap.get("pc")).doubleValue()); realtime.setChangeRatio(parsePercent(stringObjectMap.get("pcp"))); realtime.setBid(new BigDecimal(stringObjectMap.get("bid")).doubleValue()); realtime.setAsk(new BigDecimal(stringObjectMap.get("ask")).doubleValue()); DataCache.putRealtime(realtime.getSymbol(), realtime); } catch (Exception e) { log.error("处理WebSocket消息时发生错误: {}", e.getMessage(), e); } } else { log.info("WebSocket心跳或认证响应: {}", s); } } public static Map<String, String> jsonToMap(String json) { Gson gson = new Gson(); Type type = new TypeToken<Map<String, String>>(){}.getType(); return gson.fromJson(json, type); } @Override public void onClose(int i, String s, boolean b) { log.info("WebSocket连接已关闭,代码: {}, 原因: {}, 远程关闭: {}", i, s, b); } @Override public void onError(Exception e) { log.error("WebSocket连接发生错误: {}", e.getMessage(), e); } /** * 将带正负号的百分比字符串转换为double(返回小数形式,如+37.69% → 0.3769) * @param percentStr 百分比字符串(如"+37.69%"、"-5.2%") * @return 转换后的double值 * @throws IllegalArgumentException 若输入格式无效 */ public static double parsePercent(String percentStr) { if (percentStr == null || percentStr.trim().isEmpty()) { throw new IllegalArgumentException("输入字符串不能为空"); } // 1. 去除百分号并清理空格 String numStr = percentStr.replace("%", "").trim(); try { // 2. 转换为double(支持正负号) double value = Double.parseDouble(numStr); // 3. 转换为小数(百分比 → 小数) double decimal = value / 100.0; // 4. 四舍五入保留4位小数(使用BigDecimal确保精度) return new BigDecimal(decimal) .setScale(4, RoundingMode.HALF_UP) // HALF_UP:四舍五入模式 .doubleValue(); } catch (NumberFormatException e) { throw new IllegalArgumentException("无效的百分比格式:" + percentStr, e); } } } trading-order-huobi/src/main/java/com/yami/trading/huobi/websocket/constant/enums/EStockType.java
New file @@ -0,0 +1,125 @@ package com.yami.trading.huobi.websocket.constant.enums; public enum EStockType { US("US","美国股票","5","http://api-us-v2.js-stock.top/","ws://api-us-v2-ws.js-stock.top","gAFCeL8vaTo7slHJFogb","USD","$"), ; private String code; private String typeDesc; public String contryId; public String stockUrl; public String wsUrl; public String stockKey; private String symbol; private String symbol1; public static EStockType getDefault() { return US; // 指定默认 } EStockType(String code, String typeDesc, String contryId, String stockUrl, String wsUrl,String stockKey,String symbol,String symbol1) { this.code = code; this.typeDesc = typeDesc; this.contryId = contryId; this.stockUrl = stockUrl; this.wsUrl = wsUrl; this.stockKey = stockKey; this.symbol = symbol; this.symbol1 = symbol1; } public static EStockType getEStockTypeByCode(String code){ if(EStockType.US.getCode().equals(code)){ return US; /*}else if(EStockType.HK.getCode().equals(code)){ return HK; }else if(EStockType.IN.getCode().equals(code)){ return IN; }else if(EStockType.TW.getCode().equals(code)){ return TW;*/ }else{ return US; } } public static boolean isExistByCode(String code){ /*if(EStockType.US.getCode().equals(code)){ return true; }else if(EStockType.HK.getCode().equals(code)){ return true; }else if(EStockType.IN.getCode().equals(code)){ return true; }else if(EStockType.TW.getCode().equals(code)){ return true; }else{ return false; }*/ return false; } //根据货币获取类型 public static EStockType getEStockTypeBySymbol(String symbol){ /*if(EStockType.US.getSymbol().equals(symbol)){ return US; }else if(EStockType.HK.getSymbol().equals(symbol)){ return HK; }else if(EStockType.IN.getSymbol().equals(symbol)){ return IN; }else if(EStockType.TW.getSymbol().equals(symbol)){ return TW; }else{ return null; }*/ return null; } public String getContryId() { return contryId; } public String getStockUrl() { return stockUrl; } public String getStockKey() { return stockKey; } public String getWsUrl() { return wsUrl; } public void setWsUrl(String wsUrl) { this.wsUrl = wsUrl; } public String getCode() { return code; } public String getSymbol() { return symbol; } public void setSymbol(String symbol) { this.symbol = symbol; } public String getSymbol1() { return symbol1; } public void setSymbol1(String symbol1) { this.symbol1 = symbol1; } public String getTypeDesc() { return typeDesc; } }