From 6e85d12eab8accba3dd6731d9273388bd7f8d68c Mon Sep 17 00:00:00 2001
From: zyy <zyy@email.com>
Date: Fri, 14 Nov 2025 18:26:25 +0800
Subject: [PATCH] 2

---
 trading-order-admin/src/main/java/com/yami/trading/admin/task/StockTask.java |  124 +++++++++++++++++++++++++++++++++++++----
 1 files changed, 111 insertions(+), 13 deletions(-)

diff --git a/trading-order-admin/src/main/java/com/yami/trading/admin/task/StockTask.java b/trading-order-admin/src/main/java/com/yami/trading/admin/task/StockTask.java
index c975b65..0fef3fb 100644
--- a/trading-order-admin/src/main/java/com/yami/trading/admin/task/StockTask.java
+++ b/trading-order-admin/src/main/java/com/yami/trading/admin/task/StockTask.java
@@ -2,13 +2,12 @@
 
 import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import com.yami.trading.admin.util.us.HttpClientRequest;
 import com.yami.trading.admin.util.us.ReponseBase;
 import com.yami.trading.bean.data.domain.Realtime;
 import com.yami.trading.bean.item.domain.Item;
 import com.yami.trading.bean.model.StockRealTimeBean;
-import com.yami.trading.common.constants.RedisKeys;
-import com.yami.trading.common.util.RedisUtil;
 import com.yami.trading.huobi.data.DataCache;
 import com.yami.trading.huobi.websocket.constant.enums.EStockType;
 import com.yami.trading.service.item.ItemService;
@@ -16,10 +15,12 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.CommandLineRunner;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 import org.springframework.stereotype.Component;
 
+import java.lang.reflect.Type;
 import java.math.BigDecimal;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
@@ -30,7 +31,7 @@
 
 
 @Component
