| | |
| | | |
| | | import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
| | | import com.google.gson.Gson; |
| | | import com.google.gson.reflect.TypeToken; |
| | | import com.yami.trading.admin.util.us.HttpClientRequest; |
| | | import com.yami.trading.admin.util.us.ReponseBase; |
| | | import com.yami.trading.bean.data.domain.Realtime; |
| | | import com.yami.trading.bean.item.domain.Item; |
| | | import com.yami.trading.bean.model.StockRealTimeBean; |
| | | import com.yami.trading.common.constants.RedisKeys; |
| | | import com.yami.trading.common.util.RedisUtil; |
| | | import com.yami.trading.huobi.data.DataCache; |
| | | import com.yami.trading.huobi.websocket.constant.enums.EStockType; |
| | | import com.yami.trading.service.item.ItemService; |
| | |
| | | import org.slf4j.Logger; |
| | | import org.slf4j.LoggerFactory; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.boot.CommandLineRunner; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; |
| | | import org.springframework.stereotype.Component; |
| | | |
| | | import java.lang.reflect.Type; |
| | | import java.math.BigDecimal; |
| | | import java.util.*; |
| | | import java.util.concurrent.CompletableFuture; |
| | |
| | | |
| | | |
| | | @Component |
| | | public class StockTask { |
| | | public class StockTask implements CommandLineRunner { |
| | | private static final Logger log = LoggerFactory.getLogger(StockTask.class); |
| | | |
| | | private final AtomicBoolean syncINStockData = new AtomicBoolean(false); |
| | |
| | | /** |
| | | * 同步系统所需要的股票 |
| | | */ |
| | | @Scheduled(cron = "0 0/5 * * * ?") |
| | | @Scheduled(cron = "0 0/30 * * * ?") |
| | | public void syncINStockData() { |
| | | |
| | | if (syncINStockData.get()) { // 判断任务是否在处理中 |
| | |
| | | } |
| | | } |
| | | } |
| | | |
| | | |
| | | |
| | | /** |
| | | * 加载所有股票数据 |
| | |
| | | .eq("type", Item.indices)); |
| | | |
| | | log.info("同步股票 已有数据 {} 获取数据 {}", stockList.size(), list.size()); |
| | | System.out.println(stockList); |
| | | List<Item> updateStockList = new ArrayList<>(); |
| | | for (StockRealTimeBean o : list) { |
| | | //System.out.println(o); |
| | | Item indices = indicesList.stream() |
| | | .filter(x -> x.getSymbol().equals(o.getSymbol())) |
| | | .findFirst() |
| | | .orElse(null); |
| | | if (indices != null) { //指数不添加 |
| | | continue; |
| | | } |
| | | |
| | | Item item = stockList.stream() |
| | | .filter(x -> x.getSymbol().equals(o.getSymbol()) && |
| | | x.getStockCode().equals(o.getId())) |
| | | .findFirst() |
| | | .orElse(null); |
| | | if (item != null) { //已有不添加 |
| | | continue; |
| | | } |
| | | item = indicesList.stream() |
| | | .filter(x -> x.getSymbol().equals(o.getSymbol())) |
| | | .findFirst() |
| | | .orElse(null); |
| | | if (item != null) { //已有不添加 |
| | | if (item != null) { //已有不添加 |
| | | continue; |
| | | } |
| | | |
| | |
| | | |
| | | updateStockList.add(item); |
| | | |
| | | |
| | | Realtime realtime = new Realtime(); |
| | | realtime.setUuid(o.getId()); |
| | | realtime.setSymbol(o.getSymbol()); |
| | |
| | | } |
| | | } |
| | | |
| | | /** |
| | | * 初始化更新价格 |
| | | */ |
| | | public void initAllStock(EStockType eStockType) { |
| | | log.info("init US股票 数据 {}", eStockType.getCode()); |
| | | List<StockRealTimeBean> list = new ArrayList<>(); |
| | | try { |
| | | Map<String, Object> paramMap = new HashMap<>(); |
| | | List<Item> stockList = itemService.list(new QueryWrapper<Item>() |
| | | .eq("type", Item.US_STOCKS)); |
| | | int batchSize = 5000; // 每批5000条 |
| | | int totalSize = stockList.size(); // 总条数 |
| | | |
| | | List<String> pids = new ArrayList<>(); |
| | | // 循环分批处理 |
| | | for (int i = 0; i < totalSize; i += batchSize) { |
| | | // 计算当前批次的结束索引(避免最后一批不足5000条时越界) |
| | | int endIndex = Math.min(i + batchSize, totalSize); |
| | | |
| | | // 截取当前批次的子列表([i, endIndex),左闭右开) |
| | | List<Item> batchList = stockList.subList(i, endIndex); |
| | | |
| | | // 拼接当前批次的pid(逗号分隔的stockCode) |
| | | String batchPid = batchList.stream() |
| | | .map(item -> String.valueOf(item.getStockCode())) // 提取stockCode并转字符串 |
| | | .collect(Collectors.joining(",")); // 拼接 |
| | | |
| | | pids.add(batchPid); |
| | | } |
| | | pids.forEach(pid -> { |
| | | paramMap.put("pid", pid); |
| | | String result = HttpClientRequest.doPost(eStockType.stockUrl + "stock?" + "key=" + eStockType.stockKey, paramMap); |
| | | // 定义List<StockRealTimeBean>的类型(通过TypeToken保留泛型信息) |
| | | Type type = new TypeToken<List<StockRealTimeBean>>() { |
| | | }.getType(); |
| | | // 解析为List<StockRealTimeBean> |
| | | List<StockRealTimeBean> stockRealTimeBeanList = new Gson().fromJson(result, type); |
| | | list.addAll(stockRealTimeBeanList); |
| | | }); |
| | | |
| | | stockList.forEach(item -> { |
| | | StockRealTimeBean o = list.stream().filter(x -> x.getId() != null && |
| | | x.getId().equals(item.getStockCode())).findFirst().orElse(null); |
| | | if (o != null) { |
| | | Realtime realtime = new Realtime(); |
| | | realtime.setUuid(item.getStockCode()); |
| | | realtime.setSymbol(item.getSymbol()); |
| | | realtime.setName(item.getName()); |
| | | realtime.setClose(new BigDecimal(o.getLast().trim()).doubleValue()); |
| | | realtime.setLow(new BigDecimal(o.getLow().trim()).doubleValue()); |
| | | realtime.setHigh(new BigDecimal(o.getHigh().trim()).doubleValue()); |
| | | realtime.setOpen(new BigDecimal(o.getOpen().trim()).doubleValue()); |
| | | realtime.setPrevClose(new BigDecimal(o.getPrevClose().trim()).doubleValue()); |
| | | realtime.setTs(Long.valueOf(o.getTime() + "000")); |
| | | realtime.setVolume(new BigDecimal(o.getVolume().trim()).doubleValue()); |
| | | realtime.setNetChange(new BigDecimal(o.getChg().trim()).doubleValue()); |
| | | realtime.setChangeRatio(new BigDecimal(o.getChgPct()).doubleValue()); |
| | | realtime.setType(o.getType()); |
| | | realtime.setBid(new BigDecimal(o.getBid()).doubleValue()); |
| | | realtime.setAsk(new BigDecimal(o.getAsk()).doubleValue()); |
| | | DataCache.putRealtime(realtime.getSymbol(), realtime); |
| | | } |
| | | }); |
| | | |
| | | log.info("init US股票 数据 成功"); |
| | | } catch (Exception e) { |
| | | log.error(e.getMessage(), e); |
| | | } |
| | | } |
| | | |
| | | public static void main(String[] args) { |
| | | StockTask task = new StockTask(); |
| | | task.loadAllStock(EStockType.US); |
| | | task.initAllStock(EStockType.US); |
| | | } |
| | | |
| | | @Override |
| | | public void run(String... args) throws Exception { |
| | | initAllStock(EStockType.US); |
| | | } |
| | | } |