ThreadPool
深入理解Java之线程池
什么是线程池?
诸如web服务器、数据库服务器、文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务。构建服务器应用程序的一个过于简单的模型是:每当一个请求到达就创建一个新的服务对象,然后在新的服务对象中为请求服务。但当有大量请求并发访问时,服务器不断的创建和销毁对象的开销很大。所以提高服务器效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁,这样就引入了“池”的概念,“池”的概念使得人们可以定制一定量的资源,然后对这些资源进行复用,而不是频繁的创建和销毁。
线程池是预先创建线程的一种技术。线程池在还没有任务到来之前,创建一定数量的线程,放入空闲队列中。这些线程都是处于睡眠状态,即均为启动,不消耗CPU,而只是占用较小的内存空间。当请求到来之后,缓冲池给这次请求分配一个空闲线程,把请求传入此线程中运行,进行处理。当预先创建的线程都处于运行状态,即预制线程不够,线程池可以自由创建一定数量的新线程,用于处理更多的请求。当系统比较闲的时候,也可以通过移除一部分一直处于停用状态的线程。
线程池的注意事项
虽然线程池是构建多线程应用程序的强大机制,但使用它并不是没有风险的。在使用线程池时需注意线程池大小与性能的关系,注意并发风险、死锁、资源不足和线程泄漏等问题。
(1)线程池大小。多线程应用并非线程越多越好,需要根据系统运行的软硬件环境以及应用本身的特点决定线程池的大小。一般来说,如果代码结构合理的话,线程数目与CPU 数量相适合即可。如果线程运行时可能出现阻塞现象,可相应增加池的大小;如有必要可采用自适应算法来动态调整线程池的大小,以提高CPU 的有效利用率和系统的整体性能。
(2)并发错误。多线程应用要特别注意并发错误,要从逻辑上保证程序的正确性,注意避免死锁现象的发生。
(3)线程泄漏。这是线程池应用中一个严重的问题,当任务执行完毕而线程没能返回池中就会发生线程泄漏现象。
简单线程池的设计
一个典型的线程池,应该包括如下几个部分:
1、线程池管理器(ThreadPool),用于启动、停用,管理线程池
2、工作线程(WorkThread),线程池中的线程
3、请求接口(WorkRequest),创建请求对象,以供工作线程调度任务的执行
4、请求队列(RequestQueue),用于存放和提取请求
5、结果队列(ResultQueue),用于存储请求执行后返回的结果
线程池管理器,通过添加请求的方法(putRequest)向请求队列(RequestQueue)添加请求,这些请求事先需要实现请求接口,即传递工作函数、参数、结果处理函数、以及异常处理函数。之后初始化一定数量的工作线程,这些线程通过轮询的方式不断查看请求队列(RequestQueue),只要有请求存在,则会提取出请求,进行执行。然后,线程池管理器调用方法(poll)查看结果队列(resultQueue)是否有值,如果有值,则取出,调用结果处理函数执行。通过以上讲述,不难发现,这个系统的核心资源在于请求队列和结果队列,工作线程通过轮询requestQueue获得人物,主线程通过查看结果队列,获得执行结果。因此,对这个队列的设计,要实现线程同步,以及一定阻塞和超时机制的设计,以防止因为不断轮询而导致的过多cpu开销。在本文中,将会用python语言实现,python的Queue,就是很好的实现了对线程同步机制。
java线程池与五种常用线程池策略使用与解析
一.线程池
关于为什么要使用线程池久不赘述了,首先看一下Java中作为线程池Executor底层实现类的ThredPoolExecutor的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
...
}
其中各个参数含义如下:
corePoolSize- 池中所保存的线程数,包括空闲线程。需要注意的是在初创建线程池时线程不会立即启动,直到有任务提交才开始启动线程并逐渐时线程数目达到corePoolSize。若想一开始就创建所有核心线程需调用prestartAllCoreThreads方法。
maximumPoolSize-池中允许的最大线程数。需要注意的是当核心线程满且阻塞队列也满时才会判断当前线程数是否小于最大线程数,并决定是否创建新线程。
keepAliveTime - 当线程数大于核心时,多于的空闲线程最多存活时间
unit - keepAliveTime 参数的时间单位。
workQueue - 当线程数目超过核心线程数时用于保存任务的队列。主要有3种类型的BlockingQueue可供选择:无界队列,有界队列和同步移交。将在下文中详细阐述。从参数中可以看到,此队列仅保存实现Runnable接口的任务。
threadFactory - 执行程序创建新线程时使用的工厂。
handler - 阻塞队列已满且线程数达到最大值时所采取的饱和策略。java默认提供了4种饱和策略的实现方式:中止、抛弃、抛弃最旧的、调用者运行。将在下文中详细阐述。
二.可选择的阻塞队列BlockingQueue详解
首先看一下新任务进入时线程池的执行策略:
如果运行的线程少于corePoolSize,则 Executor始终首选添加新的线程,而不进行排队。(如果当前运行的线程小于corePoolSize,则任务根本不会存入queue中,而是直接运行)
如果运行的线程大于等于 corePoolSize,则 Executor始终首选将请求加入队列,而不添加新的线程。
如果无法将请求加入队列,则创建新的线程,除非创建此线程超出 maximumPoolSize,在这种情况下,任务将被拒绝。
主要有3种类型的BlockingQueue:
2.1 无界队列
队列大小无限制,常用的为无界的LinkedBlockingQueue,使用该队列做为阻塞队列时要尤其当心,当任务耗时较长时可能会导致大量新任务在队列中堆积最终导致OOM。最近工作中就遇到因为采用LinkedBlockingQueue作为阻塞队列,部分任务耗时80s+且不停有新任务进来,导致cpu和内存飙升服务器挂掉。
2.2 有界队列
常用的有两类,一类是遵循FIFO原则的队列如ArrayBlockingQueue与有界的LinkedBlockingQueue,另一类是优先级队列如PriorityBlockingQueue。PriorityBlockingQueue中的优先级由任务的Comparator决定。
使用有界队列时队列大小需和线程池大小互相配合,线程池较小有界队列较大时可减少内存消耗,降低cpu使用率和上下文切换,但是可能会限制系统吞吐量。
2.3 同步移交
如果不希望任务在队列中等待而是希望将任务直接移交给工作线程,可使用SynchronousQueue作为等待队列。SynchronousQueue不是一个真正的队列,而是一种线程之间移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。只有在使用无界线程池或者有饱和策略时才建议使用该队列。
2.4 几种BlockingQueue的具体实现原理
关于上述几种BlockingQueue的具体实现原理与分析将在下篇博文中详细阐述。
三.可选择的饱和策略RejectedExecutionHandler详解
JDK主要提供了4种饱和策略供选择。4种策略都做为静态内部类在ThreadPoolExcutor中进行实现。
3.1 AbortPolicy中止策略
该策略是默认饱和策略。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
使用该策略时在饱和时会抛出RejectedExecutionException(继承自RuntimeException),调用者可捕获该异常自行处理。
3.2 DiscardPolicy抛弃策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
如代码所示,不做任何处理直接抛弃任务
3.3 DiscardOldestPolicy抛弃旧任务策略
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
如代码,先将阻塞队列中的头元素出队抛弃,再尝试提交任务。如果此时阻塞队列使用PriorityBlockingQueue优先级队列,将会导致优先级最高的任务被抛弃,因此不建议将该种策略配合优先级队列使用。
3.4 CallerRunsPolicy调用者运行
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
既不抛弃任务也不抛出异常,直接运行任务的run方法,换言之将任务回退给调用者来直接运行。使用该策略时线程池饱和后将由调用线程池的主线程自己来执行任务,因此在执行任务的这段时间里主线程无法再提交新任务,从而使线程池中工作线程有时间将正在处理的任务处理完成。
四.java提供的四种常用线程池解析
在JDK帮助文档中,有如此一段话:
强烈建议程序员使用较为方便的Executors工厂方法Executors.newCachedThreadPool()(无界线程池,可以进行自动线程回收)、Executors.newFixedThreadPool(int)(固定大小线程池)
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。Executors.newSingleThreadExecutor()(单个后台线程)它们均为大多数使用场景预定义了设置。
详细介绍一下上述四种线程池。
4.1 newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
在newCachedThreadPool中如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
初看该构造函数时我有这样的疑惑:核心线程池为0,那按照前面所讲的线程池策略新任务来临时无法进入核心线程池,只能进入 SynchronousQueue中进行等待,而SynchronousQueue的大小为1,那岂不是第一个任务到达时只能等待在队列中,直到第二个任务到达发现无法进入队列才能创建第一个线程?
这个问题的答案在上面讲SynchronousQueue时其实已经给出了,要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接收这个元素。因此即便SynchronousQueue一开始为空且大小为1,第一个任务也无法放入其中,因为没有线程在等待从SynchronousQueue中取走元素。因此第一个任务到达时便会创建一个新线程执行该任务。
这里引申出一个小技巧:有时我们可能希望线程池在没有任务的情况下销毁所有的线程,既设置线程池核心大小为0,但又不想使用SynchronousQueue而是想使用有界的等待队列。显然,不进行任何特殊设置的话这样的用法会发生奇怪的行为:直到等待队列被填满才会有新线程被创建,任务才开始执行。这并不是我们希望看到的,此时可通过allowCoreThreadTimeOut使等待队列中的元素出队被调用执行,详细原理和使用将会在后续博客中阐述。
4.2 newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
看代码一目了然了,使用固定大小的线程池并使用无限大的队列
4.3 newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
在来看看ScheduledThreadPoolExecutor()的构造函数
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
ScheduledThreadPoolExecutor的父类即ThreadPoolExecutor,因此这里各参数含义和上面一样。值得关心的是DelayedWorkQueue这个阻塞对列,在上面没有介绍,它作为静态内部类就在ScheduledThreadPoolExecutor中进行了实现。具体分析讲会在后续博客中给出,在这里只进行简单说明:DelayedWorkQueue是一个无界队列,它能按一定的顺序对工作队列中的元素进行排列。因此这里设置的最大线程数 Integer.MAX_VALUE没有任何意义。关于ScheduledThreadPoolExecutor的具体使用将会在后续quartz的周期性任务实现原理中进行进一步分析。
4.4 newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
首先new了一个线程数目为1的ScheduledThreadPoolExecutor,再把该对象传入DelegatedScheduledExecutorService中,看看DelegatedScheduledExecutorService的实现代码:
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
在看看它的父类
DelegatedExecutorService(ExecutorService executor) { e = executor; }
其实就是使用装饰模式增强了ScheduledExecutorService(1)的功能,不仅确保只有一个线程顺序执行任务,也保证线程意外终止后会重新创建一个线程继续执行任务。具体实现原理会在后续博客中讲解。
4.5 newWorkStealingPool创建一个拥有多个任务队列(以便减少连接数)的线程池。
这是jdk1.8中新增加的一种线程池实现,先看一下它的无参实现
public static ExecutorService newWorkStealingPool() {
return new ForkJoinPool
(Runtime.getRuntime().availableProcessors(),
ForkJoinPool.defaultForkJoinWorkerThreadFactory,
null, true);
}
返回的ForkJoinPool从jdk1.7开始引进,个人感觉类似于mapreduce的思想。这个线程池较为特殊,将在后续博客中给出详细的使用说明和原理。
在前面的文章中,我们使用线程的时候就去创建一个线程,这样实现起来非常简便,但是就会有一个问题:
如果并发的线程数量很多,并且每个线程都是执行一个时间很短的任务就结束了,这样频繁创建线程就会大大降低系统的效率,因为频繁创建线程和销毁线程需要时间。
那么有没有一种办法使得线程可以复用,就是执行完一个任务,并不被销毁,而是可以继续执行其他的任务?
在Java中可以通过线程池来达到这样的效果。今天我们就来详细讲解一下Java的线程池,首先我们从最核心的ThreadPoolExecutor类中的方法讲起,然后再讲述它的实现原理,接着给出了它的使用示例,最后讨论了一下如何合理配置线程池的大小。
以下是本文的目录大纲:
一.Java中的ThreadPoolExecutor类
二.深入剖析线程池实现原理
三.使用示例
四.如何合理配置线程池的大小
若有不正之处请多多谅解,并欢迎批评指正。
请尊重作者劳动成果,转载请标明原文链接:
一.Java中的ThreadPoolExecutor类
java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。下面我们来看一下ThreadPoolExecutor类的具体实现源码。
在ThreadPoolExecutor类中提供了四个构造方法:
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
}
从上面的代码可以得知,ThreadPoolExecutor继承了AbstractExecutorService类,并提供了四个构造器,事实上,通过观察每个构造器的源码具体实现,发现前面三个构造器都是调用的第四个构造器进行的初始化工作。
Java类库中提供的线程池简介:
java提供的线程池更加强大,相信理解线程池的工作原理,看类库中的线程池就不会感到陌生了。
下面解释下一下构造器中各个参数的含义:
· corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;
· maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;
· keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;
· unit:参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:
TimeUnit.DAYS; //天
TimeUnit.HOURS; //小时
TimeUnit.MINUTES; //分钟
TimeUnit.SECONDS; //秒
TimeUnit.MILLISECONDS; //毫秒
TimeUnit.MICROSECONDS; //微妙
TimeUnit.NANOSECONDS; //纳秒
· workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;
ArrayBlockingQueue和PriorityBlockingQueue使用较少,一般使用LinkedBlockingQueue和Synchronous。线程池的排队策略与BlockingQueue有关。
· threadFactory:线程工厂,主要用来创建线程;
· handler:表示当拒绝处理任务时的策略,有以下四种取值:
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务
具体参数的配置与线程池的关系将在下一节讲述。
从上面给出的ThreadPoolExecutor类的代码可以知道,ThreadPoolExecutor继承了AbstractExecutorService,我们来看一下AbstractExecutorService的实现:
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}
AbstractExecutorService是一个抽象类,它实现了ExecutorService接口。
我们接着看ExecutorService接口的实现:
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
而ExecutorService又是继承了Executor接口,我们看一下Executor接口的实现:
public interface Executor {
void execute(Runnable command);
}
到这里,大家应该明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系了。
Executor是一个顶层接口,在它里面只声明了一个方法execute(Runnable),返回值为void,参数为Runnable类型,从字面意思可以理解,就是用来执行传进去的任务的;
然后ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
抽象类AbstractExecutorService实现了ExecutorService接口,基本实现了ExecutorService中声明的所有方法;
然后ThreadPoolExecutor继承了类AbstractExecutorService。
在ThreadPoolExecutor类中有几个非常重要的方法:
execute()
submit()
shutdown()
shutdownNow()
execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。
submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。
shutdown()和shutdownNow()是用来关闭线程池的。
还有很多其他的方法:
比如:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedT