| | |
| | | import java.math.RoundingMode; |
| | | import java.text.DecimalFormat; |
| | | import java.text.SimpleDateFormat; |
| | | import java.time.Instant; |
| | | import java.time.ZoneOffset; |
| | | import java.time.ZonedDateTime; |
| | | import java.util.*; |
| | | import java.util.stream.Collectors; |
| | | |
| | |
| | | |
| | | |
| | | public Kline buildKline(String symbol, String line, String smallLevelLine, int nums) { |
| | | if (Kline.PERIOD_5DAY.equals(line)) { |
| | | return buildKlineLegacy(symbol, line, smallLevelLine, nums); |
| | | } |
| | | return buildInProgressKline(symbol, line, smallLevelLine); |
| | | } |
| | | |
| | | /** |
| | | * 按当前周期起点拼接进行中的K线(ts 对齐到周期开盘时间,如 5 分钟线的 9:35) |
| | | */ |
| | | private Kline buildInProgressKline(String symbol, String line, String smallLevelLine) { |
| | | try { |
| | | KlineTimeObject timeObject = DataCache.getKline(symbol, line); |
| | | if (timeObject == null) { |
| | | return null; |
| | | } |
| | | List<Kline> klineList = timeObject.getKline(); |
| | | Item item = itemService.findBySymbol(symbol); |
| | | Kline latestSameLineKline = null; |
| | | if (klineList != null && !klineList.isEmpty()) { |
| | | latestSameLineKline = klineList.get(klineList.size() - 1); |
| | | } else if (item.getFake().equalsIgnoreCase("0")) { |
| | | return null; |
| | | } |
| | | |
| | | Realtime latestRealtime = DataCache.getLatestRealTime(symbol); |
| | | if (latestRealtime == null) { |
| | | latestRealtime = DataCache.getRealtime(symbol); |
| | | } |
| | | if (latestRealtime == null || latestRealtime.getClose() == null) { |
| | | return null; |
| | | } |
| | | |
| | | long currentPeriodTs = alignPeriodStartTs(line, latestRealtime.getTs()); |
| | | if (latestSameLineKline != null && latestSameLineKline.getTs() != null |
| | | && latestSameLineKline.getTs() > currentPeriodTs) { |
| | | return null; |
| | | } |
| | | |
| | | KlineTimeObject smallObject = DataCache.getKline(symbol, smallLevelLine); |
| | | List<Kline> periodBars = new ArrayList<>(); |
| | | if (smallObject != null && smallObject.getKline() != null) { |
| | | periodBars = smallObject.getKline().stream() |
| | | .filter(k -> k.getTs() != null && k.getTs() >= currentPeriodTs) |
| | | .collect(Collectors.toList()); |
| | | } |
| | | |
| | | Kline kline = new Kline(); |
| | | kline.setSymbol(symbol); |
| | | kline.setPeriod(line); |
| | | kline.setTs(currentPeriodTs); |
| | | |
| | | if (latestSameLineKline != null && latestSameLineKline.getTs() != null |
| | | && latestSameLineKline.getTs() < currentPeriodTs) { |
| | | kline.setOpen(latestSameLineKline.getClose()); |
| | | } else if (!periodBars.isEmpty() && periodBars.get(0).getOpen() != null) { |
| | | kline.setOpen(periodBars.get(0).getOpen()); |
| | | } else if (latestRealtime.getOpen() != null) { |
| | | kline.setOpen(latestRealtime.getOpen()); |
| | | } else { |
| | | kline.setOpen(latestRealtime.getClose()); |
| | | } |
| | | |
| | | if (!periodBars.isEmpty()) { |
| | | Double high = null; |
| | | Double low = null; |
| | | for (Kline bar : periodBars) { |
| | | if (bar.getHigh() != null) { |
| | | if (high == null || high <= bar.getHigh().doubleValue()) { |
| | | high = bar.getHigh().doubleValue(); |
| | | } |
| | | } |
| | | if (bar.getLow() != null) { |
| | | if (low == null || low >= bar.getLow().doubleValue()) { |
| | | low = bar.getLow().doubleValue(); |
| | | } |
| | | } |
| | | } |
| | | kline.setHigh(high == null ? latestRealtime.getClose() : new BigDecimal(high)); |
| | | kline.setLow(low == null ? latestRealtime.getClose() : new BigDecimal(low)); |
| | | kline.setClose(periodBars.get(periodBars.size() - 1).getClose()); |
| | | kline.setVolume(periodBars.stream() |
| | | .map(Kline::getVolume) |
| | | .filter(Objects::nonNull) |
| | | .reduce(BigDecimal.ZERO, BigDecimal::add)); |
| | | kline.setAmount(periodBars.stream() |
| | | .map(Kline::getAmount) |
| | | .filter(Objects::nonNull) |
| | | .reduce(BigDecimal.ZERO, BigDecimal::add)); |
| | | } else { |
| | | kline.setHigh(latestRealtime.getClose()); |
| | | kline.setLow(latestRealtime.getClose()); |
| | | kline.setClose(latestRealtime.getClose()); |
| | | kline.setVolume(latestRealtime.getVolume()); |
| | | kline.setAmount(latestRealtime.getAmount()); |
| | | } |
| | | |
| | | kline.setClose(latestRealtime.getClose()); |
| | | if (latestRealtime.getClose().compareTo(kline.getHigh()) > 0) { |
| | | kline.setHigh(latestRealtime.getClose()); |
| | | } |
| | | if (latestRealtime.getClose().compareTo(kline.getLow()) < 0) { |
| | | kline.setLow(latestRealtime.getClose()); |
| | | } |
| | | |
| | | repairKline(kline); |
| | | if (kline.getOpen().compareTo(BigDecimal.ZERO) == 0 || kline.getClose().compareTo(BigDecimal.ZERO) == 0) { |
| | | return null; |
| | | } |
| | | return kline; |
| | | } catch (Exception e) { |
| | | logger.error("buildInProgressKline error: {}, {}", symbol, line, e); |
| | | } |
| | | return null; |
| | | } |
| | | |
| | | /** |
| | | * 将时间戳对齐到K线周期起点(UTC,与火币 id 字段一致) |
| | | * 1week:周一 00:00 UTC;1mon:每月 1 日 00:00 UTC |
| | | */ |
| | | private long alignPeriodStartTs(String line, long tsMillis) { |
| | | ZonedDateTime zdt = Instant.ofEpochMilli(tsMillis).atZone(ZoneOffset.UTC); |
| | | switch (line) { |
| | | case Kline.PERIOD_1MIN: |
| | | return zdt.withSecond(0).withNano(0).toInstant().toEpochMilli(); |
| | | case Kline.PERIOD_5MIN: { |
| | | int minute = zdt.getMinute(); |
| | | return zdt.withMinute(minute - minute % 5).withSecond(0).withNano(0).toInstant().toEpochMilli(); |
| | | } |
| | | case Kline.PERIOD_15MIN: { |
| | | int minute = zdt.getMinute(); |
| | | return zdt.withMinute(minute - minute % 15).withSecond(0).withNano(0).toInstant().toEpochMilli(); |
| | | } |
| | | case Kline.PERIOD_30MIN: { |
| | | int minute = zdt.getMinute(); |
| | | return zdt.withMinute(minute - minute % 30).withSecond(0).withNano(0).toInstant().toEpochMilli(); |
| | | } |
| | | case Kline.PERIOD_60MIN: |
| | | return zdt.withMinute(0).withSecond(0).withNano(0).toInstant().toEpochMilli(); |
| | | case Kline.PERIOD_1DAY: |
| | | return zdt.withHour(0).withMinute(0).withSecond(0).withNano(0).toInstant().toEpochMilli(); |
| | | case Kline.PERIOD_1WEEK: { |
| | | zdt = zdt.withHour(0).withMinute(0).withSecond(0).withNano(0); |
| | | int dayOfWeek = zdt.getDayOfWeek().getValue(); |
| | | return zdt.minusDays(dayOfWeek - 1L).toInstant().toEpochMilli(); |
| | | } |
| | | case Kline.PERIOD_1MON: |
| | | return zdt.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0).toInstant().toEpochMilli(); |
| | | case Kline.PERIOD_2HOUR: |
| | | return zdt.withMinute(0).withSecond(0).withNano(0).toInstant().toEpochMilli(); |
| | | case Kline.PERIOD_4HOUR: { |
| | | int hour = zdt.getHour(); |
| | | return zdt.withHour(hour - hour % 4).withMinute(0).withSecond(0).withNano(0).toInstant().toEpochMilli(); |
| | | } |
| | | default: |
| | | return zdt.withSecond(0).withNano(0).toInstant().toEpochMilli(); |
| | | } |
| | | } |
| | | |
| | | private Kline buildKlineLegacy(String symbol, String line, String smallLevelLine, int nums) { |
| | | try { |
| | | // 取5分钟K线全部数据集合 |
| | | KlineTimeObject timeObject = DataCache.getKline(symbol, line); |
| | |
| | | if (realtimeKline == null) { |
| | | return null; |
| | | } |
| | | if (latestSameLineKline != null && latestSameLineKline.getTs() >= realtimeKline.getTs()) { |
| | | if (latestSameLineKline != null && latestSameLineKline.getTs() > realtimeKline.getTs()) { |
| | | return null; |
| | | } |
| | | if (latestSameLineKline != null) { |
| | | long latestSameLineKlineTs = latestSameLineKline.getTs(); |
| | | klineOneTop5 = klineOneTop5.stream().filter(r -> r.getTs() > latestSameLineKlineTs).collect(Collectors.toList()); |
| | | if (latestSameLineKlineTs == realtimeKline.getTs()) { |
| | | klineOneTop5 = klineOne.stream() |
| | | .filter(r -> r.getTs() >= latestSameLineKlineTs) |
| | | .collect(Collectors.toList()); |
| | | if (klineOneTop5.size() > nums) { |
| | | klineOneTop5 = new ArrayList<>(klineOneTop5.subList(klineOneTop5.size() - nums, klineOneTop5.size())); |
| | | } |
| | | } else { |
| | | klineOneTop5 = klineOneTop5.stream() |
| | | .filter(r -> r.getTs() > latestSameLineKlineTs) |
| | | .collect(Collectors.toList()); |
| | | } |
| | | } |
| | | |
| | | if (klineOneTop5.isEmpty()) { |
| | | return null; |
| | | } |
| | | |
| | | Double high = null; |
| | | Double low = null; |
| | |
| | | latestKilne = klineList.get(klineList.size() - 1); |
| | | } |
| | | Realtime realtime = realTimeList.get(realTimeList.size() - 1); |
| | | if (latestKilne != null && latestKilne.getTs() >= realtime.getTs()) { |
| | | long currentMinuteTs = alignPeriodStartTs(Kline.PERIOD_1MIN, realtime.getTs()); |
| | | if (latestKilne != null && latestKilne.getTs() != null && latestKilne.getTs() > currentMinuteTs) { |
| | | return null; |
| | | } |
| | | long lastKlineTs = latestKilne.getTs(); |
| | | realTimeList = realTimeList.stream().filter(r -> r.getTs() > lastKlineTs).collect(Collectors.toList()); |
| | | realTimeList = realTimeList.stream() |
| | | .filter(r -> r.getTs() != null && r.getTs() >= currentMinuteTs) |
| | | .collect(Collectors.toList()); |
| | | if (realTimeList.isEmpty()) { |
| | | realTimeList = Collections.singletonList(realtime); |
| | | } |
| | | Double high = null; |
| | | Double low = null; |
| | | for (Realtime realTime : realTimeList) { |
| | |
| | | // 保存K线到数据库 |
| | | Kline kline = new Kline(); |
| | | kline.setSymbol(symbol); |
| | | kline.setTs(realtime.getTs()); |
| | | if (latestKilne != null) { |
| | | kline.setTs(currentMinuteTs); |
| | | if (latestKilne != null && latestKilne.getTs() != null && latestKilne.getTs() < currentMinuteTs) { |
| | | kline.setOpen(latestKilne.getClose()); |
| | | } else { |
| | | kline.setOpen(realTimeList.get(0).getOpen()); |
| | | kline.setOpen(realTimeList.get(0).getOpen() != null ? realTimeList.get(0).getOpen() : realtime.getClose()); |
| | | } |
| | | kline.setHigh(new BigDecimal(high)); |
| | | kline.setLow(new BigDecimal(low)); |