可以把单线程程序当作在问题域求解的单一实体,每次只能做一件事情。因为只有一个实体,所以永远不用担心“两个实体试图同时使用同一个资源”这样的问题。

有了并发就可以同时做很多事情了,但是,两个或多个线程彼此互相干涉的问题也就出现了。

一、不正确地访问资源

考虑一个场景:一个任务产生偶数,其他任务消费这些数字。这里,消费者任务的唯一工作就是检查偶数的有效性。

创建一个抽象类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
abstract class IntGenerator {

private volatile boolean canceled = false;

public abstract int next();

public void cancel() {
canceled = true;
}

public boolean isCanceled() {
return canceled;
}

}

上面程序中的注意点:

  • 因为 canceled 标志是 boolean 类型的,所以它是原子性的。即赋值和返回值这样的简单操作发生时没有中断的可能性,因为你不会看到这个域处于在执行这些简单操作的过程中的中间状态。
  • 为了保证可视性,canceled 标志还是加上了个 volatile 关键字

任何 IntGenerator 都可以用下面的 EvenChecker 来测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class EvenChecker implements Runnable{

private final IntGenerator generator;

private final int id;

public EvenChecker(IntGenerator generator, int id) {
this.generator = generator;
this.id = id;
}

@Override
public void run() {
while (!generator.isCanceled()) {
int val = generator.next();
if (val % 2 != 0) {
System.out.println(val + " not even!");
generator.cancel();
}
}
}

public static void test(IntGenerator generator, int count) {
System.out.println("Press Control-C to exit");
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < count; i++) {
executorService.execute(new EvenChecker(generator, i));
}
executorService.shutdown();
}

public static void test(IntGenerator generator) {
test(generator, 10);
}

}

上例中,可以被撤销的类没有实现 Runnable 接口,而所有依赖于 IntGenerator 对象的 EvenChecker 任务将测试它。以查看它是否已经被撤销。

通过这种方式,共享公共资源(IntGenerator)的任务可以观察该字段的终止信号。这可以消除竞争条件,即两个或更多的任务竞争响应某个条件,因此产生冲突或不一致结果的情况。

并发系统失败的可能途径:一个任务不能依赖于另一个任务,因为任务关闭的顺序无法得到保证。而这里,通过使任务依赖于非任务对象,我们可以消除潜在的竞争条件

test() 方法通过启动大量使用相同 IntGeneratorEvenChecker,设置并执行对任何类型的 IntGenerator 的测试。如果 IntGenerator 引发失败,那么 test() 将报告它并返回。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class EvenGenerator extends IntGenerator {

private int currentEvenValue = 0;

@Override
public int next() {
currentEvenValue++;
currentEvenValue++;
// currentEvenValue += 2;
return currentEvenValue;
}

public static void main(String[] args) {
EvenChecker.test(new EvenGenerator());
}

}

上面程序执行的输出示例如下:

1
2
3
4
5
6
7
8
9
10
4275 not even!
4279 not even!
4283 not even!
4281 not even!
4287 not even!
4273 not even!
4289 not even!
4271 not even!
4277 not even!
4285 not even!

我们每执行一次 EvenGeneratornext() 方法,currentEvenValue 字段都会递增 2,也就是说从 0 开始递增,每一次都应该是偶数才对,但是输出结果表明程序并没有按照预先的情况运行。定位到可能出问题的位置就是,两次++递增操作时可能出了点问题,并没有加出来下一个偶数。

一个任务有可能在另一个任务执行第一个对 currentEvenValue 的递增操作之后,但是没有执行第二个操作之前,调用了 next() 方法。这使得这个值处于“不恰当”的状态。

调用该程序时,有时候在数值很小的时候程序便停止了,有时候在很大的时候才停止,如果希望更快的发现失败,可以尝试在两个递增操作之前增加 yield() 操作。

注意:递增程序自身也需要多个步骤,并且在递增过程中任务可能会被线程机制挂起,也就是说,在 Java 中,递增不是原子性的操作。因此,如果不保护任务,即使单一的递增也是不安全的。

二、解决共享资源竞争

对于并发工作,需要某种方式来防止两个任务访问相同的资源,至少在关键阶段不能出现这种情况。

防止这种冲突的方法就是当资源被一个任务使用时,在其上加锁。第一个访问某项资源的任务必须锁定这项资源,使其他任务在其被解锁之前,就无法访问它了,而在其被解锁之时,另一个任务就可以锁定并使用它。

