Java 并发包(JUC)中提供了很多并发工具,这其中,很多我们耳熟能详的并发工具,譬如 ReentrangLock、Semaphore,它们的实现都用到了一个共同的基类--AbstractQueuedSynchronizer,简称 AQS。AQS 是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的 ReentrantLock,Semaphore,其他的诸如 ReentrantReadWriteLock,SynchronousQueue,FutureTask 等等皆是基于 AQS 的。当然,我们自己也能利用 AQS 非常轻松容易地构造出符合我们自己需求的同步器。
本章我们就一起探究下这个神奇的东东,并对其实现原理进行剖析理解。
基本实现原理
AQS 使用一个 int 成员变量来表示同步状态,通过内置的 FIFO 队列来完成获取资源线程的排队工作。
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
状态信息通过 procted 类型的 getState,setState,compareAndSetState 进行操作
AQS 支持两种同步方式:
- 独占式
- 共享式
方便使用者实现不同类型的同步组件,独占式如 ReentrantLock,共享式如 Semaphore,CountDownLatch,组合式的如 ReentrantReadWriteLock。总之,AQS 为使用提供了底层支撑,如何组装实现,使用者可以自由发挥。
同步器的设计是基于模板方法模式的,一般的使用方式是这样:
- 使用者继承AbstractQueuedSynchronizer并重写指定的方法。(这些重写方法很简单,无非是对于共享资源state的获取和释放)
- 将AQS组合在自定义同步组件的实现中,并调用其模板方法,而这些模板方法会调用使用者重写的方法。
这其实是模板方法模式的一个很经典的应用。
我们来看看 AQS 定义的这些可重写的方法:
protected boolean tryAcquire(int arg) : 独占式获取同步状态,试着获取,成功返回true,反之为false
protected boolean tryRelease(int arg) :独占式释放同步状态,等待中的其他线程此时将有机会获取到同步状态;
protected int tryAcquireShared(int arg) :共享式获取同步状态,返回值大于等于0,代表获取成功;反之获取失败;
protected boolean tryReleaseShared(int arg) :共享式释放同步状态,成功为true,失败为false
protected boolean isHeldExclusively() : 是否在独占模式下被线程占用。
关于 AQS 的使用,我们来简单总结一下:
如何使用
首先,我们需要去继承 AbstractQueuedSynchronizer 这个类,然后我们根据我们的需求去重写相应的方法,比如要实现一个独占锁,那就去重写 tryAcquire,tryRelease 方法,要实现共享锁,就去重写 tryAcquireShared,tryReleaseShared;最后,在我们的组件中调用AQS中的模板方法就可以了,而这些模板方法是会调用到我们之前重写的那些方法的。也就是说,我们只需要很小的工作量就可以实现自己的同步组件,重写的那些方法,仅仅是一些简单的对于共享资源 state 的获取和释放操作,至于像是获取资源失败,线程需要阻塞之类的操作,自然是 AQS 帮我们完成了。
设计思想
对于使用者来讲,我们无需关心获取资源失败,线程排队,线程阻塞/唤醒等一系列复杂的实现,这些都在 AQS 中为我们处理好了。我们只需要负责好自己的那个环节就好,也就是获取/释放共享资源 state 的姿势T_T。很经典的模板方法设计模式的应用,AQS 为我们定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现即可。
自定义同步器
同步器代码实现
上面大概讲了一些关于 AQS 如何使用的理论性的东西,接下来,我们就来看下实际如何使用,直接采用 JDK 官方文档中的小例子来说明问题
package juc; import java.util.concurrent.locks.AbstractQueuedSynchronizer; /** * Created by chengxiao on 2017/3/28. */ public class Mutex implements java.io.Serializable { //静态内部类,继承AQS private static class Sync extends AbstractQueuedSynchronizer { //是否处于占用状态 protected boolean isHeldExclusively() { return getState() == 1; } //当状态为0的时候获取锁,CAS操作成功,则state状态为1, public boolean tryAcquire(int acquires) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } //释放锁,将同步状态置为0 protected boolean tryRelease(int releases) { if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; } } //同步对象完成一系列复杂的操作,我们仅需指向它即可 private final Sync sync = new Sync(); //加锁操作,代理到acquire(模板方法)上就行,acquire会调用我们重写的tryAcquire方法 public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } //释放锁,代理到release(模板方法)上就行,release会调用我们重写的tryRelease方法。 public void unlock() { sync.release(1); } public boolean isLocked() { return sync.isHeldExclusively(); } }
同步器代码测试
测试下这个自定义的同步器,我们使用之前文章中做过的并发环境下 a++ 的例子来说明问题(a++ 的原子性其实最好使用原子类 AtomicInteger 来解决,此处用 Mutex 有点大炮打蚊子的意味,好在能说明问题就好)
package juc; import java.util.concurrent.CyclicBarrier; /** * Created by chengxiao on 2017/7/16. */ public class TestMutex { private static CyclicBarrier barrier = new CyclicBarrier(31); private static int a = 0; private static Mutex mutex = new Mutex(); public static void main(String []args) throws Exception { //说明:我们启用30个线程,每个线程对i自加10000次,同步正常的话,最终结果应为300000; //未加锁前 for(int i=0;i<30;i++){ Thread t = new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10000;i++){ increment1();//没有同步措施的a++; } try { barrier.await();//等30个线程累加完毕 } catch (Exception e) { e.printStackTrace(); } } }); t.start(); } barrier.await(); System.out.println("加锁前,a="+a); //加锁后 barrier.reset();//重置CyclicBarrier a=0; for(int i=0;i<30;i++){ new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10000;i++){ increment2();//a++采用Mutex进行同步处理 } try { barrier.await();//等30个线程累加完毕 } catch (Exception e) { e.printStackTrace(); } } }).start(); } barrier.await(); System.out.println("加锁后,a="+a); } /** * 没有同步措施的a++ * @return */ public static void increment1(){ a++; } /** * 使用自定义的Mutex进行同步处理的a++ */ public static void increment2(){ mutex.lock(); a++; mutex.unlock(); } }
测试结果:
加锁前,a=279204 加锁后,a=300000
源码分析
我们先来简单描述下 AQS 的基本实现,前面我们提到过,AQS 维护一个共享资源 state,通过内置的 FIFO 来完成获取资源线程的排队工作。(这个内置的同步队列称为"CLH"队列)。该队列由一个一个的 Node 结点组成,每个 Node 结点维护一个 prev 引用和 next 引用,分别指向自己的前驱和后继结点。AQS 维护两个指针,分别指向队列头部 head 和尾部 tail。
其实就是个双端双向链表。
当线程获取资源失败(比如 tryAcquire 时试图设置 state 状态失败),会被构造成一个结点加入 CLH 队列中,同时当前线程会被阻塞在队列中(通过 LockSupport.park 实现,其实是等待态)。当持有同步状态的线程释放同步状态时,会唤醒后继结点,然后此结点线程继续加入到对同步状态的争夺中。
Node结点
Node 结点是 AbstractQueuedSynchronizer 中的一个静态内部类,我们捡 Node 的几个重要属性来说一下
static final class Node { /** waitStatus值,表示线程已被取消(等待超时或者被中断)*/ static final int CANCELLED = 1; /** waitStatus值,表示后继线程需要被唤醒(unpaking)*/ static final int SIGNAL = -1; /**waitStatus值,表示结点线程等待在condition上,当被signal后,会从等待队列转移到同步到队列中 */ /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** waitStatus值,表示下一次共享式同步状态会被无条件地传播下去 static final int PROPAGATE = -3; /** 等待状态,初始为0 */ volatile int waitStatus; /**当前结点的前驱结点 */ volatile Node prev; /** 当前结点的后继结点 */ volatile Node next; /** 与当前结点关联的排队中的线程 */ volatile Thread thread; /** ...... */ }
独占式
获取同步状态--acquire()
来看看 acquire 方法,lock 方法一般会直接代理到 acquire 上
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
我们来简单理一下代码逻辑:
- 首先,调用使用者重写的 tryAcquire 方法,若返回
true
,意味着获取同步状态成功,后面的逻辑不再执行;若返回false
,也就是获取同步状态失败,进入b步骤; - 此时,获取同步状态失败,构造独占式同步结点,通过 addWatiter 将此结点添加到同步队列的尾部(此时可能会有多个线程结点试图加入同步队列尾部,需要以线程安全的方式添加);
- 该结点以在队列中尝试获取同步状态,若获取不到,则阻塞结点线程,直到被前驱结点唤醒或者被中断。
addWaiter
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode);//构造结点 //指向尾结点tail Node pred = tail; //如果尾结点不为空,CAS快速尝试在尾部添加,若CAS设置成功,返回;否则,eng。 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
先cas快速设置,若失败,进入enq方法,将结点添加到同步队列尾部这个操作,同时可能会有多个线程尝试添加到尾部,是非线程安全的操作。
以上代码可以看出,使用了 compareAndSetTail 这个 cas 操作保证安全添加尾结点。
enq 方法
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { //如果队列为空,创建结点,同时被head和tail引用 if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) {//cas设置尾结点,不成功就一直重试 t.next = node; return t; } } } }
enq 内部是个死循环,通过 CAS 设置尾结点,不成功就一直重试。很经典的CAS自旋的用法,我们在之前关于原子类的源码分析中也提到过。这是一种乐观的并发策略。
最后,看下 acquireQueued 方法。
acquireQueued
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) {//死循环 final Node p = node.predecessor();//找到当前结点的前驱结点 if (p == head && tryAcquire(arg)) {//如果前驱结点是头结点,才tryAcquire,其他结点是没有机会tryAcquire的。 setHead(node);//获取同步状态成功,将当前结点设置为头结点。 p.next = null; // 方便GC failed = false; return interrupted; } // 如果没有获取到同步状态,通过shouldParkAfterFailedAcquire判断是否应该阻塞,parkAndCheckInterrupt用来阻塞线程 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued 内部也是一个死循环,只有前驱结点是头结点的结点,也就是老二结点,才有机会去 tryAcquire;若 tryAcquire 成功,表示获取同步状态成功,将此结点设置为头结点;若是非老二结点,或者 tryAcquire 失败,则进入 shouldParkAfterFailedAcquire 去判断判断当前线程是否应该阻塞,若可以,调用 parkAndCheckInterrupt 阻塞当前线程,直到被中断或者被前驱结点唤醒。若还不能休息,继续循环。
shouldParkAfterFailedAcquire
shouldParkAfterFailedAcquire 用来判断当前结点线程是否能休息
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { //获取前驱结点的wait值 int ws = pred.waitStatus; if (ws == Node.SIGNAL)//若前驱结点的状态是SIGNAL,意味着当前结点可以被安全地park return true; if (ws > 0) { // ws>0,只有CANCEL状态ws才大于0。若前驱结点处于CANCEL状态,也就是此结点线程已经无效,从后往前遍历,找到一个非CANCEL状态的结点,将自己设置为它的后继结点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { // 若前驱结点为其他状态,将其设置为SIGNAL状态 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
若shouldParkAfterFailedAcquire 返回 true
,也就是当前结点的前驱结点为 SIGNAL 状态,则意味着当前结点可以放心休息,进入 parking 状态了。parkAncCheckInterrupt 阻塞线程并处理中断。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this);//使用LockSupport使线程进入阻塞状态 return Thread.interrupted();// 线程是否被中断过 }
至此,关于 acquire 的方法源码已经分析完毕,我们来简单总结下:
- 首先tryAcquire获取同步状态,成功则直接返回;否则,进入下一环节;
- 线程获取同步状态失败,就构造一个结点,加入同步队列中,这个过程要保证线程安全;
- 加入队列中的结点线程进入自旋状态,若是老二结点(即前驱结点为头结点),才有机会尝试去获取同步状态;否则,当其前驱结点的状态为SIGNAL,线程便可安心休息,进入阻塞状态,直到被中断或者被前驱结点唤醒。
释放同步状态--release()
当前线程执行完自己的逻辑之后,需要释放同步状态,来看看 release 方法的逻辑
public final boolean release(int arg) { if (tryRelease(arg)) {//调用使用者重写的tryRelease方法,若成功,唤醒其后继结点,失败则返回false Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h);//唤醒后继结点 return true; } return false; }
unparkSuccessor:唤醒后继结点
private void unparkSuccessor(Node node) { //获取wait状态 int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);// 将等待状态waitStatus设置为初始值0 Node s = node.next;//后继结点 if (s == null || s.waitStatus > 0) {//若后继结点为空,或状态为CANCEL(已失效),则从后尾部往前遍历找到一个处于正常阻塞状态的结点 进行唤醒 s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//使用LockSupprot唤醒结点对应的线程 }
release 的同步状态相对简单,需要找到头结点的后继结点进行唤醒,若后继结点为空或处于 CANCEL 状态,从后向前遍历找寻一个正常的结点,唤醒其对应线程。
共享式
共享式:共享式地获取同步状态。对于独占式同步组件来讲,同一时刻只有一个线程能获取到同步状态,其他线程都得去排队等待,其待重写的尝试获取同步状态的方法 tryAcquire 返回值为 boolean,这很容易理解;对于共享式同步组件来讲,同一时刻可以有多个线程同时获取到同步状态,这也是“共享”的意义所在。其待重写的尝试获取同步状态的方法 tryAcquireShared 返回值为 int。
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
- 当返回值大于 0 时,表示获取同步状态成功,同时还有剩余同步状态可供其他线程获取;
- 当返回值等于 0 时,表示获取同步状态成功,但没有可用同步状态了;
- 当返回值小于 0 时,表示获取同步状态失败。
获取同步状态--acquireShared
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//返回值小于0,获取同步状态失败,排队去;获取同步状态成功,直接返回去干自己的事儿。 doAcquireShared(arg); }
doAcquireShared
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//构造一个共享结点,添加到同步队列尾部。若队列初始为空,先添加一个无意义的傀儡结点,再将新节点添加到队列尾部。 boolean failed = true;//是否获取成功 try { boolean interrupted = false;//线程parking过程中是否被中断过 for (;;) {//死循环 final Node p = node.predecessor();//找到前驱结点 if (p == head) {//头结点持有同步状态,只有前驱是头结点,才有机会尝试获取同步状态 int r = tryAcquireShared(arg);//尝试获取同步装填 if (r >= 0) {//r>=0,获取成功 setHeadAndPropagate(node, r);//获取成功就将当前结点设置为头结点,若还有可用资源,传播下去,也就是继续唤醒后继结点 p.next = null; // 方便GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) &&//是否能安心进入parking状态 parkAndCheckInterrupt())//阻塞线程 interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
大体逻辑与独占式的 acquireQueued 差距不大,只不过由于是共享式,会有多个线程同时获取到线程,也可能同时释放线程,空出很多同步状态,所以当排队中的老二获取到同步状态,如果还有可用资源,会继续传播下去。
setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
释放同步状态--releaseShared
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared();//释放同步状态 return true; } return false; }
doReleaseShared
private void doReleaseShared() { for (;;) {//死循环,共享模式,持有同步状态的线程可能有多个,采用循环CAS保证线程安全 Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h);//唤醒后继结点 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
代码逻辑比较容易理解,需要注意的是,共享模式,释放同步状态也是多线程的,此处采用了 CAS 自旋来保证。
总结
关于 AQS 的介绍及源码分析到此为止了。
AQS 是 JUC 中很多同步组件的构建基础,简单来讲,它内部实现主要是状态变量 state 和一个 FIFO 队列来完成,同步队列的头结点是当前获取到同步状态的结点,获取同步状态 state 失败的线程,会被构造成一个结点(或共享式或独占式)加入到同步队列尾部(采用自旋 CAS 来保证此操作的线程安全),随后线程会阻塞;释放时唤醒头结点的后继结点,使其加入对同步状态的争夺中。
AQS 为我们定义好了顶层的处理实现逻辑,我们在使用AQS构建符合我们需求的同步组件时,只需重写 tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared 几个方法,来决定同步状态的释放和获取即可,至于背后复杂的线程排队,线程阻塞/唤醒,如何保证线程安全,都由 AQS 为我们完成了,这也是非常典型的模板方法的应用。AQS定义好顶级逻辑的骨架,并提取出公用的线程入队列/出队列,阻塞/唤醒等一系列复杂逻辑的实现,将部分简单的可由使用者决定的操作逻辑延迟到子类中去实现。
本文转载自:http://www.cnblogs.com/chengxiao/archive/2017/07/24/7141160.html