Fork me on GitHub

Java并发编程实战———线程池的使用

在任务与执行策略之间的隐形耦合

在一些任务中,需要拥有或排除某种特定的执行策略。如果某些任务依赖于其他的任务,那么会要求线程池足够大,从而确保它们依赖任务不会被放入等待队列中或被拒绝,而采用线程封闭机制的任务需要串行执行。

线程饥饿死锁

在线程池中,如果任务依赖于其他任务,那么可能产生死锁。在单线程的Executor中,如果一个任务将另一个任务提交到同一个Executor,并且等待这个被提交任务的结果,那么通常会引发死锁。只要线程池中的任务需要无限期地等待一些必须由池中其他任务才能提供的资源或条件,就会发生线程饥饿死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
//在单线程Executor中任务发生死锁
public class ThreadDeadlock {
ExecutorService exec = Executors.newSingleThreadExecutor();

public class RenderPageTask implements Callable<String> {
public String call() throws Exception {
Future<String> header = exec.submit(new LoadFileTask("header.html"));
Future<String> footer = exec.submit(new LoadFileTask("footer.html"));
String page = renderBody();
//将发生死锁--由于任务在等待子任务的结果
return header.get() + page + footer.get();
}
}
}

每当提交了一个有依赖性的Executor任务时,要清楚地知道可能会出现线程饥饿死锁,因此需要在代码或配置Executor的配置文件中记录线程池的大小限制或配置限制。

运行时间较长的任务

如果任务阻塞的时间过长,那么即使不出现死锁,线程池的响应性也会变得糟糕。为缓解这种影响,可以限定任务等待资源的时间,而不要无限制地等待。

设置线程池的大小

要想正确地设置线程池的大小,必须分析计算环境、资源预算和任务的特性。当任务需要某种通过资源池来管理的资源时,例如数据库连接,那么线程池和资源池的大小将会互相影响。

配置ThreadPoolExecutor

如果默认的执行策略不能满足需求,那么可以通过ThreadPoolExecutor的构造函数来实例化一个对象,并根据自己的需求来定制,并且可以参考Executors的源代码来了解默认配置下的执行策略,然后再以这些执行策略为基础进行修改。

1
2
3
4
5
6
7
8
//ThreadPoolExecutor的通用构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) { ... }

线程的创建与销毁

线程池的基本大小(Core Pool Size)、最大大小(Maximum Pool Size)以及存活时间等因素共同负责线程的创建与销毁。基本大小也就是线程池的目标大小,即在没有任务执行时线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。线程池的最大大小表示可同时活动的线程数量的上限。如果某个线程的空闲时间超过了存活时间,那么将被标记为可回收的,并且当线程池的当前大小超过了基本大小时,这个线程将被终止。

通过调节线程池的基本大小和存活时间,可以帮助线程池回收空闲线程占有的资源,从而使得这些资源可以用于执行其他工作。

newFixedThreadPool工厂方法将线程池的基本大小和最大大小设置为参数中指定的值,而且创建的线程池不会超时。newCachedThreadPool工厂方法将线程池的最大大小设置为Inter.MAX_VALUE,从而将基本大小设置为零,并将超时设置为1分钟,这种方法创建出来的线程池可以被无限扩展,并且当需求降低时会自动收缩。

管理队列任务

在有限的线程池中会限制可并发执行的任务数量。如果新请求的到达速率超过了线程池的处理速率,那么新到来的请求将累积起来。在线程池中,这些请求会在一个由Executor管理的Runnable队列中等待,而不会像线程那样去竞争CPU资源。通过一个Runnable和一个链表节点来表现一个等待中的任务,当然比使用线程来表示的开销低很多,但如果客户提交给服务器请求的速率超过了处理器的处理速率,那么仍可能会耗尽资源。

ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。基本的任务排队方法有3种:无界队列、有界队列和同步移交(Synchronous Handoff)。队列的选择与其他的配置参数有关,例如线程池的大小等。

newFixedThreadPoolnewSingleThreadPool在默认的情况下将使用一个无界的LinkedBlockingQueue。如果所有工作线程都处于忙碌状态,那么任务将在队列中等候。如果任务持续快速地到达,并且超过了线程池处理它们的速度,那么队列将无限制地增加。

一种更稳妥的资源管理策略是使用有界队列,例如ArrayBlockingQueue、有界的LinkedBlockingQueue。有界队列有助于避免资源耗尽的情况发生。在使用有界的工作队列时,队列的大小与线程池的大小必须一起调节。如果线程池较小而队列较大,那么有助于减少内存使用量,降低CPU的使用率,同时还可以减少上下文切换,但付出的代价是可能会限制吞吐量。

对于非常大的或者无界的线程池,可以通过使用SynchronousQueue来避免任务排队,以及直接将任务从生产者移交给工作者线程。SynchronousQueue不是一个真正的队列,而是一种在线程之间进行移交的机制。要将一个元素放入SynchronousQueue中,必须有另一个线程正在等待接受这个元素。如果没有线程正在等待,并且线程池的当前大小小于最大值,那么ThreadPoolExecutor将创建一个新的线程,否则根据饱和策略,这个任务将被拒绝。使用直接移交将更高效,因为任务会直接移交给执行它的线程,而不是被首先放在队列中,然后由工作线程从队列中提取该任务。只有当线程池是无界的或者可以拒绝任务时,SynchronousQueue才有实际价值。在newCahcedThreadPool工厂方法中就使用了SynchronousQueue

