一、前言

在高并发访问的场景下,为了保证项目不被大流量请求的压力影响性能导致项目运行崩溃,常用的解决方案就是限流服务降级

本篇介绍 Semaphore, 直译就是信号量,是基于 AQS 扩展的一种多线程并发控制的工具,也就是我们常说的限流工具之一。

二、工作原理

Semaphore 通过 permit 来判断线程是否可通行。

Semaphore 需要设置一定数量的 permit,当一个线程执行任务遇到 Semaphore 时会被拦截,该线程需要向 Semaphore 申请一个或多个 permit

  1. 如果 permit 数量充足,将 permit 发给该线程然后将其放行。当线程执行完任务后需要将 permit 如数奉还。
  2. 如果 permit 数量不足,该线程会安排到 CLH 队列中等待挂起,当其他线程归还 permit 后,等待的线程会被唤醒再申请 permit

前言说到,Semaphore 是基于 AQS 扩展的,线程竞争资源的状态,线程的等待,线程的唤醒都是靠 AQS 实现和维护的,因此我们通过观察其模型图来讲解其工作原理:

图中 permit 的数量值由 state 保存维护,等待的线程则被封装成 Node 节点放在 CLH 队列中挂起等待。

三、源码解析

我们先通过案例了解 Semaphore 基本使用。

  • 案例

