package com.yami.trading.admin.task;
|
|
import com.yami.trading.bean.data.domain.Depth;
|
import com.yami.trading.bean.data.domain.Trade;
|
import com.yami.trading.bean.item.domain.Item;
|
import com.yami.trading.common.util.ThreadUtils;
|
import com.yami.trading.huobi.data.DataCache;
|
import com.yami.trading.huobi.data.internal.DepthTimeObject;
|
import com.yami.trading.huobi.data.internal.TradeTimeObject;
|
import com.yami.trading.huobi.data.job.DataQueue;
|
import com.yami.trading.huobi.data.job.HandleObject;
|
import com.yami.trading.huobi.hobi.HobiDataService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.core.task.TaskExecutor;
|
import org.springframework.stereotype.Component;
|
|
import java.util.Date;
|
|
@Slf4j
|
@Component
|
public class DataServer implements Runnable {
|
|
@Autowired
|
private HobiDataService hobiDataService;
|
|
@Autowired
|
private TaskExecutor taskExecutor;
|
|
public void start() {
|
new Thread(this, "DataServer").start();
|
if (log.isInfoEnabled())
|
log.info("启动DataServer!");
|
}
|
|
public void run() {
|
|
while (true) {
|
try {
|
|
HandleObject handleObject = DataQueue.poll();
|
|
if (handleObject != null) {
|
this.taskExecutor.execute(new HandleRunner(handleObject, handleObject.getType()));
|
} else {
|
ThreadUtils.sleep(50);
|
}
|
|
} catch (Throwable e) {
|
log.error("DataServer taskExecutor.execute() fail", e);
|
}
|
}
|
}
|
|
public class HandleRunner implements Runnable {
|
private String type;
|
private HandleObject handle;
|
|
public HandleRunner(HandleObject handle, String type) {
|
this.handle = handle;
|
this.type = type;
|
}
|
|
public void run() {
|
try {
|
if (HandleObject.type_depth.equals(type)) {
|
depth(handle);
|
} else if (HandleObject.type_trade.equals(type)) {
|
trade(handle);
|
}
|
|
} catch (Throwable t) {
|
log.error("HandleRunner run fail ", t);
|
}
|
}
|
|
private void depth(HandleObject handle) {
|
Item item = handle.getItem();
|
if(item == null){
|
return;
|
}
|
Depth depth = hobiDataService.depthDecorator(item.getSymbol(), 0);
|
if (depth != null) {
|
DepthTimeObject timeObject = new DepthTimeObject();
|
timeObject.setLastTime(new Date());
|
|
timeObject.setDepth(depth);
|
DataCache.getDepth().put(item.getSymbol(), timeObject);
|
}
|
}
|
|
private void trade(HandleObject handle) {
|
Item item = handle.getItem();
|
Trade trade = hobiDataService.tradeDecorator(item.getSymbol(), 0);
|
|
if (trade != null) {
|
TradeTimeObject timeObject = DataCache.getTrade().get(item.getSymbol());
|
if (timeObject == null) {
|
timeObject = new TradeTimeObject();
|
}
|
timeObject.setLastTime(new Date());
|
|
timeObject.put(item.getSymbol(), trade.getData());
|
|
DataCache.getTrade().put(item.getSymbol(), timeObject);
|
}
|
}
|
|
}
|
|
public void setTaskExecutor(TaskExecutor taskExecutor) {
|
this.taskExecutor = taskExecutor;
|
}
|
|
public void setHobiDataService(HobiDataService hobiDataService) {
|
this.hobiDataService = hobiDataService;
|
}
|
|
}
|