package com.yami.trading.service.etf; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollectionUtil; import cn.hutool.core.util.RandomUtil; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.yami.trading.bean.data.domain.Depth; import com.yami.trading.bean.data.domain.DepthEntry; import com.yami.trading.bean.data.domain.Kline; import com.yami.trading.bean.data.domain.Realtime; import com.yami.trading.bean.etf.domain.EtfMinuteKLine; import com.yami.trading.bean.item.domain.Item; import com.yami.trading.common.constants.RedisKeys; import com.yami.trading.common.util.Arith; import com.yami.trading.service.data.DataService; import com.yami.trading.service.item.ItemService; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; /** * ETF行情服务 */ @Service @Slf4j @Getter public class MarketService { public static final String IXIC = ".IXIC"; @Autowired ItemService itemService; @Autowired KlineConfigService klineConfigService; @Autowired RedisTemplate redisTemplate; @Autowired private DataService dataService; @Autowired private EtfMinuteKLineService etfMinuteKLineService; // 加速行情 public Map accelerate = new HashMap<>(); public Map getAccelerate() { return accelerate; } private Map> cacheKline = new ConcurrentHashMap>(); public Realtime queryRealtime(String symbol) { Kline kline = querySecKline(symbol); if (kline == null) { Realtime realtime = new Realtime(); realtime.setSymbol(symbol); realtime.setName(symbol); realtime.setTs(0L); realtime.setOpen(0); realtime.setClose(0); realtime.setHigh(0); realtime.setLow(0); return realtime; } Random random = new Random(); Realtime realtime = new Realtime(); int decimal = itemService.getDecimal(symbol); realtime.setSymbol(symbol); realtime.setName(symbol); realtime.setTs(kline.getTs() / 1000); realtime.setOpen(BigDecimal.valueOf(kline.getOpen()).setScale(decimal, RoundingMode.HALF_UP).doubleValue()); realtime.setClose(BigDecimal.valueOf(kline.getClose()).setScale(decimal, RoundingMode.HALF_UP).doubleValue()); realtime.setHigh(BigDecimal.valueOf(kline.getHigh()).setScale(decimal, RoundingMode.HALF_UP).doubleValue()); realtime.setLow(BigDecimal.valueOf(kline.getLow()).setScale(decimal, RoundingMode.HALF_UP).doubleValue()); double lastAmount = (double) Optional.ofNullable(redisTemplate.opsForHash().get(RedisKeys.SYMBOL_AMOUNT_VOLUME + symbol, "amount")).orElse(0D); double amount = Optional.of(kline.getAmount()).orElse(0D); realtime.setAmount(Arith.add(lastAmount, amount, decimal)); double lastVolume = (double) Optional.ofNullable(redisTemplate.opsForHash().get(RedisKeys.SYMBOL_VOLUME + symbol, "volume")).orElse(0D); double volume = Optional.of(kline.getVolume()).orElse(0D); // 修复金额和成交量为0的问题 if (volume == 0 && realtime.getClose() > 0 ) { volume = Arith.sub(amount, realtime.getClose(), decimal); // volume = amount.divide(realtime.getClose(), decimal, RoundingMode.HALF_UP ); } if (amount == 0) { amount = Arith.mul(volume, realtime.getClose(), decimal); // amount = volume.multiply(realtime.getClose()).setScale(decimal, RoundingMode.HALF_UP ); realtime.setAmount(amount); } realtime.setVolume(Arith.add(lastVolume, volume, decimal)); realtime.setAsk(KlineConfigService.randomDouble(realtime.getLow(), realtime.getClose(), random)); realtime.setBid(KlineConfigService.randomDouble(realtime.getHigh(), realtime.getClose(), random)); redisTemplate.opsForHash().put(RedisKeys.SYMBOL_AMOUNT_VOLUME + symbol, "amount", realtime.getAmount()); redisTemplate.opsForHash().put(RedisKeys.SYMBOL_VOLUME + symbol, "volume", realtime.getVolume()); return realtime; } private Kline querySecKline(String symbol) { if (cacheKline.size() == 0 || CollectionUtil.isEmpty(cacheKline.get(symbol))) { List secKlines = klineConfigService.querySecKlineBySymbol(symbol); secKlines.sort(Comparator.comparing(Kline::getTs)); // secKlines 变成key为时间,value为kline的map LinkedHashMap collect = secKlines.stream().collect(Collectors.toMap(k -> k.getTs() / 1000, kline -> kline, (k1, k2) -> k1, LinkedHashMap::new)); if (collect.size() != 0) { cacheKline.put(symbol, collect); // 一天的秒级k线是23400,取当天最后一秒的数据保存到redis中 redisTemplate.opsForValue().set(RedisKeys.SYMBOL_DEPTH + symbol, secKlines.size() == 23400 ? secKlines.get(23399) : secKlines.get(secKlines.size() - 1)); } else { // 如果没有数据,跟随纳斯达克大盘 Kline kline = (Kline) redisTemplate.opsForValue().get(RedisKeys.SYMBOL_DEPTH + symbol); if (kline == null) { QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("symbol", symbol); queryWrapper.lt("ts", System.currentTimeMillis()); queryWrapper.orderByDesc("ts"); queryWrapper.last("limit 1"); List list = etfMinuteKLineService.list(queryWrapper); if (CollectionUtil.isNotEmpty(list)) { kline = BeanUtil.copyProperties(list.get(0), Kline.class); kline.setPeriod(Kline.PERIOD_1MIN); kline.setTs(System.currentTimeMillis()); } else { return null; } } Kline retKline = null; try { retKline = (Kline) kline.clone(); } catch (CloneNotSupportedException e) { e.printStackTrace(); return retKline; } retKline.setTs(System.currentTimeMillis()); List realtimes = dataService.realtime(IXIC); if (CollectionUtils.isNotEmpty(realtimes)) { Realtime realtime = realtimes.get(0); // BigDecimal ratio = realtime.getClose().divide(realtime.getOpen(), 10, RoundingMode.HALF_UP); // BigDecimal close = ratio.multiply(kline.getClose()).setScale(2, RoundingMode.HALF_UP); // BigDecimal low = ratio.multiply(kline.getLow()).setScale(2, RoundingMode.HALF_UP); // BigDecimal high = ratio.multiply(kline.getHigh()).setScale(2, RoundingMode.HALF_UP); double ratio = Arith.div(realtime.getClose(), realtime.getOpen()); retKline.setClose(Arith.mul(ratio, kline.getClose(), 2)); retKline.setLow(Arith.mul(ratio, kline.getLow(), 2)); retKline.setHigh(Arith.mul(ratio, kline.getHigh(), 2)); return retKline; } } } // 获取到 long currentSec = System.currentTimeMillis() / 1000L; if (cacheKline.get(symbol) == null) { return null; } Kline kline = cacheKline.get(symbol).get(currentSec); return kline; } public void clear() { cacheKline.clear(); } public List build5min(String symbol) { // 获取所有的1s的kline List klines = klineConfigService.querySecKlineBySymbolAllData(symbol); return convertTo5MinKLine(klines); } public static List convertTo5MinKLine(List sKLineList) { List fiveMinKLineList = new ArrayList<>(); int currentIndex = 0; int dataSize = sKLineList.size(); while (currentIndex < dataSize) { Kline currentKLine = sKLineList.get(currentIndex); long currentTimestamp = currentKLine.getTs(); double open = currentKLine.getOpen(); double high = currentKLine.getHigh(); double low = currentKLine.getLow(); double close = currentKLine.getClose(); double volume = currentKLine.getVolume(); double amount = currentKLine.getAmount(); // 5 minutes in milliseconds long nextTimestamp = currentTimestamp + (5 * 60 * 1000); // Find the index of the next KLine that satisfies the 5-minute condition int nextIndex = currentIndex + 1; while (nextIndex < dataSize && sKLineList.get(nextIndex).getTs() < nextTimestamp) { Kline nextKLine = sKLineList.get(nextIndex); high = Double.max(high, nextKLine.getHigh()); low = Double.max(low, nextKLine.getLow()); volume = volume + nextKLine.getVolume(); amount = amount + nextKLine.getAmount(); close = nextKLine.getClose(); nextIndex++; } // Create the 5-minute KLine and add it to the list Kline fiveMinKLine = new Kline(); fiveMinKLine.setSymbol(currentKLine.getSymbol()); fiveMinKLine.setTs(currentTimestamp); fiveMinKLine.setOpen(open); fiveMinKLine.setHigh(high); fiveMinKLine.setLow(low); fiveMinKLine.setClose(close); fiveMinKLine.setVolume(volume); fiveMinKLine.setAmount(amount); fiveMinKLineList.add(fiveMinKLine); currentIndex = nextIndex; } return fiveMinKLineList; } public Depth queryDepth(String symbol) { Depth depth = new Depth(); depth.setSymbol(symbol); depth.setTs(System.currentTimeMillis()); Kline kline = querySecKline(symbol); Item item = itemService.findBySymbol(symbol); if (kline == null || kline.getVolume() == 0) { kline = (Kline) redisTemplate.opsForValue().get(RedisKeys.SYMBOL_DEPTH + symbol); } if (kline == null || kline.getVolume() == 0) { log.info("queryDepth {} is null", symbol); List realtimes = dataService.realtime(symbol); if(CollectionUtil.isEmpty(realtimes)){ return null; } Realtime realtime = realtimes.get(0); if(CollectionUtil.isNotEmpty(realtimes)){ kline = new Kline(); kline.setLow(realtime.getLow()); kline.setHigh(realtime.getHigh()); kline.setClose(realtime.getClose()); kline.setOpen(realtime.getOpen()); if(realtime.getVolume() == 0){ kline.setVolume(RandomUtil.randomDouble(100, 1000)); } if(realtime.getAmount() == 0){ kline.setAmount(RandomUtil.randomDouble(1000, 100000)); } } } double volume = kline.getVolume(); if(volume == 0){ volume = RandomUtil.randomDouble(100, 1000); } List volumeSplit = KlineConfigService.splitData(volume, 10, 10); DepthEntry buy; DepthEntry sell; // 拿到这一秒的挂单数量,怎么分到5档位上 Random random = new Random(); for (int i = 0; i < 5; i++) { double enlarge = 1; if (accelerate.get(symbol) != null) { enlarge = accelerate.get(symbol); } buy = new DepthEntry(); // 买 BigDecimal.valueOf(KlineConfigService.randomDouble(kline.getLow(), kline.getClose(), random)).setScale(item.getDecimals(), RoundingMode.HALF_UP).doubleValue(); buy.setPrice(BigDecimal.valueOf(KlineConfigService.randomDouble(kline.getLow(), kline.getClose(), random)).setScale(item.getDecimals(), RoundingMode.HALF_UP).doubleValue()); buy.setAmount(Arith.mul(enlarge, volumeSplit.get(i), item.getDecimals())); // 卖 sell = new DepthEntry(); sell.setPrice(BigDecimal.valueOf(KlineConfigService.randomDouble(kline.getHigh(), kline.getClose(), random)).setScale(item.getDecimals(), RoundingMode.HALF_UP).doubleValue()); sell.setAmount(Arith.mul(enlarge, volumeSplit.get(i + 5), item.getDecimals())); depth.getAsks().add(sell); depth.getBids().add(buy); } depth.getBids().sort(Comparator.comparing(DepthEntry::getPrice).reversed()); depth.getAsks().sort(Comparator.comparing(DepthEntry::getPrice)); return depth; } }