package com.yami.trading.huobi.task; import com.alibaba.fastjson.JSONObject; import com.yami.trading.bean.data.domain.Depth; import com.yami.trading.bean.data.domain.DepthEntry; import com.yami.trading.bean.data.domain.Realtime; import com.yami.trading.bean.item.domain.Item; import com.yami.trading.common.util.Arith; import com.yami.trading.huobi.data.TimeZoneConverterService; import com.yami.trading.service.MarketOpenChecker; import com.yami.trading.common.util.RandomUtil; import com.yami.trading.common.util.ThreadUtils; import com.yami.trading.common.web.ResultObject; import com.yami.trading.huobi.data.DataCache; import com.yami.trading.huobi.data.internal.DepthTimeObject; import com.yami.trading.huobi.websocket.WebSocketServer; import com.yami.trading.huobi.websocket.WebSocketSession; import com.yami.trading.service.data.DataService; import com.yami.trading.service.item.ItemService; import lombok.extern.slf4j.Slf4j; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.math.RoundingMode; import java.text.DecimalFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @Component @Slf4j public class DepthPushJob implements Runnable { private ConcurrentHashMap lastData = new ConcurrentHashMap<>(); private Logger logger = LoggerFactory.getLogger(DepthPushJob.class); @Autowired private ItemService itemService; @Autowired private TimeZoneConverterService timeZoneConverterService; @Autowired private DataService dataService; public void start() { new Thread(this, "depthPushJob").start(); if (logger.isInfoEnabled()) logger.info("启动depthPushJob!"); } public void run() { while (true) { try { this.depthHandle(); } catch (Exception e) { logger.error("run fail", e); } finally { ThreadUtils.sleep(500); } } } private void depthHandle() { try { // 数据处理 ResultObject depthResult = new ResultObject(); Map depthResultMap = new HashMap<>(); if (!WebSocketServer.depthMap.isEmpty()) { // 客户端请求的所有币种,去重集合 Set symbolSet = new HashSet(); for (String socketKey : WebSocketServer.depthMap.keySet()) { WebSocketSession webSocketSession = WebSocketServer.depthMap.get(socketKey); String symbolKey = webSocketSession.getParam(); symbolSet.add(symbolKey); } for (String symbol : symbolSet) { DepthTimeObject depth = DataCache.getDepth(symbol); Item bySymbol = itemService.findBySymbol(symbol); if (bySymbol == null) { logger.warn("---> DepthPushJob.depthHandle 当前 symbol:{} 没有对应的数据库记录", symbol); } if (depth == null) { dataService.depth(symbol); } if (null != depth && null != depth.getDepth()) { Depth depthData = depth.getDepth(); Realtime realtime = DataCache.getRealtime(symbol); depthResult.setData(this.depthRevise(depthData, symbol, realtime.getClose(), true)); } // 如果开市,直接放入 if (MarketOpenChecker.isMarketOpenByItemCloseType(bySymbol.getOpenCloseType())) { String jsonString = JSONObject.toJSONString(depthResult); depthResultMap.put(symbol, jsonString); lastData.put(symbol, jsonString); // 如果休市且lastData没数据,bySymbol } else { // 如果缓存有,用缓存的 if (lastData.containsKey(symbol)) { depthResultMap.put(symbol, lastData.get(symbol)); // 否则初始化一次,之后再用缓存的 } else { String jsonString = JSONObject.toJSONString(depthResult); depthResultMap.put(symbol, jsonString); lastData.put(symbol, jsonString); } } } if (depthResultMap.isEmpty()) { return; } for (String socketKey : WebSocketServer.depthMap.keySet()) { WebSocketSession webSocketSession = WebSocketServer.depthMap.get(socketKey); String type = webSocketSession.getType(); String symbolKey = webSocketSession.getParam(); WebSocketServer.sendToMessageById(socketKey, depthResultMap.get(symbolKey), type); } } } catch (Throwable e) { log.error("depthHandle error", e); } } /** * 市场深度数据 解析 */ public Map depthRevise(Depth data, String symbol, Double close, boolean random) { Map map = new HashMap(); map.put("symbol", symbol); Item item = this.itemService.findBySymbol(data.getSymbol()); List> asks_list = new ArrayList>(); int asksSize = data.getAsks().size(); for (int i = 0; i < 20 - asksSize; i++) { DepthEntry e = new DepthEntry(); e.setAmount(RandomUtil.randomFloat(10, 100, 0)); e.setPrice(close); data.getAsks().add(e); } int bidsSize = data.getBids().size(); for (int i = 0; i < 20 - bidsSize; i++) { DepthEntry e = new DepthEntry(); e.setAmount(RandomUtil.randomFloat(10, 100, 0)); e.setPrice(close); data.getBids().add(e); } Set asksPrices = new HashSet<>(); asksSize = data.getAsks().size(); bidsSize = data.getBids().size(); for (int i = 0; i < asksSize; i++) { DepthEntry depthEntry = data.getAsks().get(i); Map asks_map = new HashMap(); double price; double amount; if (random) { double addPriceValue = getRandomValue(String.valueOf(depthEntry.getPrice()), item.getDecimals()); double addAmountValue = getRandomValue((int) depthEntry.getAmount().doubleValue()); price = Arith.add(depthEntry.getPrice(), addPriceValue); if (price < close) { price = Arith.add(close, addPriceValue); } else { price = Arith.add(close, addPriceValue / 10); } amount = Arith.add(depthEntry.getAmount(), addAmountValue); } else { price = depthEntry.getPrice(); amount = depthEntry.getAmount(); } if(price <=0 ){ continue; } if (item.getDecimals() < 0) { asks_map.put("price", price); asks_map.put("amount", amount); } else { String format = ""; if (item.getDecimals() == 0) { format = "#"; } else { format = "#."; for (int j = 0; j < item.getDecimals(); j++) { format = format + "#"; } } DecimalFormat df = new DecimalFormat(format); df.setRoundingMode(RoundingMode.FLOOR);// 向下取整 asks_map.put("price", df.format(price)); if (asksPrices.contains(df.format(price))) { continue; } else { asksPrices.add(df.format(price)); } asks_map.put("amount", df.format(amount)); try { String timeZone = timeZoneConverterService.getTimeZoneByItemCloseType(item.getOpenCloseType()); String dateStr = timeZoneConverterService.convertTimeZone(timeZone); asks_map.put("current_time", dateStr.split(" ")[1]); }catch (Exception e){ log.error("深度图设置 current_time失败"); } } asks_list.add(asks_map); } // buy map.put("asks", asks_list.stream().sorted((a,b)->{ String priceA = (String) a.get("price"); String priceB = (String) b.get("price"); return Double.compare(Double.parseDouble(priceA), Double.parseDouble(priceB)); }).collect(Collectors.toList())); Set bidPriceSet = new HashSet<>(); List> bids_list = new ArrayList>(); for (int i = 0; i < bidsSize; i++) { DepthEntry depthEntry = data.getBids().get(i); String priceTemp = new BigDecimal(String.valueOf(depthEntry.getPrice())).toPlainString(); double addPriceValue = getRandomValue(priceTemp, item.getDecimals()); double addAmountValue = getRandomValue((int) depthEntry.getAmount().doubleValue()); double price; double amount; if (random) { price = Arith.add(depthEntry.getPrice(), -addPriceValue); if (price >= close) { price = Arith.add(close, -addPriceValue); } else { price = Arith.add(close, -addPriceValue / 10); } amount = Arith.add(depthEntry.getAmount(), addAmountValue); } else { price = depthEntry.getPrice(); amount = depthEntry.getAmount(); } if(price <=0 ){ continue; } Map bids_map = new HashMap<>(); if (item.getDecimals() < 0) { bids_map.put("price", price); bids_map.put("amount", amount); } else { String format = ""; if (item.getDecimals() == 0) { format = "#"; } else { format = "#."; for (int j = 0; j < item.getDecimals(); j++) { format = format + "#"; } } DecimalFormat df = new DecimalFormat(format); bids_map.put("price", df.format(price)); if (bidPriceSet.contains(df.format(price))) { continue; } else { bidPriceSet.add(df.format(price)); } bids_map.put("amount", df.format(amount)); try { String timeZone = timeZoneConverterService.getTimeZoneByItemCloseType(item.getOpenCloseType()); String dateStr = timeZoneConverterService.convertTimeZone(timeZone); bids_map.put("current_time", dateStr.split(" ")[1]); }catch (Exception e){ log.error("深度图设置 current_time失败"); } } bids_list.add(bids_map); } // sell map.put("bids", bids_list.stream().sorted((a,b)->{ String priceA = (String) a.get("price"); String priceB = (String) b.get("price"); return Double.compare(Double.parseDouble(priceA), Double.parseDouble(priceB)); }).collect(Collectors.toList())); return map; } private double getRandomValue(int value) { double addValue; if (value > 0) { int count = 0; while (value > 0) { value = value / 10; count++; } // 个 if (count == 1) { addValue = RandomUtil.randomFloat(0.01, 0.1999, 4); return addValue; } // 十 if (count == 2) { addValue = RandomUtil.randomFloat(0.1, 0.5999, 4); return addValue; } // 百 if (count == 3) { addValue = RandomUtil.randomFloat(0.1, 2.9999, 4); return addValue; } // 千 if (count == 4) { addValue = RandomUtil.randomFloat(1, 3.9999, 4); return addValue; } // 万 if (count == 5) { addValue = RandomUtil.randomFloat(1, 5.9999, 4); return addValue; } // 十万 else { addValue = RandomUtil.randomFloat(1, 5.9999, 4); return addValue; } } else { addValue = RandomUtil.randomFloat(0.01, 0.2999, 4); return addValue; } } private double getRandomValue(String value, int decimal) { double addValue; double d = Double.valueOf(value); int val = (int) d; // 个位数>0 if (val > 0) { int count = 0; while (val > 0) { val = val / 10; count++; } // 个 if (count == 1) { addValue = RandomUtil.randomFloat(0.01, 0.1999, 4); return addValue; } // 十 if (count == 2) { addValue = RandomUtil.randomFloat(0.1, 0.5999, 4); return addValue; } // 百 if (count == 3) { addValue = RandomUtil.randomFloat(0.1, 2.9999, 4); return addValue; } // 千 if (count == 4) { addValue = RandomUtil.randomFloat(1, 3.9999, 4); return addValue; } // 万 if (count == 5) { addValue = RandomUtil.randomFloat(1, 5.9999, 4); return addValue; } // 十万 else { addValue = RandomUtil.randomFloat(1, 5.9999, 4); return addValue; } } // 个位=0 else { // String[] valueSplit = value.split("\\."); int valueLength = decimal; if (valueLength <= 4) { addValue = RandomUtil.randomFloat(0.0001, 0.0299, 4); return addValue; } if (4 < valueLength && valueLength <= 6) { addValue = RandomUtil.randomFloat(0.000001, 0.00299, 6); return addValue; } if (6 < valueLength && valueLength <= 8) { addValue = RandomUtil.randomFloat(0.00000001, 0.0000299, 8); return addValue; } if (8 < valueLength && valueLength <= 10) { addValue = RandomUtil.randomFloat(0.00000001, 0.0000299, 10); return addValue; } else { addValue = RandomUtil.randomFloat(0.00000000001, 0.0000000299, 11); return addValue; } } } public void setItemService(ItemService itemService) { this.itemService = itemService; } }