当使用像LinkedBlockingQueueArrayBlockingQueue这样的FIFO队列时,任务的执行顺序与它们的到达顺序相同。如果想进一步控制任务执行顺序,还可以使用PriorityBlockingQueue,这个队列将根据优先级来安排任务。任务的优先级是通过自然顺序或Comparator(如果任务实现了Comparable)来定义。

对于ExecutornewCachedThreadPool工厂方法是一种很好的默认选择,它能提供比固定大小的线程池更好的排队性能。当需要限制当前任务的数量以满足资源管理需求时,那么可以选择固定大小的线程池,如果不进行限制,那么很容易发生过载问题。

只有当任务相互独立时,为线程池或工作队列设置界限才是合理的。如果任务之间存在依赖性,那么有界的线程池或队列就可能导致线程“饥饿”死锁问题。此时应该使用无界的线程池,例如newCachedThreadPool

饱和策略

当有界队列被填满后,饱和策略开始发挥作用。ThreadPoolExecutor的饱和策略可以通过调用setRejectedExecutionHandler来修改。(如果某个任务被提交到一个已被关闭的Executor时,也会用到饱和策略)。JDK提供了几种不同的RejectedExecutionHandler实现,每种实现都包含不同的饱和策略:AbortPolicyCallerRunsPolicyDiscardPolicyDiscardOldestPolicy

“中止(Abort)”策略是默认的饱和策略,该策略将抛出未检查的RejectedExecutionException。调用者可以捕获这个异常,然后根据需求编写自己的处理代码。当新提交的任务无法保存到队列中等待执行时,“抛弃(Discard)”策略会悄悄抛弃该任务。“抛弃最旧的(Discard-Oldest)”策略则会抛弃下一个将被执行的任务,然后尝试重新提交新的任务。(如果工作队列是一个优先队列,那么“抛弃最旧的”策略将导致抛弃优先级最高的任务,因此最好不要将“抛弃最旧的”饱和策略和优先级队列放在一起使用。)

“调用者运行(Caller-Runs)”策略实现了一种调节机制,该策略既不会抛弃任务,也不会抛出异常,而是将某些任务回退到调用者,从而降低新任务的流量。它不会在线程池的某个线程中执行新提交的任务,而是在一个调用了execute的线程中执行该任务。

当创建Eexcutor时,可以选择饱和策略或者对执行策略进行修改。

1
2
3
4
5
6
ThreadPoolExecutor executor
= new ThreadPoolExecutor(N_THREADS, N_THREADS,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(CAPACITY));
executor.setRejectedExceptionHandler(
new ThreadPoolExecutor.CallerRunsPolicy());

线程工厂

每当线程池需要创建一个线程时,都是通过线程工厂方法来完成的。默认的线程工厂方法将创建一个新的、非守护的线程,并且不包含特殊的配置信息。通过指定一个线程工厂方法,可以定制线程池的配置信息。在ThreadFactory中之定义了一个方法newThread,每当线程池需要创建一个新线程时都会调用这个方法。

1
2
3
public interface ThreadFactory {
Thread newThread(Runnable r);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//自定义的线程工厂
public class MyThreadFactory implements ThreadFactory {
private final String poolName;

public MyThreadFactory(String poolName) {
this.poolName = poolName;
}

public Thread newThread(Runnable runnable) {
return new MyThread(runnable, poolName);
}
}

//定制Thread基类
public class MyThread extends Thread {
public staic final String DEFAULT_NAME = "MyThread";
private static final AtomicInteger created = new AtomicInteger();

public MyThread(Runnable r) {
this(r, DEFAULT_NAME);
}

public MyThread(Runnable runnable, String name) {
super(runnable, name + "-" + created.incrementAndGet());
}
}

在调用构造函数后再定制ThreadPoolExecutor

在调用完ThreadPoolExecutor的构造函数后,仍然可以通过设置函数(Setter)来修改大多数传递给它的构造函数的参数。如果Executor是通过Executors中的某个(newSingleThreadExecutor除外)工厂方法创建的,那么可以将结果的类型转换为ThreadPoolExecutor以访问设置器。

1
2
3
4
5
6
7
//对通过标准工厂方法创建的Executor进行修改
ExecutorService exec = Executors.newCachedThreadPool();
if(exec instanceof ThreadPoolExecutor) {
((ThreadPoolExecutor) exec).setCorePoolSize(10);
} else {
throw new AssertionError("Oops, bad assumption");
}

Executors中包含一个unconfigurableExecutorService工厂方法,该方法对一个现有的ExecutorService进行包装,使其只暴露出EexcutorService的方法,因此不能对它进行配置。newSingleThreadExecutor返回按这种方式封装的ExecutorService,而不是最初的ThreadPoolExecutor。虽然单线程的Executor实际上被实现为一个只包含唯一线程的线程池,但它同样确保了不会并发地执行任务。如果在代码中增加单线程Executor的线程池大小,那么将破坏它的执行语义。

求鼓励,求支持!