Fork me on GitHub

Java并发编程实战———任务执行

在线程中执行任务

串行地执行任务

1
2
3
4
5
6
7
8
9
10
//串行的Web服务器
class SingleThreadWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while(true) {
Socket connection = socket.accept();
handleRequest(connection);
}
}
}

显式地为任务创建线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
在Web服务器中为每个请求启动一个新的线程(不要这样做)
class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while(true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
}
new Thread(task).start();
}
}
}
  • 任务处理过程从主线程中分离出来,使得主循环能够更快地重新等待下一个到来的连接。这使得程序在完成前面的请求之前可以接受新的请求,从而提高响应性。
  • 任务可以并行处理,从而能够同时服务多个请求。
  • 任务处理代码必须是线程安全的,因为当有多个任务时会并发地调用这段代码。

无限制创建线程的不足

  • 线程生命周期的开销非常高。 线程的创建与销毁都是有代价的,需要JVM和操作系统提供一些辅助操作。
  • 资源消耗。 活跃的线程会消耗系统资源,尤其是内存。如果可运行的线程数量多于可用处理器数量,那么有些线程将闲置。大量闲置的线程会占用许多内存,给垃圾回收器带来压力,而且大量线程在竞争CPU资源时还将产生其他的性能开销。如果你已经拥有足够多的线程使所有CPU保持忙碌状态,那么再创建更多的线程反而会降低性能。
  • 稳定性。 在可创建线程的数量上存在一个限制。这个限制值受多个因素影响,包括JVM参数、Thread构造函数中请求的栈大小,以及底层操作系统对线程的限制等。如果破坏了这些限制,那么很可能抛出OutOfMemoryError异常。

Executor框架

线程池简化了线程的管理工作,并且java.util.concurrent提供了一种灵活的线程池实现作为Executor框架的一部分。在Java类库中,任务执行的主要抽象不是Thread,而是Executor

1
2
3
public interface Executor {
void execute(Runnable command);
}

Executor基于生产者-消费者模式,提交任务的操作相当于生产者,执行任务的线程相当于消费者。如果要在程序中实现一个生产者-消费者的设计,那么最简单的方式通常就是使用Executor

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
//阻塞队列和Executor实现生产者-消费者模式
// 生产者
class Producer implements Runnable {
private LinkedBlockingQueue<Object> queue;

public Producer(LinkedBlockingQueue<Object> queue) {
this.queue = queue;
}

@Override
public void run() {
while (true) {
try {
Object o = new Object();
queue.put(o);
System.out.println("Producer: " + o);
} catch (InterruptedException e) {
System.out.println("Producer is interrupted!");
}
}
}
}

// 消费者
class Consumer implements Runnable {
private LinkedBlockingQueue<Object> queue;

public Consumer(LinkedBlockingQueue<Object> queue) {
this.queue = queue;
}

@Override
public void run() {
while (true) {
try {
Object o = queue.take();
System.out.println("Consumer: " + o);
} catch (InterruptedException e) {
System.out.println("Consumer is interrupted!");
}
}

}
}

public class ProducerConsumer {

public static void main(String[] args) {
//阻塞队列
LinkedBlockingQueue<Object> queue = new LinkedBlockingQueue<Object>(10);
ExecutorService es = Executors.newFixedThreadPool(6);
for (int i = 0; i < 3; i++) {
es.execute(new Producer(queue));
es.execute(new Consumer(queue));
}
}
}

示例:基于Executor的Web服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static flnal Executor exec = Executors.newFixedThreadPool(NTHREADS);

public static void main(String[] args) throw IOException {
ServerSocket socket = new ServerSocket(80);
while(true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
}
exec.execute(task);
}
}
}

线程池

线程池是指管理一组同构工作线程的资源池。通过重用现有的线程而不是创建新线程,可以在处理多个请求时分摊在线程创建和销毁过程中产生的巨大开销。另一个额外的好处是,当请求到达时,工作线程通常已经存在,因此不会由于等待创建线程而延迟任务的执行,从而提高了响应性。通过适当调整线程池的大小,可以创建足够多的线程以便使处理器保持忙绿状态,同时还可以防止过多线程相互竞争资源而使应用程序耗尽内存或失败。

类库提供了一个灵活的线程池以及一些有用的默认配置。可以通过调用Executors中的静态工厂方法来创建线程池:

  • newFixedThreadPool。 newFixedThreadPool将创建一个固定长度的线程池,每当提交一个任务时就创建一个线程,直到达到线程池的最大数量,这时线程池的规模将不再变化。
  • newCachedThreadPool。 newCachedThreadPool将创建一个可缓存的线程池,如果线程池的当前规模超过了处理需求时,那么将回收空闲的线程,而当需求增加时,则可以添加新的线程,线程池的规模不存在任何限制。
  • newSingleThreadExecutor。 newSingleThreadExecutor是一个单线程的Executor,它创建单个工作线程来执行任务,如果这个线程异常结束,会创建另一个线程来代替。newSingleThreadExecutor能确保依照任务在队列中的顺序来串行执行。
  • newScheduledThreadPool。 newScheduledThreadPool创建了一个固定长度的线程池,而且以延迟或定时的方式来执行任务。

