package com.yami.trading.admin.task;
|
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.google.gson.Gson;
|
import com.google.gson.reflect.TypeToken;
|
import com.yami.trading.admin.util.us.HttpClientRequest;
|
import com.yami.trading.admin.util.us.ReponseBase;
|
import com.yami.trading.bean.data.domain.Realtime;
|
import com.yami.trading.bean.item.domain.Item;
|
import com.yami.trading.bean.model.StockRealTimeBean;
|
import com.yami.trading.huobi.data.DataCache;
|
import com.yami.trading.huobi.websocket.constant.enums.EStockType;
|
import com.yami.trading.service.item.ItemService;
|
import org.apache.commons.lang3.StringUtils;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.boot.CommandLineRunner;
|
import org.springframework.scheduling.annotation.Scheduled;
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
import org.springframework.stereotype.Component;
|
|
import java.lang.reflect.Type;
|
import java.math.BigDecimal;
|
import java.util.*;
|
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.stream.Collectors;
|
|
|
@Component
|
public class StockTask implements CommandLineRunner {
|
private static final Logger log = LoggerFactory.getLogger(StockTask.class);
|
|
private final AtomicBoolean syncINStockData = new AtomicBoolean(false);
|
|
private final Lock syncINStockDataLock = new ReentrantLock();
|
|
@Autowired
|
private ThreadPoolTaskExecutor taskExecutor;
|
|
@Autowired
|
ItemService itemService;
|
|
/**
|
* 同步系统所需要的股票
|
*/
|
@Scheduled(cron = "0 0/30 * * * ?")
|
public void syncINStockData() {
|
|
if (syncINStockData.get()) { // 判断任务是否在处理中
|
return;
|
}
|
if (syncINStockDataLock.tryLock()) {
|
try {
|
syncINStockData.set(true);
|
|
// 1. 定义需要处理的所有股票类型(集中管理,新增类型只需添加到列表)
|
List<EStockType> stockTypes = Arrays.asList(
|
EStockType.US
|
);
|
|
// 2. 批量创建所有异步任务
|
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
for (EStockType type : stockTypes) {
|
// 添加loadAllStock任务
|
futures.add(CompletableFuture.runAsync(() -> loadAllStock(type), taskExecutor));
|
}
|
|
// 3. 等待所有任务完成(将List转换为数组传入allOf)
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
|
} catch (Exception e) {
|
log.error("同步股票数据出错", e);
|
} finally {
|
syncINStockDataLock.unlock();
|
syncINStockData.set(false);
|
}
|
}
|
}
|
|
|
|
/**
|
* 加载所有股票数据
|
*/
|
public void loadAllStock(EStockType eStockType) {
|
log.info("同步股票 数据 {}", eStockType.getCode());
|
List<StockRealTimeBean> list = new ArrayList<>();
|
int totleStock = 1;
|
int page = 0;
|
try {
|
while (totleStock > list.size()) {
|
try {
|
String result = HttpClientRequest.doGet(eStockType.stockUrl + "list?country_id=" + eStockType.getContryId() + "&size=100000&page=" + page + "&key=" + eStockType.stockKey);
|
ReponseBase reponseBase = new Gson().fromJson(result, ReponseBase.class);
|
list.addAll(reponseBase.getData());
|
page++;
|
totleStock = reponseBase.getTotal();
|
} catch (Exception e) {
|
e.printStackTrace();
|
break;
|
}
|
}
|
if (list.isEmpty()) {
|
return;
|
}
|
List<String> stockCodeList = list.stream().map(StockRealTimeBean::getId).collect(Collectors.toList());
|
List<Item> stockList = itemService.list(new QueryWrapper<Item>()
|
.eq("type", Item.US_STOCKS).in("stock_code", stockCodeList));
|
|
List<Item> indicesList = itemService.list(new QueryWrapper<Item>()
|
.eq("type", Item.indices));
|
|
log.info("同步股票 已有数据 {} 获取数据 {}", stockList.size(), list.size());
|
List<Item> updateStockList = new ArrayList<>();
|
for (StockRealTimeBean o : list) {
|
//System.out.println(o);
|
Item indices = indicesList.stream()
|
.filter(x -> x.getSymbol().equals(o.getSymbol()))
|
.findFirst()
|
.orElse(null);
|
if (indices != null) { //指数不添加
|
continue;
|
}
|
|
Item item = stockList.stream()
|
.filter(x -> x.getSymbol().equals(o.getSymbol()) &&
|
x.getStockCode().equals(o.getId()))
|
.findFirst()
|
.orElse(null);
|
if (item != null) { //已有不添加
|
continue;
|
}
|
|
item = new Item();
|
String name = StringUtils.trim(o.getName());
|
item.setEnName(name);
|
item.setName(name);
|
item.setSymbolFullName(name);
|
item.setSymbol(o.getSymbol());
|
item.setSymbolData(o.getSymbol());
|
item.setPips(BigDecimal.valueOf(0.01).doubleValue());
|
item.setPipsAmount(BigDecimal.valueOf(0.02).doubleValue());
|
item.setAdjustmentValue(BigDecimal.ZERO.doubleValue());
|
item.setUnitAmount(BigDecimal.valueOf(1000).doubleValue());
|
item.setUnitFee(BigDecimal.valueOf(30).doubleValue());
|
item.setMarket("FOREVER");
|
item.setDecimals(2);
|
item.setMultiple(BigDecimal.ZERO.doubleValue());
|
item.setBorrowingRate(BigDecimal.ZERO.doubleValue());
|
item.setDelFlag(0);
|
item.setType(Item.US_STOCKS);
|
item.setCategory(Item.US_STOCKS);
|
item.setSorted("100");
|
item.setOpenCloseType(Item.US_STOCKS);
|
item.setFake("0");
|
item.setShowStatus("1");
|
item.setTradeStatus("1");
|
item.setQuoteCurrency("USDT");
|
item.setCrawlStatus("default_active");
|
item.setStockCode(o.getId());
|
|
updateStockList.add(item);
|
|
Realtime realtime = new Realtime();
|
realtime.setUuid(o.getId());
|
realtime.setSymbol(o.getSymbol());
|
realtime.setName(o.getName());
|
realtime.setClose(new BigDecimal(o.getLast().trim()).doubleValue());
|
realtime.setLow(new BigDecimal(o.getLow().trim()).doubleValue());
|
realtime.setHigh(new BigDecimal(o.getHigh().trim()).doubleValue());
|
realtime.setOpen(new BigDecimal(o.getOpen().trim()).doubleValue());
|
realtime.setPrevClose(new BigDecimal(o.getPrevClose().trim()).doubleValue());
|
realtime.setTs(Long.valueOf(o.getTime() + "000"));
|
realtime.setVolume(new BigDecimal(o.getVolume().trim()).doubleValue());
|
realtime.setNetChange(new BigDecimal(o.getChg().trim()).doubleValue());
|
realtime.setChangeRatio(new BigDecimal(o.getChgPct()).doubleValue());
|
realtime.setType(o.getType());
|
realtime.setBid(new BigDecimal(o.getBid()).doubleValue());
|
realtime.setAsk(new BigDecimal(o.getAsk()).doubleValue());
|
|
DataCache.putRealtime(realtime.getSymbol(), realtime);
|
}
|
if (!updateStockList.isEmpty()) {
|
itemService.saveOrUpdateBatch(updateStockList);
|
}
|
log.info("同步股票 数据 成功 {} 总共同步数据 {}", eStockType.getCode(), updateStockList.size());
|
} catch (
|
Exception e) {
|
log.error("同步出错", e);
|
}
|
}
|
|
/**
|
* 初始化更新价格
|
*/
|
public void initAllStock(EStockType eStockType) {
|
log.info("init US股票 数据 {}", eStockType.getCode());
|
List<StockRealTimeBean> list = new ArrayList<>();
|
try {
|
Map<String, Object> paramMap = new HashMap<>();
|
List<Item> stockList = itemService.list(new QueryWrapper<Item>()
|
.eq("type", Item.US_STOCKS));
|
int batchSize = 5000; // 每批5000条
|
int totalSize = stockList.size(); // 总条数
|
|
List<String> pids = new ArrayList<>();
|
// 循环分批处理
|
for (int i = 0; i < totalSize; i += batchSize) {
|
// 计算当前批次的结束索引(避免最后一批不足5000条时越界)
|
int endIndex = Math.min(i + batchSize, totalSize);
|
|
// 截取当前批次的子列表([i, endIndex),左闭右开)
|
List<Item> batchList = stockList.subList(i, endIndex);
|
|
// 拼接当前批次的pid(逗号分隔的stockCode)
|
String batchPid = batchList.stream()
|
.map(item -> String.valueOf(item.getStockCode())) // 提取stockCode并转字符串
|
.collect(Collectors.joining(",")); // 拼接
|
|
pids.add(batchPid);
|
}
|
pids.forEach(pid -> {
|
paramMap.put("pid", pid);
|
String result = HttpClientRequest.doPost(eStockType.stockUrl + "stock?" + "key=" + eStockType.stockKey, paramMap);
|
// 定义List<StockRealTimeBean>的类型(通过TypeToken保留泛型信息)
|
Type type = new TypeToken<List<StockRealTimeBean>>() {
|
}.getType();
|
// 解析为List<StockRealTimeBean>
|
List<StockRealTimeBean> stockRealTimeBeanList = new Gson().fromJson(result, type);
|
list.addAll(stockRealTimeBeanList);
|
});
|
|
stockList.forEach(item -> {
|
StockRealTimeBean o = list.stream().filter(x -> x.getId() != null &&
|
x.getId().equals(item.getStockCode())).findFirst().orElse(null);
|
if (o != null) {
|
Realtime realtime = new Realtime();
|
realtime.setUuid(item.getStockCode());
|
realtime.setSymbol(item.getSymbol());
|
realtime.setName(item.getName());
|
realtime.setClose(new BigDecimal(o.getLast().trim()).doubleValue());
|
realtime.setLow(new BigDecimal(o.getLow().trim()).doubleValue());
|
realtime.setHigh(new BigDecimal(o.getHigh().trim()).doubleValue());
|
realtime.setOpen(new BigDecimal(o.getOpen().trim()).doubleValue());
|
realtime.setPrevClose(new BigDecimal(o.getPrevClose().trim()).doubleValue());
|
realtime.setTs(Long.valueOf(o.getTime() + "000"));
|
realtime.setVolume(new BigDecimal(o.getVolume().trim()).doubleValue());
|
realtime.setNetChange(new BigDecimal(o.getChg().trim()).doubleValue());
|
realtime.setChangeRatio(new BigDecimal(o.getChgPct()).doubleValue());
|
realtime.setType(o.getType());
|
realtime.setBid(new BigDecimal(o.getBid()).doubleValue());
|
realtime.setAsk(new BigDecimal(o.getAsk()).doubleValue());
|
DataCache.putRealtime(realtime.getSymbol(), realtime);
|
}
|
});
|
|
log.info("init US股票 数据 成功");
|
} catch (Exception e) {
|
log.error(e.getMessage(), e);
|
}
|
}
|
|
public static void main(String[] args) {
|
StockTask task = new StockTask();
|
task.initAllStock(EStockType.US);
|
}
|
|
@Override
|
public void run(String... args) throws Exception {
|
initAllStock(EStockType.US);
|
}
|
}
|