yeskery

Java 并发包基石 - AQS 详解

Java 并发包(JUC)中提供了很多并发工具,这其中,很多我们耳熟能详的并发工具,譬如 ReentrangLock、Semaphore,它们的实现都用到了一个共同的基类—AbstractQueuedSynchronizer,简称 AQS。AQS 是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLock,Semaphore,其他的诸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等皆是基于 AQS 的。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。

本章我们就一起探究下这个神奇的东东,并对其实现原理进行剖析理解。

基本实现原理

AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。

  1. private volatile int state;//共享变量,使用volatile修饰保证线程可见性

状态信息通过 procted 类型的 getStatesetStatecompareAndSetState 进行操作

AQS 支持两种同步方式:

  1. 独占式
  2. 共享式

方便使用者实现不同类型的同步组件,独占式如 ReentrantLock,共享式如 Semaphore,CountDownLatch,组合式的如 ReentrantReadWriteLock。总之,AQS 为使用提供了底层支撑,如何组装实现,使用者可以自由发挥。

同步器的设计是基于模板方法模式的,一般的使用方式是这样:

  1. 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)

  2. 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法

这其实是模板方法模式的一个很经典的应用。

我们来看看 AQS 定义的这些可重写的方法:

protected boolean tryAcquire(int arg) : 独占式获取同步状态,试着获取,成功返回true,反之为false

protected boolean tryRelease(int arg) :独占式释放同步状态,等待中的其他线程此时将有机会获取到同步状态;

protected int tryAcquireShared(int arg) :共享式获取同步状态,返回值大于等于0,代表获取成功;反之获取失败;

protected boolean tryReleaseShared(int arg) :共享式释放同步状态,成功为true,失败为false

protected boolean isHeldExclusively() : 是否在独占模式下被线程占用。

关于 AQS 的使用,我们来简单总结一下:

如何使用

首先,我们需要去继承 AbstractQueuedSynchronizer 这个类,然后我们根据我们的需求去重写相应的方法,比如要实现一个独占锁,那就去重写 tryAcquire,tryRelease 方法,要实现共享锁,就去重写 tryAcquireShared,tryReleaseShared;最后,在我们的组件中调用AQS中的模板方法就可以了,而这些模板方法是会调用到我们之前重写的那些方法的。也就是说,我们只需要很小的工作量就可以实现自己的同步组件,重写的那些方法,仅仅是一些简单的对于共享资源 state 的获取和释放操作,至于像是获取资源失败,线程需要阻塞之类的操作,自然是 AQS 帮我们完成了。

设计思想

对于使用者来讲,我们无需关心获取资源失败,线程排队,线程阻塞/唤醒等一系列复杂的实现,这些都在 AQS 中为我们处理好了。我们只需要负责好自己的那个环节就好,也就是获取/释放共享资源 state 的姿势T_T。很经典的模板方法设计模式的应用,AQS 为我们定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现即可。

自定义同步器

同步器代码实现

上面大概讲了一些关于 AQS 如何使用的理论性的东西,接下来,我们就来看下实际如何使用,直接采用 JDK 官方文档中的小例子来说明问题

  1. package juc;
  2. import java.util.concurrent.locks.AbstractQueuedSynchronizer;
  3. /**
  4. * Created by chengxiao on 2017/3/28.
  5. */
  6. public class Mutex implements java.io.Serializable {
  7. //静态内部类,继承AQS
  8. private static class Sync extends AbstractQueuedSynchronizer {
  9. //是否处于占用状态
  10. protected boolean isHeldExclusively() {
  11. return getState() == 1;
  12. }
  13. //当状态为0的时候获取锁,CAS操作成功,则state状态为1,
  14. public boolean tryAcquire(int acquires) {
  15. if (compareAndSetState(0, 1)) {
  16. setExclusiveOwnerThread(Thread.currentThread());
  17. return true;
  18. }
  19. return false;
  20. }
  21. //释放锁,将同步状态置为0
  22. protected boolean tryRelease(int releases) {
  23. if (getState() == 0) throw new IllegalMonitorStateException();
  24. setExclusiveOwnerThread(null);
  25. setState(0);
  26. return true;
  27. }
  28. }
  29. //同步对象完成一系列复杂的操作,我们仅需指向它即可
  30. private final Sync sync = new Sync();
  31. //加锁操作,代理到acquire(模板方法)上就行,acquire会调用我们重写的tryAcquire方法
  32. public void lock() {
  33. sync.acquire(1);
  34. }
  35. public boolean tryLock() {
  36. return sync.tryAcquire(1);
  37. }
  38. //释放锁,代理到release(模板方法)上就行,release会调用我们重写的tryRelease方法。
  39. public void unlock() {
  40. sync.release(1);
  41. }
  42. public boolean isLocked() {
  43. return sync.isHeldExclusively();
  44. }
  45. }