newFixedThreadPoolnewCachedThreadPool这两个工厂方法返回通用的ThreadPoolExecutor实例,这些实例可以直接用来构造专门用途的executor

Executor的生命周期

由于Executor以异步方式来执行任务,因此在任何时刻,之前提交的任务的状态不是立即可见的。有些任务可能已经完成,有些可能正在运行,而其他的任务可能在队列中等待执行。Executor是可关闭的(无论采用平缓的方式还是粗暴的方式),并将在关闭操作中受影响的任务的状态反馈给应用程序。

为了解决执行服务的生命周期问题,ExecutorService接口扩展了Executor,添加了一些用于生命周期管理的方法。

1
2
3
4
5
6
7
8
9
//ExecutorService中的生命周期管理方法
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
......
}

ExecutorService的生命周期有3种状态:运行、关闭和已终止。ExecutorService在初始创建时处于运行状态。shutdown方法将执行平缓的关闭过程:不再接受新的任务,同时等待已经提交的任务执行完成–包括那些还未开始执行的任务。shutdownNow方法将执行粗暴的关闭过程:它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始执行的任务。

ExecutorService关闭后提交的任务将由“拒绝执行处理器(Rejected Execution Handler)”来处理,它会抛弃任务,或者使得execute方法抛出一个未检查的RejectedExecutionException。等所有任务都完成后,ExecutorService将转入终止状态。可以调用awaitTermination来等待ExecutorService到达终止状态,或者通过调用isTerminated来轮询ExecutorService是否已经终止。通常在调用awaitTermination之后会立即调用shutdown,从而产生同步地关闭ExecutorService的效果。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//支持关闭操作的Web服务器
class LifecycleWebServer {
private final ExecutorService exec = Executors.newCachedThreadPool();

public void start() throws IOException {
ServerSocket socket = new ServerSocket();
while (!exec.isShutdown()) {
try {
final Socket connection = socket.accept();
exec.execute(new Runnable() {
public void run() {
handleRequest(connection);
}
});
} catch (RejectedExecutionException e) {
if (!exec.isShutdown()) {
log("task submission rejected", e);
}
}
}
}

public void stop() {
exec.shutdown();
}

void handleRequest(Socket connection) {
Request req = readRequest(connection);
if (isShutdownRequest(req)) {
stop();
} else {
dispatchRequest(req);
}
}
}

延迟任务与周期任务

Timer类负责管理延迟任务以及周期任务。然而,Timer存在一些缺陷。Timer在执行所有定时任务时只会创建一个线程。如果某个任务的执行时间过长,那么将破坏其他TimerTask的定时准确性。Timer的另一个问题是,如果TimerTask抛出了一个未检查的异常,那么Timer将表现出糟糕的行为。Timer线程并不捕获异常,因此当TimerTask抛出未检查的异常时将终止定时线程,这种情况下,Timer也不会恢复线程的执行,而是会错误地认为整个Timer都被取消了。因此,已经被调度但尚未执行的TimerTask将不会再执行,新的任务也不能被调度(这个问题称之为“线程泄露”)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//错误的Timer行为
public class OutOfTime {
public static void main(String[] args) throws Exception {
Timer timer = new Timer();
timer.schedule(new ThrowTask(), 1);
SECONDS.sleep(1);
timer.schedule(new ThrowTask(), 1);
SECONDS.sleep(5);
}

static class ThrowTask extends TimerTask {
public void run() {
throw new RuntimeException();
}
}
}

//运行一秒结束,并抛出一个异常消息“Timer already cancelled”

ScheduledThreadPoolExecutor能正确处理这些表现出错误行为的任务。在Java 5.0之后,很少使用Timer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public class ScheduledThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
// 创建大小为5的线程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 3; i++) {
Task worker = new Task("task-"\ + i);
// 只执行一次
// scheduledThreadPool.schedule(worker, 5, TimeUnit.SECONDS);
// 周期性执行,每5秒执行一次
scheduledThreadPool.scheduleAtFixedRate(worker, 0, 5, TimeUnit.SECONDS);
}
Thread.sleep(10000);
System.out.println("Shutting down executor...");
// 关闭线程池
scheduledThreadPool.shutdown();
boolean isDone;
// 等待线程池终止
do {
isDone = scheduledThreadPool.awaitTermination(1, TimeUnit.DAYS);
System.out.println("awaitTermination...");
} while (!isDone);

