1
zj
2024-06-13 a4662cc65a02f258062bf6cc392ceb1017db9292
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package com.yami.trading.common.config;
 
import java.lang.reflect.Constructor;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
 
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.aop.interceptor.SimpleAsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
import com.yami.trading.common.util.ApplicationUtil;
 
/**
 * @author JORGE
 * @description 线程池
 */
@SuppressWarnings({"unchecked","rawtypes"})
public class ThreadPool {
    /**
     * 默认应用线程池
     */
    private static ThreadPoolTaskExecutor defaultThreadPool;
    
    /**
     * 允许核心线程超时后也关闭
     */
    private static final Boolean ALLOW_CORETHREAD_TIMEOUT=false;
    
    /**
     * 新创建线程的名称前缀
     */
    private static final String DEFAULT_THREAD_NAME_PREFIX="Application-ThreadPool-";
    
    /**
     * 丢弃新任务且抛出RejectedExecutionException异常(默认值)
     */
    public static final Class<? extends RejectedExecutionHandler> DROP_THROWS=ThreadPoolExecutor.AbortPolicy.class;
    
    /**
     * 丢弃新任务且不抛出异常
     */
    public static final Class<? extends RejectedExecutionHandler> DROP_NOTHROWS=ThreadPoolExecutor.DiscardPolicy.class;
    
    /**
     * 使用调用者线程处理队列溢出任务
     */
    public static final Class<? extends RejectedExecutionHandler> CALLER_RUNS=ThreadPoolExecutor.CallerRunsPolicy.class;
    
    /**
     * 丢弃队列中老任务(最早,最前面)且不抛出异常
     */
    public static final Class<? extends RejectedExecutionHandler> DROP_OLDS_NOTHROWS=ThreadPoolExecutor.DiscardOldestPolicy.class;
    
    /**
     * 获取Spring上下文中第一个兼容的线程池
     * @param threadPoolType 兼容的线程池类型
     * @return 线程池
     */
    public static <R extends Executor> R getSpringThreadPool(Class<R>... threadPoolType){
        Class<R> poolType=(null==threadPoolType||0==threadPoolType.length)?
                (Class<R>)ThreadPoolTaskExecutor.class:threadPoolType[0];
        Map<String, R> executors=ApplicationUtil.getBeansOfType(poolType);
        if(null==executors||0==executors.size()) return null;
        return executors.entrySet().iterator().next().getValue();
    }
    
    /**
     * 获取应用线程池
     * @return 应用线程池
     */
    public static final <R extends Executor> R getApplicationThreadPool(){
        if(null!=defaultThreadPool) return (R)defaultThreadPool;
        
        int maxRevise=ApplicationUtil.getProperty("thread.pool.max.revise",Integer.class,0);
        double maxFactor=ApplicationUtil.getProperty("thread.pool.max.factor",Double.class,2.0);
        int queueCapacity=ApplicationUtil.getProperty("thread.pool.queue.size",Integer.class,200);
        int keepAliveSeconds=ApplicationUtil.getProperty("thread.pool.max.idle",Integer.class,180);
        
        int corePoolSize=Runtime.getRuntime().availableProcessors();
        int maxPoolSize=(int)(maxFactor * corePoolSize + maxRevise);
        return (R)(defaultThreadPool=getTaskExecutor(corePoolSize,maxPoolSize,queueCapacity,keepAliveSeconds,DEFAULT_THREAD_NAME_PREFIX,ALLOW_CORETHREAD_TIMEOUT,CALLER_RUNS));
    }
    
    /**
     * 获取异步调用异常处理器
     * @return AsyncUncaughtExceptionHandler
     */
    public static AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
    