同步器代码测试

测试下这个自定义的同步器,我们使用之前文章中做过的并发环境下 a++ 的例子来说明问题(a++ 的原子性其实最好使用原子类 AtomicInteger 来解决,此处用 Mutex 有点大炮打蚊子的意味,好在能说明问题就好)

  1. package juc;
  2. import java.util.concurrent.CyclicBarrier;
  3. /**
  4. * Created by chengxiao on 2017/7/16.
  5. */
  6. public class TestMutex {
  7. private static CyclicBarrier barrier = new CyclicBarrier(31);
  8. private static int a = 0;
  9. private static Mutex mutex = new Mutex();
  10. public static void main(String []args) throws Exception {
  11. //说明:我们启用30个线程,每个线程对i自加10000次,同步正常的话,最终结果应为300000;
  12. //未加锁前
  13. for(int i=0;i<30;i++){
  14. Thread t = new Thread(new Runnable() {
  15. @Override
  16. public void run() {
  17. for(int i=0;i<10000;i++){
  18. increment1();//没有同步措施的a++;
  19. }
  20. try {
  21. barrier.await();//等30个线程累加完毕
  22. } catch (Exception e) {
  23. e.printStackTrace();
  24. }
  25. }
  26. });
  27. t.start();
  28. }
  29. barrier.await();
  30. System.out.println("加锁前,a="+a);
  31. //加锁后
  32. barrier.reset();//重置CyclicBarrier
  33. a=0;
  34. for(int i=0;i<30;i++){
  35. new Thread(new Runnable() {
  36. @Override
  37. public void run() {
  38. for(int i=0;i<10000;i++){
  39. increment2();//a++采用Mutex进行同步处理
  40. }
  41. try {
  42. barrier.await();//等30个线程累加完毕
  43. } catch (Exception e) {
  44. e.printStackTrace();
  45. }
  46. }
  47. }).start();
  48. }
  49. barrier.await();
  50. System.out.println("加锁后,a="+a);
  51. }
  52. /**
  53. * 没有同步措施的a++
  54. * @return
  55. */
  56. public static void increment1(){
  57. a++;
  58. }
  59. /**
  60. * 使用自定义的Mutex进行同步处理的a++
  61. */
  62. public static void increment2(){
  63. mutex.lock();
  64. a++;
  65. mutex.unlock();
  66. }
  67. }

测试结果:

  1. 加锁前,a=279204
  2. 加锁后,a=300000

源码分析

我们先来简单描述下 AQS 的基本实现,前面我们提到过,AQS 维护一个共享资源 state,通过内置的 FIFO 来完成获取资源线程的排队工作。(这个内置的同步队列称为”CLH”队列)。该队列由一个一个的 Node 结点组成,每个 Node 结点维护一个 prev 引用和 next 引用,分别指向自己的前驱和后继结点。AQS 维护两个指针,分别指向队列头部 head 和尾部 tail。

aqs_clh

其实就是个双端双向链表

当线程获取资源失败(比如 tryAcquire 时试图设置 state 状态失败),会被构造成一个结点加入 CLH 队列中,同时当前线程会被阻塞在队列中(通过 LockSupport.park 实现,其实是等待态)。当持有同步状态的线程释放同步状态时,会唤醒后继结点,然后此结点线程继续加入到对同步状态的争夺中。

Node结点

