package com.yami.trading.huobi.task.contract; import cn.hutool.core.util.StrUtil; import com.yami.trading.bean.contract.domain.ContractApplyOrder; import com.yami.trading.bean.contract.domain.ContractOrder; import com.yami.trading.bean.data.domain.Realtime; import com.yami.trading.bean.item.domain.Item; import com.yami.trading.common.constants.RedisKeys; import com.yami.trading.common.constants.RedisLockKeyConstants; import com.yami.trading.service.MarketOpenChecker; import com.yami.trading.common.util.RedisUtil; import com.yami.trading.common.util.ThreadUtils; import com.yami.trading.service.contract.ContractApplyOrderService; import com.yami.trading.service.contract.ContractLock; import com.yami.trading.service.contract.ContractOrderService; import com.yami.trading.service.data.DataService; import com.yami.trading.service.item.ItemService; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import java.math.BigDecimal; import java.util.List; /** * * 委托单进入市场 */ @Slf4j @Component public class ContractApplyOrderHandleJob implements Runnable { @Autowired private ContractOrderService contractOrderService; @Autowired private ContractApplyOrderService contractApplyOrderService; @Autowired @Qualifier("dataService") private DataService dataService; @Autowired private ItemService itemService; @Autowired private RedissonClient redissonClient; public void start(){ new Thread(this, "ContractApplyOrderHandleJob").start(); log.info("委托单处理线程启动!"); } @Override public void run() { /* * 系统启动先暂停40秒 */ ThreadUtils.sleep(1000 * 40); while (true) { boolean processSuccess = false; String oneContractApplyOrderId = null; try { oneContractApplyOrderId = RedisUtil.spop(RedisKeys.NEW_CONTRACT_APPLY_ORDERS); if (StrUtil.isBlank(oneContractApplyOrderId)) { // finally 里面仍然会执行了 sleep processSuccess = true; //log.info("---> ContractApplyOrderHandleJob.run 没有可以处理的单..."); continue; } //log.info("---> ContractApplyOrderHandleJob.run 获取一个待处理的合约订单:{}", oneContractApplyOrderId); ContractApplyOrder order = this.contractApplyOrderService.getById(oneContractApplyOrderId); if (order == null || !order.getState().equals(ContractApplyOrder.STATE_SUBMITTED)) { processSuccess = true; continue; } List realtime_list = this.dataService.realtime(order.getSymbol()); Realtime realtime = null; if (realtime_list.size() > 0) { realtime = realtime_list.get(0); //log.info("---> ContractApplyOrderHandleJob.run symbol:{} 的 realTime 值为:{}", order.getSymbol(), realtime); } else { log.warn("---> ContractApplyOrderHandleJob.run symbol:{} 的 realTime 值为空", order.getSymbol()); continue; } // 休市不做撮合 Item bySymbol = itemService.findBySymbol(order.getSymbol()); if (bySymbol == null) { continue; } boolean isOpen = MarketOpenChecker.isMarketOpenByItemCloseType(bySymbol.getOpenCloseType()); if (!isOpen) { //log.warn("---> ContractApplyOrderHandleJob.run symbol:{} 未open", order.getSymbol()); continue; } if ("limit".equals(order.getOrderPriceType())) { /** * 限价单 */ if ("buy".equals(order.getDirection())) { /** * 买涨 */ if (BigDecimal.valueOf(realtime.getClose()).compareTo(order.getPrice()) <= 0) { processSuccess = this.handle(order, realtime); } } else { /** * 买跌 */ if (BigDecimal.valueOf(realtime.getClose()).compareTo(order.getPrice()) >= 0) { processSuccess = this.handle(order, realtime); } } } else { /** * 非限制,直接进入市场 */ processSuccess = this.handle(order, realtime); } } catch (Exception e) { log.error("合约订单:{} 处理失败", oneContractApplyOrderId, e); } finally { if (!processSuccess) { // 未处理成功,缓存加回去 //log.warn("合约订单:{} 处理失败,重新录入缓存", oneContractApplyOrderId); RedisUtil.sadd(RedisKeys.NEW_CONTRACT_APPLY_ORDERS, oneContractApplyOrderId); } ThreadUtils.sleep(1000 * 1); } } } public boolean handle(ContractApplyOrder applyOrder, Realtime realtime) { boolean lock = false; try { if (!ContractLock.add(applyOrder.getOrderNo())) { return false; } lock = true; if ("open".equals(applyOrder.getOffset())) { // 全局锁 String lockKey = RedisLockKeyConstants.LOCK_CONTRACT_APPLY_TO_ORDER_PREFIX + applyOrder.getUuid(); RLock rLock = redissonClient.getLock(lockKey); if (!rLock.tryLock()) { return false; } try { ContractApplyOrder order = this.contractApplyOrderService.getById(applyOrder.getUuid()); if (order == null || !order.getState().equals(ContractApplyOrder.STATE_SUBMITTED)) { // 已处理,跳过 return true; } this.contractOrderService.saveOpen(applyOrder, realtime); } finally { rLock.unlock(); } return true; } else if ("close".equals(applyOrder.getOffset())) { /** * 平仓 */ List list = this.contractOrderService.findSubmitted(applyOrder.getPartyId(), applyOrder.getSymbol(), applyOrder.getDirection()); if (list.size() == 0) { applyOrder.setVolume(BigDecimal.ZERO); applyOrder.setState(ContractApplyOrder.STATE_CREATED); this.contractApplyOrderService.updateById(applyOrder); return true; } boolean allProcessOk = true; for (int i = 0; i < list.size(); i++) { ContractOrder order = list.get(i); boolean lock_order = false; try { if (!ContractLock.add(order.getOrderNo())) { allProcessOk = false; continue; } lock_order = true; applyOrder = this.contractOrderService.saveClose(applyOrder, realtime, order.getOrderNo()); if (ContractApplyOrder.STATE_CREATED.equals(applyOrder.getState())) { allProcessOk = true; break; } } catch (Exception e) { allProcessOk = false; log.error("---> ContractApplyOrderHandleJob.handle 处理合约:{} 报错", applyOrder.getUuid(), e); } finally { if (lock_order) { ThreadUtils.sleep(100); ContractLock.remove(order.getOrderNo()); } } } return allProcessOk; } log.error("---> ContractApplyOrderHandleJob.handle 当前合约:{} 记录无法处理", applyOrder.getUuid()); return false; } catch (Exception e) { log.error("---> ContractApplyOrderHandleJob.handle 处理合约:{} 报错:", applyOrder.getUuid(), e); return false; } finally { if (lock) { ThreadUtils.sleep(100); ContractLock.remove(applyOrder.getOrderNo()); } } } }