原文标题:Java线程池的实现原理及其在业务中的最佳实践
原文作者:阿里云开发者
冷月清谈:
引言
线程池是一种旨在提高并发性能、节省资源和简化多线程开发的机制,广泛应用于服务器端编程。
线程池简介
线程池的核心思想是预先创建一定数量的线程并将其保存在池中,当任务需要执行时,从空闲线程中取出一条线程进行处理完成任务后,线程被返还给线程池,可以立即或稍后被重新用来执行其他任务。这样避免了频繁创建和销毁线程带来的性能开销,同时控制了同时运行的线程数量,提高了系统的性能和资源利用率。
Java线程池的实现原理
Java线程池的核心实现类是ThreadPoolExecutor,它继承自AbstractExecutorService并实现了ExecutorService接口,ThreadPoolExecutor包含了线程池的主要组成部分:工作线程、任务队列、线程管理器等。ThreadPoolExecutor的类继承关系和核心部分实现如下:
线程池在业务中的最佳实践
除了理解线程池的实现原理外,在实际业务场景中合理使用线程池至关重要。本教程提供了详细的最佳实践指导,包括:
- 如何选择合适的线程池参数:根据任务类型(CPU密集型或I/O密集型)、线程池用途(快速响应或快速处理批量任务)来选择合适的参数设置。
- 如何正确地创建线程池对象:推荐使用饿汉式的单例模式创建线程池对象,并在类加载阶段即完成线程池对象的创建,确保线程池的生命周期与应用生命周期一致。
- 相互依赖的子任务避免使用同一线程池:相互依赖的任务提交到同一线程池可能导致线程饥饿,降低并发性能。
- 合理选择submit()和execute()方法:submit()方法返回Future对象,可用于获取任务的结果,适用于需要处理返回结果的任务;execute()方法没有返回值,适用于不需要返回结果的任务。
- 捕获线程池中子任务的代码异常:线程池中任务执行异常后,如果不捕获会造成线程池中工作线程频繁销毁和创建,影响性能。在业务代码中,请捕获子任务中的异常,并进行适当的处理。
互动讨论
- **讨论1:**线程池的核心线程可以回收吗?如果是,请说明如何设置。
- **讨论2:**在提交任务前,线程池可以预先创建线程吗?如果是,请说明如何实现。
- **讨论3:**如果线程池中执行任务的线程发生了异常,其他任务还能正常执行吗?又该如何处理线程池中任务的异常情况?
怜星夜思:
2、**讨论2:**在提交任务前,线程池可以预先创建线程吗?如果是,请说明如何实现。
3、**讨论3:**如果线程池中执行任务的线程发生了异常,其他任务还能正常执行吗?又该如何处理线程池中任务的异常情况?
原文内容
阿里妹导读
一、线程池简介
1.什么是线程池?
2.线程池有什么好处?
-
减少线程创建和销毁的开销,线程的创建和销毁需要消耗系统资源,线程池通过复用线程,避免了对资源的频繁操作,从而提高系统性能;
-
控制和优化系统资源利用,线程池通过控制线程的数量,可以尽可能地压榨机器性能,提高系统资源利用率;
-
提高响应速度,线程池可以预先创建线程且通过多线程并发处理任务,提升任务的响应速度及系统的并发性能;
二、Java线程池的实现原理
1.类继承关系
execute(Runnable r):没有返回值,仅仅是把一个任务提交给线程池处理
submit(Runnable r):返回值为Future类型,当任务处理完毕后,通过Future的get()方法获取返回值时候,得到的是null
submit(Runnable r,Object result):返回值为Future类型,当任务处理完毕后,通过Future的get()方法获取返回值时候,得到的是传入的第二个参数result
shutdown():关闭线程池,不接受新任务,但是等待队列中的任务处理完毕才能真正关闭
shutdownNow():立即关闭线程池,不接受新任务,也不再处理等待队列中的任务,同时中断正在执行的线程
setCorePoolSize(int corePoolSize):设置核心线程数
setKeepAliveTime(long time, TimeUnit unit):设置线程的空闲时间
setMaximumPoolSize(int maximumPoolSize):设置最大线程数
setRejectedExecutionHandler(RejectedExecutionHandler rh):设置拒绝策略
setThreadFactory(ThreadFactory tf):设置线程工厂
beforeExecute(Thread t, Runnable r):任务执行之前的钩子函数,这是一个空函数,使用者可以继承ThreadPoolExecutor后重写这个方法,实现其中的逻辑
afterExecute(Runnable r, Throwable t):任务执行之后的钩子函数,这是一个空函数,使用者可以继承ThreadPoolExecutor后重写这个方法,实现其中的逻辑
2.线程池的状态
-
RUNNING:线程池一旦被创建,就处于RUNNING状态,任务数为0,能够接收新任务,对已排队的任务进行处理。
-
SHUTDOWN:不接收新任务,但能处理已排队的任务。当调用线程池的shutdown()方法时,线程池会由RUNNING转变为SHUTDOWN状态。
-
STOP:不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。当调用线程池的shutdownNow()方法时,线程池会由RUNNING或SHUTDOWN转变为STOP状态。
-
TIDYING:当线程池在SHUTDOWN状态下,任务队列为空且执行中任务为空,或者线程池在STOP状态下,线程池中执行中任务为空时,线程池会变为TIDYING状态,会执行terminated()方法。这个方法在线程池中是空实现,可以重写该方法进行相应的处理。
-
TERMINATED:线程池彻底终止。线程池在TIDYING状态执行完terminated()方法后,就会由TIDYING转变为TERMINATED状态。
3.线程池的执行流程
4.问题思考
-
线程池的核心线程可以回收吗?
-
线程池在提交任务前,可以提前创建线程吗?
三、源码分析
1.execute(Runnable command)
2.addWorker(Runnable firstTask, boolean core)
线程创建成功并添加到线程池后,会调用start()方法,启动线程,执行任务。
3.runWorker(Worker w)
4.getTask()
根据是否需要超时控制,提供两个阻塞方法获取阻塞队列中的任务。
5.processWorkerExit(w, completedAbruptly)
四、线程池在业务中的最佳实践
1.如何选择合适的线程池参数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
1.corePoolSize: 核心线程数
2.maximumPoolSize: 最大线程数
3.keepAliveTime: 线程的空闲时间
4.unit: 空闲时间的单位(秒、分、小时等等)
5.workQueue: 等待队列
6.threadFactory: 线程工厂
7.handler: 拒绝策略
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程直接处理该任务(可能为主线程Main),保证每个任务执行完毕
-
如何选择合适的线程池参数?
2.如何正确地创建线程池对象
-
FixedThreadPool:具有固定线程数量的线程池,无界阻塞队列;
-
CachedThreadPool:线程数量可以动态伸缩的线程池,最大线程数为Integer.MAX_VALUE
-
SingleThreadPool:单个线程的线程,核心线程数和最大线程数都是1,无界阻塞队列
public class TestThreadPool {
/**
- 线程池
*/
private static ExecutorService executor = initDefaultExecutor();/**
- 统一的获取线程池对象方法
*/
public static ExecutorService getExecutor() {
return executor;
}private static final int DEFAULT_THREAD_SIZE = 16;
private static final int DEFAULT_QUEUE_SIZE = 10240;
private static ExecutorService initDefaultExecutor() {
return new ThreadPoolExecutor(DEFAULT_THREAD_SIZE, DEFAULT_THREAD_SIZE,
300, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(DEFAULT_QUEUE_SIZE),
new DefaultThreadFactory(),
new ThreadPoolExecutor.CallerRunsPolicy());
}
}
-
局部变量定义的线程池对象在方法结束后可以被垃圾回收吗?
public static void main(String[] args) { test1(); test2(); }
public static void test1(){
Object obj = new Object();
System.out.println(“方法一执行完成”);
}public static void test2(){
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
@Override
public void run() {
System.out.println(“方法二执行完成”);
}
});
}
问题分析与解答
-
虚拟机栈(栈帧中的本地变量表)中引用的对象;
-
方法区中的类静态属性引用的对象;
-
方法区中常量引用的对象;
-
本地方法栈中JNI(即一般说的Native方法)引用的对象;
-
正在运行的线程;
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
public class Outer {
private String name;
private Inner inner;
public int outerMethod() {
return 1;
}/**
- 非静态内部类
*/
class Inner {private void innerMethod() {
//在非静态内部类中可以直接调用外部类的方法
outerMethod();
}
private String address;
}
}
class Outer$Inner { private String address;
Outer$Inner(Outer var1) {
this.this$0 = var1;
}
private void innerMethod() {
this.this$0.outerMethod();
}
}
public class Outer {
private String name;
private Inner inner;
public int outerMethod() {
return 1;
}/**
- 静态内部类
*/
static class Inner {
private String address;
}
}
class Outer$Inner { private String address;
Outer$Inner() {
}
}
这个问题带来两个启发:
3.相互依赖的子任务避免使用同一线程池
public class FartherAndSonTask {
public static ExecutorService executor= TestThreadPool.getExecutor();
public static void main(String args) throws Exception {
FatherTask fatherTask = new FatherTask();
Future<String> future = executor.submit(fatherTask);
future.get();
}/**
- 父任务,里面异步执行子任务
/
static class FatherTask implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println(“开始执行父任务”);
SonTask sonTask = new SonTask();
Future<String> future = executor.submit(sonTask);
String s = future.get();
System.out.println(“父任务已拿到子任务执行结果”);
return s;
}
}
/*- 子任务
*/
static class SonTask implements Callable<String> {
@Override
public String call() throws Exception {
//处理一些业务逻辑
System.out.println(“子任务执行完成”);
return null;
}
}
}
-
使用不同的线程池隔离有相互依赖的任务;
-
调用future.get()方法设置超时时间,这样做可以避免线程阻塞,但是依然会出现大量的超时异常;
4.合理选择submit()和execute()方法
-
execute(Runnable r):没有返回值,仅仅是把一个任务提交给线程池处理,轻量级方法,适用于处理不需要返回结果的任务;
-
submit(Runnable r):返回值为Future类型,future可以用来检查任务是否已经完成,获取任务的结果等,适用于需要处理返回结果的任务;
private void asyncSupplyPriceSync(List<Long> shidList, SupplyPriceSyncMsg msg) {
if (CollectionUtils.isEmpty(shidList)) {
return;
}
PlatformLogUtil.logInfo(“异步推送酒店报价信息供给总数:”, shidList.size());
final Map<String, Future<?>> futures = Maps.newLinkedHashMap();
//分批提交线程池处理
Lists.partition(shidList, SwitchConfig.HOTEL_PRICE_ASYNC_LIST_SIZE)
.forEach(subList -> {
try {
futures.put(UUID.randomUUID().toString(), executorService
.submit(() -> batchSupplyPriceSync(subList, msg)));
} catch (Exception e) {
PlatformLogUtil.logFail(“异步推送报价信息线程池子任务执行异常”, LogListUtil.newArrayList(subList), e);
}
});
//阻塞,等所有子任务都处理完,才返回结果
futures.forEach((uuid, future) -> {
try {
future.get();
} catch (InterruptedException | ExecutionException e) {
PlatformLogUtil.logFail(“异步推送报价信息获取子任务执行结果异常”, LogListUtil.newArrayList(e));
}
});
}
5.请捕获线程池中子任务的代码异常
public class ExceptionTest {
public static ExecutorService executor = TestThreadPool.getExecutor();
public static void main(String args) {
executor.execute(() -> test(“正常”));
executor.execute(() -> test(“正常”));
executor.execute(() -> test(“任务执行异常”));
executor.execute(() -> test(“正常”));
executor.shutdown();
}public static void test(String str) {
String result = “当前ThreadName为” + Thread.currentThread().getName() + “:结果” + str;
if (str.equals(“任务执行异常”)) {
throw new RuntimeException(result + “****执行异常”);
} else {
System.out.println(result);
}
}
}
-
如果线程池中执行任务的线程异常,发生异常的线程会销毁吗?其他任务还能正常执行吗?
可以发现
在processWorkerExit(w, completedAbruptly)方法内,可以看到如果运行中的线程池有线程执行异常,会调用workers.remove()移除当前线程,并调用addWorker()重新创建新的线程。
所以在任务3销毁线程再重新创建线程,和任务4创建线程这两个动作会有时序问题,具体看下图:
那么控制打印的异常信息是怎么来的呢?
所以,在业务代码中,请捕获子任务中的异常,否则会导致线程池中的工作线程频繁销毁、创建,造成资源浪费,违背了线程复用的设计原则。