package com.yami.trading.huobi.data.internal; import cn.hutool.core.collection.CollectionUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.yami.trading.bean.data.domain.Realtime; import com.yami.trading.bean.item.domain.Item; import com.yami.trading.common.config.RequestDataHelper; import com.yami.trading.common.constants.Constants; import com.yami.trading.common.util.DateUtils; import com.yami.trading.huobi.data.DataCache; import com.yami.trading.huobi.data.job.RealtimeQueue; import com.yami.trading.service.data.RealtimeService; import com.yami.trading.service.item.ItemService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.namedparam.NamedParameterJdbcOperations; import org.springframework.stereotype.Service; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.*; @Service @Slf4j public class DataDBServiceImpl implements DataDBService { @Autowired private NamedParameterJdbcOperations namedParameterJdbcTemplate; @Autowired private ItemService itemService; @Autowired private RealtimeService realtimeService; @Override public void saveAsyn(Realtime realtime) { Realtime current = DataCache.getRealtime(realtime.getSymbol()); if (current == null || !current.getTs().equals(realtime.getTs())) { String symbol = realtime.getSymbol(); Item item = itemService.findBySymbol(realtime.getSymbol()); if (item.getMultiple() > 0) { realtime.setVolume(BigDecimal.valueOf(realtime.getVolume() * item.getMultiple()).setScale(item.getDecimals(), RoundingMode.HALF_UP).doubleValue()); realtime.setAmount(BigDecimal.valueOf(realtime.getAmount() * item.getMultiple()).setScale(item.getDecimals(), RoundingMode.HALF_UP).doubleValue()); } Realtime latestRealtime = DataCache.getLatestRealTime(symbol); // 超过1分钟丢弃 if (null != latestRealtime && latestRealtime.getTs() >= (realtime.getTs() + 60 * 1000)) { return; } /** * 时间有变化,才保存 */ DataCache.putRealtime(realtime.getSymbol(), realtime); // 虚拟货币才需要的逻辑 if (Item.cryptos.equalsIgnoreCase(item.getType())) { // 虚拟货币才需要维护最搞,最低。24小时信息 Double high = DataCache.getRealtimeHigh(realtime.getSymbol()); if (high != null && high >= realtime.getClose()) { realtime.setHigh(new BigDecimal(high).setScale(item.getDecimals(), RoundingMode.HALF_UP).doubleValue()); } Double low = DataCache.getRealtimeLow(realtime.getSymbol()); if (low != null && low <= realtime.getClose()) { realtime.setLow(new BigDecimal(low).setScale(item.getDecimals(), RoundingMode.HALF_UP).doubleValue()); } Double h24Before = DataCache.getRealtime24HBeforeOpen(realtime.getSymbol()); if (h24Before != null) { realtime.setOpen(new BigDecimal(h24Before).setScale(item.getDecimals(), RoundingMode.HALF_UP).doubleValue()); } List listHistory = DataCache.getCryptosRealtimeHistory(realtime.getSymbol()); if (listHistory == null) { listHistory = new LinkedList<>(); } if (realtime.getLow() > 0) { /** * 修正最低为0的BUG,直接丢弃 */ listHistory.add(realtime); DataCache.putCryptosRealtimeHistory(realtime.getSymbol(), listHistory); } } DataCache.putLatestRealTime(symbol, realtime); // 最近60s内实时价格集合 // 此处要做优化,如果最近一次ts和采集的不一样,需要更新的 List list = DataCache.getLatestRealTime60s(symbol); if (list == null) { log.info("---> DataDBServiceImpl.saveAsyn 当前 symbol:{} 下的 60s 实时数据为空", symbol); } // 当前时间和ts最后一个时间不一样,才更新.一样的话不更新,避免k线图计算 if (CollectionUtil.isEmpty(list)) { list = new ArrayList<>(); list.add(realtime); DataCache.putLatestRealTime60s(symbol, list); RealtimeQueue.add(realtime); } else if (!Objects.equals(CollectionUtil.getLast(list).getTs(), realtime.getTs())) { if (list.size() >= KlineConstant.LATEST_REALTIME_LIST_MAX) { list.remove(0); } list.add(realtime); DataCache.putLatestRealTime60s(symbol, list); RealtimeQueue.add(realtime); } } else { DataCache.putLatestRealTime(current.getSymbol(), current); } } @Override public void saveBatch(List entities) { RequestDataHelper.set("symbol", entities.get(0).getSymbol()); realtimeService.saveBatch(entities); RequestDataHelper.clear(); } @Override public Realtime get(String symbol) { RequestDataHelper.set("symbol", symbol); LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() .eq(Realtime::getSymbol, symbol) .orderByDesc(Realtime::getTs) .last("LIMIT 1"); Realtime realtime = realtimeService.getBaseMapper().selectOne(queryWrapper); RequestDataHelper.clear(); return realtime; } @Override public void deleteRealtime(int days) { for (int i = 0; i <= Constants.TABLE_PARTITIONS - 1; i++) { Map parameters = new HashMap(); Long ts = DateUtils.addDate(new Date(), days).getTime(); parameters.put("ts", ts); this.namedParameterJdbcTemplate.update("DELETE FROM t_realtime_" + i + " WHERE ts < :ts", parameters); } } @Override public List findRealtimeOneDay(String symbol) { int interval = 3; int num = (24 * 60 * 60) / interval; RequestDataHelper.set("symbol", symbol); LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() .eq(Realtime::getSymbol, symbol) .orderByDesc(Realtime::getTs) .last("LIMIT " + num); List realtimes = realtimeService.getBaseMapper().selectList(queryWrapper); Collections.sort(realtimes); RequestDataHelper.clear(); return realtimes; } @Override public void delete(String symbol) { RequestDataHelper.set("symbol", symbol); QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.eq("symbol", symbol); realtimeService.remove(queryWrapper); RequestDataHelper.clear(); } /** * 获取最新60s实时价格数据 */ @Override public List listRealTime60s(String symbol) { RequestDataHelper.set("symbol", symbol); int data_interval = 3000; // 取数据条数为 int limit = 60 * 1000 / data_interval; LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() .eq(Realtime::getSymbol, symbol) .orderByDesc(Realtime::getTs) .last("LIMIT " + limit); List realtimes = realtimeService.getBaseMapper().selectList(queryWrapper); RequestDataHelper.clear(); return realtimes; } }