package com.yami.trading.admin.task; import com.yami.trading.bean.data.domain.Depth; import com.yami.trading.bean.data.domain.Realtime; import com.yami.trading.bean.data.domain.Trade; import com.yami.trading.bean.item.domain.Item; import com.yami.trading.common.util.Arith; import com.yami.trading.huobi.data.AdjustmentValueCache; import com.yami.trading.huobi.data.DataCache; import com.yami.trading.huobi.data.internal.DepthTimeObject; import com.yami.trading.huobi.data.internal.TradeTimeObject; import com.yami.trading.huobi.data.websocket.client.MarketClient; import com.yami.trading.huobi.data.websocket.client.req.market.SubMarketTickerRequest; import com.yami.trading.huobi.data.websocket.constant.HuobiOptions; import com.yami.trading.huobi.data.websocket.model.market.MarketDepthEvent; import com.yami.trading.huobi.data.websocket.model.market.MarketTickerEvent; import com.yami.trading.huobi.data.websocket.model.market.MarketTradeEvent; import com.yami.trading.huobi.hobi.HobiDataService; import com.yami.trading.service.item.ItemService; import org.checkerframework.checker.units.qual.C; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.stream.Collectors; /** * * 深度、近期交易记录 websocket启动服务 */ @Component public class RealtimeWebsocketServer { @Autowired ItemService itemService; @Autowired HobiDataService hobiDataService; public static Map map = new HashMap(); public void start() { List itemList = itemService.cacheGetAll().stream().filter(i -> i.getType().equalsIgnoreCase(Item.cryptos)).collect(Collectors.toList()); MarketClient tickerClient = MarketClient.create(new HuobiOptions()); for (Item item : itemList) { String symbol = item.getSymbol(); SubMarketTickerRequest tickerReq = new SubMarketTickerRequest(); tickerReq.setSymbol(symbol); tickerClient.subMarketTicker(tickerReq, (marketTradeEvent) -> { // System.out.println(JSONObject.toJSONString(marketTradeEvent)); ticker(marketTradeEvent, symbol); }); } // MarketClient marketClient = MarketClient.create(new HuobiOptions()); // for (Item item : itemList) { // String symbol = item.getSymbol_data(); // SubMarketDepthRequest depthReq = new SubMarketDepthRequest(); // depthReq.setSymbol(symbol); // depthReq.setStep(DepthStepEnum.STEP2); // marketClient.subMarketDepth(depthReq, (marketDetailEvent) -> { // // System.out.println(JSONObject.toJSONString(marketDetailEvent)); // depth(marketDetailEvent, symbol); // }); // } // // MarketClient tradeClient = MarketClient.create(new HuobiOptions()); // for (Item item : itemList) { // String symbol = item.getSymbol_data(); // SubMarketTradeRequest tradeReq = new SubMarketTradeRequest(); // tradeReq.setSymbol(symbol); // tradeClient.subMarketTrade(tradeReq, (marketTradeEvent) -> { // // System.out.println(JSONObject.toJSONString(marketTradeEvent)); // trade(marketTradeEvent, symbol); // }); // } } private void ticker(MarketTickerEvent event, String symbol) { try { if (map.containsKey(symbol) && map.get(symbol) > event.getTs()) { return; } map.put(symbol, event.getTs() + 500); Realtime realtime = new Realtime(); Item item = itemService.findBySymbol(symbol); symbol = item.getSymbol(); item = this.itemService.findBySymbol(symbol); Double currentValue = AdjustmentValueCache.getCurrentValue().get(symbol).doubleValue(); double close = event.getTicker().getClose().doubleValue(); double vol = event.getTicker().getVol().doubleValue(); double amount = event.getTicker().getAmount().doubleValue(); realtime.setSymbol(symbol); realtime.setTs(event.getTs()); realtime.setName(item.getName()); realtime.setOpen(event.getTicker().getOpen()); realtime.setClose(new BigDecimal(close)); realtime.setHigh(event.getTicker().getHigh()); realtime.setLow(event.getTicker().getLow()); realtime.setVolume(new BigDecimal(vol)); realtime.setAmount(new BigDecimal(amount)); if (currentValue != null && currentValue != 0) { realtime.setClose(new BigDecimal(Arith.add(close, currentValue.doubleValue()))); realtime.setVolume(new BigDecimal(Arith.add(vol, Arith.mul(Arith.div(currentValue, close), vol)))); realtime.setAmount(new BigDecimal(Arith.add(amount, Arith.mul(Arith.div(currentValue, close), amount)))); } Double high = DataCache.getRealtimeHigh().get(symbol); Double low = DataCache.getRealtimeLow().get(symbol); if (high == null || realtime.getClose().doubleValue() > high) { DataCache.getRealtimeHigh().put(symbol, close); } if ((low == null || realtime.getClose().doubleValue() < low) && realtime.getClose().doubleValue() > 0) { DataCache.getRealtimeLow().put(symbol, close); } Realtime current = DataCache.getRealtime(symbol); if (current == null || current.getTs() != event.getTs()) { // 交易量倍数不为空或0时修改倍数 if (item.getMultiple().doubleValue() > 0) { realtime.setVolume(realtime.getVolume().multiply(item.getMultiple())); realtime.setAmount(realtime.getAmount().multiply(item.getMultiple())); } if (high != null && high >= realtime.getClose().doubleValue()) { realtime.setHigh(new BigDecimal(high)); } if (low != null && low <= realtime.getClose().doubleValue()) { realtime.setLow(new BigDecimal(low)); } Double h24Before = DataCache.getRealtime24HBeforeOpen().get(symbol); if (h24Before != null) { realtime.setOpen(new BigDecimal(h24Before)); } DataCache.putRealtime(symbol, realtime); // System.out.println(JSONObject.toJSONString(realtime)); } } catch (Exception e) { e.printStackTrace(); } } private void depth(MarketDepthEvent event, String symbol) { Item item = itemService.findBySymbol(symbol); Depth depth = hobiDataService.depthDecorator(event, item); if (depth != null) { DepthTimeObject timeObject = new DepthTimeObject(); timeObject.setLastTime(new Date()); timeObject.setDepth(depth); DataCache.getDepth().put(item.getSymbol(), timeObject); // System.out.println("深度数据 入缓存" + JSONObject.toJSONString(timeObject)); } } private void trade(MarketTradeEvent event, String symbol) { Item item = itemService.findBySymbol(symbol); Trade trade = hobiDataService.tradeDecorator(event, item); if (trade != null) { TradeTimeObject timeObject = new TradeTimeObject(); timeObject.setLastTime(new Date()); timeObject.put(item.getSymbol(), trade.getData()); DataCache.getTrade().put(item.getSymbol(), timeObject); } } public void setItemService(ItemService itemService) { this.itemService = itemService; } public void setHobiDataService(HobiDataService hobiDataService) { this.hobiDataService = hobiDataService; } }