Node 结点是 AbstractQueuedSynchronizer 中的一个静态内部类,我们捡 Node 的几个重要属性来说一下

  1. static final class Node {
  2. /** waitStatus值,表示线程已被取消(等待超时或者被中断)*/
  3. static final int CANCELLED = 1;
  4. /** waitStatus值,表示后继线程需要被唤醒(unpaking)*/
  5. static final int SIGNAL = -1;
  6. /**waitStatus值,表示结点线程等待在condition上,当被signal后,会从等待队列转移到同步到队列中 */
  7. /** waitStatus value to indicate thread is waiting on condition */
  8. static final int CONDITION = -2;
  9. /** waitStatus值,表示下一次共享式同步状态会被无条件地传播下去
  10. static final int PROPAGATE = -3;
  11. /** 等待状态,初始为0 */
  12. volatile int waitStatus;
  13. /**当前结点的前驱结点 */
  14. volatile Node prev;
  15. /** 当前结点的后继结点 */
  16. volatile Node next;
  17. /** 与当前结点关联的排队中的线程 */
  18. volatile Thread thread;
  19. /** ...... */
  20. }

独占式

获取同步状态—acquire()

来看看 acquire 方法,lock 方法一般会直接代理到 acquire 上

  1. public final void acquire(int arg) {
  2. if (!tryAcquire(arg) &&
  3. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. selfInterrupt();
  5. }

我们来简单理一下代码逻辑:

  • 首先,调用使用者重写的 tryAcquire 方法,若返回 true,意味着获取同步状态成功,后面的逻辑不再执行;若返回 false,也就是获取同步状态失败,进入b步骤

  • 此时,获取同步状态失败,构造独占式同步结点,通过 addWatiter 将此结点添加到同步队列的尾部(此时可能会有多个线程结点试图加入同步队列尾部,需要以线程安全的方式添加)

  • 该结点以在队列中尝试获取同步状态,若获取不到,则阻塞结点线程,直到被前驱结点唤醒或者被中断

addWaiter

  1. private Node addWaiter(Node mode) {
  2. Node node = new Node(Thread.currentThread(), mode);//构造结点
  3. //指向尾结点tail
  4. Node pred = tail;
  5. //如果尾结点不为空,CAS快速尝试在尾部添加,若CAS设置成功,返回;否则,eng。
  6. if (pred != null) {
  7. node.prev = pred;
  8. if (compareAndSetTail(pred, node)) {
  9. pred.next = node;
  10. return node;
  11. }
  12. }
  13. enq(node);
  14. return node;
  15. }

先cas快速设置,若失败,进入enq方法,将结点添加到同步队列尾部这个操作,同时可能会有多个线程尝试添加到尾部,是非线程安全的操作。

以上代码可以看出,使用了 compareAndSetTail 这个 cas 操作保证安全添加尾结点。

enq 方法

  1. private Node enq(final Node node) {
  2. for (;;) {
  3. Node t = tail;
  4. if (t == null) { //如果队列为空,创建结点,同时被head和tail引用
  5. if (compareAndSetHead(new Node()))
  6. tail = head;
  7. } else {
  8. node.prev = t;
  9. if (compareAndSetTail(t, node)) {//cas设置尾结点,不成功就一直重试
  10. t.next = node;
  11. return t;
  12. }
  13. }
  14. }
  15. }

enq 内部是个死循环,通过 CAS 设置尾结点,不成功就一直重试。很经典的CAS自旋的用法,我们在之前关于原子类的源码分析中也提到过。这是一种乐观的并发策略

最后,看下 acquireQueued 方法。

acquireQueued

  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {//死循环
  6. final Node p = node.predecessor();//找到当前结点的前驱结点
  7. if (p == head && tryAcquire(arg)) {//如果前驱结点是头结点,才tryAcquire,其他结点是没有机会tryAcquire的。
  8. setHead(node);//获取同步状态成功,将当前结点设置为头结点。
  9. p.next = null; // 方便GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. // 如果没有获取到同步状态,通过shouldParkAfterFailedAcquire判断是否应该阻塞,parkAndCheckInterrupt用来阻塞线程
  14. if (shouldParkAfterFailedAcquire(p, node) &&
  15. parkAndCheckInterrupt())
  16. interrupted = true;
  17. }
  18. } finally {
  19. if (failed)
  20. cancelAcquire(node);
  21. }
  22. }

acquireQueued 内部也是一个死循环,只有前驱结点是头结点的结点,也就是老二结点,才有机会去 tryAcquire;若 tryAcquire 成功,表示获取同步状态成功,将此结点设置为头结点;若是非老二结点,或者 tryAcquire 失败,则进入 shouldParkAfterFailedAcquire 去判断判断当前线程是否应该阻塞,若可以,调用 parkAndCheckInterrupt 阻塞当前线程,直到被中断或者被前驱结点唤醒。若还不能休息,继续循环。

