Java并发编程之8——AQS如何实现Semaphore

Semaphore的实现和ReentrantLock的实现很相似,都有公平和非公平两种实现。本文的分析可以对比 LOCK源码分析 和 CountDownLatch源码分析 对比看。这样可以看出AQS的几条关键执行路径。

非公平信号量的实现

构造函数

信号量的默认实现也是采用非公平锁,构造函数如下:

1
2
3
public Semaphore(int permits) {
sync = new NonfairSync(permits); //构造函数指定许可的数量
}

1
2
3
NonfairSync(int permits) {
super(permits); //调用AQS直接设置许可的数量
}

许可的获取

获取许可直接调用的是AQS的acquire方法

1
2
3
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

这里的实现过程和Lock完全一样,先由子类判断是否可以获取许可,如果可以获取成功,就将同步状态减一,然后继续执行,如果不能获取到锁,就将当前线程挂起

1
2
3
4
5
6
7
8
9
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
tryAcquireShared方法都是由具体的子类去实现
if (tryAcquireShared(arg) < 0)
如果剩余的信号量小于等于0(表示没有信号量)就以当前线程为为节点构造等待队列
doAcquireSharedInterruptibly(arg);
}

分析:Sync类如何判断是否可以获取到许可。

1
2
3
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}

1
2
3
4
5
6
7
8
9
10
11
12
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
计算剩余的信号量的个数
int remaining = available - acquires;
返回的是剩余信号量的个数
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

温习: 获取信号量失败的时候,AQS如何构造等待线程列表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //以当前线程构造节点添加到等待队列
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) { //判断当前节点的前驱节点是否是头结点,如果是头结点就,再次尝试获取信号量,如果获取成功就返回
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
//如果前驱节点不是头结点,就判断当前线程是否应该挂起,如果应该挂起就将当前线程挂起。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

许可的释放

1
2
3
public void release() {
sync.releaseShared(1);
}

sync类的releaseShared调用的是AQS的方法,他的实现过程是:先由子类去释放信号量,如果是否成功,就激活等待信号量队列的第一个线程。

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

子类释放许可的过程:就是简单的CAS将许可的数量加1

1
2
3
4
5
6
7
8
9
10
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

在子类释放许可成功之后,AQS释放等待队列的第一个等待获取信号量的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
unparkSuccessor(h);
}
else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)){
continue;
}
}
if (h == head)
break;
}
}

公平信号量的实现

公平信号量的实现和非公平的信号量的实现基本一样。可以参考我的上一篇文章: LOCK源码分析