package com.yami.trading.huobi.data.internal;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
|
import com.yami.trading.bean.data.domain.Kline;
|
import com.yami.trading.bean.data.domain.Realtime;
|
import com.yami.trading.bean.data.dto.BeforeClose;
|
import com.yami.trading.bean.item.domain.Item;
|
import com.yami.trading.common.config.RequestDataHelper;
|
import com.yami.trading.common.constants.Constants;
|
import com.yami.trading.common.constants.RedisKeys;
|
import com.yami.trading.common.util.DateUtils;
|
import com.yami.trading.huobi.data.DataCache;
|
import com.yami.trading.huobi.data.job.RealtimeQueue;
|
import com.yami.trading.service.data.RealtimeService;
|
import com.yami.trading.service.item.ItemService;
|
import com.yami.trading.service.syspara.SysparaService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations;
|
import org.springframework.stereotype.Service;
|
|
import java.math.BigDecimal;
|
import java.math.RoundingMode;
|
import java.time.*;
|
import java.time.temporal.ChronoUnit;
|
import java.util.*;
|
import java.util.concurrent.TimeUnit;
|
import java.util.stream.Collectors;
|
|
@Service
|
@Slf4j
|
public class DataDBServiceImpl implements DataDBService {
|
@Autowired
|
private NamedParameterJdbcOperations namedParameterJdbcTemplate;
|
@Autowired
|
private SysparaService sysparaService;
|
@Autowired
|
private ItemService itemService;
|
@Autowired
|
private RealtimeService realtimeService;
|
@Autowired
|
private JdbcTemplate jdbcTemplate;
|
@Autowired
|
RedisTemplate redisTemplate;
|
|
@Override
|
public void saveAsyn(Realtime realtime) {
|
|
String symbol = realtime.getSymbol();
|
Realtime latestRealtime = DataCache.getLatestRealTime(symbol);
|
if (null != latestRealtime && latestRealtime.getTs() >= realtime.getTs()) {
|
log.debug("时间没有变化");
|
return;
|
}
|
|
//停牌时不更新
|
if (itemService.isSuspended(symbol)) {
|
return;
|
}
|
/*if (realtime.getSymbol().equals("galausdt")) {
|
System.out.println("galausdt111" + realtime);
|
}*/
|
DataCache.putLatestRealTime(symbol, realtime);
|
DataCache.putLatestOpen(symbol, realtime.getOpen());
|
|
// 最近60s内实时价格集合
|
List<Realtime> list = DataCache.latestRealTimeMap_60s.get(symbol);
|
if (list == null) {
|
return;
|
}
|
if (list.size() >= KlineConstant.LATEST_REALTIME_LIST_MAX) {
|
list.remove(0);
|
}
|
list.add(realtime);
|
DataCache.latestRealTimeMap_60s.put(symbol, list);
|
RealtimeQueue.add(realtime);
|
}
|
|
@Override
|
public void saveBatch(List<Realtime> entities) {
|
Map<String, List<Realtime>> collect = entities.stream().collect(Collectors.groupingBy(Realtime::getSymbol));
|
for (String symbol : collect.keySet()) {
|
RequestDataHelper.set("symbol", symbol);
|
realtimeService.saveOrUpdateBatch(collect.get(symbol));
|
RequestDataHelper.clear();
|
}
|
|
}
|
|
@Override
|
public Realtime get(String symbol) {
|
RequestDataHelper.set("symbol", symbol);
|
LambdaQueryWrapper<Realtime> queryWrapper = new LambdaQueryWrapper<Realtime>()
|
.eq(Realtime::getSymbol, symbol)
|
.orderByDesc(Realtime::getTs)
|
.last("LIMIT 1");
|
Realtime realtime = realtimeService.getBaseMapper().selectOne(queryWrapper);
|
RequestDataHelper.clear();
|
return realtime;
|
|
}
|
|
@Override
|
public Realtime getBefore(String symbol) {
|
Realtime realtime = (Realtime) redisTemplate.opsForValue().get(RedisKeys.REAL_TIME_BEFORE + symbol);
|
if (realtime != null) {
|
long ts = realtime.getTs();
|
// 1. 确定时间戳单位(假设ts是毫秒级,若为秒级需用ofEpochSecond())
|
Instant instant = Instant.ofEpochMilli(ts);
|
|
// 2. 将时间戳转换为当地时区的日期(指定时区更准确,如Asia/Tokyo)
|
LocalDate tsDate = instant.atZone(ZoneId.of("Asia/Tokyo")).toLocalDate();
|
|
// 3. 获取“昨天的日期”(当前日期减1天)
|
LocalDate yesterday = LocalDate.now(ZoneId.of("Asia/Tokyo")).minusDays(1);
|
|
// 4. 判断是否为昨天
|
boolean isYesterday = tsDate.equals(yesterday);
|
if (isYesterday) {
|
return realtime;
|
}
|
}
|
// 没缓存重新保存
|
// 计算当天0点的时间戳(毫秒级)
|
LocalDateTime todayStart = LocalDateTime.of(LocalDate.now(), LocalTime.MIN);
|
long todayStartTime = todayStart.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
RequestDataHelper.set("symbol", symbol);
|
LambdaQueryWrapper<Realtime> queryWrapper = new LambdaQueryWrapper<Realtime>()
|
.eq(Realtime::getSymbol, symbol)
|
.lt(Realtime::getTs, todayStartTime)
|
.orderByDesc(Realtime::getTs)
|
.last("LIMIT 1");
|
realtime = realtimeService.getBaseMapper().selectOne(queryWrapper);
|
RequestDataHelper.clear();
|
if (realtime != null) {
|
redisTemplate.opsForValue().set(RedisKeys.REAL_TIME_BEFORE + symbol, realtime);
|
}
|
return realtime;
|
}
|
|
public BeforeClose getBeforeClose(String symbol, String line, Long ts) {
|
BeforeClose beforeClose = (BeforeClose) redisTemplate.opsForValue().get(RedisKeys.REAL_TIME_BEFORE_CLOSE + symbol + line);
|
if (beforeClose == null) {
|
// 直接获取当前时间的毫秒级时间戳(系统默认时区,但值是全球统一的)
|
long currentTimeStamp = System.currentTimeMillis();
|
|
// 如果需要严格基于东京时区的当前时间戳(结果和上面一致,因为时间戳是UTC绝对时间)
|
//long currentTokyoTimeStamp = Instant.now().atZone(ZoneId.of("Asia/Tokyo")).toInstant().toEpochMilli();
|
RequestDataHelper.set("symbol", symbol);
|
QueryWrapper<Realtime> queryWrapper = new QueryWrapper<Realtime>()
|
.eq("symbol", symbol) // 直接写数据库字段名(需和表字段一致)
|
.ge("ts", ts)
|
.le("ts", currentTimeStamp)
|
.select("MAX(close) as maxClose", "MIN(close) as minClose");
|
// 4. 执行聚合查询,用selectMap接收结果(键值对:maxClose/minClose -> 对应值)
|
Map<String, Object> resultMap = realtimeService.getMap(queryWrapper);
|
RequestDataHelper.clear();
|
beforeClose = new BeforeClose();
|
if (resultMap == null || resultMap.isEmpty()) {
|
return beforeClose;
|
}
|
beforeClose.setMaxClose(convertToBigDecimal(resultMap.get("maxClose")));
|
beforeClose.setMinClose(convertToBigDecimal(resultMap.get("minClose")));
|
redisTemplate.opsForValue().set(RedisKeys.REAL_TIME_BEFORE_CLOSE + symbol + line, beforeClose , 10 , TimeUnit.SECONDS);
|
}
|
return beforeClose;
|
}
|
|
// 辅助方法:统一转换为BigDecimal,避免类型错误
|
private BigDecimal convertToBigDecimal(Object value) {
|
if (value == null) {
|
return BigDecimal.ZERO;
|
}
|
if (value instanceof BigDecimal) {
|
return (BigDecimal) value;
|
}
|
try {
|
return new BigDecimal(value.toString());
|
} catch (NumberFormatException e) {
|
log.error("转换数值为BigDecimal失败:value={}", value, e);
|
return BigDecimal.ZERO;
|
}
|
}
|
|
@Override
|
public void cacheBefore24Hour(String symbol) {
|
// 计算“24小时前”的时间戳(毫秒级)
|
long twentyFourHoursAgo = Instant.now().minus(24, ChronoUnit.HOURS).toEpochMilli();
|
RequestDataHelper.set("symbol", symbol);
|
LambdaQueryWrapper<Realtime> queryWrapper = new LambdaQueryWrapper<Realtime>()
|
.eq(Realtime::getSymbol, symbol)
|
.ge(Realtime::getTs, twentyFourHoursAgo) // 时间戳 >= 24小时前(前24小时内)
|
.orderByDesc(Realtime::getClose) // 24小时最高
|
.last("LIMIT 1");
|
Realtime realtimeHigh = realtimeService.getBaseMapper().selectOne(queryWrapper);
|
LambdaQueryWrapper<Realtime> queryWrapper2 = new LambdaQueryWrapper<Realtime>()
|
.eq(Realtime::getSymbol, symbol)
|
.ge(Realtime::getTs, twentyFourHoursAgo) // 时间戳 >= 24小时前(前24小时内)
|
.orderByAsc(Realtime::getClose) // 24小时最低
|
.last("LIMIT 1");
|
Realtime realtimeLow = realtimeService.getBaseMapper().selectOne(queryWrapper2);
|
RequestDataHelper.clear();
|
System.out.println("realtimeHigh:" + realtimeHigh);
|
System.out.println("realtimeLow:" + realtimeLow);
|
if (realtimeHigh != null) {
|
DataCache.getRealtimeHigh().put(symbol, realtimeHigh.getClose().doubleValue());
|
System.out.println("putRealtimeHigh:" + realtimeHigh.getClose().doubleValue());
|
}
|
if (realtimeLow != null) {
|
DataCache.getRealtimeLow().put(symbol, realtimeLow.getClose().doubleValue());
|
System.out.println("putRealtimeLow:" + realtimeLow.getClose().doubleValue());
|
}
|
}
|
|
public void deleteRealtime(int days) {
|
for (int i = 0; i <= Constants.TABLE_PARTITIONS - 1; i++) {
|
Map<String, Object> parameters = new HashMap();
|
Long ts = DateUtils.addDate(new Date(), days).getTime();
|
parameters.put("ts", ts);
|
this.namedParameterJdbcTemplate.update("DELETE FROM t_realtime_" + i + " WHERE ts < :ts", parameters);
|
}
|
|
|
}
|
|
@Override
|
public void updateOptimize(String table) {
|
for (int i = 0; i <= Constants.TABLE_PARTITIONS - 1; i++) {
|
this.jdbcTemplate.execute("optimize table " + table + "_" + i);
|
|
}
|
|
}
|
|
@Override
|
public List<Realtime> findRealtimeOneDay(String symbol) {
|
int interval = this.sysparaService.find("data_interval").getInteger() / 1000;
|
int num = (24 * 60 * 60) / interval;
|
RequestDataHelper.set("symbol", symbol);
|
|
LambdaQueryWrapper<Realtime> queryWrapper = new LambdaQueryWrapper<Realtime>()
|
.eq(Realtime::getSymbol, symbol)
|
.orderByDesc(Realtime::getTs)
|
.last("LIMIT " + num);
|
List<Realtime> realtimes = realtimeService.getBaseMapper().selectList(queryWrapper);
|
RequestDataHelper.clear();
|
return realtimes;
|
|
}
|
|
/**
|
* 获取最新60s实时价格数据
|
*/
|
@Override
|
public List<Realtime> listRealTime60s(String symbol) {
|
RequestDataHelper.set("symbol", symbol);
|
int data_interval = sysparaService.find("data_interval").getInteger().intValue();
|
// 取数据条数为
|
int limit = 60*1000/data_interval;
|
LambdaQueryWrapper<Realtime> queryWrapper = new LambdaQueryWrapper<Realtime>()
|
.eq(Realtime::getSymbol, symbol)
|
.orderByDesc(Realtime::getTs)
|
.last("LIMIT " + limit);
|
List<Realtime> realtimes = realtimeService.getBaseMapper().selectList(queryWrapper);
|
RequestDataHelper.clear();
|
return realtimes;
|
}
|
|
@Override
|
public BigDecimal getChangeRatio(Realtime realtime, String symbol) {
|
Item item = itemService.findBySymbol(symbol);
|
if (item.getType().equals(Item.cryptos) && (item.getCurrencyType() != null && item.getCurrencyType() == 1)) {
|
Realtime realtimeBefore = getBefore(symbol);
|
if (realtimeBefore == null) {
|
return BigDecimal.ZERO;
|
}
|
BigDecimal open = realtimeBefore.getClose();
|
BigDecimal changeRatio = realtime.getClose().subtract(open).divide(open, 10, RoundingMode.HALF_UP);
|
changeRatio = changeRatio.multiply(new BigDecimal(100)).setScale(2, RoundingMode.DOWN);
|
return changeRatio;
|
} else {
|
return realtime.getChangeRatio2();
|
}
|
}
|
}
|