1
zyy
2025-11-07 865464c7b3aa65eba7469a256cfd01af64ba581f
trading-order-admin/src/main/java/com/yami/trading/admin/task/StockTask.java
@@ -2,13 +2,12 @@
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import com.yami.trading.admin.util.us.HttpClientRequest;
import com.yami.trading.admin.util.us.ReponseBase;
import com.yami.trading.bean.data.domain.Realtime;
import com.yami.trading.bean.item.domain.Item;
import com.yami.trading.bean.model.StockRealTimeBean;
import com.yami.trading.common.constants.RedisKeys;
import com.yami.trading.common.util.RedisUtil;
import com.yami.trading.huobi.data.DataCache;
import com.yami.trading.huobi.websocket.constant.enums.EStockType;
import com.yami.trading.service.item.ItemService;
@@ -16,10 +15,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Component;
import java.lang.reflect.Type;
import java.math.BigDecimal;
import java.util.*;
import java.util.concurrent.CompletableFuture;
@@ -30,7 +31,7 @@
@Component
public class StockTask {
public class StockTask implements CommandLineRunner {
    private static final Logger log = LoggerFactory.getLogger(StockTask.class);
    private final AtomicBoolean syncINStockData = new AtomicBoolean(false);
@@ -46,7 +47,7 @@
    /**
     * 同步系统所需要的股票
     */
    @Scheduled(cron = "0 0/5 * * * ?")
    @Scheduled(cron = "0 0/30 * * * ?")
    public void syncINStockData() {
        if (syncINStockData.get()) { // 判断任务是否在处理中
@@ -78,6 +79,8 @@
            }
        }
    }
    /**
     * 加载所有股票数据
@@ -111,23 +114,23 @@
                    .eq("type", Item.indices));
            log.info("同步股票 已有数据 {} 获取数据 {}", stockList.size(), list.size());
            System.out.println(stockList);
            List<Item> updateStockList = new ArrayList<>();
            for (StockRealTimeBean o : list) {
                //System.out.println(o);
                Item indices = indicesList.stream()
                        .filter(x -> x.getSymbol().equals(o.getSymbol()))
                        .findFirst()
                        .orElse(null);
                if (indices != null) {  //指数不添加
                    continue;
                }
                Item item = stockList.stream()
                        .filter(x -> x.getSymbol().equals(o.getSymbol()) &&
                                x.getStockCode().equals(o.getId()))
                        .findFirst()
                        .orElse(null);
                if (item != null) {  //已有不添加
                    continue;
                }
                item = indicesList.stream()
                        .filter(x -> x.getSymbol().equals(o.getSymbol()))
                        .findFirst()
                        .orElse(null);
                if (item != null) {  //已有不添加
                if (item != null) {    //已有不添加
                    continue;
                }
@@ -161,7 +164,6 @@
                updateStockList.add(item);
                Realtime realtime = new Realtime();
                realtime.setUuid(o.getId());
                realtime.setSymbol(o.getSymbol());
@@ -191,8 +193,83 @@
        }
    }
    /**
     * 初始化更新价格
     */
    public void initAllStock(EStockType eStockType) {
        log.info("init US股票 数据 {}", eStockType.getCode());
        List<StockRealTimeBean> list = new ArrayList<>();
        try {
            Map<String, Object> paramMap = new HashMap<>();
            List<Item> stockList = itemService.list(new QueryWrapper<Item>()
                    .eq("type", Item.US_STOCKS));
            int batchSize = 5000; // 每批5000条
            int totalSize = stockList.size(); // 总条数
            List<String> pids = new ArrayList<>();
            // 循环分批处理
            for (int i = 0; i < totalSize; i += batchSize) {
                // 计算当前批次的结束索引(避免最后一批不足5000条时越界)
                int endIndex = Math.min(i + batchSize, totalSize);
                // 截取当前批次的子列表([i, endIndex),左闭右开)
                List<Item> batchList = stockList.subList(i, endIndex);
                // 拼接当前批次的pid(逗号分隔的stockCode)
                String batchPid = batchList.stream()
                        .map(item -> String.valueOf(item.getStockCode())) // 提取stockCode并转字符串
                        .collect(Collectors.joining(",")); // 拼接
                pids.add(batchPid);
            }
            pids.forEach(pid -> {
                paramMap.put("pid", pid);
                String result = HttpClientRequest.doPost(eStockType.stockUrl + "stock?" + "key=" + eStockType.stockKey, paramMap);
                // 定义List<StockRealTimeBean>的类型(通过TypeToken保留泛型信息)
                Type type = new TypeToken<List<StockRealTimeBean>>() {
                }.getType();
                // 解析为List<StockRealTimeBean>
                List<StockRealTimeBean> stockRealTimeBeanList = new Gson().fromJson(result, type);
                list.addAll(stockRealTimeBeanList);
            });
            stockList.forEach(item -> {
                StockRealTimeBean o = list.stream().filter(x -> x.getId() != null &&
                        x.getId().equals(item.getStockCode())).findFirst().orElse(null);
                if (o != null) {
                    Realtime realtime = new Realtime();
                    realtime.setUuid(item.getStockCode());
                    realtime.setSymbol(item.getSymbol());
                    realtime.setName(item.getName());
                    realtime.setClose(new BigDecimal(o.getLast().trim()).doubleValue());
                    realtime.setLow(new BigDecimal(o.getLow().trim()).doubleValue());
                    realtime.setHigh(new BigDecimal(o.getHigh().trim()).doubleValue());
                    realtime.setOpen(new BigDecimal(o.getOpen().trim()).doubleValue());
                    realtime.setPrevClose(new BigDecimal(o.getPrevClose().trim()).doubleValue());
                    realtime.setTs(Long.valueOf(o.getTime() + "000"));
                    realtime.setVolume(new BigDecimal(o.getVolume().trim()).doubleValue());
                    realtime.setNetChange(new BigDecimal(o.getChg().trim()).doubleValue());
                    realtime.setChangeRatio(new BigDecimal(o.getChgPct()).doubleValue());
                    realtime.setType(o.getType());
                    realtime.setBid(new BigDecimal(o.getBid()).doubleValue());
                    realtime.setAsk(new BigDecimal(o.getAsk()).doubleValue());
                    DataCache.putRealtime(realtime.getSymbol(), realtime);
                }
            });
            log.info("init US股票 数据 成功");
        } catch (Exception e) {
            log.error(e.getMessage(), e);
        }
    }
    public static void main(String[] args) {
        StockTask task = new StockTask();
        task.loadAllStock(EStockType.US);
        task.initAllStock(EStockType.US);
    }
    @Override
    public void run(String... args) throws Exception {
        initAllStock(EStockType.US);
    }
}