package project.data.job;
|
|
import java.util.Date;
|
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.core.task.TaskExecutor;
|
|
import kernel.util.ThreadUtils;
|
import project.data.DataCache;
|
import project.data.internal.DepthTimeObject;
|
import project.data.internal.TradeTimeObject;
|
import project.data.model.Depth;
|
import project.data.model.Trade;
|
import project.hobi.HobiDataService;
|
import project.item.model.Item;
|
|
public class DataServer implements Runnable {
|
|
private Logger logger = LoggerFactory.getLogger(DataServer.class);
|
|
private HobiDataService hobiDataService;
|
|
private TaskExecutor taskExecutor;
|
|
public void start() {
|
new Thread(this, "DataServer").start();
|
if (logger.isInfoEnabled())
|
logger.info("启动DataServer!");
|
}
|
|
public void run() {
|
|
while (true) {
|
try {
|
|
HandleObject handleObject = DataQueue.poll();
|
|
if (handleObject != null) {
|
this.taskExecutor.execute(new HandleRunner(handleObject, handleObject.getType()));
|
} else {
|
ThreadUtils.sleep(50);
|
}
|
|
} catch (Throwable e) {
|
logger.error("DataServer taskExecutor.execute() fail", e);
|
}
|
}
|
}
|
|
public class HandleRunner implements Runnable {
|
private String type;
|
private HandleObject handle;
|
|
public HandleRunner(HandleObject handle, String type) {
|
this.handle = handle;
|
this.type = type;
|
}
|
|
public void run() {
|
try {
|
if (HandleObject.type_depth.equals(type)) {
|
depth(handle);
|
} else if (HandleObject.type_trade.equals(type)) {
|
trade(handle);
|
}
|
|
} catch (Throwable t) {
|
logger.error("HandleRunner run fail ", t);
|
}
|
}
|
|
private void depth(HandleObject handle) {
|
Item item = handle.getItem();
|
Depth depth = hobiDataService.depthDecorator(item.getSymbol_data(), 0);
|
if (depth != null) {
|
DepthTimeObject timeObject = new DepthTimeObject();
|
timeObject.setLastTime(new Date());
|
|
timeObject.setDepth(depth);
|
DataCache.getDepth().put(item.getSymbol(), timeObject);
|
}
|
}
|
|
private void trade(HandleObject handle) {
|
Item item = handle.getItem();
|
Trade trade = hobiDataService.tradeDecorator(item.getSymbol_data(), 0);
|
|
if (trade != null) {
|
TradeTimeObject timeObject = DataCache.getTrade().get(item.getSymbol());
|
if (timeObject == null) {
|
timeObject = new TradeTimeObject();
|
}
|
timeObject.setLastTime(new Date());
|
|
timeObject.put(item.getSymbol(), trade.getData());
|
|
DataCache.getTrade().put(item.getSymbol(), timeObject);
|
}
|
}
|
|
}
|
|
public void setTaskExecutor(TaskExecutor taskExecutor) {
|
this.taskExecutor = taskExecutor;
|
}
|
|
public void setHobiDataService(HobiDataService hobiDataService) {
|
this.hobiDataService = hobiDataService;
|
}
|
|
}
|