共享模式acquire实现流程

上文我们讲解了AbstractQueuedSynchronizer独占模式的acquire实现流程,本文趁热打铁继续看一下AbstractQueuedSynchronizer共享模式acquire的实现流程。连续两篇文章的学习,也可以对比独占模式acquire和共享模式acquire的区别,加深对于AbstractQueuedSynchronizer的理解。

先看一下共享模式acquire的实现,方法为acquireShared和acquireSharedInterruptibly,两者差别不大,区别就在于后者有中断处理,以acquireShared为例:

 1 public final void acquireShared(int arg) { 2     if (tryAcquireShared(arg) < 0) 3         doAcquireShared(arg); 4 }

这里就能看出第一个差别来了:独占模式acquire的时候子类重写的方法tryAcquire返回的是boolean,即是否tryAcquire成功;共享模式acquire的时候,返回的是一个int型变量,判断是否<0。doAcquireShared方法的实现为:

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

 1 private void doAcquireShared(int arg) { 2     final Node node = addWaiter(Node.SHARED); 3     boolean failed = true; 4     try { 5         boolean interrupted = false; 6         for (;;) { 7             final Node p = node.predecessor(); 8             if (p == head) { 9                 int r = tryAcquireShared(arg);10                 if (r >= 0) {11                     setHeadAndPropagate(node, r);12                     p.next = null; // help GC13                     if (interrupted)14                         selfInterrupt();15                     failed = false;16                     return;17                 }18             }19             if (shouldParkAfterFailedAcquire(p, node) &&20                 parkAndCheckInterrupt())21                 interrupted = true;22         }23     } finally {24         if (failed)25             cancelAcquire(node);26     }27 }

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

我们来分析一下这段代码做了什么:

  1. addWaiter,把所有tryAcquireShared<0的线程实例化出一个Node,构建为一个FIFO队列,这和独占锁是一样的

  2. 拿当前节点的前驱节点,只有前驱节点是head的节点才能tryAcquireShared,这和独占锁也是一样的

  3. 前驱节点不是head的,执行"shouldParkAfterFailedAcquire() && parkAndCheckInterrupt()",for(;;)循环,"shouldParkAfterFailedAcquire()"方法执行2次,当前线程阻塞,这和独占锁也是一样的

确实,共享模式下的acquire和独占模式下的acquire大部分逻辑差不多,最大的差别在于tryAcquireShared成功之后,独占模式的acquire是直接将当前节点设置为head节点即可,共享模式会执行setHeadAndPropagate方法,顾名思义,即在设置head之后多执行了一步propagate操作。setHeadAndPropagate方法源码为:

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

 1 private void setHeadAndPropagate(Node node, int propagate) { 2     Node h = head; // Record old head for check below 3     setHead(node); 4     /* 5      * Try to signal next queued node if: 6      *   Propagation was indicated by caller, 7      *     or was recorded (as h.waitStatus) by a previous operation 8      *     (note: this uses sign-check of waitStatus because 9      *      PROPAGATE status may transition to SIGNAL.)10      * and11      *   The next node is waiting in shared mode,12      *     or we don't know, because it appears null13      *14      * The conservatism in both of these checks may cause15      * unnecessary wake-ups, but only when there are multiple16      * racing acquires/releases, so most need signals now or soon17      * anyway.18      */19     if (propagate > 0 || h == null || h.waitStatus < 0) {20         Node s = node.next;21         if (s == null || s.isShared())22             doReleaseShared();23     }24 }

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

第3行的代码设置重设head,第2行的代码由于第3行的代码要重设head,因此先定义一个Node型变量h获得原head的地址,这两行代码很简单。

第19行~第23行的代码是独占锁和共享锁最不一样的一个地方,我们再看独占锁acquireQueued的代码:

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

 1 final boolean acquireQueued(final Node node, int arg) { 2     boolean failed = true; 3     try { 4         boolean interrupted = false; 5         for (;;) { 6             final Node p = node.predecessor(); 7             if (p == head && tryAcquire(arg)) { 8                 setHead(node); 9                 p.next = null; // help GC10                 failed = false;11                 return interrupted;12             }13             if (shouldParkAfterFailedAcquire(p, node) &&14                 parkAndCheckInterrupt())15                 interrupted = true;16         }17     } finally {18         if (failed)19             cancelAcquire(node);20     }21 }

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

这意味着独占锁某个节点被唤醒之后,它只需要将这个节点设置成head就完事了,而共享锁不一样,某个节点被设置为head之后,如果它的后继节点是SHARED状态的,那么将继续通过doReleaseShared方法尝试往后唤醒节点,实现了共享状态的向后传播

 

共享模式release实现流程

上面讲了共享模式下acquire是如何实现的,下面再看一下release的实现流程,方法为releaseShared:

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

1 public final boolean releaseShared(int arg) {2     if (tryReleaseShared(arg)) {3         doReleaseShared();4         return true;5     }6     return false;7 }

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

tryReleaseShared方法是子类实现的,如果tryReleaseShared成功,那么执行doReleaseShared()方法:

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

 1 private void doReleaseShared() { 2     /* 3      * Ensure that a release propagates, even if there are other 4      * in-progress acquires/releases.  This proceeds in the usual 5      * way of trying to unparkSuccessor of head if it needs 6      * signal. But if it does not, status is set to PROPAGATE to 7      * ensure that upon release, propagation continues. 8      * Additionally, we must loop in case a new node is added 9      * while we are doing this. Also, unlike other uses of10      * unparkSuccessor, we need to know if CAS to reset status11      * fails, if so rechecking.12      */13     for (;;) {14         Node h = head;15         if (h != null && h != tail) {16             int ws = h.waitStatus;17             if (ws == Node.SIGNAL) {18                 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))19                     continue;            // loop to recheck cases20                 unparkSuccessor(h);21             }22             else if (ws == 0 &&23                      !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))24                 continue;                // loop on failed CAS25         }26         if (h == head)                   // loop if head changed27             break;28     }29 }

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

主要是两层逻辑:

  1. 头结点本身的waitStatus是SIGNAL且能通过CAS算法将头结点的waitStatus从SIGNAL设置为0,唤醒头结点的后继节点

  2. 头结点本身的waitStatus是0的话,尝试将其设置为PROPAGATE状态的,意味着共享状态可以向后传播

 

Condition的await()方法实现原理----构建等待队列

我们知道,Condition是用于实现通知/等待机制的,和Object的wait()/notify()一样,由于本文之前描述AbstractQueuedSynchronizer的共享模式的篇幅不是很长,加之Condition也是AbstractQueuedSynchronizer的一部分,因此将Condition也放在这里写了。

Condition分为await()和signal()两部分,前者用于等待、后者用于唤醒,首先看一下await()是如何实现的。Condition本身是一个接口,其在AbstractQueuedSynchronizer中的实现为ConditionObject:

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

1 public class ConditionObject implements Condition, java.io.Serializable {2         private static final long serialVersionUID = 1173984872572414699L;3         /** First node of condition queue. */4         private transient Node firstWaiter;5         /** Last node of condition queue. */6         private transient Node lastWaiter;7         8         ...9 }

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

这里贴了一些字段定义,后面都是方法就不贴了,会对重点方法进行分析的。从字段定义我们可以看到,ConditionObject全局性地记录了第一个等待的节点与最后一个等待的节点

像ReentrantLock每次要使用ConditionObject,直接new一个ConditionObject出来即可。我们关注一下await()方法的实现:

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

 1 public final void await() throws InterruptedException { 2     if (Thread.interrupted()) 3         throw new InterruptedException(); 4     Node node = addConditionWaiter(); 5     int savedState = fullyRelease(node); 6     int interruptMode = 0; 7     while (!isOnSyncQueue(node)) { 8         LockSupport.park(this); 9         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)10             break;11     }12     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)13         interruptMode = REINTERRUPT;14     if (node.nextWaiter != null) // clean up if cancelled15         unlinkCancelledWaiters();16     if (interruptMode != 0)17         reportInterruptAfterWait(interruptMode);18 }

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

第2行~第3行的代码用于处理中断,第4行代码比较关键,添加Condition的等待者,看一下实现:

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

 1 private Node addConditionWaiter() { 2     Node t = lastWaiter; 3     // If lastWaiter is cancelled, clean out. 4     if (t != null && t.waitStatus != Node.CONDITION) { 5         unlinkCancelledWaiters(); 6         t = lastWaiter; 7     } 8     Node node = new Node(Thread.currentThread(), Node.CONDITION); 9     if (t == null)10         firstWaiter = node;11     else12         t.nextWaiter = node;13     lastWaiter = node;14     return node;15 }

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

首先拿到队列(注意数据结构,Condition构建出来的也是一个队列)中最后一个等待者,紧接着第4行的的判断,判断最后一个等待者的waitStatus不是CONDITION的话,执行第5行的代码,解绑取消的等待者,因为通过第8行的代码,我们看到,new出来的Node的状态都是CONDITION的

那么unlinkCancelledWaiters做了什么?里面的流程就不看了,就是一些指针遍历并判断状态的操作,总结一下就是:从头到尾遍历每一个Node,遇到Node的waitStatus不是CONDITION的就从队列中踢掉,该节点的前后节点相连。

接着第8行的代码前面说过了,new出来了一个Node,存储了当前线程,waitStatus是CONDITION,接着第9行~第13行的操作很好理解:

  1. 如果lastWaiter是null,说明FIFO队列中没有任何Node,firstWaiter=Node

  2. 如果lastWaiter不是null,说明FIFO队列中有Node,原lastWaiter的next指向Node

  3. 无论如何,新加入的Node编程lastWaiter,即新加入的Node一定是在最后面

用一张图表示一下构建的数据结构就是:

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

对比学习,我们总结一下Condition构建出来的队列和AbstractQueuedSynchronizer构建出来的队列的差别,主要体现在2点上:

  1. AbstractQueuedSynchronizer构建出来的队列,头节点是一个没有Thread的空节点,其标识作用,而Condition构建出来的队列,头节点就是真正等待的节点

  2. AbstractQueuedSynchronizer构建出来的队列,节点之间有next与pred相互标识该节点的前一个节点与后一个节点的地址,而Condition构建出来的队列,只使用了nextWaiter标识下一个等待节点的地址

整个过程中,我们看到没有使用任何CAS操作,firstWaiter和lastWaiter也没有用volatile修饰,其实原因很简单:要await()必然要先lock(),既然lock()了就表示没有竞争,没有竞争自然也没必要使用volatile+CAS的机制去保证什么

 

Condition的await()方法实现原理----线程等待

前面我们看了Condition构建等待队列的过程,接下来我们看一下等待的过程,await()方法的代码比较短,再贴一下:

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

 1 public final void await() throws InterruptedException { 2     if (Thread.interrupted()) 3         throw new InterruptedException(); 4     Node node = addConditionWaiter(); 5     int savedState = fullyRelease(node); 6     int interruptMode = 0; 7     while (!isOnSyncQueue(node)) { 8         LockSupport.park(this); 9         if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)10             break;11     }12     if (acquireQueued(node, savedState) && interruptMode != THROW_IE)13         interruptMode = REINTERRUPT;14     if (node.nextWaiter != null) // clean up if cancelled15         unlinkCancelledWaiters();16     if (interruptMode != 0)17         reportInterruptAfterWait(interruptMode);18 }

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

构建完毕队列之后,执行第5行的fullyRelease方法,顾名思义:fullyRelease方法的作用是完全释放Node的状态。方法实现为:

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

 1 final int fullyRelease(Node node) { 2     boolean failed = true; 3     try { 4         int savedState = getState(); 5         if (release(savedState)) { 6             failed = false; 7             return savedState; 8         } else { 9             throw new IllegalMonitorStateException();10         }11     } finally {12         if (failed)13             node.waitStatus = Node.CANCELLED;14     }15 }

大数据培训,云培训,数据挖掘培训,云计算培训,高端软件开发培训,项目经理培训

http://www.cnblogs.com/xrq730/p/7067904.html