一、前言
在高并发访问的场景下,为了保证项目不被大流量请求的压力影响性能导致项目运行崩溃,常用的解决方案就是限流和服务降级。
本篇介绍 Semaphore, 直译就是信号量,是基于 AQS 扩展的一种多线程并发控制的工具,也就是我们常说的限流工具之一。
二、工作原理
Semaphore 通过 permit 来判断线程是否可通行。
Semaphore 需要设置一定数量的 permit,当一个线程执行任务遇到 Semaphore 时会被拦截,该线程需要向 Semaphore 申请一个或多个 permit:
- 如果 permit 数量充足,将 permit 发给该线程然后将其放行。当线程执行完任务后需要将 permit 如数奉还。
- 如果 permit 数量不足,该线程会安排到 CLH 队列中等待挂起,当其他线程归还 permit 后,等待的线程会被唤醒再申请 permit。
前言说到,Semaphore 是基于 AQS 扩展的,线程竞争资源的状态,线程的等待,线程的唤醒都是靠 AQS 实现和维护的,因此我们通过观察其模型图来讲解其工作原理:
图中 permit 的数量值由 state 保存维护,等待的线程则被封装成 Node 节点放在 CLH 队列中挂起等待。
三、源码解析
我们先通过案例了解 Semaphore 基本使用。
- 案例
我们把 Semaphore 当作停车位,Thread 当作找车位的车子:
1 | public class SemaphoreTest { |
执行结果:
1 | 2023-03-06T10:56:24.299 -> t2 号车获取到停车位 |
从结果不难看出,在 5 辆车子(thread)中每次最多只有 2 辆车子能够申请到停车位(permit)停车。
- 源码分析
我们按照例子中的代码执行顺序讲解。
我们先看 (1) 处的代码,即 Semaphore semaphore = new Semaphore(2)
,进入源码:
1 | public class Semaphore implements java.io.Serializable { |
调用 Semaphore 构造方法时,它底层是创建一个名为 Sync 的抽象的静态内部类的实例,Semaphore 所有的操作都是通过 Sync 实例来完成。
Sync 类继承了 AbstractQueuedSynchronizer 类,因此它拥有了 AQS 的能力。此处 Sync 的设计和 ReentrantLock 类相似,Sync 都被声明为抽象类,再通过子类实现具体的方法。Sync 拥有 FairSync 和 NonfairSync 两个子类。
我们案例中,创建的 Semaphore 实例,底层使用的是 NonfairSync 实例,调用构造方法时传入的 permits 值为 2。最终值会被传入到 (4) 处,即 setState(permits)
,该方法来自 AQS, 将 AQS 中的 state 值设置成 2。
此时,AQS 的模型图如下:
创建实例成功后,线程来到 (2) 处,即 semaphore.acquire()
申请 permit,查看源码:
1 | public class Semaphore implements java.io.Serializable { |
此方法底层调用 sync.acquireSharedInterruptibly(1)
,该方法来自 AQS,我们接着点进去:
1 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { |
该方法中:
- 先判断当前线程的中断状态,如果为 true 则抛出 InterruptedException 异常。
- 否则来到 (7) 处,即
tryAcquireShared(arg)
。该方法是一个抽象方法,由子类 NonfairSync 实现,来到 (6) 处,而 (6) 处方法的底层调用父类的方法实现,即 (5) 处代码:
1 | abstract static class Sync extends AbstractQueuedSynchronizer { |
该方法开启一个无限循环来尝试获取 permit :
- 先获取 AQS 的 state 值(permits 可用数量)。
- 通过 permits 可用数量减去要申请的 permit 数量,计算出剩余的 permits 数量。
- 如果 permits 剩余数量
< 0
直接返回该值或者如果 permits 剩余数量>= 0
则通过 CAS 方式重新设置 state 值,最后再返回该值。
此处开启无限循环是因为多个线程在执行 compareAndSetState
操作前可能会发生线程切换,但只会有一个线程能执行成功,切换回来的线程执行 CAS 操作必定会失败,因此需要循环重新计算 permits 可用值,确保 AQS 的 state 值的准确性。
案例中,t1 和 t2 线程率先拿到 permit 处理业务,此时 AQS 模型图如下:
而 t3 线程以及其他线程未能拿到 permit (tryAcquireShared
返回值小于 0),因此它们来到 (8) 处,即 doAcquireSharedInterruptibly(arg)
,该方法来自 AQS:
1 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { |
竞争资源失败的线程进入到这个方法中,案例中 t3 线程执行代码流程如下:
- 先执行
addWaiter(Node.SHARED)
方法,将当前线程封装到 Node 节点(node1)。因为 CLH 的数据为空,当 node1 进到 CLH 前,会为其创建一个虚拟头节点(dummy)。 - 进入到一个无限循环中,判断 node1 的前驱节点是否为头结点,判断成功,进入到 (10) 处
tryAcquireShared(arg)
尝试获取 permit,该方法上文已讲解,不再赘述。 - 显然在 t1 和 t2 线程未释放 permit 之前,其他线程无法获取到 permit,返回值必然小于 0,来到第 (12) 处
shouldParkAfterFailedAcquire(p, node)
。 shouldParkAfterFailedAcquire(p, node)
方法将 node1 节点的前驱节点(dummy)的 waitStatus 状态改为 -1。最后来到第 (13) 处,parkAndCheckInterrupt()
。parkAndCheckInterrupt()
方法底层调用LockSupport.park(this)
,这样 t3 线程就被挂起等待。
此时,AQS 的模型图为:
同样的,其他未获取到 permit 都会被封装成 Node 节点进到 CLH 队列被挂起等待。
我们回到获取到 permit 的线程视角,如 t2 线程业务执行结束后,执行 semaphore.release()
,进到源码中:
1 | public class Semaphore implements java.io.Serializable { |
方法底层调用 Sync 实例的 releaseShared(1)
方法,该方法来自 AQS:
1 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { |
t2 线程来到 (14) 处,执行 tryReleaseShared(arg)
,该方法为抽象方法,由 AQS 的子类 Sync 类实现:
1 | public class Semaphore implements java.io.Serializable { |
该方法用于归还 permit,同样开启一个无限循环:
- 先获取 AQS 的 state 值(permits 剩余数量),当前值为 0。
- 通过 permits 剩余数量加上要归还的 permit 数量,计算出 permits 的新值,当前值为 1。
- 检验数值的合法性
- 通过 CAS 方式将 state 值修改为 permits 新值,返回 true。
此时,AQS 的模型图为:
tryReleaseShared(arg)
方法返回 true,t2 线程进到 (15) 处,即 doReleaseShared()
方法,该方法来自 AQS:
1 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { |
该方法用于修改头结点的 waitStatus 值以及唤醒头结点的后继节点中的线程。 开启一个无限循环:
- 获取 CLH 的头结点
- 判断头结点(dummy)是否为空,同时头结点是否与尾节点相同。由 AQS 模型图可知,(16) 处的判断是成立的,随后 t2 线程进到 if 方法体中。
- 判断头结点(dummy)的 waitStatus 状态,当前状态值为 -1,(17) 处判断成立,将头结点的 waitStatus 通过 CAS 方式还原为 0。
- 修改成功后执行 (18) 处代码,即
unparkSuccessor(h)
,该方法用于查询头结点的后继节点,并通过LockSupport.unpark(thread)
唤醒节点中的线程(t3 线程)。由于该方法在 《AQS 源码详解》 已讲解,此处不多赘述。 - t2 线程最后来到 (19) 处,判断成立退出无限循环。
这样 t2 线程释放锁完毕,结束线程,我们转到被唤醒的 t3 线程视角:
1 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { |
t3 线程在执行 (13) 处代码被挂起等待的,当被唤醒后,又进入下一个循环:
- 获取当前节点(node1)的前驱节点(dummy),判断是否为头结点,从 AQS 模型图可知,判断成立。
- 执行
tryAcquireShared(arg)
尝试获取 permit,由于 AQS 的 state 值此时为 1,t3 线程能够成功获取到 permit,方法返回 0。 - 进行
if (r >= 0)
判断也成立,执行 (11) 处方法。
1 | public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { |
该方法将 t3 线程的 node1 节点设置为头结点,同时将节点中的 prev 和 thread 设置为 null。之后做了一系列判断执行 doReleaseShared()
方法,此方法就是 (15) 处的方法,具体流程此处不多赘述(在执行 doReleaseShared()
期间,node1 节点的下一个节点 node2 的线程被唤醒过,于是申请 permit 失败又被挂起等待)。
t3 线程执行完 (11) 处代码,再执行 p.next = null
后,获取 permit 的工作基本完成。
此时,AQS 的流程图如下:
剩余的被挂起等待的 t4 和 t5 线程后续工作同上。