package com.yami.trading.admin.task; import com.alibaba.fastjson.JSONObject; import com.yami.trading.api.websocket.WebSocketServer; import com.yami.trading.api.websocket.WebSocketSession; import com.yami.trading.bean.data.domain.Depth; import com.yami.trading.bean.data.domain.DepthEntry; import com.yami.trading.bean.data.domain.Realtime; import com.yami.trading.bean.data.domain.TradeEntry; import com.yami.trading.bean.item.domain.Item; import com.yami.trading.common.util.StringUtils; import com.yami.trading.common.web.ResultObject; import com.yami.trading.huobi.data.DataCache; import com.yami.trading.huobi.data.internal.DepthTimeObject; import com.yami.trading.huobi.data.internal.TradeTimeObject; import com.yami.trading.huobi.data.job.DataQueue; import com.yami.trading.huobi.data.job.HandleObject; import com.yami.trading.huobi.hobi.internal.XueQiuDataServiceImpl; import com.yami.trading.service.item.ItemService; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Lazy; import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.*; import java.util.stream.Collectors; import java.util.stream.Stream; @Component @Lazy(value = false) @Slf4j public class AStockPanKouTask { @Autowired private DepthPushJob depthPushJob; @Autowired private ItemService itemService; @Autowired private XueQiuDataServiceImpl xueQiuDataService; // @Scheduled(cron = "*/5 * * * * ?") 雪球 public void sendTask() throws InterruptedException { List collect = itemService.list().stream().filter(t -> Item.A_STOCKS.equalsIgnoreCase(t.getOpenCloseType())).map(Item::getSymbol).collect(Collectors.toList()); for (String symbols: collect) { List pankous = xueQiuDataService.pankous(symbols); pankous.stream().forEach(d -> { DepthTimeObject timeObject = new DepthTimeObject(); timeObject.setLastTime(new Date()); timeObject.setDepth(d); DataCache.getDepth().put(d.getSymbol(), timeObject); }); } // Set symbolSet = pankous.stream().map(Depth::getSymbol).collect(Collectors.toSet()); // Map depthResultMap = new HashMap<>(); // // 数据处理 // ResultObject depthResult = new ResultObject(); // for (String symbol : symbolSet) { // DepthTimeObject depth = DataCache.getDepth().get(symbol); // if (null != depth && null != depth.getDepth()) { // Depth depthData = depth.getDepth(); // Realtime realtime = DataCache.getRealtime(symbol); // if (realtime != null) { // depthResult.setData(depthPushJob.depthRevise(depthData, symbol, realtime.getClose().doubleValue())); // } // } // depthResultMap.put(symbol, JSONObject.toJSONString(depthResult)); // } // // if (depthResultMap.isEmpty()) { // return; // } // for (String socketKey : WebSocketServer.depthMap.keySet()) { // WebSocketSession webSocketSession = WebSocketServer.depthMap.get(socketKey); // String type = webSocketSession.getType(); // String symbolKey = webSocketSession.getParam(); // String message = depthResultMap.get(symbolKey); // if (StringUtils.isEmptyString(message)) { // continue; // } // WebSocketServer.sendToMessageById(socketKey, message, type); // } } }