package com.nq.utils.task.stock; import com.alibaba.fastjson2.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import com.nq.Repository.StockRepository; import com.nq.dao.StockConfigMapper; import com.nq.dao.StockMapper; import com.nq.dao.UserPositionMapper; import com.nq.enums.EConfigKey; import com.nq.enums.EStockType; import com.nq.pojo.*; import com.nq.service.ExchangeRateService; import com.nq.service.IMandatoryLiquidationService; import com.nq.service.IStockService; import com.nq.service.IUserPositionService; import com.nq.service.impl.StockServiceImpl; import com.nq.utils.PropertiesUtil; import com.nq.utils.StringUtils; import com.nq.utils.http.HttpClientRequest; import com.nq.utils.http.HttpRequest; import com.nq.utils.redis.RedisKeyUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.stereotype.Component; import java.lang.reflect.Type; import java.math.BigDecimal; import java.util.*; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.stream.Collectors; @Component public class StockTask { @Autowired StockMapper stockMapper; @Autowired IStockService istockService; @Autowired StockRepository stockRepository; private final Lock stockConstraintLock = new ReentrantLock(); @Autowired IMandatoryLiquidationService mandatoryLiquidationService; private static final Logger log = LoggerFactory.getLogger(StockTask.class); private final AtomicBoolean syncINStockData = new AtomicBoolean(false); private final Lock syncINStockDataLock = new ReentrantLock(); @Autowired private ThreadPoolTaskExecutor taskExecutor; @Autowired private StockServiceImpl iStockService; @Autowired private ExchangeRateService exchangeRateService; @Autowired StockConfigMapper stockConfigMapper; /** * test */ //@Scheduled(cron = "*/5 * * * * *") public void test() {} /** * 同步系统所需要的股票 */ @Scheduled(cron = "0 0/3 * * * ?") public void syncINStockData() { if (syncINStockData.get()) { // 判断任务是否在处理中 return; } if (syncINStockDataLock.tryLock()) { try { syncINStockData.set(true); // 1. 定义需要处理的所有股票类型(集中管理,新增类型只需添加到列表) List stockTypes = Arrays.asList( EStockType.US, EStockType.JP ); // 2. 批量创建所有异步任务 List> futures = new ArrayList<>(); for (EStockType type : stockTypes) { // 添加loadAllStock任务 futures.add(CompletableFuture.runAsync(() -> loadAllStock(type), taskExecutor)); // 添加syncIndices任务 futures.add(CompletableFuture.runAsync(() -> syncIndices(type), taskExecutor)); } // 3. 等待所有任务完成(将List转换为数组传入allOf) CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); } catch (Exception e) { log.error("同步股票数据出错", e); } finally { syncINStockDataLock.unlock(); syncINStockData.set(false); } } } /** * 加载所有指数数据 */ private void syncIndices(EStockType eStockType) { log.info("同步指数 数据 {}", eStockType.getCode()); List list = new ArrayList<>(); int totleStock = 1; try { while (totleStock > list.size()) { try { String result = HttpRequest.doGrabGet(eStockType.stockUrl + "indices?key=" + eStockType.getStockKey() + "&country_id=" + eStockType.getContryId()); // 把JSON数据解析为List Type listType = new TypeToken>(){}.getType(); list = new Gson().fromJson(result, listType); totleStock = list.size(); } catch (Exception e) { e.printStackTrace(); break; } } if (list.isEmpty()) { return; } List stockCodeList = list.stream().map(DataStockBean::getId).collect(Collectors.toList()); List stockList = stockMapper.selectList(new QueryWrapper().in("stock_code", stockCodeList)); List updateStockList = new ArrayList<>(); for (DataStockBean o : list) { Stock stock = stockList.stream() .filter(x -> x.getStockCode().equals(o.getId())) .findFirst() .orElse(null); if (stock == null) { stock = new Stock(); } stock.setStockCode(o.getId()); stock.setStockName(StringUtils.trim(o.getName())); stock.setStockType(eStockType.getCode()); //指数数据类型 stock.setStockGid(EConfigKey.INDICES.getCode()); stock.setStockSpell(o.getSymbol()); stock.setIsLock(0); if (null == stock.getIsShow()){ stock.setIsShow(0); } stock.setDataBase(0); stock.setAddTime(new Date()); updateStockList.add(stock); RedisKeyUtil.setCaCheKeyBaseStock(eStockType, o); } stockRepository.saveAll(updateStockList); cacheKData(eStockType.getCode(), list); log.info("同步指数 数据 成功 {} 总共同步数据 {}", eStockType.getCode(), list.size()); } catch (Exception e) { e.printStackTrace(); log.error("同步指数列表出现异常: {}", eStockType.getCode()); } } /** * 同步指数股票后缓存k线图 */ public void cacheKData(String stockType, List list) { /*StringBuilder codeList = new StringBuilder(); codeList.append(PropertiesUtil.getProperty("us_home_indices_code")); codeList.append(PropertiesUtil.getProperty("hk_home_indices_code")); codeList.append(PropertiesUtil.getProperty("in_home_indices_code")); codeList.append(PropertiesUtil.getProperty("tw_home_indices_code"));*/ List stockConfigList = stockConfigMapper.selectList (new LambdaQueryWrapper() .like(StockConfig::getCKey, EConfigKey.INDICES.getCode())); String codeList = null; if (stockConfigList != null && !stockConfigList.isEmpty()) { codeList = stockConfigList.stream().filter(Objects::nonNull) .map(x -> { return x.getCValue() == null ? "" : x.getCValue(); }).collect(Collectors.joining(",")); } if (codeList == null || codeList.isEmpty()) { return; } for (DataStockBean dataStockBean : list) { //缓存首页指数k线图 if (codeList.contains(dataStockBean.getId())) { // 获取K线数据 Object kData = istockService.getKData(dataStockBean.getId(), "D", stockType); if (kData != null) { //缓存redis RedisKeyUtil.setCaCheStockKData(stockType, dataStockBean.getId(), kData); } } } log.info("同步指数k线图 数据 成功 {} 总共同步数据 {}", stockType, list.size()); } /** * 同步美国股票 */ // @Scheduled(cron = "0 0/30 * * * ?") public void loadStockCompanies() { loadAllCompanies(); } /** * 加载公司信息 */ public void loadAllCompanies() { List list = stockMapper.findStockList(); for (int i = 0; i < list.size(); i++) { Stock stock = list.get(i); EStockType eStockType = EStockType.getEStockTypeByCode(stock.getStockType()); String result = HttpClientRequest.doGet(eStockType.stockUrl + "companies?pid=+" + stock.getStockCode() + "+country_id=" + eStockType.getContryId() + "&size=1&page=1&key=" + eStockType.stockKey); try { JSONObject jsonObject = JSONObject.parseObject(result); JSONObject companiesInfo = jsonObject.getJSONArray("data").getJSONObject(0); RedisKeyUtil.setCacheCompanies(stock, new Gson().toJson(companiesInfo)); } catch (Exception e) { log.info(""); } } } /** * 加载所有股票数据 */ public void loadAllStock(EStockType eStockType) { log.info("同步股票 数据 {}", eStockType.getCode()); List list = new ArrayList<>(); int totleStock = 1; int page = 0; try { while (totleStock > list.size()) { try { String result = HttpClientRequest.doGet(eStockType.stockUrl + "list?country_id=" + eStockType.getContryId() + "&size=100000&page=" + page + "&key=" + eStockType.stockKey); ReponseBase reponseBase = new Gson().fromJson(result, ReponseBase.class); list.addAll(reponseBase.getData()); page++; totleStock = reponseBase.getTotal(); } catch (Exception e) { e.printStackTrace(); break; } } if (list.isEmpty()) { return; } List stockCodeList = list.stream().map(DataStockBean::getId).collect(Collectors.toList()); List stockList = stockMapper.selectList(new QueryWrapper().in("stock_code", stockCodeList)); List updateStockList = new ArrayList<>(); for (DataStockBean o : list) { //Stock stock = stockMapper.findStockByCode(o.getId()); Stock stock = stockList.stream() .filter(x -> x.getStockCode().equals(o.getId())) .findFirst() .orElse(null); if (stock == null) { stock = new Stock(); } stock.setStockCode(o.getId()); stock.setStockName(StringUtils.trim(o.getName())); stock.setStockType(eStockType.getCode()); if (o.getType() == null) { stock.setStockGid(eStockType.getCode()); } else { stock.setStockGid(o.getType()); } stock.setStockSpell(o.getSymbol()); stock.setIsLock(0); if (null == stock.getIsShow()){ stock.setIsShow(0); } stock.setDataBase(0); stock.setAddTime(new Date()); updateStockList.add(stock); RedisKeyUtil.setCaCheKeyBaseStock(eStockType, o); } stockRepository.saveAll(updateStockList); log.info("同步股票 数据 成功 {} 总共同步数据 {}", eStockType.getCode(), list.size()); } catch ( Exception e) { log.error("同步出错", e); } } private final AtomicBoolean stockConstraint = new AtomicBoolean(false); /** * 强制平仓 */ // @Scheduled(cron = "0/1 * * * * ?") /*public void stockConstraint() { if (stockConstraint.get()) { // 判断任务是否在处理中 return; } if (stockConstraintLock.tryLock()) { try { stockConstraint.set(true); // 设置处理中标识为true List userPositions = userPositionMapper.selectList(new LambdaQueryWrapper().isNull(UserPosition::getSellOrderId)); if (CollectionUtils.isNotEmpty(userPositions)) { userPositionService.stockConstraint(userPositions); } } catch (Exception e) { e.printStackTrace(); log.error("强制平仓任务错误:" + e.getMessage()); } finally { stockConstraintLock.unlock(); stockConstraint.set(false); // 设置处理中标识为false } } else { log.info("强制平仓任务--------->上次任务还未执行完成,本次任务忽略"); } }*/ }