System.out.println("Finished all threads");
}
}

class Task implements Runnable {
private String name;
public Task(String name) {
this.name = name;
}

@Override public void run() {
System.out.println("name = " + name + ", startTime = " + new Date());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("name = " + name + ", endTime = " + new Date());
}
}

如果要构建自己的调度服务,那么可以利用DelayQueue,它实现了BlockingQueue,并为ScheduledThreadPoolExecutor提供调度功能。DelayQueue管理着一组Delayed对象。每个Delayed对象都有一个相应的延迟时间:在DelayQueue中,只有某个元素逾期后,才能从DelayQueue中执行take操作。从DelayQueue中返回的对象将根据它们的延迟时间进行排序。

找出可利用的并行性

携带结果的任务Callable与Future

Executor框架使用Runnable作为其基本的任务表示形式。Runnable是一种有很大局限的抽象,它不能返回一个值或抛出一个受检查的异常。Callable是一种更好的抽象,它认为调用处将返回一个值,并可能抛出一个异常。

Executor执行的任务有4个生命周期阶段:创建、提交、开始和完成。在Executor框架中,已提交但尚未开始的任务可以取消,但对于那些已经开始执行的任务,只有当它们能响应中断时,才能取消。取消一个已经完成的任务不会有任何影响。

Future表示一个任务的生命周期,并提供了相应的方法来判断是否已经完成或取消,以及获取任务的结果和取消任务等。get方法的行为取决于任务的状态(尚未开始、正在运行、已完成)。如果任务已经完成,那么get会立即返回或者抛出一个Exception,如果任务没有完成,那么get将阻塞并直到任务完成。如果任务抛出了异常,那么get将该异常封装为ExecutionException并重新抛出。如果任务被取消,那么get将抛出CancellationException。如果get抛出了ExecutionException,那么可以通过getCause来获得被封装的初始异常。

1
2
3
4
5
6
7
8
9
10
11
12
//Callable与Future接口
public interface Callable<V> {
V call() throws Exception;
}

public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException, CancellationException;
V get(long timeout, TimeUnit unit)
}

CompletionService:Executor与BlockingQueue

如果向Executor提交了一组计算任务,并且希望在计算完成后获得结果,那么可以保留与每个任务关联的Future,然后反复使用get方法,同时将参数timeout指定为0,从而通过轮训来判断任务是否完成。这种方法有些繁琐,还有种更好的方法:完成服务(CompletionService)。

CompletionServiceExecutorBlockingQueue的功能融合在一起。你可以将Callable任务提交给它来执行,然后使用类似于队列操作的takepoll等方法来获得已完成的结果,而这些结果会在完成时将被封装为FutureExecutorCompletionService实现了CompletionService,并将计算部分委托给一个Executor

CompletionService

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CompletionServiceDemo {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(executor);

for(int i = 0; i < 10; i++) {
final Integer seq = i + 1;
completionService.submit(new Callable<Integer>() {
public Integer call() throws Exception {
Thread.sleep((long)(Math.random() * 1000));
}
});
}

for(int i = 0; i < 10; i++) {
// take 方法等待下一个结果并返回 Future 对象。
// poll 不等待,有结果就返回一个 Future 对象,否则返回 null。
Future<Integer> future = completionService.take();
System.out.print(future.get() + ";");
}

executor.shutdown();
}
}

//output
2;3;4;1;5;7;6;10;8;9;

为任务设置时限

在支持时间限制的Future.get中支持这种需求:当结果可用时,它将立即返回,如果在指定时限内没有计算出结果,那么将抛出TimeoutException。当这些任务超时后应该立即停止,从而避免为继续计算一个不再使用的结果而浪费计算资源。要实现这个功能,可以由任务本身来管理它的限定时间,并且在超时后中止执行或取消任务。此时可再次使用Future,如果一个限时的get方法抛出了TimeoutException,那么可以通过Future来取消任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//在指定时间内获取广告信息
Page renderPageWithAd() throws InterruptedException {
long endNanos = System.nanoTime() + TIME_BUDGET;
Future<Ad> f = exec.submit(new FetchAdTask());
//在等待广告的同时显示页面
Page page = renderPageBody();
Ad ad;
try {
//只等待指定的时间长度
long timeLeft = endNanos - System.nanoTime();
ad = f.get(timeLeft, NANOSECONDS);
} cache (ExecutionException e) {
ad = DEFAULT_AD;
} catch (TimeoutException e) {
ad = DEFAULT_AD;
f.cancel(true);
}
page.setAd(ad);
return page;
}
求鼓励,求支持!