Java并发编程之9——如何理解Condtion

在理解Condition之前首先要明白两个概念:

  • 等待队列:等待获取锁的线程构成的队列,称为等待队列
  • 条件队列:已经获取到锁的线程由于某种原因释放了锁,等待signal信号,而构成的队列称为条件队列

概述

Condition和Lock总是分不开的。在前面的文章 AQS源码分析 中对这两者的实现原理都有一定的分析。并且对 Lock源码 也进行了分析。本文将展开讨论一下Condition的实现原理和具体用途。


说明:
- 如下的例子简化了异常的处理过程,因为处理异常的地方如果做的很完善会影响读者理解程序的运行逻辑。如果在正常的开发过程,还是建议完整地处理异常
- 程序的整体构建逻辑是:任务1获取到锁,然后调用await方法,释放锁,让任务2去获取锁,当任务2获取锁,并执行完代码调用signal方法的时候,任务1会被唤醒并尝试获取锁。这时候只有当任务2执行完所有的任务并释放锁之后,任务1才可以获取到锁,继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package com.qunar.dzs.datahub.common.conditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* Created by guochenglai on 7/1/16.
*/
public class ConditionTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ConditionTest.class);
public static void main(String[] args) throws Exception{
final Lock lock = new ReentrantLock();
final Condition condition = lock.newCondition();
Runnable task1 = () -> {
lock.lock(); //首先线程1获取到了锁
try {
System.out.println(Thread.currentThread().getName() + " i already get a lock and do something");
TimeUnit.SECONDS.sleep(2); //获取到了锁之后,做一些独占的任务
System.out.println(Thread.currentThread().getName() + " for some reason i give up this lock ");
condition.await(); //由于某种原因线程放弃了锁
//是否锁之后当前线程会 在这里 陷入休眠
//当接收到signal信号之后,线程会被唤醒,然后重新去获取到锁,如果获取成功 会从这里继续执行
System.out.println(Thread.currentThread().getName() + " i catch a signal to get the lock again ");
TimeUnit.SECONDS.sleep(1);
//执行完成之后记得释放锁
lock.unlock();
} catch (Exception e) {
lock.unlock(); //发生异常了就释放锁
LOGGER.error("task one cause exception ", e);
}
};
Runnable task2 = () -> {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + " task one give up the lock i catch it "); //任务1释放了锁之后,这里线程2才可以获取到锁
TimeUnit.SECONDS.sleep(1); //任务2 做一些独占的任务
/**
* 任务2 做完任务之后告知等待当前条件的其他线程 可以去获取锁了,
* 这时候任务1 会被唤醒,然后尝试获取锁,
*
* 这里需要注意的是任务1 在这里只是有机会去尝试获取锁,现在的
* 锁还是被任务2 所占用,只有当任务2执行完成之后,释放了锁,任务1
* 才可以真正的去获取锁
*/
System.out.println(Thread.currentThread().getName() + " notify task one to get lock and continue to do his work ...");
condition.signal();
TimeUnit.SECONDS.sleep(1); //任务2 做一些其他的任务
System.out.println(Thread.currentThread().getName() + " do some other work and release lock ");
lock.unlock();
} catch (Exception e) {
lock.unlock();
LOGGER.error("task two cause exception ", e);
}
};
new Thread(task1).start(); //这里让任务1先执行,因为设计的就是它先获取到锁,然后释放锁,最后被唤醒继续获取锁的过程
TimeUnit.SECONDS.sleep(1);
new Thread(task2).start();
}
}

输出结果如下:

1
2
3
4
5
6
Thread-0 i already get a lock and do something
Thread-0 for some reason i give up this lock
Thread-1 task one give up the lock i catch it
Thread-1 notify task one to get lock and continue to do his work ...
Thread-1 do some other work and release lock
Thread-0 i catch a signal to get the lock again

通过输出可以看到整个的处理流程如下:

  • 任务1获取到锁,执行独占的任务,同时任务2在等待获取锁
  • 任务1由于某种原因,自己主动释放锁,并将自己陷入休眠
  • 任务2发现锁可以获取了,就会去获取锁,执行自己的独占任务
  • 任务2执行完自己的任务之后调用signal通知,等待队列的任务1去尝试获取锁。并且自己释放自己占有的锁
  • 任务1,发现锁可以获取之后,会去获取锁并继续执行自己的任务。

阅读上面的流程读者可能如下的疑问:
1 任务一调用await之后,任务2是怎么知道锁是可以获取了,并去获取锁的。
2 任务二调用signal之后,任务1又是怎么知道锁是可以获取的,并去获取锁的。
其实,这两个问题才是Condition的精髓。这其中涉及到条件队列和等待队列的相互转移。下面将结合源码详细分析。

实现分析

锁的获取和等待队列。

线程1获取到锁之后,线程2再次尝试获取锁的时候,就会将自己挂起构成等待队列,这个过程在前面的文章已经分析过了。这里将从lock.lock()方法开始,温习一下整个流程。

lock.lock()

lock.lock()方法,默认调用的非公平实现的lock方法。首先进行CAS如果成功,就设置锁的拥有者。否则调用AQS的acquire方法。

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

AQS.acquire

AQS的acquire方法会再次调用lock.tryAcquire()方法尝试获取到锁,如果获取失败,就调用自己的addWaiter()和acquireQueued()方法,构造等待队列

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

lock.tryAcquire()

当AQS的同步状态为0的时候,并且没有等待队列,并且CAS更新状态成功,就获取到锁,并设置锁的所有者。返回true
当AQS的同步状态不为0的时候,判断是否是重入锁,如果是就将同步状态加1。并返回true
其他情况都是获取锁失败,返回false

protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

AQS.acquireQueued

addWaiter方法就是将当前线程构造成node节点并添加到等待队列的末尾,前面已经具体分析过实现,acquireQueued方法接受添加到队列的节点,在进行一次挣扎,如果失败,就将当前线程挂起

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

至此,任务1在继续执行任务,任务2被挂起了,并构造了一个等待队列。

await挂起当前线程锁构成条件队列,激活等待队列等待线程

当任务1由于某种原因调用await方法释放自己的锁的时候。会将自己休眠加入到条件队列,并激活等待队列的线程。实现过程如下:

AQS.ConditionObject.await

await方法,首先调用 AQS.ConditionObject. addConditionWaiter()方法构造一个条件队列。然后调用AQS.ConditionObject.fullyRelease()方法去激活等待队列上的等待线程(其实就是调用AQS.release方法),然后将当前线程挂起。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); //调用await方法正常情况,当前线程会在这个地方被挂起,激活之后继续从这里执行。
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

AQS.release激活等待队列的线程去获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}

signal激活条件队列的线程

1
2
3
4
5
6
7
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

根据传入的节点,找到第一个未被取消的节点

1
2
3
4
5
6
7
8
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}

将等待这个condition的节点由条件队列转移到等待队列,然后当当前线程释放锁之后,在等待队列的节点就可以继续去尝试获取到锁了。

1
2
3
4
5
6
7
8
9
10
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

整个处理流程,写了好久!!!!