-public class StockTask {
+public class StockTask implements CommandLineRunner {
     private static final Logger log = LoggerFactory.getLogger(StockTask.class);
 
     private final AtomicBoolean syncINStockData = new AtomicBoolean(false);
@@ -46,7 +47,7 @@
     /**
      * 同步系统所需要的股票
      */
-    @Scheduled(cron = "0 0/5 * * * ?")
+    @Scheduled(cron = "0 0/30 * * * ?")
     public void syncINStockData() {
 
         if (syncINStockData.get()) { // 判断任务是否在处理中
@@ -79,6 +80,8 @@
         }
     }
 
+
+
     /**
      * 加载所有股票数据
      */
@@ -103,18 +106,35 @@
             if (list.isEmpty()) {
                 return;
             }
-            List<String> stockCodeList = list.stream().map(StockRealTimeBean::getSymbol).collect(Collectors.toList());
-            List<Item> stockList = itemService.list(new QueryWrapper<Item>().in("symbol", stockCodeList));
+            List<String> stockCodeList = list.stream().map(StockRealTimeBean::getId).collect(Collectors.toList());
+            List<Item> stockList = itemService.list(new QueryWrapper<Item>()
+                            .eq("type", Item.US_STOCKS).in("stock_code", stockCodeList));
+
+            List<Item> indicesList = itemService.list(new QueryWrapper<Item>()
+                    .eq("type", Item.indices));
+
+            log.info("同步股票 已有数据 {} 获取数据 {}", stockList.size(), list.size());
             List<Item> updateStockList = new ArrayList<>();
             for (StockRealTimeBean o : list) {
                 //System.out.println(o);
-                Item item = stockList.stream()
+                Item indices = indicesList.stream()
                         .filter(x -> x.getSymbol().equals(o.getSymbol()))
                         .findFirst()
                         .orElse(null);
-                if (item == null) {
-                    item = new Item();
+                if (indices != null) {  //指数不添加
+                    continue;
                 }
+
+                Item item = stockList.stream()
+                        .filter(x -> x.getSymbol().equals(o.getSymbol()) &&
+                                x.getStockCode().equals(o.getId()))
+                        .findFirst()
+                        .orElse(null);
+                if (item != null) {    //已有不添加
+                    continue;
+                }
+
+                item = new Item();
                 String name = StringUtils.trim(o.getName());
                 item.setEnName(name);
                 item.setName(name);
@@ -139,9 +159,10 @@
                 item.setShowStatus("1");
                 item.setTradeStatus("1");
                 item.setQuoteCurrency("USDT");
+                item.setCrawlStatus("default_active");
+                item.setStockCode(o.getId());
 
                 updateStockList.add(item);
-
 
                 Realtime realtime = new Realtime();
                 realtime.setUuid(o.getId());
@@ -162,16 +183,93 @@
 
                 DataCache.putRealtime(realtime.getSymbol(), realtime);
             }
-            itemService.saveOrUpdateBatch(updateStockList);
-            log.info("同步股票 数据 成功 {}  总共同步数据 {}", eStockType.getCode(), list.size());
+            if (!updateStockList.isEmpty()) {
+                itemService.saveOrUpdateBatch(updateStockList);
+            }
+            log.info("同步股票 数据 成功 {}  总共同步数据 {}", eStockType.getCode(), updateStockList.size());
         } catch (
                 Exception e) {
             log.error("同步出错", e);
         }
     }
 
+    /**
+     * 初始化更新价格
+     */
+    public void initAllStock(EStockType eStockType) {
+        log.info("init US股票 数据 {}", eStockType.getCode());
+        List<StockRealTimeBean> list = new ArrayList<>();
+        try {
+            Map<String, Object> paramMap = new HashMap<>();
+            List<Item> stockList = itemService.list(new QueryWrapper<Item>()
+                    .eq("type", Item.US_STOCKS));
+            int batchSize = 5000; // 每批5000条
+            int totalSize = stockList.size(); // 总条数
+
+            List<String> pids = new ArrayList<>();
+            // 循环分批处理
+            for (int i = 0; i < totalSize; i += batchSize) {
+                // 计算当前批次的结束索引(避免最后一批不足5000条时越界)
+                int endIndex = Math.min(i + batchSize, totalSize);
+
+                // 截取当前批次的子列表([i, endIndex),左闭右开)
+                List<Item> batchList = stockList.subList(i, endIndex);
+
+                // 拼接当前批次的pid(逗号分隔的stockCode)
+                String batchPid = batchList.stream()
+                        .map(item -> String.valueOf(item.getStockCode())) // 提取stockCode并转字符串
+                        .collect(Collectors.joining(",")); // 拼接
+
+                pids.add(batchPid);
+            }
+            pids.forEach(pid -> {
+                paramMap.put("pid", pid);
+                String result = HttpClientRequest.doPost(eStockType.stockUrl + "stock?" + "key=" + eStockType.stockKey, paramMap);
+                // 定义List<StockRealTimeBean>的类型(通过TypeToken保留泛型信息)
+                Type type = new TypeToken<List<StockRealTimeBean>>() {
+                }.getType();
+                // 解析为List<StockRealTimeBean>
+                List<StockRealTimeBean> stockRealTimeBeanList = new Gson().fromJson(result, type);
+                list.addAll(stockRealTimeBeanList);
+            });
+
+            stockList.forEach(item -> {
+                StockRealTimeBean o = list.stream().filter(x -> x.getId() != null &&
+                        x.getId().equals(item.getStockCode())).findFirst().orElse(null);
+                if (o != null) {
+                    Realtime realtime = new Realtime();
+                    realtime.setUuid(item.getStockCode());
+                    realtime.setSymbol(item.getSymbol());
+                    realtime.setName(item.getName());
+                    realtime.setClose(new BigDecimal(o.getLast().trim()).doubleValue());
+                    realtime.setLow(new BigDecimal(o.getLow().trim()).doubleValue());
+                    realtime.setHigh(new BigDecimal(o.getHigh().trim()).doubleValue());
+                    realtime.setOpen(new BigDecimal(o.getOpen().trim()).doubleValue());
+                    realtime.setPrevClose(new BigDecimal(o.getPrevClose().trim()).doubleValue());
+                    realtime.setTs(Long.valueOf(o.getTime() + "000"));
+                    realtime.setVolume(new BigDecimal(o.getVolume().trim()).doubleValue());
+                    realtime.setNetChange(new BigDecimal(o.getChg().trim()).doubleValue());
+                    realtime.setChangeRatio(new BigDecimal(o.getChgPct()).doubleValue());
+                    realtime.setType(o.getType());
+                    realtime.setBid(new BigDecimal(o.getBid()).doubleValue());
+                    realtime.setAsk(new BigDecimal(o.getAsk()).doubleValue());
+                    DataCache.putRealtime(realtime.getSymbol(), realtime);
+                }
+            });
+
+            log.info("init US股票 数据 成功");
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+
     public static void main(String[] args) {
         StockTask task = new StockTask();
-        task.loadAllStock(EStockType.US);
+        task.initAllStock(EStockType.US);
+    }
+
+    @Override
+    public void run(String... args) throws Exception {
+        initAllStock(EStockType.US);
     }
 }

--
Gitblit v1.9.3