1
zj
2025-06-23 dc9bd22833255bc602dd42c7f603ecb50842ab35
src/main/java/project/contract/job/ContractOrderCalculationServiceImpl.java
@@ -1,10 +1,15 @@
package project.contract.job;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.text.DecimalFormat;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import kernel.util.StringUtils;
import kernel.web.ApplicationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -14,6 +19,11 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.jdbc.core.BatchPreparedStatementSetter;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import org.springframework.util.ObjectUtils;
import project.contract.ContractLock;
import project.contract.ContractOrder;
@@ -27,6 +37,7 @@
import project.wallet.Wallet;
import project.wallet.WalletRedisKeys;
import project.wallet.WalletService;
import project.wallet.consumer.WalletDao;
import project.wallet.consumer.WalletMessage;
public class ContractOrderCalculationServiceImpl implements ContractOrderCalculationService, ApplicationContextAware {
@@ -36,6 +47,7 @@
   private ContractOrderService contractOrderService;
   private DataService dataService;
   private WalletService walletService;
   private WalletDao walletDao;
//   private RedisHandler redisHandler;
   private AssetService assetService;
   public final static String STATE_SUBMITTED = "submitted";
@@ -55,6 +67,7 @@
      try {
         ContractOrder order = contractOrderService.findByOrderNo(order_no);
         if (order == null || !ContractOrder.STATE_SUBMITTED.equals(order.getState())) {
            logger.info("-----------到这里来了吗");
            /**
             * 状态已改变,退出处理
             */
@@ -62,6 +75,7 @@
         }
         List<Realtime> list = this.dataService.realtime(order.getSymbol());
         if (list.size() == 0) {
            logger.info("-----------去你妈退出了");
            return;
         }
         Realtime realtime = list.get(0);
@@ -98,6 +112,9 @@
   }
   private static final Lock lock = new ReentrantLock(); // 全局锁,避免重复执行
   private static final long SLEEP_TIME = 500; // 重试间隔
   /**
    * 盈亏计算
    *
@@ -117,18 +134,28 @@
          * 盈 正数
          */
         order.setProfit(Arith.add(0.0D, Math.abs(amount)));
         redisHandler.setSync("MONEY_CONTRACT_PROFIT_"+order.getPartyId().toString(), 0);
         Object profit = redisHandler.get("MONEY_CONTRACT_PROFIT_" + order.getPartyId().toString());
         if(!ObjectUtils.isEmpty(profit)){
            if(Double.parseDouble(profit.toString()) < 0 ){
               redisHandler.setSync("MONEY_CONTRACT_PROFIT_"+order.getPartyId().toString(), 0);
               Object money = redisHandler.get("PARTY_ID_MONEY_" + order.getPartyId().toString());
               if(!ObjectUtils.isEmpty(money)){
                  Wallet wallet = this.walletService.saveWalletByPartyId(order.getPartyId().toString());
                  wallet.setMoney(Double.parseDouble(money.toString()));
                  redisHandler.setSync(WalletRedisKeys.WALLET_PARTY_ID + wallet.getPartyId().toString(), wallet);
                  redisHandler.pushAsyn(WalletRedisKeys.WALLET_QUEUE_UPDATE, new WalletMessage(order.getPartyId().toString(), amount));
               }
            }
         }
      } else if ("loss".equals(profit_loss)) {
         order.setProfit(Arith.sub(0.0D,Math.abs(amount) ));
         //---------------------------------------------  这一段代码是新加的,服务器上面现在是注释的
         //定义一个 总浮动亏损的值
         logger.info("----------fffffffffffffff-------"+redisHandler);
         logger.info("----------ssssssssssss-------"+order.getPartyId().toString());
         Double contractAssetsProfit = (Double) redisHandler.get(ContractRedisKeys.CONTRACT_ASSETS_PROFIT_PARTY_ID + order.getPartyId().toString());
         logger.info("----------bbbbbbbbbbbbbbbbbb-------"+contractAssetsProfit);
         double contractProfit = null == contractAssetsProfit ? 0.000D : contractAssetsProfit;
         logger.info("----------contractProfit-------"+contractProfit);
         List<ContractOrder> list = contractOrderService.findSubmitted(order.getPartyId().toString(), null, null);
         double deposit = 0;
         for (int i = 0; i < list.size(); i++) {
@@ -136,29 +163,23 @@
            deposit = Arith.add(deposit, close_line.getDeposit());
         }
         double profitt = Arith.add(contractProfit,deposit);
         logger.info("----------盈亏除去保证金之后的值-------"+profitt);
         if(profitt <= 0){
            redisHandler.setSync("MONEY_CONTRACT_PROFIT_"+order.getPartyId().toString(), profitt);
         }else{
            redisHandler.setSync("MONEY_CONTRACT_PROFIT_"+order.getPartyId().toString(), 0);
         }
         logger.info("----------1111111111111-------");
         //修改余额,每次修改取平仓前余额减去浮动亏损
         Wallet wallet = this.walletService.saveWalletByPartyId(order.getPartyId().toString());
         logger.info("----------wallet-------"+wallet);
         Object money = redisHandler.get("PARTY_ID_MONEY_" + order.getPartyId().toString());
         Object profit = redisHandler.get("MONEY_CONTRACT_PROFIT_" + order.getPartyId().toString());
         logger.info("----------money-------"+money);
         logger.info("----------profit-------"+profit);
         if(!ObjectUtils.isEmpty(money) && !ObjectUtils.isEmpty(profit)){
            wallet.setMoney(Arith.add(Double.parseDouble(money.toString()),Double.parseDouble(profit.toString())));
            double residueMoney = Arith.add(Double.parseDouble(money.toString()), Double.parseDouble(profit.toString()));
            //如果当前余额减去浮动亏损小于0,将进入强平,这里不再进行余额更新
            if(residueMoney > 0){
               wallet.setMoney(residueMoney);
               redisHandler.setSync(WalletRedisKeys.WALLET_PARTY_ID + wallet.getPartyId().toString(), wallet);
            }
         }
         logger.info("----------2222222222222-------");
         redisHandler.setSync(WalletRedisKeys.WALLET_PARTY_ID + wallet.getPartyId().toString(), wallet);
         logger.info("----------3333333333333333------");
         redisHandler.pushAsyn(WalletRedisKeys.WALLET_QUEUE_UPDATE, new WalletMessage(order.getPartyId().toString(), amount));
         logger.info("----------4444444444444-------");
         //------------------------------------------
      }
      double changeRatio;