我们把 Semaphore 当作停车位,Thread 当作找车位的车子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class SemaphoreTest {

public static void main(String[] args) {
// (1)
Semaphore semaphore = new Semaphore(2);
for (int i = 1; i <= 5; i++) {
Thread thread = new Thread(() -> {
try {
// (2)
semaphore.acquire();
System.out.println(LocalDateTime.now() + " -> " + Thread.currentThread().getName() + " 号车获取到停车位");
int randomSec = (int) (Math.random() * 3) + 1;
TimeUnit.SECONDS.sleep(randomSec);
System.out.println(LocalDateTime.now() + " -> " + Thread.currentThread().getName() + " 号车停车 " + randomSec + " 秒,现在离开");
// (3)
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t" + i);
thread.start();
}
}
}

执行结果:

1
2
3
4
5
6
7
8
9
10
2023-03-06T10:56:24.299 -> t2 号车获取到停车位
2023-03-06T10:56:24.299 -> t1 号车获取到停车位
2023-03-06T10:56:25.300 -> t2 号车停车 1 秒,现在离开
2023-03-06T10:56:25.300 -> t3 号车获取到停车位
2023-03-06T10:56:26.301 -> t3 号车停车 1 秒,现在离开
2023-03-06T10:56:26.301 -> t4 号车获取到停车位
2023-03-06T10:56:27.301 -> t1 号车停车 3 秒,现在离开
2023-03-06T10:56:27.301 -> t5 号车获取到停车位
2023-03-06T10:56:29.302 -> t4 号车停车 3 秒,现在离开
2023-03-06T10:56:30.301 -> t5 号车停车 3 秒,现在离开

从结果不难看出,在 5 辆车子(thread)中每次最多只有 2 辆车子能够申请到停车位(permit)停车。

  • 源码分析

我们按照例子中的代码执行顺序讲解。

我们先看 (1) 处的代码,即 Semaphore semaphore = new Semaphore(2),进入源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class Semaphore implements java.io.Serializable {

private final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {

Sync(int permits) {
// (4)
setState(permits);
}

// (5)
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

// ... 省略 ...
}

static final class NonfairSync extends Sync {

NonfairSync(int permits) {
super(permits);
}

// (6)
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

public Semaphore(int permits) {
sync = new NonfairSync(permits);
}

// ... 省略 ...
}

调用 Semaphore 构造方法时,它底层是创建一个名为 Sync 的抽象的静态内部类的实例,Semaphore 所有的操作都是通过 Sync 实例来完成。

Sync 类继承了 AbstractQueuedSynchronizer 类,因此它拥有了 AQS 的能力。此处 Sync 的设计和 ReentrantLock 类相似,Sync 都被声明为抽象类,再通过子类实现具体的方法。Sync 拥有 FairSyncNonfairSync 两个子类。

我们案例中,创建的 Semaphore 实例,底层使用的是 NonfairSync 实例,调用构造方法时传入的 permits 值为 2。最终值会被传入到 (4) 处,即 setState(permits),该方法来自 AQS, 将 AQS 中的 state 值设置成 2。

此时,AQS 的模型图如下:

创建实例成功后,线程来到 (2) 处,即 semaphore.acquire() 申请 permit,查看源码:

1
2
3
4
5
6
7
8
public class Semaphore implements java.io.Serializable {

// ... 省略 ...

public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
}

此方法底层调用 sync.acquireSharedInterruptibly(1),该方法来自 AQS,我们接着点进去:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

// ... 省略 ...

public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// (7)
if (tryAcquireShared(arg) < 0)
// (8)
doAcquireSharedInterruptibly(arg);
}
}

该方法中:

  1. 先判断当前线程的中断状态,如果为 true 则抛出 InterruptedException 异常。
  2. 否则来到 (7) 处,即 tryAcquireShared(arg)。该方法是一个抽象方法,由子类 NonfairSync 实现,来到 (6) 处,而 (6) 处方法的底层调用父类的方法实现,即 (5) 处代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
abstract static class Sync extends AbstractQueuedSynchronizer {
// ... 省略 ...

// (5)
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

该方法开启一个无限循环来尝试获取 permit

  1. 先获取 AQSstate 值(permits 可用数量)。
  2. 通过 permits 可用数量减去要申请的 permit 数量,计算出剩余的 permits 数量。
  3. 如果 permits 剩余数量 < 0 直接返回该值或者如果 permits 剩余数量 >= 0 则通过 CAS 方式重新设置 state 值,最后再返回该值。

此处开启无限循环是因为多个线程在执行 compareAndSetState 操作前可能会发生线程切换,但只会有一个线程能执行成功,切换回来的线程执行 CAS 操作必定会失败,因此需要循环重新计算 permits 可用值,确保 AQSstate 值的准确性。

案例中,t1 和 t2 线程率先拿到 permit 处理业务,此时 AQS 模型图如下:

而 t3 线程以及其他线程未能拿到 permittryAcquireShared 返回值小于 0),因此它们来到 (8) 处,即 doAcquireSharedInterruptibly(arg),该方法来自 AQS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

// ... 省略 ...

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// (9)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// (10)
int r = tryAcquireShared(arg);
if (r >= 0) {
// (11)
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// (12): shouldParkAfterFailedAcquire
// (13): parkAndCheckInterrupt
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}

竞争资源失败的线程进入到这个方法中,案例中 t3 线程执行代码流程如下:

  1. 先执行 addWaiter(Node.SHARED) 方法,将当前线程封装到 Node 节点(node1)。因为 CLH 的数据为空,当 node1 进到 CLH 前,会为其创建一个虚拟头节点(dummy)。
  2. 进入到一个无限循环中,判断 node1 的前驱节点是否为头结点,判断成功,进入到 (10) 处 tryAcquireShared(arg) 尝试获取 permit,该方法上文已讲解,不再赘述。
  3. 显然在 t1 和 t2 线程未释放 permit 之前,其他线程无法获取到 permit,返回值必然小于 0,来到第 (12) 处 shouldParkAfterFailedAcquire(p, node)
  4. shouldParkAfterFailedAcquire(p, node) 方法将 node1 节点的前驱节点(dummy)的 waitStatus 状态改为 -1。最后来到第 (13) 处,parkAndCheckInterrupt()
  5. parkAndCheckInterrupt() 方法底层调用 LockSupport.park(this) ,这样 t3 线程就被挂起等待。

此时,AQS 的模型图为:

同样的,其他未获取到 permit 都会被封装成 Node 节点进到 CLH 队列被挂起等待。

我们回到获取到 permit 的线程视角,如 t2 线程业务执行结束后,执行 semaphore.release(),进到源码中:

1
2
3
4
5
6
7
8
9
public class Semaphore implements java.io.Serializable {

// ... 省略 ...

public void release() {
sync.releaseShared(1);
}

}

方法底层调用 Sync 实例的 releaseShared(1) 方法,该方法来自 AQS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

// ... 省略 ...

public final boolean releaseShared(int arg) {
// (14)
if (tryReleaseShared(arg)) {
// (15)
doReleaseShared();
return true;
}
return false;
}
}

t2 线程来到 (14) 处,执行 tryReleaseShared(arg),该方法为抽象方法,由 AQS 的子类 Sync 类实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class Semaphore implements java.io.Serializable {

// ... 省略 ...

abstract static class Sync extends AbstractQueuedSynchronizer {

// ... 省略 ...

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

}

}

该方法用于归还 permit,同样开启一个无限循环:

  1. 先获取 AQSstate 值(permits 剩余数量),当前值为 0。
  2. 通过 permits 剩余数量加上要归还的 permit 数量,计算出 permits 的新值,当前值为 1。
  3. 检验数值的合法性
  4. 通过 CAS 方式将 state 值修改为 permits 新值,返回 true。

此时,AQS 的模型图为:

tryReleaseShared(arg) 方法返回 true,t2 线程进到 (15) 处,即 doReleaseShared() 方法,该方法来自 AQS

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

// ... 省略 ...

private void doReleaseShared() {
for (;;) {
Node h = head;
// (16)
if (h != null && h != tail) {
int ws = h.waitStatus;
// (17)
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue;
// (18)
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue;
}
// (19)
if (h == head)
break;
}
}
}

