package project.web.job; import java.math.RoundingMode; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSONObject; import kernel.util.ThreadUtils; import kernel.web.ResultObject; import project.data.DataCache; import project.data.model.Realtime; import project.item.ItemService; import project.item.model.Item; import project.web.websocket.WebSocketServer; /** * 行情数据推送Job * */ public class RealtimePushJob implements Runnable { private static final Logger logger = LoggerFactory.getLogger(RealtimePushJob.class); private ItemService itemService; public void start() { new Thread(this, "realtimePushJob").start(); if (logger.isInfoEnabled()) logger.info("启动realtimePushJob!"); } public void run() { while (true) { try { this.realtimeHandle(); } catch (Exception e) { logger.error("run fail", e); } finally { ThreadUtils.sleep(1000); } } } private void realtimeHandle() { try { Map realtimeResultMap = new HashMap<>(); // 行情实时价格 if (!WebSocketServer.realtimeMap.isEmpty()) { // 客户端请求的所有币种,去重集合 Set symbolSet = new HashSet(); for (String socketKey : WebSocketServer.realtimeMap.keySet()) { String symbolKey = socketKey.split("_")[2]; symbolSet.add(symbolKey); } for (String symbol : symbolSet) { Realtime realtimeData = DataCache.getRealtime(symbol); if(null==realtimeData) { logger.error("Not Found Realtime For Symbol: {}",symbol); continue; } this.realtimeRevise(realtimeResultMap, realtimeData, symbol); } if (realtimeResultMap.isEmpty()) { return; } for (String socketKey : WebSocketServer.realtimeMap.keySet()) { // long timeMillins = System.currentTimeMillis(); WebSocketServer server = WebSocketServer.realtimeMap.get(socketKey); // if (server.getTimeStr() != 0 && timeMillins > server.getTimeStr()) { // server.onClose(); // return; // } String type = socketKey.split("_")[1]; String symbolKey = socketKey.split("_")[2]; server.sendToMessageById(socketKey, realtimeResultMap.get(symbolKey), type); } } } catch (Throwable e) { e.printStackTrace(); } } /** * 行情实时价格解析 */ private void realtimeRevise(Map realtimeResultMap, Realtime realtime, String symbol) { ResultObject realtimeResult = new ResultObject(); List> list = new ArrayList>(); Map map = new HashMap(); map.put("symbol", symbol); map.put("ts", realtime.getTs()); map.put("current_time", realtime.getCurrent_time()); map.put("name", realtime.getName()); map.put("change_ratio", realtime.getChange_ratio()); double close = realtime.getClose(); Item item = this.itemService.cacheBySymbol(realtime.getSymbol(), true); if (item.getDecimals() == null || item.getDecimals() < 0) { map.put("open", realtime.getOpen()); map.put("close", close); map.put("high", realtime.getHigh()); map.put("low", realtime.getLow()); map.put("volume", realtime.getVolume()); map.put("amount", realtime.getAmount()); } else { String format = ""; if (item.getDecimals() == 0) { format = "#"; } else { format = "#."; for (int j = 0; j < item.getDecimals(); j++) { format = format + "#"; } } DecimalFormat df = new DecimalFormat(format); df.setRoundingMode(RoundingMode.FLOOR);// 向下取整 map.put("open", df.format(realtime.getOpen())); map.put("close", df.format(close)); map.put("high", df.format(realtime.getHigh())); map.put("low", df.format(realtime.getLow())); map.put("volume", df.format(realtime.getVolume())); map.put("amount", df.format(realtime.getAmount())); } list.add(map); realtimeResult.setData(list); realtimeResultMap.put(realtime.getSymbol(), JSONObject.toJSONString(realtimeResult)); } public void setItemService(ItemService itemService) { this.itemService = itemService; } }