shouldParkAfterFailedAcquire

shouldParkAfterFailedAcquire 用来判断当前结点线程是否能休息

  1. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  2. //获取前驱结点的wait值
  3. int ws = pred.waitStatus;
  4. if (ws == Node.SIGNAL)//若前驱结点的状态是SIGNAL,意味着当前结点可以被安全地park
  5. return true;
  6. if (ws > 0) {
  7. // ws>0,只有CANCEL状态ws才大于0。若前驱结点处于CANCEL状态,也就是此结点线程已经无效,从后往前遍历,找到一个非CANCEL状态的结点,将自己设置为它的后继结点
  8. do {
  9. node.prev = pred = pred.prev;
  10. } while (pred.waitStatus > 0);
  11. pred.next = node;
  12. } else {
  13. // 若前驱结点为其他状态,将其设置为SIGNAL状态
  14. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  15. }
  16. return false;
  17. }

若shouldParkAfterFailedAcquire 返回 true,也就是当前结点的前驱结点为 SIGNAL 状态,则意味着当前结点可以放心休息,进入 parking 状态了。parkAncCheckInterrupt 阻塞线程并处理中断。

  1. private final boolean parkAndCheckInterrupt() {
  2. LockSupport.park(this);//使用LockSupport使线程进入阻塞状态
  3. return Thread.interrupted();// 线程是否被中断过
  4. }

至此,关于 acquire 的方法源码已经分析完毕,我们来简单总结下:

  • 首先tryAcquire获取同步状态,成功则直接返回;否则,进入下一环节
  • 线程获取同步状态失败,就构造一个结点,加入同步队列中,这个过程要保证线程安全
  • 加入队列中的结点线程进入自旋状态,若是老二结点(即前驱结点为头结点),才有机会尝试去获取同步状态;否则,当其前驱结点的状态为SIGNAL,线程便可安心休息,进入阻塞状态,直到被中断或者被前驱结点唤醒

释放同步状态—release()

当前线程执行完自己的逻辑之后,需要释放同步状态,来看看 release 方法的逻辑

  1. public final boolean release(int arg) {
  2. if (tryRelease(arg)) {//调用使用者重写的tryRelease方法,若成功,唤醒其后继结点,失败则返回false
  3. Node h = head;
  4. if (h != null && h.waitStatus != 0)
  5. unparkSuccessor(h);//唤醒后继结点
  6. return true;
  7. }
  8. return false;
  9. }

unparkSuccessor:唤醒后继结点

  1. private void unparkSuccessor(Node node) {
  2. //获取wait状态
  3. int ws = node.waitStatus;
  4. if (ws < 0)
  5. compareAndSetWaitStatus(node, ws, 0);// 将等待状态waitStatus设置为初始值0
  6. Node s = node.next;//后继结点
  7. if (s == null || s.waitStatus > 0) {//若后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到一个处于正常阻塞状态的结点     进行唤醒
  8. s = null;
  9. for (Node t = tail; t != null && t != node; t = t.prev)
  10. if (t.waitStatus <= 0)
  11. s = t;
  12. }
  13. if (s != null)
  14. LockSupport.unpark(s.thread);//使用LockSupprot唤醒结点对应的线程
  15. }

release 的同步状态相对简单,需要找到头结点的后继结点进行唤醒,若后继结点为空或处于 CANCEL 状态,从后向前遍历找寻一个正常的结点,唤醒其对应线程。

共享式

共享式:共享式地获取同步状态。对于独占式同步组件来讲,同一时刻只有一个线程能获取到同步状态,其他线程都得去排队等待,其待重写的尝试获取同步状态的方法 tryAcquire 返回值为 boolean,这很容易理解;对于共享式同步组件来讲,同一时刻可以有多个线程同时获取到同步状态,这也是“共享”的意义所在。其待重写的尝试获取同步状态的方法 tryAcquireShared 返回值为 int。

  1. protected int tryAcquireShared(int arg) {
  2. throw new UnsupportedOperationException();
  3. }
  1. 当返回值大于 0 时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取
  2. 当返回值等于 0 时,表示获取同步状态成功,但没有可用同步状态了
  3. 当返回值小于 0 时,表示获取同步状态失败