@@ -231,47 +252,33 @@
            profit = Arith.add(profit, Arith.add(close_line.getProfit(), close_line.getDeposit()));
         }
         Wallet wallet = this.walletService.saveWalletByPartyId(order.getPartyId().toString());
         double totleMoney = wallet.getMoney();
         Object money = redisHandler.get("PARTY_ID_MONEY_" + order.getPartyId().toString());
         double totleMoney = 0;
         if(ObjectUtils.isEmpty(money)){
            totleMoney = wallet.getMoney();
         }else{
            totleMoney = Double.parseDouble(money.toString());
         }
         logger.info("---------全仓强平任务----------------");
         logger.info("-------------------------:"+Arith.add(profit,totleMoney));
         if (Arith.add(profit,totleMoney) <= 0) {
            redisHandler.setSync("MONEY_CONTRACT_PROFIT_"+order.getPartyId().toString(), 0);
            /**
             * 触发全仓强平
             */
            this.contractOrderService.saveClose(order.getPartyId().toString(), order.getOrder_no());
            ThreadUtils.sleep(100);
            for (int i = 0; i < list.size(); i++) {
               ContractOrder close_line = list.get(i);
               if (!order.getOrder_no().equals(close_line.getOrder_no())) {
                  try {
                     while (true) {
                        if (ContractLock.add(close_line.getOrder_no())) {
                           this.contractOrderService.saveClose(close_line.getPartyId().toString(),
                                 close_line.getOrder_no());
                           /**
                            * 处理完退出
                            */
                           break;
                        }
                        ThreadUtils.sleep(500);
                     }
                  } catch (Exception e) {
                     logger.error("error:", e);
                  } finally {
                     ContractLock.remove(close_line.getOrder_no());
                     ThreadUtils.sleep(100);
                  }
               synchronized (close_line.getOrder_no()){
                  this.contractOrderService.saveClose(close_line.getPartyId().toString(),
                        close_line.getOrder_no());
               }
            }
            //钱包归零
            extracted(order, wallet, redisHandler);
         }
      } else {
         logger.info("---------进入单个持仓----------------");
         logger.info("---------order.getProfit()----------------"+order.getProfit());
         logger.info("---------order.getDeposit()----------------"+order.getDeposit());
         logger.info("---------order_close_line----------------"+order_close_line);
         if (order.getProfit() < 0 && (Arith.div(order.getDeposit(), Math.abs(order.getProfit())) <= Arith
               .div(order_close_line, 100))) {
            logger.info("---------进入强平了----------------");
@@ -282,8 +289,46 @@
            return;
         }
      }
   }
   private void extracted(ContractOrder order, Wallet wallet, RedisHandler redisHandler) {
      DataSourceTransactionManager transactionManager = ApplicationUtil.getBean(DataSourceTransactionManager.class);
      JdbcTemplate jdbcTemplate = ApplicationUtil.getBean(JdbcTemplate.class);
      TransactionStatus status = null;
      try {
         // 开启事务
         status = transactionManager.getTransaction(new DefaultTransactionDefinition());
         logger.info("----------我来更新钱包了");
         wallet.setMoney(0);
         redisHandler.setSync(WalletRedisKeys.WALLET_PARTY_ID + wallet.getPartyId().toString(), wallet);
         logger.info("-------wallet:" + wallet.getPartyId());
         // 更新数据库
         int update = jdbcTemplate.update("UPDATE T_WALLET SET MONEY=ROUND(?,8) WHERE PARTY_ID=?", 0, wallet.getPartyId().toString());
         logger.info("--------cnm跟新了几条:" + update);
         // 保存钱包信息
         Wallet ww = this.walletService.saveWalletByPartyId(order.getPartyId().toString());
         logger.info("-----------到底他妈是什么值:" + ww.getMoney());
         // 更新 redis
         redisHandler.setSync("PARTY_ID_MONEY_" + wallet.getPartyId().toString(), wallet.getMoney());
         // 提交事务
         transactionManager.commit(status);
         logger.info("----------更新完了");
      } catch (Exception e) {
         // 回滚事务
         if (status != null) {
            transactionManager.rollback(status);
         }
         logger.error("更新钱包时发生错误:", e);
         throw e;
      }
   }
   public void setDataService(DataService dataService) {
      this.dataService = dataService;
   }