trading-order-admin/src/main/java/com/yami/trading/admin/task/StockTask.java
@@ -7,8 +7,6 @@ import com.yami.trading.bean.data.domain.Realtime; import com.yami.trading.bean.item.domain.Item; import com.yami.trading.bean.model.StockRealTimeBean; import com.yami.trading.common.constants.RedisKeys; import com.yami.trading.common.util.RedisUtil; import com.yami.trading.huobi.data.DataCache; import com.yami.trading.huobi.websocket.constant.enums.EStockType; import com.yami.trading.service.item.ItemService; @@ -16,6 +14,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; @@ -30,7 +29,7 @@ @Component public class StockTask { public class StockTask implements CommandLineRunner { private static final Logger log = LoggerFactory.getLogger(StockTask.class); private final AtomicBoolean syncINStockData = new AtomicBoolean(false); @@ -124,43 +123,43 @@ continue; } Item item = stockList.stream() .filter(x -> x.getSymbol().equals(o.getSymbol()) && x.getStockCode().equals(o.getId())) .findFirst() .orElse(null); if (item == null) { item = new Item(); String name = StringUtils.trim(o.getName()); item.setEnName(name); item.setName(name); item.setSymbolFullName(name); item.setSymbol(o.getSymbol()); item.setSymbolData(o.getSymbol()); item.setPips(BigDecimal.valueOf(0.01).doubleValue()); item.setPipsAmount(BigDecimal.valueOf(0.02).doubleValue()); item.setAdjustmentValue(BigDecimal.ZERO.doubleValue()); item.setUnitAmount(BigDecimal.valueOf(1000).doubleValue()); item.setUnitFee(BigDecimal.valueOf(30).doubleValue()); item.setMarket("FOREVER"); item.setDecimals(2); item.setMultiple(BigDecimal.ZERO.doubleValue()); item.setBorrowingRate(BigDecimal.ZERO.doubleValue()); item.setDelFlag(0); item.setType(Item.US_STOCKS); item.setCategory(Item.US_STOCKS); item.setSorted("100"); item.setOpenCloseType(Item.US_STOCKS); item.setFake("0"); item.setShowStatus("1"); item.setTradeStatus("1"); item.setQuoteCurrency("USDT"); item.setCrawlStatus("default_active"); item.setStockCode(o.getId()); updateStockList.add(item); if (item != null) { //已有不添加 continue; } item = new Item(); String name = StringUtils.trim(o.getName()); item.setEnName(name); item.setName(name); item.setSymbolFullName(name); item.setSymbol(o.getSymbol()); item.setSymbolData(o.getSymbol()); item.setPips(BigDecimal.valueOf(0.01).doubleValue()); item.setPipsAmount(BigDecimal.valueOf(0.02).doubleValue()); item.setAdjustmentValue(BigDecimal.ZERO.doubleValue()); item.setUnitAmount(BigDecimal.valueOf(1000).doubleValue()); item.setUnitFee(BigDecimal.valueOf(30).doubleValue()); item.setMarket("FOREVER"); item.setDecimals(2); item.setMultiple(BigDecimal.ZERO.doubleValue()); item.setBorrowingRate(BigDecimal.ZERO.doubleValue()); item.setDelFlag(0); item.setType(Item.US_STOCKS); item.setCategory(Item.US_STOCKS); item.setSorted("100"); item.setOpenCloseType(Item.US_STOCKS); item.setFake("0"); item.setShowStatus("1"); item.setTradeStatus("1"); item.setQuoteCurrency("USDT"); item.setCrawlStatus("default_active"); item.setStockCode(o.getId()); updateStockList.add(item); Realtime realtime = new Realtime(); @@ -192,6 +191,68 @@ } } /** * 初始化所有股票数据 */ public void initAllStock(EStockType eStockType) { log.info("init US股票 数据 {}", eStockType.getCode()); List<StockRealTimeBean> list = new ArrayList<>(); int totleStock = 1; int page = 0; try { while (totleStock > list.size()) { try { String result = HttpClientRequest.doGet(eStockType.stockUrl + "list?country_id=" + eStockType.getContryId() + "&size=100000&page=" + page + "&key=" + eStockType.stockKey); ReponseBase reponseBase = new Gson().fromJson(result, ReponseBase.class); list.addAll(reponseBase.getData()); page++; totleStock = reponseBase.getTotal(); } catch (Exception e) { e.printStackTrace(); break; } } if (list.isEmpty()) { return; } List<Item> indicesList = itemService.list(new QueryWrapper<Item>() .eq("type", Item.indices)); for (StockRealTimeBean o : list) { Item indices = indicesList.stream() .filter(x -> x.getSymbol().equals(o.getSymbol())) .findFirst() .orElse(null); if (indices != null) { //指数不添加 continue; } Realtime realtime = new Realtime(); realtime.setUuid(o.getId()); realtime.setSymbol(o.getSymbol()); realtime.setName(o.getName()); realtime.setClose(new BigDecimal(o.getLast().trim()).doubleValue()); realtime.setLow(new BigDecimal(o.getLow().trim()).doubleValue()); realtime.setHigh(new BigDecimal(o.getHigh().trim()).doubleValue()); realtime.setOpen(new BigDecimal(o.getOpen().trim()).doubleValue()); realtime.setPrevClose(new BigDecimal(o.getPrevClose().trim()).doubleValue()); realtime.setTs(Long.valueOf(o.getTime() + "000")); realtime.setVolume(new BigDecimal(o.getVolume().trim()).doubleValue()); realtime.setNetChange(new BigDecimal(o.getChg().trim()).doubleValue()); realtime.setChangeRatio(new BigDecimal(o.getChgPct()).doubleValue()); realtime.setType(o.getType()); realtime.setBid(new BigDecimal(o.getBid()).doubleValue()); realtime.setAsk(new BigDecimal(o.getAsk()).doubleValue()); DataCache.putRealtime(realtime.getSymbol(), realtime); } log.info("init US股票 数据 成功"); } catch ( Exception e) { log.error("同步出错", e); } } @@ -200,4 +261,9 @@ StockTask task = new StockTask(); task.loadAllStock(EStockType.US); } @Override public void run(String... args) throws Exception { initAllStock(EStockType.US); } } trading-order-admin/src/main/java/com/yami/trading/api/controller/ApiProjectBreedContorller.java
@@ -54,7 +54,7 @@ public Result<List<ProjectVariety>> getConstituentStockList(GetConstituentStockListModel model) { List<ProjectVariety> list = projectVarietyService.list(Wrappers.<ProjectVariety>query().lambda().eq(ProjectVariety::getTransactionPairsSymbol, model.getSymbol())); try { itemVisitFacade.updateOrInsertVisit(model.getSymbol()); //itemVisitFacade.updateOrInsertVisit(model.getSymbol()); }catch (Exception e){ log.error("updateOrInsertVisit {} 异常", model.getSymbol(), e); } trading-order-admin/src/main/java/com/yami/trading/api/controller/RealtimeController.java
@@ -54,6 +54,9 @@ @GetMapping("api/hobi!getRealtime.action") public Result<List<Realtime>> getRealtime(@RequestParam String symbol) { try { if (symbol == null || symbol.isEmpty()){ return Result.succeed(); } List<Realtime> data = this.dataService.realtime(symbol); data.forEach(d->{ Item bySymbol = itemService.findBySymbol(d.getSymbol()); @@ -289,6 +292,9 @@ List<Item> items = new ArrayList<>(); //按字符串排序 List<Item> itemList = itemService.cacheGetAll().stream().sorted(Comparator.comparing(Item::getSorted).reversed()).collect(Collectors.toList()); if (type != null && type.equalsIgnoreCase(Item.US_STOCKS)) { category = null; } //按数字排序 // List<Item> itemList = itemService.cacheGetAll().stream().sorted(Comparator.comparing(Item::getSorted, (x,y)->{ // int xInt = 0; @@ -303,13 +309,14 @@ // })).collect(Collectors.toList()); if (null != type && null != category) { String finalCategory = category; // 知名类 需要模糊查询 if ("prominent".equals(category)) { itemsTotal = new ArrayList<>(itemList).stream().filter(item -> item.getType().equals(type) && item.getCategory().contains(category)).collect(Collectors.toList()); && item.getCategory().contains(finalCategory)).collect(Collectors.toList()); } else { itemsTotal = new ArrayList<>(itemList).stream().filter(item -> item.getType().equals(type) && item.getCategory().equals(category)).collect(Collectors.toList()); && item.getCategory().equals(finalCategory)).collect(Collectors.toList()); } if (pageNo * pageSize - pageSize < itemsTotal.size()) { int pages = itemsTotal.size() % pageSize == 0 ? itemsTotal.size() / pageSize : itemsTotal.size() / pageSize + 1; trading-order-huobi/src/main/java/com/yami/trading/huobi/hobi/internal/HobiDataServiceImpl.java
@@ -540,10 +540,10 @@ List<MarketTicker> tickers = marketClient.getTickers(); tickers.stream().forEach(ticker -> { String symbol = ticker.getSymbol(); logger.info(">>>>>>>>正在更新tk 交易对:{}",symbol); if(!symbolArr.contains(symbol)){ return; } logger.info(">>>>>>>>正在更新tk 交易对:{}",symbol); DataCache.putMarketTicker(symbol,ticker); }); } trading-order-huobi/src/main/java/com/yami/trading/huobi/task/DepthPushJob.java
@@ -84,11 +84,18 @@ } for (String symbol : symbolSet) { DepthTimeObject depth = DataCache.getDepth(symbol); Item bySymbol = itemService.findBySymbol(symbol); // 新增:校验symbol是否为null或空字符串 if (symbol == null || symbol.trim().isEmpty()) { logger.warn("----> DepthPushJob.depthHandle symbol为空或无效,跳过处理"); continue; } Item bySymbol = itemService.findCaCheBySymbol(symbol); if (bySymbol == null) { logger.warn("---> DepthPushJob.depthHandle 当前 symbol:{} 没有对应的数据库记录", symbol); continue; } DepthTimeObject depth = DataCache.getDepth(symbol); if (depth == null) { dataService.depth(symbol); } trading-order-huobi/src/main/java/com/yami/trading/huobi/task/InitHandle.java
@@ -30,6 +30,7 @@ import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.Date; import java.util.List; @Slf4j @@ -111,90 +112,94 @@ @Override public void run(String... args) { // 初始化缓存 //loadCacheService.loadcache(); cryptosGetDataJob.start(); tipService.init(); futuresLoadCacheService.loadcache(); financeLoadCacheService.loadcache(); // 已经加了注解 try { // 初始化缓存 //loadCacheService.loadcache(); cryptosGetDataJob.start(); tipService.init(); futuresLoadCacheService.loadcache(); financeLoadCacheService.loadcache(); // 已经加了注解 // minerLoadCacheService.loadcache(); // todo 先注释观察报错 // todo 先注释观察报错 // futuresRecomConsumeServer.start(); contractOrderCalculationJob.start(); //启动huobi wss行情 contractOrderCalculationJob.start(); //启动huobi wss行情 log.info("开始Data初始化........"); List<Item> items = new ArrayList<>(itemService.listWithOutCache()); for (Item item : items) { AdjustmentValueCache.getCurrentValue().put(item.getSymbol(), item.getAdjustmentValue()); } for (Item item : items) { Realtime realtime = dataDBService.get(item.getSymbol()); if (realtime != null) { DataCache.putRealtime(item.getSymbol(), realtime); log.info("开始Data初始化........ {}"); List<Item> items = new ArrayList<>(itemService.listWithOutCache()); for (Item item : items) { AdjustmentValueCache.getCurrentValue().put(item.getSymbol(), item.getAdjustmentValue()); } } for (Item item : items) { if(Item.cryptos.equalsIgnoreCase(item.getType())){ List<Realtime> list = this.dataDBService.findRealtimeOneDay(item.getSymbol()); DataCache.putCryptosRealtimeHistory(item.getSymbol(), list); log.info("开始Data初始化1"); for (Item item : items) { Realtime realtime = dataDBService.get(item.getSymbol()); if (realtime != null) { DataCache.putRealtime(item.getSymbol(), realtime); } } log.info("开始Data初始化2"); for (Item item : items) { if(Item.cryptos.equalsIgnoreCase(item.getType())){ List<Realtime> list = this.dataDBService.findRealtimeOneDay(item.getSymbol()); DataCache.putCryptosRealtimeHistory(item.getSymbol(), list); } } log.info("开始Data初始化3"); tradingViewJob.start(); /** * 实时行情价格 */ // 极度证券 wss行情,包含 A股,美股,港股,台股 stockGetDataJob.start(); //外汇实时行情 forexGetDataJob.start(); log.info("开始Data初始化4"); //初始化雪球Cookies //xueQiuDataService.initXueQiuCookies(); klineLoadCache.loadCache(); // 高低修正 highLowHandleJob.start(); highLowHandleJob.bulidHighLow(); log.info("开始Data初始化5"); stockGetMarketJob.start(); //fakeSymbolGetDataJob.start(); // 实时数据批量保存线程 saveRealtimeServer.start(); klineCacheJob.start(); /** * 委托单处理线程启动 */ contractApplyOrderHandleJob.start(); /** * 持仓单盈亏计算线程启动 */ contractOrderCalculationService.setOrder_close_line(this.sysparaService.find("order_close_line").getBigDecimal()); contractOrderCalculationService.setOrder_close_line_type(this.sysparaService.find("order_close_line_type").getInteger()); log.info("开始Data初始化6"); // todo 做模块判断,后续打开 futuresOrderCalculationJob.start(); realtimePushJob.start(); depthPushJob.start(); tradePushJob.start(); dataFrequencyServer.start(); cleanDataJob.taskJob(); log.info("完成Data初始化。{}"); /** * 币币委托单处理线程启动 */ exchangeApplyOrderHandleJob.start(); exchangeLeverApplyOrderHandleJob.start(); //klineService.clean(); }catch (Exception e) { log.error("Data初始化异常{}" , e.getMessage()); } tradingViewJob.start(); /** * 实时行情价格 */ // 极度证券 wss行情,包含 A股,美股,港股,台股 stockGetDataJob.start(); //外汇实时行情 forexGetDataJob.start(); //初始化雪球Cookies //xueQiuDataService.initXueQiuCookies(); klineLoadCache.loadCache(); // 高低修正 highLowHandleJob.start(); highLowHandleJob.bulidHighLow(); stockGetMarketJob.start(); //fakeSymbolGetDataJob.start(); // 实时数据批量保存线程 saveRealtimeServer.start(); klineCacheJob.start(); /** * 委托单处理线程启动 */ contractApplyOrderHandleJob.start(); /** * 持仓单盈亏计算线程启动 */ contractOrderCalculationService.setOrder_close_line(this.sysparaService.find("order_close_line").getBigDecimal()); contractOrderCalculationService.setOrder_close_line_type(this.sysparaService.find("order_close_line_type").getInteger()); // todo 做模块判断,后续打开 futuresOrderCalculationJob.start(); realtimePushJob.start(); depthPushJob.start(); tradePushJob.start(); dataFrequencyServer.start(); cleanDataJob.taskJob(); log.info("完成Data初始化。"); /** * 币币委托单处理线程启动 */ exchangeApplyOrderHandleJob.start(); exchangeLeverApplyOrderHandleJob.start(); //klineService.clean(); } } trading-order-huobi/src/main/java/com/yami/trading/huobi/task/KlineLoadCache.java
@@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.stream.Collectors; /** * Kline组件初始化加载缓存服务 @@ -40,6 +41,7 @@ public void loadCache() { List<Item> items = new ArrayList<>(itemService.listWithOutCache()); items = items.stream().filter(x -> !x.getType().equalsIgnoreCase(Item.US_STOCKS)).collect(Collectors.toList()); //过滤美股 log.info("--------------> KlineLoadCache.loadCache, item 集合长度为1:" + items.size()); // 加载调整值到内存 for (Item item : items) { trading-order-huobi/src/main/java/com/yami/trading/huobi/task/RealtimePushJob.java
@@ -73,6 +73,18 @@ } for (String symbol : symbolSet) { // 新增:校验symbol是否为null或空字符串 if (symbol == null || symbol.trim().isEmpty()) { log.warn("----> RealtimePushJob symbol为空或无效,跳过处理"); continue; } Item bySymbol = itemService.findCaCheBySymbol(symbol); if (bySymbol == null) { log.warn("---> RealtimePushJob 当前 symbol:{} 没有对应的数据库记录", symbol); continue; } Realtime realtimeData = DataCache.getRealtime(symbol); if (realtimeData == null) { log.error("realtimeHandle 获取{} 数据为空", symbol); @@ -99,7 +111,7 @@ } } } catch (Throwable e) { } catch (Exception e) { e.printStackTrace(); }