该方法用于修改头结点的 waitStatus 值以及唤醒头结点的后继节点中的线程。 开启一个无限循环:

  1. 获取 CLH 的头结点
  2. 判断头结点(dummy)是否为空,同时头结点是否与尾节点相同。由 AQS 模型图可知,(16) 处的判断是成立的,随后 t2 线程进到 if 方法体中。
  3. 判断头结点(dummy)的 waitStatus 状态,当前状态值为 -1,(17) 处判断成立,将头结点的 waitStatus 通过 CAS 方式还原为 0。
  4. 修改成功后执行 (18) 处代码,即 unparkSuccessor(h),该方法用于查询头结点的后继节点,并通过 LockSupport.unpark(thread) 唤醒节点中的线程(t3 线程)。由于该方法在 《AQS 源码详解》 已讲解,此处不多赘述。
  5. t2 线程最后来到 (19) 处,判断成立退出无限循环。

这样 t2 线程释放锁完毕,结束线程,我们转到被唤醒的 t3 线程视角:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

// ... 省略 ...

private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// (9)
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
// (10)
int r = tryAcquireShared(arg);
if (r >= 0) {
// (11)
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// (12): shouldParkAfterFailedAcquire
// (13): parkAndCheckInterrupt
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
}

t3 线程在执行 (13) 处代码被挂起等待的,当被唤醒后,又进入下一个循环:

  1. 获取当前节点(node1)的前驱节点(dummy),判断是否为头结点,从 AQS 模型图可知,判断成立。
  2. 执行 tryAcquireShared(arg) 尝试获取 permit,由于 AQSstate 值此时为 1,t3 线程能够成功获取到 permit,方法返回 0。
  3. 进行 if (r >= 0) 判断也成立,执行 (11) 处方法。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {

// ... 省略 ...

private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;
setHead(node);

if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
}

该方法将 t3 线程的 node1 节点设置为头结点,同时将节点中的 prev 和 thread 设置为 null。之后做了一系列判断执行 doReleaseShared() 方法,此方法就是 (15) 处的方法,具体流程此处不多赘述(在执行 doReleaseShared() 期间,node1 节点的下一个节点 node2 的线程被唤醒过,于是申请 permit 失败又被挂起等待)。

t3 线程执行完 (11) 处代码,再执行 p.next = null后,获取 permit 的工作基本完成。

此时,AQS 的流程图如下:

剩余的被挂起等待的 t4 和 t5 线程后续工作同上。

四、参考资料

CAS 原理新讲

LockSupport 工具介绍