    /**
     * 获取Sleuth中的LazyTraceExecutor包装器以支持自定义线程池中创建异步Span
     * Sleuth默认的线程池包装器是org.springframework.cloud.sleuth.instrument.async.LazyTraceThreadPoolTaskExecutor
     * @param executor 线程池执行器
     * @return 包装后的线程池执行器
     */
    public static Executor getLazyTraceExecutor(Executor executor){
        try {
            Class executorType=Class.forName("org.springframework.cloud.sleuth.instrument.async.LazyTraceExecutor");
            Constructor<Executor> cons=executorType.getDeclaredConstructor(BeanFactory.class,Executor.class);
            return cons.newInstance(ApplicationUtil.getApplicationContext(),executor);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
    
    /**
     * 获取Sleuth中的LazyTraceThreadPoolTaskExecutor包装器以支持自定义线程池中创建异步Span
     * Sleuth默认的线程池包装器是org.springframework.cloud.sleuth.instrument.async.LazyTraceThreadPoolTaskExecutor
     * @param executor 线程池执行器
     * @return 包装后的线程池执行器
     */
    public static ThreadPoolTaskExecutor getLazyTraceExecutor(ThreadPoolTaskExecutor executor){
        try {
            Class executorType=Class.forName("org.springframework.cloud.sleuth.instrument.async.LazyTraceThreadPoolTaskExecutor");
            Constructor<ThreadPoolTaskExecutor> cons=executorType.getDeclaredConstructor(BeanFactory.class,ThreadPoolTaskExecutor.class);
            return cons.newInstance(ApplicationUtil.getApplicationContext(),executor);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
 
    /**
     * 获取固定容量大小且无缓冲队列的线程池,池中无空闲线程后使用调用方线程处理任务
     * @param poolSize 池中固定线程数量
     * @param threadNamePrefixs 线程名称前缀
     * @return
     */
    public static ThreadPoolTaskExecutor getFixedTaskExecutor(Integer poolSize,String... threadNamePrefixs){
        String threadNamePrefix=(null==threadNamePrefixs||0==threadNamePrefixs.length)?DEFAULT_THREAD_NAME_PREFIX:threadNamePrefixs[0];
        if(null==threadNamePrefixs||0==threadNamePrefixs.length) {
            threadNamePrefix=DEFAULT_THREAD_NAME_PREFIX;
        }else {
            threadNamePrefix=threadNamePrefixs[0];
            if(null==threadNamePrefix||(threadNamePrefix=threadNamePrefix.trim()).isEmpty()) {
                threadNamePrefix=DEFAULT_THREAD_NAME_PREFIX;
            }
        }
        return getTaskExecutor(poolSize,poolSize,0,0,threadNamePrefix,ALLOW_CORETHREAD_TIMEOUT,CALLER_RUNS);
    }
    
    /**
     * 获取固定容量大小且具有缓冲队列的线程池,池中无空闲线程后转入缓冲队列,队列溢出后使用调用方线程处理任务
     * @param poolSize 池中固定线程数量
     * @param queueSize 任务队列尺寸
     * @param threadNamePrefixs 线程名称前缀
     * @return 线程池
     */
    public static ThreadPoolTaskExecutor getFixedTaskExecutor(Integer poolSize,Integer queueSize,String... threadNamePrefixs){
        String threadNamePrefix=(null==threadNamePrefixs||0==threadNamePrefixs.length)?DEFAULT_THREAD_NAME_PREFIX:threadNamePrefixs[0];
        if(null==threadNamePrefixs||0==threadNamePrefixs.length) {
            threadNamePrefix=DEFAULT_THREAD_NAME_PREFIX;
        }else {
            threadNamePrefix=threadNamePrefixs[0];
            if(null==threadNamePrefix||(threadNamePrefix=threadNamePrefix.trim()).isEmpty()) {
                threadNamePrefix=DEFAULT_THREAD_NAME_PREFIX;
            }
        }
        return getTaskExecutor(poolSize,poolSize,queueSize,0,threadNamePrefix,ALLOW_CORETHREAD_TIMEOUT,CALLER_RUNS);
    }
    
    /**
     * 获取自定义线程池
     * @param coreSize 池中核心线程数量
     * @param maxSize 池中最大线程数量
     * @param queueSize 任务队列尺寸
     * @param idleTime 线程空闲时间
     * @param threadNamePrefix 创建新线程的名称前缀
     * @param allowCoreTimeout 允许核心数量的线程超时关闭
     * @param rejectedPolicy 任务队列溢出处理策略
     * @return
     */
    public static ThreadPoolTaskExecutor getTaskExecutor(Integer coreSize,Integer maxSize,Integer queueSize,Integer idleTime,String threadNamePrefix,Boolean allowCoreTimeout,Class<? extends RejectedExecutionHandler> rejectedPolicy){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(coreSize);
        executor.setMaxPoolSize(maxSize);
        executor.setQueueCapacity(queueSize);
        executor.setKeepAliveSeconds(idleTime);
        executor.setThreadNamePrefix(threadNamePrefix); 
        executor.setAllowCoreThreadTimeOut(allowCoreTimeout);
        
        try {
            executor.setRejectedExecutionHandler(rejectedPolicy.newInstance());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        
        executor.afterPropertiesSet();
        Runtime.getRuntime().addShutdownHook(new Thread(()->executor.shutdown()));
        
        return executor;
    }
}