基本上所有的并发模式在解决线程冲突问题的时候,都是采用序列化访问共享资源的方案。这意味着在给定时刻只允许一个任务访问共享资源。通常这是通过在代码前面加上一条锁语句来实现的,这就使得在一段时间内只有一个任务可以运行这段代码。

因为锁语句产生了一种互相排斥的效果,所以这种机制常常被称为互斥量

1、使用 synchronized 关键字

Java 以提供关键字 synchronized 的形式,为防止资源冲突提供了内置支持。当任务要执行被 synchronized 关键字保护的代码片段的时候,它将检查锁是否可用,然后获取锁,执行代码,释放锁。

**要控制对共享资源的访问,得先把它包装进一个对象,然后把所有要访问的这个资源的方法标记为 synchronized**。

在生成偶数的代码中,你应该将该累的数据成员都声明为 private 的,而且只能通过方法来访问这些数据,所以可以把方法标记为 synchronized 来防止资源冲突

1.1 对象的锁

所有对象都自动含有单一的锁(也称为监视器)。当在对象上调用其任意 synchronized 方法时,此对象都被加锁,这时该对象上的其他 synchronized 方法只能等到前一个方法调用完毕并释放锁之后才能被调用。也就是说,该对象的所有 synchronized 方法共享一个锁,这可以被用来防止多个任务同时访问编码对象内存。

注意:在使用并发时,将域设置为 private 是非常重要的,否则,synchronized 关键字就不能防止其他任务直接访问域,这样就会产生冲突。

一个任务可以多次获得对象的锁。如果一个方法在同一个对象上调用了第二个方法,后者又调用了同一对象上的另一个方法,就会发生这种情况。JVM 负责跟踪对象被加锁的次数。如果一个对象被解锁,其计数变为 0。在任务第一次给对象加锁的时候,计数变为 1。每当这个相同的任务在对象上获取锁的时候,计数都会递增。只有首先获得了锁的任务才能允许继续获取多个锁。每当任务离开一个 synchronized 方法,计数递减,当计数为 0 的时候,锁被完全释放,此时别的任务就可以使用此资源了。

1.2 类的锁

针对每个类,也有一个锁(作为类的 Class 对象的一部分),所以 synchronized static 方法可以在类的范围内防止对 static 数据的并发访问。

1.3 同步时机

应该什么时候同步呢?可以运用 Brian 的同步规则:

如果你正在写一个变量,它可能接下来将被另一个线程读取,或者正在读取一个上一次已经被另一个线程写过的变量,那么你必须使用同步,并且,读写线程都必须用相同的监视器锁同步。

如果在类中有超过一个方法在处理临界数据,那么必须同步所有相关的方法。如果只同步一个方法,那么其他方法将随意地忽略这个对象锁,并可以在无任何惩罚的情况下被调用。

每个访问临界共享资源的方法都必须被同步,否则它们就不会正确地工作。

1.4 同步控制 EvenGenerator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class SynchronizedEvenGenerator extends IntGenerator {

private int currentEvenValue = 0;

@Override
public synchronized int next() {
currentEvenValue++;
Thread.yield();
currentEvenValue++;
return currentEvenValue;
}

public static void main(String[] args) {
EvenChecker.test(new SynchronizedEvenGenerator());
}

}

Thread.yield() 的调用被插入到了两个递增操作之间,以提高在 currentEvenValue 是计数状态时切换上下文的可能性。

因为互斥可以防止多个任务同时进入临界区,所以这不会产生任何失败。

第一个进入 next 方法的任务将获得锁,任何其他试图获取锁的任务都将从其开始尝试之时被阻塞,直至第一个任务释放锁。通过这种方式,任何时刻只有一个任务可以通过由互斥量看护的代码。

2、使用显式的 Lock 对象

Java SE5 类库中还包含定义在 java.util.concurrent.locks 中的显式的互斥机制。

Lock 对象必须被显示地创建、锁定和释放。因此,它与内建的锁形式相比,代码缺乏优雅性。但是对于解决某些类型的问题来说,它更加灵活。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class MutexEvenGenerator extends IntGenerator {

private int currentEvenValue = 0;

private final Lock lock = new ReentrantLock();

@Override
public int next() {
lock.lock();
try {
currentEvenValue++;
Thread.yield();
currentEvenValue++;
return currentEvenValue;
} finally {
lock.unlock();
}
}

public static void main(String[] args) {
EvenChecker.test(new MutexEvenGenerator());
}

}

