Fork me on GitHub

Java并发编程实战———基础构建模块

Java平台类库包含了丰富的并发基础构建模块,例如线程安全的容器类以及各种用于协调多个相互协作的线程控制流的同步工具类(Synchronizer)。

同步容器类

同步容器类包括VectorHashTable,还包括由Collections.synchronizedXxx等工厂方法创建的封装器类。同步容器类可以简单地理解为通过synchronized来实现同步的容器。这些类实现线程安全的方式是:将它们的状态封装起来,并对每个公有方法都进行同步,使得每次只有一个线程能访问容器的状态。

同步容器类的问题

同步容器类都是线程安全的,但在某些情况下可能需要额外的客户端加锁来保护复合操作。容器上常见的复合操作包括:迭代、跳转以及条件运算。在同步容器类中,这些复合操作在没有客户端加锁的情况下仍然是线程安全的,但当其他线程并发地修改容器时,它们可能会表现出意料之外的行为。

1
2
3
4
5
6
7
8
9
10
//复合操作非线程安全
public static Object getLast(Vector list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}

public static void deleteLast(Vector list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//加锁实现线程安全
public static Object getLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
return list.get(lastIndex);
}
}

public static void deleteLast(Vector list) {
synchronized (list) {
int lastIndex = list.size() - 1;
list.remove(lastIndex);
}
}

迭代器与ConcurrentModificationException

对容器类进行迭代的过程中,如果有其他线程并发地修改容器,那么即使是使用迭代器也无法避免在迭代期间对容器加锁。当容器在迭代过程中被修改时,就会抛出一个ConcurrentModificationException异常。

同步容器将所有对容器状态的访问都串行化,以实现它们的线程安全性。这种方法的代价是严重降低并发性,当多个线程竞争容器的锁时,吞吐量将严重减低。

并发容器

并发容器是针对多个线程并发访问设计的。并发容器提供的迭代器不会抛出ConcurrentModificationException,因此不需要在迭代过程中对容器加锁。ConcurrentHashMap用来替代同步且基于散列的MapCopyOnWriteArrayList用于在遍历操作为主要操作的情况下代替同步的List

ConcurrentHashMap

ConcurrentHashMap在Java 7中使用分段锁,在Java 8中使用CAS来实现并发操作。

ConcurrentHashMap返回的迭代器具有弱一致性,弱一致性的迭代器可以容忍并发的修改,当创建迭代器时会遍历已有的元素,并可以在迭代器被构造后将修改反映给容器。

对于一些需要在整个Map上进行计算的方法,例如sizeisEmpty,这些方法的语义被略微减弱了以反映容器的并发特性。由于size返回的结果在计算时可能已经过期了,它实际上只是一个估计值,因此允许size返回一个近似值而不是一个精确值。

ConcurrentHashMap中没有实现对Map加锁以提供独占访问,在大多数情况下,用ConcurrentHashMap来代替同步Map能进一步提高代码的可伸缩性,只有当应用程序需要加锁Map以进行独占访问时,才应该放弃使用ConcurrentHashMap

CopyOnWriteArrayList

CopyOnWriteArrayList用于替代同步List,在某些情况下它提供了更好的并发性能,并且在迭代期间不需要对容器进行加锁或复制。

“写入时复制(Copy-On-Write)”容器的线程安全性在于,只要正确地发布一个事实不可变的对象,那么在访问该对象时就不再需要进一步的同步。在每次修改时,都会创建并重新发布一个新的容器副本,从而实现可变性。

“写入时复制”容器的迭代器保留一个指向底层基础数组的引用,这个数组当前位于迭代器的起始位置,由于它不会被修改,因此在对其进行同步时只需确保数组内容的可见性。因此,多个线程可以同时对这个容器进行迭代,而不会彼此干扰或者与修改容器的线程相互干扰。“写入时复制”容器返回的迭代器不会抛出ConcurrentModificationException,并且返回的元素与迭代器创建时的元素完全一致,而不必考虑之后修改操作所带来的影响。

每当修改容器时都会复制底层数组,这需要一定的开销,特别是当容器的规模较大时。仅当迭代操作远远多于修改操作时,才应该使用“写入时复制”容器

同步工具类

同步工具类可以是任何一个对象,只要它根据其自身的状态来协调线程的控制流。阻塞队列可以作为同步工具类,其他类型的同步工具类还包括信号量(Semaphore)、栅栏(Barrier)以及闭锁(Latch)。

所有的同步工具类都包含一些特定的结构化属性:它们封装了一些状态,这些状态将决定执行同步工具类的线程是继续执行还是等待,此外还提供了一些方法对状态进行操作,以及另一些方法用于高效地等待同步工具类进入到预期状态。

闭锁

闭锁是一种同步工具类,可以延迟线程的进度直到其到达终止状态。闭锁可以用来确保某些活动直到其他活动都完成后才继续执行。

CountDownLatch是一种灵活的闭锁实现,它可以使一个或多个线程等待一组事件发生。闭锁状态包含一个计数器,该计数器初始化为一个正数,表示需要等待的事件数量。countDown方法递减计数器,表示有一个事件已经发生了,而await方法等待计数器达到零,这表示所有需要等待的事件都已经发生。如果计数器的值非零,那么await会一直阻塞直到计数器为零,或者等待中的线程中断,或者等待超时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
//在计时测试中使用CountDownLatch来启动和停止线程
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task) {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);

