package com.yami.trading.huobi.data.internal;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
import com.yami.trading.bean.data.domain.Kline;
|
import com.yami.trading.bean.data.domain.Realtime;
|
import com.yami.trading.bean.item.domain.Item;
|
import com.yami.trading.common.config.RequestDataHelper;
|
import com.yami.trading.common.constants.Constants;
|
import com.yami.trading.common.constants.RedisKeys;
|
import com.yami.trading.common.util.DateUtils;
|
import com.yami.trading.huobi.data.DataCache;
|
import com.yami.trading.huobi.data.job.RealtimeQueue;
|
import com.yami.trading.service.data.RealtimeService;
|
import com.yami.trading.service.item.ItemService;
|
import com.yami.trading.service.syspara.SysparaService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.data.redis.core.RedisTemplate;
|
import org.springframework.jdbc.core.JdbcTemplate;
|
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations;
|
import org.springframework.stereotype.Service;
|
|
import java.math.BigDecimal;
|
import java.math.RoundingMode;
|
import java.time.*;
|
import java.util.*;
|
import java.util.stream.Collectors;
|
|
@Service
|
@Slf4j
|
public class DataDBServiceImpl implements DataDBService {
|
@Autowired
|
private NamedParameterJdbcOperations namedParameterJdbcTemplate;
|
@Autowired
|
private SysparaService sysparaService;
|
@Autowired
|
private ItemService itemService;
|
@Autowired
|
private RealtimeService realtimeService;
|
@Autowired
|
private JdbcTemplate jdbcTemplate;
|
@Autowired
|
RedisTemplate redisTemplate;
|
|
@Override
|
public void saveAsyn(Realtime realtime) {
|
|
String symbol = realtime.getSymbol();
|
Realtime latestRealtime = DataCache.getLatestRealTime(symbol);
|
if (null != latestRealtime && latestRealtime.getTs() >= realtime.getTs()) {
|
log.debug("时间没有变化");
|
return;
|
}
|
|
//停牌时不更新
|
if (itemService.isSuspended(symbol)) {
|
return;
|
}
|
/*if (realtime.getSymbol().equals("axsusdt")) {
|
System.out.println("axsusdt1" + realtime);
|
}*/
|
DataCache.putLatestRealTime(symbol, realtime);
|
DataCache.putLatestOpen(symbol, realtime.getOpen());
|
|
// 最近60s内实时价格集合
|
List<Realtime> list = DataCache.latestRealTimeMap_60s.get(symbol);
|
if (list == null) {
|
return;
|
}
|
if (list.size() >= KlineConstant.LATEST_REALTIME_LIST_MAX) {
|
list.remove(0);
|
}
|
list.add(realtime);
|
DataCache.latestRealTimeMap_60s.put(symbol, list);
|
RealtimeQueue.add(realtime);
|
}
|
|
@Override
|
public void saveBatch(List<Realtime> entities) {
|
Map<String, List<Realtime>> collect = entities.stream().collect(Collectors.groupingBy(Realtime::getSymbol));
|
for (String symbol : collect.keySet()) {
|
RequestDataHelper.set("symbol", symbol);
|
realtimeService.saveOrUpdateBatch(collect.get(symbol));
|
RequestDataHelper.clear();
|
}
|
|
}
|
|
@Override
|
public Realtime get(String symbol) {
|
RequestDataHelper.set("symbol", symbol);
|
LambdaQueryWrapper<Realtime> queryWrapper = new LambdaQueryWrapper<Realtime>()
|
.eq(Realtime::getSymbol, symbol)
|
.orderByDesc(Realtime::getTs)
|
.last("LIMIT 1");
|
Realtime realtime = realtimeService.getBaseMapper().selectOne(queryWrapper);
|
RequestDataHelper.clear();
|
return realtime;
|
|
}
|
|
@Override
|
public Realtime getBefore(String symbol) {
|
Realtime realtime = (Realtime) redisTemplate.opsForValue().get(RedisKeys.REAL_TIME_BEFORE + symbol);
|
if (realtime != null) {
|
long ts = realtime.getTs();
|
// 1. 确定时间戳单位(假设ts是毫秒级,若为秒级需用ofEpochSecond())
|
Instant instant = Instant.ofEpochMilli(ts);
|
|
// 2. 将时间戳转换为当地时区的日期(指定时区更准确,如Asia/Shanghai)
|
LocalDate tsDate = instant.atZone(ZoneId.of("Asia/Shanghai")).toLocalDate();
|
|
// 3. 获取“昨天的日期”(当前日期减1天)
|
LocalDate yesterday = LocalDate.now(ZoneId.of("Asia/Shanghai")).minusDays(1);
|
|
// 4. 判断是否为昨天
|
boolean isYesterday = tsDate.equals(yesterday);
|
if (isYesterday) {
|
return realtime;
|
}
|
}
|
// 没缓存重新保存
|
// 计算当天0点的时间戳(毫秒级)
|
LocalDateTime todayStart = LocalDateTime.of(LocalDate.now(), LocalTime.MIN);
|
long todayStartTime = todayStart.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
|
RequestDataHelper.set("symbol", symbol);
|
LambdaQueryWrapper<Realtime> queryWrapper = new LambdaQueryWrapper<Realtime>()
|
.eq(Realtime::getSymbol, symbol)
|
.lt(Realtime::getTs, todayStartTime)
|
.orderByDesc(Realtime::getTs)
|
.last("LIMIT 1");
|
realtime = realtimeService.getBaseMapper().selectOne(queryWrapper);
|
RequestDataHelper.clear();
|
if (realtime != null) {
|
redisTemplate.opsForValue().set(RedisKeys.REAL_TIME_BEFORE + symbol, realtime);
|
}
|
return realtime;
|
}
|
|
public void deleteRealtime(int days) {
|
for (int i = 0; i <= Constants.TABLE_PARTITIONS - 1; i++) {
|
Map<String, Object> parameters = new HashMap();
|
Long ts = DateUtils.addDate(new Date(), days).getTime();
|
parameters.put("ts", ts);
|
this.namedParameterJdbcTemplate.update("DELETE FROM t_realtime_" + i + " WHERE ts < :ts", parameters);
|
}
|
|
|
}
|
|
@Override
|
public void updateOptimize(String table) {
|
for (int i = 0; i <= Constants.TABLE_PARTITIONS - 1; i++) {
|
this.jdbcTemplate.execute("optimize table " + table + "_" + i);
|
|
}
|
|
}
|
|
@Override
|
public List<Realtime> findRealtimeOneDay(String symbol) {
|
int interval = this.sysparaService.find("data_interval").getInteger() / 1000;
|
int num = (24 * 60 * 60) / interval;
|
RequestDataHelper.set("symbol", symbol);
|
|
LambdaQueryWrapper<Realtime> queryWrapper = new LambdaQueryWrapper<Realtime>()
|
.eq(Realtime::getSymbol, symbol)
|
.orderByDesc(Realtime::getTs)
|
.last("LIMIT " + num);
|
List<Realtime> realtimes = realtimeService.getBaseMapper().selectList(queryWrapper);
|
RequestDataHelper.clear();
|
return realtimes;
|
|
}
|
|
/**
|
* 获取最新60s实时价格数据
|
*/
|
@Override
|
public List<Realtime> listRealTime60s(String symbol) {
|
RequestDataHelper.set("symbol", symbol);
|
int data_interval = sysparaService.find("data_interval").getInteger().intValue();
|
// 取数据条数为
|
int limit = 60*1000/data_interval;
|
LambdaQueryWrapper<Realtime> queryWrapper = new LambdaQueryWrapper<Realtime>()
|
.eq(Realtime::getSymbol, symbol)
|
.orderByDesc(Realtime::getTs)
|
.last("LIMIT " + limit);
|
List<Realtime> realtimes = realtimeService.getBaseMapper().selectList(queryWrapper);
|
RequestDataHelper.clear();
|
return realtimes;
|
}
|
|
@Override
|
public BigDecimal getChangeRatio(Realtime realtime, String symbol) {
|
Item item = itemService.findBySymbol(symbol);
|
if (item.getType().equals(Item.cryptos) && (item.getCurrencyType() != null && item.getCurrencyType() == 1)) {
|
Realtime realtimeBefore = getBefore(symbol);
|
if (realtimeBefore == null) {
|
return BigDecimal.ZERO;
|
}
|
BigDecimal open = realtimeBefore.getClose();
|
BigDecimal changeRatio = realtime.getClose().subtract(open).divide(open, 10, RoundingMode.HALF_UP);
|
changeRatio = changeRatio.multiply(new BigDecimal(100)).setScale(2, RoundingMode.DOWN);
|
return changeRatio;
|
} else {
|
return realtime.getChangeRatio2();
|
}
|
}
|
}
|