package com.yami.trading.admin.task;
|
|
import com.yami.trading.bean.data.domain.Depth;
|
import com.yami.trading.bean.data.domain.Realtime;
|
import com.yami.trading.bean.data.domain.Trade;
|
import com.yami.trading.bean.item.domain.Item;
|
import com.yami.trading.common.exception.YamiShopBindException;
|
import com.yami.trading.common.util.Arith;
|
import com.yami.trading.huobi.data.AdjustmentValueCache;
|
import com.yami.trading.huobi.data.DataCache;
|
import com.yami.trading.huobi.data.internal.DepthTimeObject;
|
import com.yami.trading.huobi.data.internal.TradeTimeObject;
|
import com.yami.trading.huobi.data.websocket.client.MarketClient;
|
import com.yami.trading.huobi.data.websocket.client.req.market.SubMarketTickerRequest;
|
import com.yami.trading.huobi.data.websocket.constant.HuobiOptions;
|
import com.yami.trading.huobi.data.websocket.model.market.MarketDepthEvent;
|
import com.yami.trading.huobi.data.websocket.model.market.MarketTickerEvent;
|
import com.yami.trading.huobi.data.websocket.model.market.MarketTradeEvent;
|
import com.yami.trading.huobi.hobi.HobiDataService;
|
import com.yami.trading.service.item.ItemService;
|
import org.checkerframework.checker.units.qual.C;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.stereotype.Component;
|
|
import java.math.BigDecimal;
|
import java.util.Date;
|
import java.util.HashMap;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.stream.Collectors;
|
|
/**
|
*
|
* 深度、近期交易记录 websocket启动服务
|
*/
|
@Component
|
public class RealtimeWebsocketServer {
|
|
@Autowired
|
ItemService itemService;
|
@Autowired
|
HobiDataService hobiDataService;
|
|
public static Map<String, Long> map = new HashMap();
|
|
public void start() {
|
List<Item> itemList = itemService.cacheGetAll().stream().filter(i -> i.getType().equalsIgnoreCase(Item.cryptos)).collect(Collectors.toList());
|
|
MarketClient tickerClient = MarketClient.create(new HuobiOptions());
|
for (Item item : itemList) {
|
String symbol = item.getSymbol();
|
SubMarketTickerRequest tickerReq = new SubMarketTickerRequest();
|
tickerReq.setSymbol(symbol);
|
tickerClient.subMarketTicker(tickerReq, (marketTradeEvent) -> {
|
// System.out.println(JSONObject.toJSONString(marketTradeEvent));
|
ticker(marketTradeEvent, symbol);
|
});
|
}
|
|
// MarketClient marketClient = MarketClient.create(new HuobiOptions());
|
// for (Item item : itemList) {
|
// String symbol = item.getSymbol_data();
|
// SubMarketDepthRequest depthReq = new SubMarketDepthRequest();
|
// depthReq.setSymbol(symbol);
|
// depthReq.setStep(DepthStepEnum.STEP2);
|
// marketClient.subMarketDepth(depthReq, (marketDetailEvent) -> {
|
// // System.out.println(JSONObject.toJSONString(marketDetailEvent));
|
// depth(marketDetailEvent, symbol);
|
// });
|
// }
|
//
|
// MarketClient tradeClient = MarketClient.create(new HuobiOptions());
|
// for (Item item : itemList) {
|
// String symbol = item.getSymbol_data();
|
// SubMarketTradeRequest tradeReq = new SubMarketTradeRequest();
|
// tradeReq.setSymbol(symbol);
|
// tradeClient.subMarketTrade(tradeReq, (marketTradeEvent) -> {
|
// // System.out.println(JSONObject.toJSONString(marketTradeEvent));
|
// trade(marketTradeEvent, symbol);
|
// });
|
// }
|
}
|
|
private void ticker(MarketTickerEvent event, String symbol) {
|
try {
|
|
if (map.containsKey(symbol) && map.get(symbol) > event.getTs()) {
|
return;
|
}
|
map.put(symbol, event.getTs() + 500);
|
Realtime realtime = new Realtime();
|
Item item = itemService.findBySymbol(symbol);
|
symbol = item.getSymbol();
|
item = this.itemService.findBySymbol(symbol);
|
//停牌时不更新
|
if (itemService.isSuspended(symbol)) {
|
return;
|
}
|
Double currentValue = AdjustmentValueCache.getCurrentValue().get(symbol).doubleValue();
|
double close = event.getTicker().getClose().doubleValue();
|
double vol = event.getTicker().getVol().doubleValue();
|
double amount = event.getTicker().getAmount().doubleValue();
|
|
realtime.setSymbol(symbol);
|
realtime.setTs(event.getTs());
|
realtime.setName(item.getName());
|
realtime.setOpen(event.getTicker().getOpen());
|
realtime.setClose(new BigDecimal(close));
|
realtime.setHigh(event.getTicker().getHigh());
|
realtime.setLow(event.getTicker().getLow());
|
realtime.setVolume(new BigDecimal(vol));
|
realtime.setAmount(new BigDecimal(amount));
|
|
if (currentValue != null && currentValue != 0) {
|
realtime.setClose(new BigDecimal(Arith.add(close, currentValue.doubleValue())));
|
realtime.setVolume(new BigDecimal(Arith.add(vol, Arith.mul(Arith.div(currentValue, close), vol))));
|
realtime.setAmount(new BigDecimal(Arith.add(amount, Arith.mul(Arith.div(currentValue, close), amount))));
|
}
|
|
Double high = DataCache.getRealtimeHigh().get(symbol);
|
Double low = DataCache.getRealtimeLow().get(symbol);
|
|
if (high == null || realtime.getClose().doubleValue() > high) {
|
DataCache.getRealtimeHigh().put(symbol, close);
|
}
|
if ((low == null || realtime.getClose().doubleValue() < low) && realtime.getClose().doubleValue() > 0) {
|
DataCache.getRealtimeLow().put(symbol, close);
|
}
|
|
Realtime current = DataCache.getRealtime(symbol);
|
if (current == null || current.getTs() != event.getTs()) {
|
// 交易量倍数不为空或0时修改倍数
|
if (item.getMultiple().doubleValue() > 0) {
|
realtime.setVolume(realtime.getVolume().multiply(item.getMultiple()));
|
realtime.setAmount(realtime.getAmount().multiply(item.getMultiple()));
|
}
|
|
if (high != null && high >= realtime.getClose().doubleValue()) {
|
realtime.setHigh(new BigDecimal(high));
|
}
|
|
if (low != null && low <= realtime.getClose().doubleValue()) {
|
realtime.setLow(new BigDecimal(low));
|
}
|
|
Double h24Before = DataCache.getRealtime24HBeforeOpen().get(symbol);
|
if (h24Before != null) {
|
realtime.setOpen(new BigDecimal(h24Before));
|
}
|
|
DataCache.putRealtime(symbol, realtime);
|
// System.out.println(JSONObject.toJSONString(realtime));
|
}
|
} catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
|
private void depth(MarketDepthEvent event, String symbol) {
|
Item item = itemService.findBySymbol(symbol);
|
Depth depth = hobiDataService.depthDecorator(event, item);
|
if (depth != null) {
|
DepthTimeObject timeObject = new DepthTimeObject();
|
timeObject.setLastTime(new Date());
|
timeObject.setDepth(depth);
|
DataCache.getDepth().put(item.getSymbol(), timeObject);
|
// System.out.println("深度数据 入缓存" + JSONObject.toJSONString(timeObject));
|
}
|
}
|
|
private void trade(MarketTradeEvent event, String symbol) {
|
Item item = itemService.findBySymbol(symbol);
|
Trade trade = hobiDataService.tradeDecorator(event, item);
|
if (trade != null) {
|
TradeTimeObject timeObject = new TradeTimeObject();
|
timeObject.setLastTime(new Date());
|
timeObject.put(item.getSymbol(), trade.getData());
|
DataCache.getTrade().put(item.getSymbol(), timeObject);
|
}
|
}
|
|
public void setItemService(ItemService itemService) {
|
this.itemService = itemService;
|
}
|
|
public void setHobiDataService(HobiDataService hobiDataService) {
|
this.hobiDataService = hobiDataService;
|
}
|
}
|