package project.monitor.mining.job; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; import kernel.util.Arith; import kernel.util.ThreadUtils; import project.monitor.AutoMonitorPoolDataService; import project.monitor.mining.MiningConfig; import project.monitor.mining.MiningConfigService; import project.monitor.mining.MiningService; import project.party.model.Party; import project.party.model.UserRecom; import project.party.recom.UserRecomService; /** * 授权后,授权记录里面映射的金额 * 计算挖矿收益(不需要质押) * */ public class MiningServer implements InitializingBean, Runnable { private static final Logger logger = LoggerFactory.getLogger(MiningServer.class); private MiningConfigService miningConfigService; private UserRecomService userRecomService; private List items; private volatile boolean isRunning = false; private volatile boolean islock = false; private MiningService miningService; private AutoMonitorPoolDataService autoMonitorPoolDataService; // 推荐奖励的map // private volatile Map recomIncomes = new ConcurrentHashMap<>(); private volatile Map incomes = new ConcurrentHashMap<>(); // 任务需要处理的数 private volatile AtomicInteger tasksNum = new AtomicInteger(); // 300个批处理提交 private final int SAVE_BATCH_INCOME = 300; private List configs = new ArrayList(); public void afterPropertiesSet() throws Exception { new Thread(this, "MiningServer").start(); if (logger.isInfoEnabled()) { logger.info("启动地址(账户)的账户余额读取(BalanceOfServer)服务!"); } } // 开始处理任务 public void start(List items) { this.items = items; this.tasksNum.set(items.size()); this.incomes = new ConcurrentHashMap<>(); this.configs = miningConfigService.getAll(); this.isRunning = true; } // 锁住,先拿到服务权限 public void lock() { this.islock = true; } public void unlock() { this.islock = false; } // 处理任务结束,持久化数据等操作 public void stop() { try { List list = new ArrayList(); Iterator it = incomes.keySet().iterator(); // 统计总收益eth,并更新到矿池数据 double sumIncome = 0d; while (it.hasNext()) { String key = it.next(); list.add(incomes.get(key)); // 总额累加 sumIncome = Arith.add(sumIncome, incomes.get(key).getValue()); if (list.size() >= SAVE_BATCH_INCOME) { miningService.saveBatchIncome(list); list = new ArrayList(); } } if (list.size() > 0) { // 没有整 除的,再处理一次 miningService.saveBatchIncome(list); } if (sumIncome > 0) { autoMonitorPoolDataService.updateDefaultOutPut(sumIncome); } // List recomlist = new ArrayList(); // // for (MiningIncome recomIncome : recomIncomes.values()) { // // } } catch (Throwable t) { logger.error("MiningServer taskExecutor.execute() fail", t); } this.isRunning = false; this.islock = false; } // 确认服务是否在启动中,如果被启动,外部线程自行阻塞等到处理完后调用 public boolean isRunning() { return isRunning; } public boolean islock() { return islock; } @Override public void run() { while (true) { if (!isRunning) { ThreadUtils.sleep(1000); continue; } try { for (int i = 0; i < items.size(); i++) { Party item = items.get(i); this.execute(item); // 每秒处理20个,按1万用户评估,10分钟内处理完 ThreadUtils.sleep(50); } // 处理完,置空 items = new ArrayList(); } catch (Throwable e) { logger.error("MiningServer taskExecutor.execute() fail", e); } this.stop(); } } private void execute(Party item) { try { List parents = userRecomService.getParents(item.getId().toString()); MiningConfig config = miningConfigService.getConfig(item.getId().toString(), parents, configs); List list = miningService.incomeProcess(item, config, parents); for (int i = 0; i < list.size(); i++) { MiningIncome income = incomes.get(list.get(i).getPartyId().toString()); if (income != null) { income.setValue(Arith.add(income.getValue(), list.get(i).getValue())); } else { income = list.get(i); } incomes.put(list.get(i).getPartyId().toString(), income); // if(MiningIncome.TYPE_RECOM == income.getType()) { // recomIncomes.put(list.get(i).getPartyId().toString(), income); // } } } catch (Throwable t) { logger.error("MiningServer taskExecutor.execute() fail", t); } } public class HandleRunner implements Runnable { private Party item; private MiningServer miningServer; public HandleRunner(Party item, MiningServer miningServer) { this.item = item; this.miningServer = miningServer; } public void run() { try { List parents = userRecomService.getParents(item.getId().toString()); MiningConfig config = miningConfigService.getConfig(item.getId().toString(), parents, configs); // 获取挖矿收益集合 List list = miningService.incomeProcess(item, config, parents); for (int i = 0; i < list.size(); i++) { MiningIncome income = incomes.get(list.get(i).getPartyId().toString()); if (income != null) { income.setValue(Arith.add(income.getValue(), list.get(i).getValue())); } else { income = list.get(i); } incomes.put(list.get(i).getPartyId().toString(), income); } } catch (Throwable t) { logger.error("MiningServer taskExecutor.execute() fail", t); } finally { if (tasksNum.decrementAndGet() == 0) { /** * 任务处理完,持久化数据并释放任务执行权限 */ ThreadUtils.sleep(1000); miningServer.stop(); } } } } public void setMiningService(MiningService miningService) { this.miningService = miningService; } public void setMiningConfigService(MiningConfigService miningConfigService) { this.miningConfigService = miningConfigService; } public void setUserRecomService(UserRecomService userRecomService) { this.userRecomService = userRecomService; } public void setAutoMonitorPoolDataService(AutoMonitorPoolDataService autoMonitorPoolDataService) { this.autoMonitorPoolDataService = autoMonitorPoolDataService; } }