package com.yami.trading.common.executor; import java.util.concurrent.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; /* * 一个自定义的阻塞式线程池. * @author caster * **/ public class BlockingThreadPoolExecutor extends ThreadPoolExecutor { protected static org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BlockingThreadPoolExecutor.class); private final ReentrantLock taskLock = new ReentrantLock(); private final Condition unpaused = taskLock.newCondition(); private final Condition submitCnd = taskLock.newCondition(); private final int maxTaskCount; private volatile int currentTaskCount; // 正在执行的任务数量 private volatile int activeTaskCount; // 已提交待执行的任务数量+正在执行的任务数量 public BlockingThreadPoolExecutor(int fixedThreadPoolSize) { this(fixedThreadPoolSize, fixedThreadPoolSize, 60, TimeUnit.SECONDS); } public BlockingThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit) { // 经测试,好像是:正在执行的线程数量+等待队列的数量(容量还不能设置为0)=maximumPoolSize super(corePoolSize, maximumPoolSize, keepAliveTime, unit, new ArrayBlockingQueue(1), new AbortPolicy()); this.maxTaskCount = maximumPoolSize + 1; //SynchronousQueue } @Override public Future submit(Runnable task) { taskLock.lock(); try { // 提交新任务时,如果发现线程池中所有任务数量(正在处理的+队列中待处理的)超过了允许的最大数量,则堵塞提交 while (maxTaskCount <= activeTaskCount) { try { logger.warn("[BlockingThreadPoolExecutor submit] 线程池满了,无法执行当前任务, maxTaskCount:" + maxTaskCount + ", activeTaskCount:" + activeTaskCount); submitCnd.await(); } catch (InterruptedException e) { throw new RuntimeException("线程池被中断..."); } } } finally { taskLock.unlock(); } // 内部提交请求,并且增加有效任务数量 Future submitResult = super.submit(task); activeTaskCount++; logger.info("[BlockingThreadPoolExecutor submit] 线程池添加了一个任务, maxTaskCount:" + maxTaskCount + ", activeTaskCount:" + activeTaskCount); return submitResult; } @Override public Future submit(Runnable task, T result) { taskLock.lock(); try { // 提交新任务时,如果发现线程池中所有任务数量(正在处理的+队列中待处理的)超过了允许的最大数量,则堵塞提交 while (maxTaskCount <= activeTaskCount) { try { logger.warn("[BlockingThreadPoolExecutor submit] 线程池满了,无法执行当前任务, maxTaskCount:" + maxTaskCount + ", activeTaskCount:" + activeTaskCount); submitCnd.await(); } catch (InterruptedException e) { throw new RuntimeException("线程池被中断..."); } } } finally { taskLock.unlock(); } Future submitResult = super.submit(task, result); activeTaskCount++; logger.info("[BlockingThreadPoolExecutor submit] 线程池添加了一个任务, maxTaskCount:" + maxTaskCount + ", activeTaskCount:" + activeTaskCount); return submitResult; } @Override public Future submit(Callable task) { taskLock.lock(); try { // 提交新任务时,如果发现线程池中所有任务数量(正在处理的+队列中待处理的)超过了允许的最大数量,则堵塞提交 while (maxTaskCount <= activeTaskCount) { try { logger.warn("[BlockingThreadPoolExecutor submit] 线程池满了,无法执行当前任务, maxTaskCount:" + maxTaskCount + ", activeTaskCount:" + activeTaskCount); submitCnd.await(); } catch (InterruptedException e) { throw new RuntimeException("线程池被中断..."); } } } finally { taskLock.unlock(); } Future submitResult = super.submit(task); activeTaskCount++; logger.info("[BlockingThreadPoolExecutor submit] 线程池添加了一个任务, maxTaskCount:" + maxTaskCount + ", activeTaskCount:" + activeTaskCount); return submitResult; } /** * 选中一个线程开始处理当前业务的前置调用. * * @param t * @param r */ @Override protected void beforeExecute(Thread t, Runnable r) { super.beforeExecute(t, r); taskLock.lock(); try { // 正式处理的任务数量如果达到了允许的活跃线程池上限,则阻塞等待 // 正常情况下不应该有此种情况出现,监控下日志 while (maxTaskCount <= currentTaskCount + 1) { try { logger.warn("[BlockingThreadPoolExecutor beforeExecute] =======> maxTaskCount:{}, currentTaskCount:{}", maxTaskCount, currentTaskCount); unpaused.await(); } catch (InterruptedException e) { t.interrupt(); } } currentTaskCount++; logger.info("[BlockingThreadPoolExecutor beforeExecute] 有一个线程池任务开始正式执行, maxTaskCount:" + maxTaskCount + ", activeTaskCount:" + activeTaskCount + ", currentTaskCount:" + currentTaskCount); } finally { taskLock.unlock(); } } /** * 一个线程完成业务处理后执行. * * @param r * @param t */ @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); taskLock.lock(); try { // 正在执行的业务处理数减一 currentTaskCount--; // 总的业务处理数量减一 activeTaskCount--; logger.info("[BlockingThreadPoolExecutor afterExecute] 完成了一个线程池任务, currentTaskCount:" + currentTaskCount + ", activeTaskCount:" + activeTaskCount); // unpaused 可以考虑是否有必要 unpaused.signalAll(); // 有一个线程完成了业务处理,代表有空闲线程可以接受新的提交请求 submitCnd.signalAll(); } finally { taskLock.unlock(); } } /** * 提前当前时刻正在执行中的任务数量(不包括处于等待队列的任务). * * @return */ public int getRunningTaskCount() { return this.currentTaskCount; } /** * 获取正在执行的任务 + 队列中待执行的任务总数. * * @return */ public int getUnFinishedTaskCount() { taskLock.lock(); try { int count = this.currentTaskCount; count = count + this.getQueue().size(); return count; } finally { taskLock.unlock(); } } }