获取同步状态—acquireShared

  1. public final void acquireShared(int arg) {
  2. if (tryAcquireShared(arg) < 0)//返回值小于0,获取同步状态失败,排队去;获取同步状态成功,直接返回去干自己的事儿。
  3. doAcquireShared(arg);
  4. }

doAcquireShared

  1. private void doAcquireShared(int arg) {
  2. final Node node = addWaiter(Node.SHARED);//构造一个共享结点,添加到同步队列尾部。若队列初始为空,先添加一个无意义的傀儡结点,再将新节点添加到队列尾部。
  3. boolean failed = true;//是否获取成功
  4. try {
  5. boolean interrupted = false;//线程parking过程中是否被中断过
  6. for (;;) {//死循环
  7. final Node p = node.predecessor();//找到前驱结点
  8. if (p == head) {//头结点持有同步状态,只有前驱是头结点,才有机会尝试获取同步状态
  9. int r = tryAcquireShared(arg);//尝试获取同步装填
  10. if (r >= 0) {//r>=0,获取成功
  11. setHeadAndPropagate(node, r);//获取成功就将当前结点设置为头结点,若还有可用资源,传播下去,也就是继续唤醒后继结点
  12. p.next = null; // 方便GC
  13. if (interrupted)
  14. selfInterrupt();
  15. failed = false;
  16. return;
  17. }
  18. }
  19. if (shouldParkAfterFailedAcquire(p, node) &&//是否能安心进入parking状态
  20. parkAndCheckInterrupt())//阻塞线程
  21. interrupted = true;
  22. }
  23. } finally {
  24. if (failed)
  25. cancelAcquire(node);
  26. }
  27. }

大体逻辑与独占式的 acquireQueued 差距不大,只不过由于是共享式,会有多个线程同时获取到线程,也可能同时释放线程,空出很多同步状态,所以当排队中的老二获取到同步状态,如果还有可用资源,会继续传播下去。

setHeadAndPropagate

  1. private void setHeadAndPropagate(Node node, int propagate) {
  2. Node h = head; // Record old head for check below
  3. setHead(node);
  4. if (propagate > 0 || h == null || h.waitStatus < 0) {
  5. Node s = node.next;
  6. if (s == null || s.isShared())
  7. doReleaseShared();
  8. }
  9. }

释放同步状态—releaseShared

  1. public final boolean releaseShared(int arg) {
  2. if (tryReleaseShared(arg)) {
  3. doReleaseShared();//释放同步状态
  4. return true;
  5. }
  6. return false;
  7. }

doReleaseShared

  1. private void doReleaseShared() {
  2. for (;;) {//死循环,共享模式,持有同步状态的线程可能有多个,采用循环CAS保证线程安全
  3. Node h = head;
  4. if (h != null && h != tail) {
  5. int ws = h.waitStatus;
  6. if (ws == Node.SIGNAL) {
  7. if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
  8. continue;
  9. unparkSuccessor(h);//唤醒后继结点
  10. }
  11. else if (ws == 0 &&
  12. !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
  13. continue;
  14. }
  15. if (h == head)
  16. break;
  17. }
  18. }

代码逻辑比较容易理解,需要注意的是,共享模式,释放同步状态也是多线程的,此处采用了 CAS 自旋来保证。

总结

关于 AQS 的介绍及源码分析到此为止了。

AQS 是 JUC 中很多同步组件的构建基础,简单来讲,它内部实现主要是状态变量 state 和一个 FIFO 队列来完成,同步队列的头结点是当前获取到同步状态的结点,获取同步状态 state 失败的线程,会被构造成一个结点(或共享式或独占式)加入到同步队列尾部(采用自旋 CAS 来保证此操作的线程安全),随后线程会阻塞;释放时唤醒头结点的后继结点,使其加入对同步状态的争夺中。

AQS 为我们定义好了顶层的处理实现逻辑,我们在使用AQS构建符合我们需求的同步组件时,只需重写 tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared 几个方法,来决定同步状态的释放和获取即可,至于背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由 AQS 为我们完成了,这也是非常典型的模板方法的应用。AQS定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现。

本文转载自:http://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html

评论

发表评论 点击刷新验证码

提示

该功能暂未开放