From 7289903c3b78d4e6e141e3a5e976ddef52e9fc97 Mon Sep 17 00:00:00 2001
From: zyy <zyy@email.com>
Date: Fri, 29 May 2026 14:31:40 +0800
Subject: [PATCH] 1
---
trading-order-huobi/src/main/java/com.yami.trading.huobi/data/internal/KlineServiceImpl.java | 198 +++++++++++++++++++++++++++++++++++++++++++++++--
1 files changed, 189 insertions(+), 9 deletions(-)
diff --git a/trading-order-huobi/src/main/java/com.yami.trading.huobi/data/internal/KlineServiceImpl.java b/trading-order-huobi/src/main/java/com.yami.trading.huobi/data/internal/KlineServiceImpl.java
index 52a25de..83f5b4b 100644
--- a/trading-order-huobi/src/main/java/com.yami.trading.huobi/data/internal/KlineServiceImpl.java
+++ b/trading-order-huobi/src/main/java/com.yami.trading.huobi/data/internal/KlineServiceImpl.java
@@ -29,6 +29,9 @@
import java.math.RoundingMode;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
import java.util.*;
import java.util.stream.Collectors;
@@ -180,6 +183,165 @@
public Kline buildKline(String symbol, String line, String smallLevelLine, int nums) {
+ if (Kline.PERIOD_5DAY.equals(line)) {
+ return buildKlineLegacy(symbol, line, smallLevelLine, nums);
+ }
+ return buildInProgressKline(symbol, line, smallLevelLine);
+ }
+
+ /**
+ * 按当前周期起点拼接进行中的K线(ts 对齐到周期开盘时间,如 5 分钟线的 9:35)
+ */
+ private Kline buildInProgressKline(String symbol, String line, String smallLevelLine) {
+ try {
+ KlineTimeObject timeObject = DataCache.getKline(symbol, line);
+ if (timeObject == null) {
+ return null;
+ }
+ List<Kline> klineList = timeObject.getKline();
+ Item item = itemService.findBySymbol(symbol);
+ Kline latestSameLineKline = null;
+ if (klineList != null && !klineList.isEmpty()) {
+ latestSameLineKline = klineList.get(klineList.size() - 1);
+ } else if (item.getFake().equalsIgnoreCase("0")) {
+ return null;
+ }
+
+ Realtime latestRealtime = DataCache.getLatestRealTime(symbol);
+ if (latestRealtime == null) {
+ latestRealtime = DataCache.getRealtime(symbol);
+ }
+ if (latestRealtime == null || latestRealtime.getClose() == null) {
+ return null;
+ }
+
+ long currentPeriodTs = alignPeriodStartTs(line, latestRealtime.getTs());
+ if (latestSameLineKline != null && latestSameLineKline.getTs() != null
+ && latestSameLineKline.getTs() > currentPeriodTs) {
+ return null;
+ }
+
+ KlineTimeObject smallObject = DataCache.getKline(symbol, smallLevelLine);
+ List<Kline> periodBars = new ArrayList<>();
+ if (smallObject != null && smallObject.getKline() != null) {
+ periodBars = smallObject.getKline().stream()
+ .filter(k -> k.getTs() != null && k.getTs() >= currentPeriodTs)
+ .collect(Collectors.toList());
+ }
+
+ Kline kline = new Kline();
+ kline.setSymbol(symbol);
+ kline.setPeriod(line);
+ kline.setTs(currentPeriodTs);
+
+ if (latestSameLineKline != null && latestSameLineKline.getTs() != null
+ && latestSameLineKline.getTs() < currentPeriodTs) {
+ kline.setOpen(latestSameLineKline.getClose());
+ } else if (!periodBars.isEmpty() && periodBars.get(0).getOpen() != null) {
+ kline.setOpen(periodBars.get(0).getOpen());
+ } else if (latestRealtime.getOpen() != null) {
+ kline.setOpen(latestRealtime.getOpen());
+ } else {
+ kline.setOpen(latestRealtime.getClose());
+ }
+
+ if (!periodBars.isEmpty()) {
+ Double high = null;
+ Double low = null;
+ for (Kline bar : periodBars) {
+ if (bar.getHigh() != null) {
+ if (high == null || high <= bar.getHigh().doubleValue()) {
+ high = bar.getHigh().doubleValue();
+ }
+ }
+ if (bar.getLow() != null) {
+ if (low == null || low >= bar.getLow().doubleValue()) {
+ low = bar.getLow().doubleValue();
+ }
+ }
+ }
+ kline.setHigh(high == null ? latestRealtime.getClose() : new BigDecimal(high));
+ kline.setLow(low == null ? latestRealtime.getClose() : new BigDecimal(low));
+ kline.setClose(periodBars.get(periodBars.size() - 1).getClose());
+ kline.setVolume(periodBars.stream()
+ .map(Kline::getVolume)
+ .filter(Objects::nonNull)
+ .reduce(BigDecimal.ZERO, BigDecimal::add));
+ kline.setAmount(periodBars.stream()
+ .map(Kline::getAmount)
+ .filter(Objects::nonNull)
+ .reduce(BigDecimal.ZERO, BigDecimal::add));
+ } else {
+ kline.setHigh(latestRealtime.getClose());
+ kline.setLow(latestRealtime.getClose());
+ kline.setClose(latestRealtime.getClose());
+ kline.setVolume(latestRealtime.getVolume());
+ kline.setAmount(latestRealtime.getAmount());
+ }
+
+ kline.setClose(latestRealtime.getClose());
+ if (latestRealtime.getClose().compareTo(kline.getHigh()) > 0) {
+ kline.setHigh(latestRealtime.getClose());
+ }
+ if (latestRealtime.getClose().compareTo(kline.getLow()) < 0) {
+ kline.setLow(latestRealtime.getClose());
+ }
+
+ repairKline(kline);
+ if (kline.getOpen().compareTo(BigDecimal.ZERO) == 0 || kline.getClose().compareTo(BigDecimal.ZERO) == 0) {
+ return null;
+ }
+ return kline;
+ } catch (Exception e) {
+ logger.error("buildInProgressKline error: {}, {}", symbol, line, e);
+ }
+ return null;
+ }
+
+ /**
+ * 将时间戳对齐到K线周期起点(UTC,与火币 id 字段一致)
+ * 1week:周一 00:00 UTC;1mon:每月 1 日 00:00 UTC
+ */
+ private long alignPeriodStartTs(String line, long tsMillis) {
+ ZonedDateTime zdt = Instant.ofEpochMilli(tsMillis).atZone(ZoneOffset.UTC);
+ switch (line) {
+ case Kline.PERIOD_1MIN:
+ return zdt.withSecond(0).withNano(0).toInstant().toEpochMilli();
+ case Kline.PERIOD_5MIN: {
+ int minute = zdt.getMinute();
+ return zdt.withMinute(minute - minute % 5).withSecond(0).withNano(0).toInstant().toEpochMilli();
+ }
+ case Kline.PERIOD_15MIN: {
+ int minute = zdt.getMinute();
+ return zdt.withMinute(minute - minute % 15).withSecond(0).withNano(0).toInstant().toEpochMilli();
+ }
+ case Kline.PERIOD_30MIN: {
+ int minute = zdt.getMinute();
+ return zdt.withMinute(minute - minute % 30).withSecond(0).withNano(0).toInstant().toEpochMilli();
+ }
+ case Kline.PERIOD_60MIN:
+ return zdt.withMinute(0).withSecond(0).withNano(0).toInstant().toEpochMilli();
+ case Kline.PERIOD_1DAY:
+ return zdt.withHour(0).withMinute(0).withSecond(0).withNano(0).toInstant().toEpochMilli();
+ case Kline.PERIOD_1WEEK: {
+ zdt = zdt.withHour(0).withMinute(0).withSecond(0).withNano(0);
+ int dayOfWeek = zdt.getDayOfWeek().getValue();
+ return zdt.minusDays(dayOfWeek - 1L).toInstant().toEpochMilli();
+ }
+ case Kline.PERIOD_1MON:
+ return zdt.withDayOfMonth(1).withHour(0).withMinute(0).withSecond(0).withNano(0).toInstant().toEpochMilli();
+ case Kline.PERIOD_2HOUR:
+ return zdt.withMinute(0).withSecond(0).withNano(0).toInstant().toEpochMilli();
+ case Kline.PERIOD_4HOUR: {
+ int hour = zdt.getHour();
+ return zdt.withHour(hour - hour % 4).withMinute(0).withSecond(0).withNano(0).toInstant().toEpochMilli();
+ }
+ default:
+ return zdt.withSecond(0).withNano(0).toInstant().toEpochMilli();
+ }
+ }
+
+ private Kline buildKlineLegacy(String symbol, String line, String smallLevelLine, int nums) {
try {
// 取5分钟K线全部数据集合
KlineTimeObject timeObject = DataCache.getKline(symbol, line);
@@ -211,14 +373,27 @@
if (realtimeKline == null) {
return null;
}
- if (latestSameLineKline != null && latestSameLineKline.getTs() >= realtimeKline.getTs()) {
+ if (latestSameLineKline != null && latestSameLineKline.getTs() > realtimeKline.getTs()) {
return null;
}
if (latestSameLineKline != null) {
long latestSameLineKlineTs = latestSameLineKline.getTs();
- klineOneTop5 = klineOneTop5.stream().filter(r -> r.getTs() > latestSameLineKlineTs).collect(Collectors.toList());
+ if (latestSameLineKlineTs == realtimeKline.getTs()) {
+ klineOneTop5 = klineOne.stream()
+ .filter(r -> r.getTs() >= latestSameLineKlineTs)
+ .collect(Collectors.toList());
+ if (klineOneTop5.size() > nums) {
+ klineOneTop5 = new ArrayList<>(klineOneTop5.subList(klineOneTop5.size() - nums, klineOneTop5.size()));
+ }
+ } else {
+ klineOneTop5 = klineOneTop5.stream()
+ .filter(r -> r.getTs() > latestSameLineKlineTs)
+ .collect(Collectors.toList());
+ }
}
-
+ if (klineOneTop5.isEmpty()) {
+ return null;
+ }
Double high = null;
Double low = null;
@@ -673,11 +848,16 @@
latestKilne = klineList.get(klineList.size() - 1);
}
Realtime realtime = realTimeList.get(realTimeList.size() - 1);
- if (latestKilne != null && latestKilne.getTs() >= realtime.getTs()) {
+ long currentMinuteTs = alignPeriodStartTs(Kline.PERIOD_1MIN, realtime.getTs());
+ if (latestKilne != null && latestKilne.getTs() != null && latestKilne.getTs() > currentMinuteTs) {
return null;
}
- long lastKlineTs = latestKilne.getTs();
- realTimeList = realTimeList.stream().filter(r -> r.getTs() > lastKlineTs).collect(Collectors.toList());
+ realTimeList = realTimeList.stream()
+ .filter(r -> r.getTs() != null && r.getTs() >= currentMinuteTs)
+ .collect(Collectors.toList());
+ if (realTimeList.isEmpty()) {
+ realTimeList = Collections.singletonList(realtime);
+ }
Double high = null;
Double low = null;
for (Realtime realTime : realTimeList) {
@@ -692,11 +872,11 @@
// 保存K线到数据库
Kline kline = new Kline();
kline.setSymbol(symbol);
- kline.setTs(realtime.getTs());
- if (latestKilne != null) {
+ kline.setTs(currentMinuteTs);
+ if (latestKilne != null && latestKilne.getTs() != null && latestKilne.getTs() < currentMinuteTs) {
kline.setOpen(latestKilne.getClose());
} else {
- kline.setOpen(realTimeList.get(0).getOpen());
+ kline.setOpen(realTimeList.get(0).getOpen() != null ? realTimeList.get(0).getOpen() : realtime.getClose());
}
kline.setHigh(new BigDecimal(high));
kline.setLow(new BigDecimal(low));
--
Gitblit v1.9.3