该类中添加了一个被互斥调用的锁,并使用 lockunlock 方法在 next 方法内部创建了临界资源。

注意

  • lock 方法的调用必须放在 try 子句中
  • return 语句必须在 try 子句中出现,以确保 unlock 不会过早地发生,从而将数据暴露给第二个任务

如果在使用 synchronized 关键字时,某些事物失败了,那么就会抛出一个异常。但是你没有机会去做任何清理工作,以维护系统使其处于良好状态。有了显式 Lock 对象,就可以使用 finally 子句将系统维护在正确的状态了。

2.1 使用显式 Lock 对象的场景

在使用 synchronized 关键字时,需要写的代码量更少,并且用户出钱错误的可能性也会降低,因此通常只有在解决特殊问题时,才使用显式 Lock 对象。例如,synchronized 关键字不能尝试着获取锁且最终获取锁会失败,或者尝试着获取锁一段时间,然后放弃它。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
class AttemptLocking {

private final Lock lock = new ReentrantLock();

public void untimed() {
boolean captured = lock.tryLock();
try {
System.out.println("tryLock(): " + captured);
} finally {
if (captured) {
lock.unlock();
}
}
}

public void timed() {
boolean captured = false;
try {
captured = lock.tryLock(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try {
System.out.println("tryLock(2s): " + captured);
} finally {
if (captured) {
lock.unlock();
}
}
}

public static void main(String[] args) throws InterruptedException {
AttemptLocking attemptLocking = new AttemptLocking();
attemptLocking.untimed();
attemptLocking.timed();

Thread thread = new Thread(() -> {
attemptLocking.lock.lock();
System.out.println("acquired");
});
thread.setDaemon(true);
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(thread);
TimeUnit.SECONDS.sleep(1);
// Thread.yield();
attemptLocking.untimed();
attemptLocking.timed();
executorService.shutdown();
}

}

ReentrantLock 允许你尝试着获取但最终未获取锁,这样如果其他人已经获取了这个锁,那么你就可以决定离开去执行其他一些事情,而不是等待直到这个锁被释放,就像在 untimed 方法中所看到的。在 timed 方法中,做出了尝试去获取锁,该尝试可以在 2 秒之后失败。在 main 方法中新增一个后台线程获取锁,这使得 untimedtimed 方法对某些事物将产生竞争。

显式的 Lock 对象在加锁和释放锁方面,相对于内建的 synchronized 锁来说,还赋予了你更细粒度的控制力。这对于实现专有同步结构时很有用的,例如用于遍历链接列表中的节点的节节传递的加锁机制(也称为锁耦合),这种遍历代码必须在释放当前节点的锁之前捕获下一个节点的锁。

三、原子性与易变性

原子操作是不能被线程调度机制终端的操作,一旦操作开始,那么它一定可以在可能发生的“上下文切换”之前(切换到其他线程执行)执行完毕。

依赖于原子性是很棘手且危险的。

Goetz 测试:如果你可以编写用于现代微处理器的高性能 JVM,那么就有资格去考虑是否可以避免同步。

原子性可以应用于除 longdouble 之外的所有基本类型之上的”简单操作“。对于读写除 longdouble 之外的基本类型变量,可以保证它们会被当作不可分(原子)的操作来操作内存。但是 JVM 可以将 64 位(doublelong)的读取和写入当作两个分离的 32 位操作来执行,这就产生了在一个读取和写入操作中间发生上下文切换,从而导致不同的任务可以看到不正确结果的可能性。(这有时被称为字撕裂,可能会看到部分被修改过的值)。

但是在定义 longdouble 类型的变量时,如果使用了 volatile 关键字,就会获得原子性。(注意,在 Java SE5 之前,volatile 一直未能正确地工作)。不同的 JVM 可以任意地提供更强的保证,但是你不应该依赖于平台相关的特性。

原子操作可由线程机制来保证其不可中断,专家级的程序猿可以利用这一点来编写无锁代码,这些代码不需要被同步。尝试着移除同步通常是一种表示不成熟优化的信号,并且将会给你招致大量的麻烦,而你却可能没有收获多少好处,甚至压根没有好处。

在多处理器系统(多核处理器)上,相对于单处理器系统而言,可视性问题远比原子性问题多得多。一个任务作出的修改,即使在不同段的意义上是原子性的,对其他任务也可能是不可视的(例如,修改只是暂时性地存储在本地处理器的缓存中),因此不同的任务对应用的状态由不同的视图。另一方面,同步机制强制在处理器系统中,一个任务作出的修改必须在应用中是可视的。如果没有同步机制,那么修改时可视将无法确定。

volatile 关键字还确保了应用中的可视性。如果你将一个域声明为 volatile 的,那么只要对这个域产生了写操作,那么所有的读操作就都可以看到这个修改。volatile 域会立即被写入到主存中,而读取操作就发生在主存中。

理解原子性和易变性是不同的概念这一点很重要。在非 volatile 域上的原子操作不必刷新到主存中去,因此其他读取该域的任务也不必看到这个新值。如果多个任务在同时访问某个域,那么这个域就应该是 volatile 的,否则,这个域就应该只能经由同步来访问。同步也会导致向主存中刷新,因此如果一个域完全由 synchronized 方法或语句块来防护,那么就不必将其设置为 volatile 的。

一个任务所做的任何写入操作对这个任务来说都是可视的,因此如果它只需要在这个任务内部可视,那么你就不需要将其设置为 volatile 的。

当一个域的值依赖于它之前的值时(例如递增一个计数器),volatile 就无法工作了。如果某个域的值收到其他域的值的限制,那么 volatile 也无法工作。

使用 volatile 而不是 synchronized 的唯一安全的情况是类中只有一个可变的域。你的第一选择应该是使用 synchronized 关键字,这是最安全的方式,而尝试其他任何方式都是有风险的。

对域中的值做赋值和返回操作通常都是原子性的。在 Java 中,像下面的操作肯定不是原子性的,但是在 C++中可能是原子性的,这取决于编译器和处理器。

1
2
i++;
i+=2;

参考下面的方法示例的 JVM 指令。

1
2
3
4
5
6
7
8
9
10
11
12
13
class Atomicity {

int i;

void method() {
i++;
}

void method2() {
i+=2;
}

}

method 方法字节码如下:

1
2
3
4
5
6
7
 0 aload_0
1 dup
2 getfield #7 <cn/z2huo/knowledge/concurrency/section_21_3/Atomicity.i : I>
5 iconst_1
6 iadd
7 putfield #7 <cn/z2huo/knowledge/concurrency/section_21_3/Atomicity.i : I>
10 return

method2 方法字节码如下:

1
2
3
4
5
6
7
 0 aload_0
1 dup
2 getfield #7 <cn/z2huo/knowledge/concurrency/section_21_3/Atomicity.i : I>
5 iconst_2
6 iadd
7 putfield #7 <cn/z2huo/knowledge/concurrency/section_21_3/Atomicity.i : I>
10 return

从上面两个字节码中看到,每条指令都会产生一个 get 和 put,它们之间还有其他的指令。因此在获取和放置之间,另一个任务可能会修改这个域。所以,这些操作不是原子性的。

如果盲目地使用原子性的概念,可能会看到下面示例中的输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class AtomicityTest implements Runnable {

private int i = 0;

private int getValue() {
return i;
}

private synchronized void evenIncrement() {
i++;
i++;
}

@Override
public void run() {
while (true) {
evenIncrement();
}
}

public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
AtomicityTest atomicityTest = new AtomicityTest();
executorService.execute(atomicityTest);
while (true) {
int value = atomicityTest.getValue();
if (value % 2 != 0) {
System.out.println(value);
System.exit(0);
}
}
}

}
1
2
3
533

Process finished with exit code 148 (interrupted by signal 20:SIGCHLD)

上面的程序将在找到奇数的时候终止。两个 i++ 操作的方法为同步方法,但是程序还是终止了。尽管 return i; 确实是原子性操作,但是缺少同步使得其数值可以在处于不稳定的中间状态时被读取。初次之外,由于 i 也不是 volatile 的,因此还存在可视性问题。可以将上面的 getValue 方法也加上 synchronized

基本上,如果一个域可能会被多个任务同时访问,或者这些任务中至少有一个是写入任务,那么你就应该将这个域设置为 volatile 的。如果将一个域设置为 volatile 的,那么它就会告诉编译器不要执行任何移除读取和写入操作的优化,这些操作的目的是用线程中的局部变量维护对这个域的精确同步。实际上,读取和写入都是直接针对内存的,而却没有被缓存。

在 Java 中,即便是像下面如此简单的操作,也会产生线程问题。因为 Java 递增不是原子性的,并且涉及一个读操作和一个写操作。

1
2
3
4
5
6
7
8
9
class SerialNumberGenerator {

private static volatile int serialNumber = 0;

public static int nextSerialNumber() {
return serialNumber++;
}

}

下面是对上面的验证。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class CircularSet {

private int[] array;

private int length;

private int index = 0;

public CircularSet(int size) {
array = new int[size];
length = size;
for (int i = 0; i < size; i++) {
array[i] = -1;
}
}

public synchronized void add(int i) {
array[index] = i;
index = ++index % length;
}

public synchronized boolean contains(int value) {
for (int i = 0; i < length; i++) {
if (array[i] == value) {
return true;
}
}
return false;
}

}

CircularSet 重用了存储 int 数值的内存,并假设在你生成序列数时,产生数值覆盖冲突的可能性最小。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class SerialNumberChecker {

private static final int SIZE = 10;

private static CircularSet serials = new CircularSet(1000);

private static ExecutorService executorService = Executors.newCachedThreadPool();

static class SerialChecker implements Runnable {
@Override
public void run() {
while (true) {
int serial = SerialNumberGenerator.nextSerialNumber();
if (serials.contains(serial)) {
System.out.println("Duplicate: " + serial);
System.exit(0);
}
serials.add(serial);
}
}
}

public static void main(String[] args) throws Exception {
for (int i = 0; i < SIZE; i++) {
executorService.execute(new SerialChecker());
}
if (args.length > 0) {
TimeUnit.SECONDS.sleep(Integer.parseInt(args[0]));
System.out.println("no duplicates detected");
System.exit(0);
}
}

}

输出如下:

1
2
3
4
5
Duplicate: 4267
Duplicate: 930
Duplicate: 4313

Process finished with exit code 0

上面的程序,通过创建多个任务来竞争序列数,这些任务最重会得到重复的序列数。为了解决上面的问题,在 nextSerialNumber() 方法前面添加 synchronized 关键字。

对基本类型的读取和赋值操作被认为是安全的原子性操作。但是,当对象处于不稳定状态时,仍旧很有可能使用原子性操作来访问它们。

四、原子类

Java SE5 引入了诸如 AtomicIntegerAtomicLongAtomicReference 等特殊的原子性变量类,它们提供下面形式的原子性条件更新操作。

1
boolean compareAndSet(int expectedValue, int newValue);

这些类被调整为可以使用在某些现代处理器上的可获得的,并且是在机器级别上的原子性,因此在使用它们时,通常不需要担心。对于常规编程来说,它们很少会派上用场。但是在涉及性能调优时,就大有用武之地了。

下面使用 AtomicInteger 重写 AtomicityTest,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
class AtomicIntegerTest implements Runnable {

private final AtomicInteger atomicInteger = new AtomicInteger(0);

private int getValue() {
return atomicInteger.get();
}

private void evenIncrement() {
atomicInteger.addAndGet(2);
}

@Override
public void run() {
while (true) {
evenIncrement();
}
}

public static void main(String[] args) {
new Timer().schedule(new TimerTask() {
@Override
public void run() {
System.err.println("Aborting");
System.exit(0);
}
}, 5000);

ExecutorService executorService = Executors.newCachedThreadPool();
AtomicIntegerTest atomicIntegerTest = new AtomicIntegerTest();
executorService.execute(atomicIntegerTest);
while (true) {
int value = atomicIntegerTest.getValue();
if (value % 2 != 0) {
System.out.println(value);
System.exit(0);
}
}
}

}

上面的代码中,通过使用 AtomicInteger 消除了 synchronized 关键字,程序不会失败,所以添加了一个 Timer,以便在几秒钟之后自动地终止。

Atomic 类被设计用来构建 java.util.concurrent 中的类,因此只有在特殊情况下才在自己的代码中使用它们,即便使用了也需要确保不存在其他可能出现的问题。

通常依赖于锁要更安全一些(synchronized 关键字或显式的 Lock 对象)。

五、临界区

有时候你只是希望防止多个线程同时访问方法内部的部分代码而不是防止访问整个方法。

通过这种方式分离出来的代码通常被称为临界区synchronized 被用来指定某个对象,此对象的锁被用来对花括号内的代码进行同步控制,例如:

1
2
3
synchronized(syncObject) {
// 需要同步的代码块
}

这也被称为同步代码块。在进入此段代码前,必须得到 syncObject 对象的锁。如果其他线程已经得到这个锁,那么就得等到锁被释放以后,才能进入临界区。

通过使用同步代码块,而不是对整个方法进行同步控制,可以使多个任务访问对象的时间性能得到显著提高

下面的例子比较了这两种同步控制方法,此外,也演示了如何把一个非保护类型的类,在其他类的保护和控制之下,应用于多线程的环境。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Getter
@Setter
@ToString
class Pair {

private int x;

private int y;

public Pair() {
this(0, 0);
}

public Pair(int x, int y) {
this.x = x;
this.y = y;
}

public void incrementX() {
x++;
}

public void incrementY() {
y++;
}

public void checkState() {
if (x != y) {
throw new PairValuesNotEqualException(this);
}
}

}

Pair 不是线程安全的,因为约束条件需要两个变量要维护成相同的值。自增加不是线程安全的,并且没有任何方法被标记为 synchronized,所以不能保证一个 Pair 对象在多线程程序中不会被破坏。

1
2
3
4
5
6
7
class PairValuesNotEqualException extends RuntimeException {

public PairValuesNotEqualException(Pair pair) {
super("Pair values not equal: " + pair);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
abstract class BasePairManager {

AtomicInteger checkCounter = new AtomicInteger(0);

protected Pair pair = new Pair();

private final List<Pair> storage = Collections.synchronizedList(new ArrayList<>());

public synchronized Pair getPair() {
// 复制对象,保证原对象是安全的
return new Pair(pair.getX(), pair.getY());
}

protected void store(Pair pair) {
storage.add(pair);
// 睡眠,假设该方法中的操作是一个费时的操作
try {
TimeUnit.MILLISECONDS.sleep(50);
} catch (InterruptedException e) {
}
}

public abstract void increment();

}

PairManager 类只有一个 Pair 对象并控制对它的一切访问。注意 getPair() 方法是 synchronized 的。

store() 方法将一个 Pair 对象添加到 synchronizedList 中,所以这个操作是线程安全的。

1
2
3
4
5
6
7
8
9
10
class PairManager extends BasePairManager {

@Override
public synchronized void increment() {
pair.incrementX();
pair.incrementY();
store(getPair());
}

}

注意synchronized 关键字不属于方法特征签名的组成部分,所以可以在覆盖方法的时候加上去。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
class PairManager2 extends BasePairManager {

@Override
public void increment() {
Pair temp;
synchronized (this) {
pair.incrementX();
pair.incrementY();
temp = getPair();
}
store(temp);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class PairManipulator implements Runnable {

private final BasePairManager pairManager;

public PairManipulator(BasePairManager pairManager) {
this.pairManager = pairManager;
}

@Override
public void run() {
while (true) {
pairManager.increment();
}
}

public String toString() {
return "Pair: " + pairManager.getPair() + " checkCounter = " + pairManager.checkCounter.get();
}

}

PairManipulator 被创建用来测试两种不同类型的 PairManager,其方法是在某个任务中调用 increment(),而 PairChecker 则在另一个任务中执行。为了跟踪可以运行测试的频度,PairChecker 在每次成功时都递增 checkCounter

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class PairChecker implements Runnable {

private final BasePairManager pairManager;

public PairChecker(BasePairManager pairManager) {
this.pairManager = pairManager;
}

@Override
public void run() {
while (true) {
pairManager.checkCounter.incrementAndGet();
pairManager.getPair().checkState();
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class CriticalSection {

static void testApproaches(BasePairManager pairManager, BasePairManager pairManager2) {
ExecutorService executorService = Executors.newCachedThreadPool();

PairManipulator pairManipulator = new PairManipulator(pairManager);
PairManipulator pairManipulator2 = new PairManipulator(pairManager2);

PairChecker pairChecker = new PairChecker(pairManager);
PairChecker pairChecker2 = new PairChecker(pairManager2);

executorService.execute(pairManipulator);
executorService.execute(pairManipulator2);

executorService.execute(pairChecker);
executorService.execute(pairChecker2);

try {
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
System.out.println("sleep interrupted");
}
System.out.println("pairManipulator: " + pairManipulator);
System.out.println("pairManipulator2: " + pairManipulator2);
System.exit(0);
}

public static void main(String[] args) {
BasePairManager pairManager = new PairManager();
BasePairManager pairManager2 = new PairManager2();
testApproaches(pairManager, pairManager2);
}

}

对于 PairChecker 的检查频率,采用同步控制块的方法,对象不加锁的时间更长。这也是宁愿使用同步控制块而不是对整个方法进行同步控制的典型原因:使得其他线程在安全的情况下能更多地访问

还可以使用显式的 Lock 对象来创建临界区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class ExplicitPairManager extends BasePairManager {

private final Lock lock = new ReentrantLock();

@Override
public void increment() {
lock.lock();
try {
pair.incrementX();
pair.incrementY();
store(getPair());
} finally {
lock.unlock();
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class ExplicitPairManager2 extends BasePairManager {

private final Lock lock = new ReentrantLock();

@Override
public void increment() {
Pair temp;
lock.lock();
try {
pair.incrementX();
pair.incrementY();
temp = getPair();
} finally {
lock.unlock();
}
store(temp);
}

}
1
2
3
4
5
6
7
8
9
10
class ExplicitCriticalSection {

public static void main(String[] args) {
BasePairManager pairManager = new ExplicitPairManager();
BasePairManager pairManager2 = new ExplicitPairManager2();

CriticalSection.testApproaches(pairManager, pairManager2);
}

}

六、在其他对象上同步

synchronized 块必须给定一个在其上进行同步的对象,并且最合理的方式是,使用其方法正在被调用的当前对象:synchronized(this)。这种方式中,如果获得了 synchronized 块上的锁,那么该对象其他的 synchronized 方法和临界区就不能被调用了。因此,如果在 this 上同步,临界区的效果就会直接缩小在同步的范围内。

有时必须在另一个对象上同步,但如果在另一个对象上同步,就必须保证所有相关的任务都是在同一个对象上同步的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class DualSync implements Runnable {

private final Object syncObject = new Object();

@Override
public void run() {
this.f();
}

public synchronized void f() {
for (int i = 0; i < 5; i++) {
System.out.println("f()");
Thread.yield();
}
}

public void g() {
synchronized (syncObject) {
for (int i = 0; i < 5; i++) {
System.out.println("g()");
Thread.yield();
}
}
}

}
1
2
3
4
5
6
7
8
9
10
class SyncObject {

public static void main(String[] args) {
DualSync dualSync = new DualSync();
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(dualSync);
dualSync.g();
}

}

DualSync.f()this 上同步,而 g() 有一个在 syncObject 上同步的 synchronized 块,因此这两个同步是相互独立的。

七、线程本地存储

防止任务在共享资源上产生冲突的第二种方式是根除对变量的共享。

线程本地存储是一种自动化机制,可以为使用相同变量的每个不同的线程都创建不同的存储。

如果你有 4 个线程都要使用变量 x 表示的对象,那线程本地存储就会生成 4 个用于 x 的不同的存储块。主要是,它们使得你可以将状态与线程关联起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class Accessor implements Runnable {

private final int id;

public Accessor(int id) {
this.id = id;
}

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
ThreadLocalVariableHolder.increment();
System.out.println(this);
Thread.yield();
}
}

@Override
public String toString() {
return "#" + id + ": " + ThreadLocalVariableHolder.get();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class ThreadLocalVariableHolder {

private static final ThreadLocal<Integer> value = new ThreadLocal<>() {
private final Random random = new Random(55);

@Override
protected synchronized Integer initialValue() {
return random.nextInt(10000);
}
};

public static void increment() {
value.set(value.get() + 1);
}

public static int get() {
return value.get();
}

public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++) {
executorService.execute(new Accessor(i));
}
TimeUnit.SECONDS.sleep(3);
executorService.shutdown();
}

}

输出结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#2: 754710
#2: 754711
#2: 754712
#3: 762887
#3: 762888
#3: 762889
#3: 762890
#3: 762891
#2: 754713
#0: 766773
#2: 754714
#4: 770001
#0: 766774
#0: 766775
#0: 766776

ThreadLocal 对象通常当作静态域存储。在创建 ThreadLocal 时,只能通过 get()set() 方法来访问该对象的内容,其中, get() 方法将返回与其线程相关联的对象的副本,而 set() 会将参数插入到为其线程存储的对象中,并返回存储中原有的对象。

相关链接

OB tags

#Java #并发 #多线程 #Thinking-in-Java