for(int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) { }
}
};
t.start();
}

long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
}

FutureTask

FutureTask也可以用作闭锁。FutureTask表示的计算是通过Callable来实现的,相当于一种可生成结果的Runnable,并且可以处于以下3种状态:等待运行,正在运行和运行完成。“执行完成”表示计算的所有可能结束方式,包括正常结束、由于取消而结束和由于异常而结束等。

Future.get的行为取决于任务的状态。如果任务已经完成,那么get会立即返回结果,否则get将阻塞直到任务进入完成状态,然后返回结果或者抛出异常。

FutureTaskExecutor框架中表示异步任务,此外还可以用来表示一些时间较长的计算,这些计算可以在使用计算结果之前启动。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//使用FutureTask来提前加载稍后需要的数据
public class Preloader {
private final FutureTask<ProductInfo> future = new FutureTask<ProductInfo>(
new Callable<ProductInfo>() {
public ProductInfo call() throws DataLoadException {
return loadProductInfo();
}
});
private final Thread thread = new Thread(future);

public start() {
thread.start();
}

public ProductInfo get() throws DataLoadException, InterruptException {
try {
return future.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
if (cause instanceof DataLoadException){
throw (DataLoadException) cause;
}else {
throw launderThrowable(cause);
}
}
}
//强制将未检查的Throwable转换为RuntimeException
public static RuntimeException launderThrowable(Throwable t) {
if (t instanceof RuntimeException) {
return (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
} else {
throw new IllegalStateException("Not unchecked", t);
}
}
}

信号量

Semaphore中管理着一组虚拟的许可(permit),许可的初始数量可通过构造函数来指定。在执行操作时先要获取许可,并在使用以后释放许可。如果没有许可,那么acquire将阻塞直到有许可(或者直到被中断或者操作超时)。release方法将返回一个许可给信号量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//使用Semaphore为容器设置边界
public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;

public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
asm = new Semaphore(bound);
}

public boolean add(T o) throws InterruptedException {
asm.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded) {
asm.release();
}
}
}

public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved) {
asm.release();
}
return wasRemoved;
}
}

栅栏

栅栏类似于闭锁,它能阻塞一组线程直到某个事件发生。栅栏与闭锁的关键区别在于,所有线程必须同时到达栅栏位置,才能继续执行。闭锁用于等待事件,而栅栏用于等待其他线程。

CyclicBarrier可以使一定数量的参与方反复地在栅栏位置汇集,当线程到达栅栏位置时将调用await方法,这个方法将阻塞直到所有线程都到达栅栏位置。如果所有线程都到达栅栏位置,那么栅栏将打开,此时所有线程都被释放,而栅栏被重置以便下次使用。如果对await的调用超时,或者await阻塞的线程被中断,那么栅栏就被认为是打破了,所有阻塞的await调用都将终止并抛出BrokenBarrierException。如果成功地通过栅栏,那么await将为每个线程返回一个唯一的到达索引号。CyclicBarrier还可以使你将一个栅栏操作传递给构造函数,这是一个Runnable,当成功通过栅栏时执行它。

CyclicBarrier有两个构造参数,分别是:

  • CyclicBarrier(int parties)
    创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,但它不会在启动barrier时执行预定义的操作。
  • CyclicBarrier(int parties, Runnable barrierAction)
    创建一个新的CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动,并在启动barrier时执行给定的屏障操作,该操作由最后一个进入barrier的线程执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
//运动会所有选手都就位后才开始
class Athlete implements Runnable {
private CyclicBarrier cyclicBarrier;
private String name;

public Athlete(CyclicBarrier cyclicBarrier, String name) {
this.cyclicBarrier = cyclicBarrier;
this.name = name;
}

@override
public void run() {
System.out.println(name + "就位");
try {
cyclicBarrier.await();
Random random = new Random();
double time = random.nextDouble() + 9;
System.out.println(name + ": " + time);
} catch (Exception e) {

}
}
}

class Race {
private CyclicBarrier cyclicBarrier = new CyclicBarrier(8);
public void start() {
List<Athlete> athletes = new ArrayList<>();
athleteList.add(new Athlete(cyclicBarrier,"选手一"));
athleteList.add(new Athlete(cyclicBarrier,"选手二"));
athleteList.add(new Athlete(cyclicBarrier,"选手三"));
athleteList.add(new Athlete(cyclicBarrier,"选手四"));
athleteList.add(new Athlete(cyclicBarrier,"选手五"));
athleteList.add(new Athlete(cyclicBarrier,"选手六"));
athleteList.add(new Athlete(cyclicBarrier,"选手七"));
athleteList.add(new Athlete(cyclicBarrier,"选手八"));
Executor executor = Executors.newFixedThreadPool(8);
for (Athlete athlete : athleteList) {
executor.execute(athlete);
}
}
}

CountDownLatch是一次性使用的,如果需要可重用的CountDownLatch,考虑使用CyclicBarrier

详细分析可参考Java并发之CyclicBarrier

求鼓励,求支持!