-
Notifications
You must be signed in to change notification settings - Fork 614
JUC AQS理解
javahongxi edited this page Aug 14, 2019
·
10 revisions
import sun.misc.Unsafe;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.*;
/**
* Provides a framework for implementing blocking locks and related
* synchronizers (semaphores, events, etc) that rely on
* first-in-first-out (FIFO) wait queues. This class is designed to
* be a useful basis for most kinds of synchronizers that rely on a
* single atomic {@code int} value to represent state. Subclasses
* must define the protected methods that change this state, and which
* define what that state means in terms of this object being acquired
* or released. Given these, the other methods in this class carry
* out all queuing and blocking mechanics. Subclasses can maintain
* other state fields, but only the atomically updated {@code int}
* value manipulated using methods {@link #getState}, {@link
* #setState} and {@link #compareAndSetState} is tracked with respect
* to synchronization.
*
* <p>Subclasses should be defined as non-public internal helper
* classes that are used to implement the synchronization properties
* of their enclosing class. Class
* {@code AQS} does not implement any
* synchronization interface. Instead it defines methods such as
* {@link #acquireInterruptibly} that can be invoked as
* appropriate by concrete locks and related synchronizers to
* implement their public methods.
*
* <p>This class supports either or both a default <em>exclusive</em>
* mode and a <em>shared</em> mode. When acquired in exclusive mode,
* attempted acquires by other threads cannot succeed. Shared mode
* acquires by multiple threads may (but need not) succeed. This class
* does not "understand" these differences except in the
* mechanical sense that when a shared mode acquire succeeds, the next
* waiting thread (if one exists) must also determine whether it can
* acquire as well. Threads waiting in the different modes share the
* same FIFO queue. Usually, implementation subclasses support only
* one of these modes, but both can come into play for example in a
* {@link ReadWriteLock}. Subclasses that support only exclusive or
* only shared modes need not define the methods supporting the unused mode.
*
* <p>This class defines a nested {@link ConditionObject} class that
* can be used as a {@link Condition} implementation by subclasses
* supporting exclusive mode for which method {@link
* #isHeldExclusively} reports whether synchronization is exclusively
* held with respect to the current thread, method {@link #release}
* invoked with the current {@link #getState} value fully releases
* this object, and {@link #acquire}, given this saved state value,
* eventually restores this object to its previous acquired state. No
* {@code AQS} method otherwise creates such a
* condition, so if this constraint cannot be met, do not use it. The
* behavior of {@link ConditionObject} depends of course on the
* semantics of its synchronizer implementation.
*
* <h3>Usage</h3>
*
* <p>To use this class as the basis of a synchronizer, redefine the
* following methods, as applicable, by inspecting and/or modifying
* the synchronization state using {@link #getState}, {@link
* #setState} and/or {@link #compareAndSetState}:
*
* <ul>
* <li> {@link #tryAcquire}
* <li> {@link #tryRelease}
* <li> {@link #tryAcquireShared}
* <li> {@link #tryReleaseShared}
* <li> {@link #isHeldExclusively}
* </ul>
*
* Each of these methods by default throws {@link
* UnsupportedOperationException}. Implementations of these methods
* must be internally thread-safe, and should in general be short and
* not block. Defining these methods is the <em>only</em> supported
* means of using this class. All other methods are declared
* {@code final} because they cannot be independently varied.
*
* <p>This class provides an efficient and scalable basis for
* synchronization in part by specializing its range of use to
* synchronizers that can rely on {@code int} state, acquire, and
* release parameters, and an internal FIFO wait queue. When this does
* not suffice, you can build synchronizers from a lower level using
* {@link java.util.concurrent.atomic atomic} classes, your own custom
* {@link java.util.Queue} classes, and {@link LockSupport} blocking
* support.
*
* <p>This class is just for studying AbstractQueuedSynchronizer
*
* 1.AQS中用state属性表示锁同步状态,如果能成功将state属性通过CAS操作从0设置成1即获取了锁.
* 当state>0时表示已经获取了锁,当state = 0无锁。
*
* 2.获取了锁的线程才能将exclusiveOwnerThread设置成自己
*
* 3.addWaiter负责将当前等待锁的线程包装成Node,并成功地添加到队列的末尾,这一点是由它调用的
* enq方法保证的,enq方法同时还负责在队列为空时初始化队列。
*
* 4.acquireQueued方法用于在Node成功入队后,继续尝试获取锁(取决于Node的前驱节点是不是head),
* 或者将线程挂起
*
* 5.shouldParkAfterFailedAcquire方法用于保证当前线程的前驱节点的waitStatus属性值为SIGNAL,
* 从而保证了自己挂起后,前驱节点会负责在合适的时候唤醒自己。
*
* 6.parkAndCheckInterrupt方法用于挂起当前线程,并检查中断状态。
*
* 7.如果最终成功获取了锁,线程会从lock()方法返回,继续往下执行;否则,线程会阻塞等待。
*
* @author shenhongxi 2019/8/13
* @see java.util.concurrent.locks.AbstractQueuedSynchronizer
*/
public abstract class AQS implements java.io.Serializable {
private static final long serialVersionUID = 3464927934203269978L;
/**
* Creates a new {@code AQS} instance
* with initial synchronization state of zero.
*/
protected AQS() { }
/**
* 队列是悲观锁思想,CAS是乐观锁思想
* 独占锁exclusive是一个悲观锁,共享锁shared是一个乐观锁
* Java中的悲观锁就是synchronized,AQS框架下的锁则是先尝试CAS乐观锁去获取,
* 获取不到才会转为悲观锁,如ReentrantLock
* 大量使用了CAS操作,并且在冲突时,采用自旋方式重试,以实现轻量级和高效地获取锁。
*
* AQS中,队列的实现是一个双向链表,被称为sync queue,它表示所有等待锁的线程的集合
*
* AQS中的队列是一个CLH队列,它的head节点永远是一个哑结点(dummy node), 它不代表
* 任何线程(某些情况下可以看做是代表了当前持有锁的线程),因此head所指向的Node的
* thread属性永远是null。只有从次头节点往后的所有节点才代表了所有等待锁的线程。也
* 就是说,在当前线程没有抢到锁被包装成Node扔到队列中时,即使队列是空的,它也会排在
* 第二个,我们会在它的前面新建一个dummy节点
*
* 在并发编程中使用队列通常是将当前线程包装成某种类型的数据结构扔到等待队列中
*
* Wait queue node class.
*
* <p>The wait queue is a variant of a "CLH" (Craig, Landin, and
* Hagersten) lock queue. CLH locks are normally used for
* spinlocks. We instead use them for blocking synchronizers, but
* use the same basic tactic of holding some of the control
* information about a thread in the predecessor of its node. A
* "status" field in each node keeps track of whether a thread
* should block. A node is signalled when its predecessor
* releases. Each node of the queue otherwise serves as a
* specific-notification-style monitor holding a single waiting
* thread. The status field does NOT control whether threads are
* granted locks etc though. A thread may try to acquire if it is
* first in the queue. But being first does not guarantee success;
* it only gives the right to contend. So the currently released
* contender thread may need to rewait.
*
* <p>To enqueue into a CLH lock, you atomically splice it in as new
* tail. To dequeue, you just set the head field.
* <pre>
* +------+ prev +-----+ +-----+
* head | | <---- | | <---- | | tail
* +------+ +-----+ +-----+
* </pre>
*
* <p>Insertion into a CLH queue requires only a single atomic
* operation on "tail", so there is a simple atomic point of
* demarcation from unqueued to queued. Similarly, dequeuing
* involves only updating the "head". However, it takes a bit
* more work for nodes to determine who their successors are,
* in part to deal with possible cancellation due to timeouts
* and interrupts.
*
* <p>The "prev" links (not used in original CLH locks), are mainly
* needed to handle cancellation. If a node is cancelled, its
* successor is (normally) relinked to a non-cancelled
* predecessor. For explanation of similar mechanics in the case
* of spin locks, see the papers by Scott and Scherer at
* http://www.cs.rochester.edu/u/scott/synchronization/
*
* <p>We also use "next" links to implement blocking mechanics.
* The thread id for each node is kept in its own node, so a
* predecessor signals the next node to wake up by traversing
* next link to determine which thread it is. Determination of
* successor must avoid races with newly queued nodes to set
* the "next" fields of their predecessors. This is solved
* when necessary by checking backwards from the atomically
* updated "tail" when a node's successor appears to be null.
* (Or, said differently, the next-links are an optimization
* so that we don't usually need a backward scan.)
*
* <p>Cancellation introduces some conservatism to the basic
* algorithms. Since we must poll for cancellation of other
* nodes, we can miss noticing whether a cancelled node is
* ahead or behind us. This is dealt with by always unparking
* successors upon cancellation, allowing them to stabilize on
* a new predecessor, unless we can identify an uncancelled
* predecessor who will carry this responsibility.
*
* <p>CLH queues need a dummy header node to get started. But
* we don't create them on construction, because it would be wasted
* effort if there is never contention. Instead, the node
* is constructed and head and tail pointers are set upon first
* contention.
*
* <p>Threads waiting on Conditions use the same nodes, but
* use an additional link. Conditions only need to link nodes
* in simple (non-concurrent) linked queues because they are
* only accessed when exclusively held. Upon await, a node is
* inserted into a condition queue. Upon signal, the node is
* transferred to the main queue. A special value of status
* field is used to mark which queue a node is on.
*
* <p>Thanks go to Dave Dice, Mark Moir, Victor Luchangco, Bill
* Scherer and Michael Scott, along with members of JSR-166
* expert group, for helpful ideas, discussions, and critiques
* on the design of this class.
*/
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final AQS.Node SHARED = new AQS.Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final AQS.Node EXCLUSIVE = null;
/**
* 因为超时或者中断,节点会被设置为取消状态,被取消的节点时不会参与到竞争中的,
* 他会一直保持取消状态不会转变为其他状态
*/
/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/**
* 后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,
* 将会通知后继节点,使后继节点的线程得以运行
* (说白了就是处于等待被唤醒的线程(或是节点)只要前继结点释放锁,就会通知
* 标识为SIGNAL状态的后继结点的线程执行)
*/
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/**
* 节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了
* signal()后,该节点将会从等待队列中转移到同步队列中,加入到同步状态的获取中
*/
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* 表示下一次共享式同步状态获取,将会无条件地传播下去
*/
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;
/**
* waitStatus 它不是表征当前节点的状态,而是当前节点的下一个节点的状态。
*
* SIGNAL 当一个节点的waitStatus被置为SIGNAL,就说明它的下一个节点(即它的后继节点)
* 已经被挂起了(或者马上就要被挂起了),因此在当前节点释放了锁或者放弃获取锁时,
* 如果它的waitStatus属性为SIGNAL,它还要完成一个额外的操作——唤醒它的后继节点。
*
* CANCELLED 表示Node所代表的当前线程已经取消了排队,即放弃获取锁了。
*
* EXCLUSIVE 模式只需要关心 CANCELLED 和 SIGNAL
*/
/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;
/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
/** 前驱节点,当节点添加到同步队列时被设置(尾部添加) */
volatile AQS.Node prev;
/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
/** 后继节点 */
volatile AQS.Node next;
/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
/** 获取同步状态的线程 */
volatile Thread thread;
/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
/** 等待队列中的后续节点。如果当前节点是共享的,那么字段将是一个 SHARED 常量,
* 也就是说节点类型(独占和共享)和等待队列中的后续节点共用同一个字段
* 在独占锁模式下永远为null,仅仅起到一个标记作用,没有实际意义
*/
AQS.Node nextWaiter;
/**
* Returns true if node is waiting in shared mode.
*/
final boolean isShared() {
return nextWaiter == SHARED;
}
/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final AQS.Node predecessor() throws NullPointerException {
AQS.Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
Node() { // Used to establish initial head or SHARED marker
}
Node(Thread thread, AQS.Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}
/**
* Head of the wait queue, lazily initialized. Except for
* initialization, it is modified only via method setHead. Note:
* If head exists, its waitStatus is guaranteed not to be
* CANCELLED.
*/
private transient volatile AQS.Node head; // 队头,为dummy node
/**
* Tail of the wait queue, lazily initialized. Modified only via
* method enq to add new wait node.
*/
private transient volatile AQS.Node tail; // 队尾,新入队的节点
/**
* The synchronization state.
*/
private volatile int state;
/**
* The current owner of exclusive mode synchronization.
* @see java.util.concurrent.locks.AbstractOwnableSynchronizer
*/
private transient Thread exclusiveOwnerThread;
/**
* Returns the current value of synchronization state.
* This operation has memory semantics of a {@code volatile} read.
* @return current state value
*/
protected final int getState() {
return state;
}
/**
* Sets the value of synchronization state.
* This operation has memory semantics of a {@code volatile} write.
* @param newState the new state value
*/
protected final void setState(int newState) {
state = newState;
}
/**
* Sets the thread that currently owns exclusive access.
* A {@code null} argument indicates that no thread owns access.
* This method does not otherwise impose any synchronization or
* {@code volatile} field accesses.
* @param thread the owner thread
*/
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}
/**
* Returns the thread last set by {@code setExclusiveOwnerThread},
* or {@code null} if never set. This method does not otherwise
* impose any synchronization or {@code volatile} field accesses.
* @return the owner thread
*/
protected final Thread getExclusiveOwnerThread() {
return exclusiveOwnerThread;
}
/**
* Atomically sets synchronization state to the given updated
* value if the current state value equals the expected value.
* This operation has memory semantics of a {@code volatile} read
* and write.
*
* @param expect the expected value
* @param update the new value
* @return {@code true} if successful. False return indicates that the actual
* value was not equal to the expected value.
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
/**
* tryAcquire()尝试直接去获取资源,如果成功则直接返回;
*
* addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
*
* acquireQueued()使线程在等待队列中获取资源,一直获取到资源后才返回。
* 如果在整个等待过程中被中断过,则返回true,否则返回false。
*
* 如果线程在等待过程中被中断过,先不响应的。在获取资源后才再进行自我中断selfInterrupt()
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { // CAS
pred.next = node;
return node;
}
}
// 代码会执行到这里, 只有两种情况:
// 1. 队列为空
// 2. CAS失败
// 注意, 这里是并发条件下, 所以什么都有可能发生, 尤其注意CAS失败后也会来到这里.
// 例如: 有可能其他线程已经成为了新的尾节点,导致尾节点不再是我们之前看到的那个prev了。
enq(node);
return node;
}
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
// 使用了自旋(死循环)保证插入队尾成功
for (;;) {
Node t = tail;
// 如果是空队列, 首先进行初始化
// 这里也可以看出, 队列不是在构造的时候初始化的, 而是延迟到需要用的时候再初始化, 以提升性能
if (t == null) { // Must initialize
// 注意,初始化时使用new Node()方法新建了一个dummy节点
// 从这里可以看出, 在这个等待队列中,头结点是一个“哑节点”,它不代表任何等待的线程。
// head节点不代表任何线程,它就是一个空节点!
if (compareAndSetHead(new Node()))
// 这里仅仅是将尾节点指向dummy节点,并没有返回
tail = head;
} else {
// 到这里说明队列已经不是空的了, 这个时候再继续尝试将节点加到队尾
// 1.设置node的前驱节点为当前的尾节点
node.prev = t;
// 2.修改tail属性,使它指向当前节点; 这里的CAS保证了同一时刻只有一个节点
// 能成为尾节点,其他节点将失败,失败后将回到for循环中继续重试。
if (compareAndSetTail(t, node)) {
// 3.修改原来的尾节点,使它的next指向当前节点
t.next = node;
return t;
}
// 需要注意,这里的三步并不是一个原子操作,第一步很容易成功;
// 而第二步由于是一个CAS操作,在并发条件下有可能失败,
// 第三步只有在第二步成功的条件下才执行。这里的CAS保证了同一时刻只有一个节点
// 能成为尾节点,其他节点将失败,失败后将回到for循环中继续重试。
// 所以,当有大量的线程在同时入队的时候,同一时刻,只有一个线程能完整地完成这
// 三步,而其他线程只能完成第一步,于是就出现了尾分叉.
}
}
}
/**
* 这里第三步是在第二步执行成功后才执行的,这就意味着,有可能即使我们已经完成了第二步,
* 将新的节点设置成了尾节点,此时原来旧的尾节点的next值可能还是null(因为还没有来的及
* 执行第三步),所以如果此时有线程恰巧从头节点开始向后遍历整个链表,则它是遍历不到新加
* 进来的尾节点的,但是这显然是不合理的,因为现在的tail已经指向了新的尾节点。
*
* 另一方面,当我们完成了第二步之后,第一步一定是完成了的,所以如果我们从尾节点开始向前
* 遍历,已经可以遍历到所有的节点。
*
* 这也就是为什么我们在AQS相关的源码中 (比如: unparkSuccessor(Node node) 中的:
* for (Node t = tail; t != null && t != node; t = t.prev)
* 通常是从尾节点开始逆向遍历链表——因为一个节点要能入队,则它的prev属性一定是有值的,
* 但是它的next属性可能暂时还没有值。
*
* 至于那些“分叉”的入队失败的其他节点,在下一轮的循环中,它们的prev属性会重新指向新
* 的尾节点,继续尝试新的CAS操作,最终,所有节点都会通过自旋不断的尝试入队,直到成功为止。
*/
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
// 当前节点的前驱是 head 节点时, 再次尝试获取锁
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
// 在获取锁失败后, 判断是否需要把当前线程挂起
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
final AQS.Node node = addWaiter(AQS.Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final AQS.Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
/**
* Cancels an ongoing attempt to acquire.
*
* @param node the node
*/
private void cancelAcquire(AQS.Node node) {
// Ignore if node doesn't exist
if (node == null)
return;
node.thread = null;
// Skip cancelled predecessors
AQS.Node pred = node.prev;
while (pred.waitStatus > 0)
node.prev = pred = pred.prev;
// predNext is the apparent node to unsplice. CASes below will
// fail if not, in which case, we lost race vs another cancel
// or signal, so no further action is necessary.
AQS.Node predNext = pred.next;
// Can use unconditional write instead of CAS here.
// After this atomic step, other Nodes can skip past us.
// Before, we are free of interference from other threads.
node.waitStatus = AQS.Node.CANCELLED;
// If we are the tail, remove ourselves.
if (node == tail && compareAndSetTail(node, pred)) {
compareAndSetNext(pred, predNext, null);
} else {
// If successor needs signal, try to set pred's next-link
// so it will get one. Otherwise wake it up to propagate.
int ws;
if (pred != head &&
((ws = pred.waitStatus) == AQS.Node.SIGNAL ||
(ws <= 0 && compareAndSetWaitStatus(pred, ws, AQS.Node.SIGNAL))) &&
pred.thread != null) {
AQS.Node next = node.next;
if (next != null && next.waitStatus <= 0)
compareAndSetNext(pred, predNext, next);
} else {
unparkSuccessor(node);
}
node.next = node; // help GC
}
}
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
// 前驱节点的状态已经是SIGNAL了(This node has already set status asking a release),
// 说明闹钟已经设了,可以直接高枕无忧地睡了(so it can safely park)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
// 当前节点的 ws > 0, 则为 Node.CANCELLED 说明前驱节点已经取消了等待锁(由于超时或者中断等原因)
// 既然前驱节点不等了, 那就继续往前找, 直到找到一个还在等待锁的节点
// 然后我们跨过这些不等待锁的节点, 直接排在等待锁的节点的后面 (是不是很开心!!!)
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 前驱节点的状态既不是SIGNAL,也不是CANCELLED
// 用CAS设置前驱节点的ws为 Node.SIGNAL,给自己定一个闹钟
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); // 线程被挂起,停在这里不再往下执行了
return Thread.interrupted();
}
/**
* Convenience method to interrupt current thread.
*/
static void selfInterrupt() {
Thread.currentThread().interrupt();
}
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
/**
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
*
* <p>An invocation of this method is equivalent to (but may be
* more efficient than):
* <pre> {@code
* getFirstQueuedThread() != Thread.currentThread() &&
* hasQueuedThreads()}</pre>
*
* <p>Note that because cancellations due to interrupts and
* timeouts may occur at any time, a {@code true} return does not
* guarantee that some other thread will acquire before the current
* thread. Likewise, it is possible for another thread to win a
* race to enqueue after this method has returned {@code false},
* due to the queue being empty.
*
* <p>This method is designed to be used by a fair synchronizer to
* avoid <a href="AbstractQueuedSynchronizer#barging">barging</a>.
* Such a synchronizer's {@link #tryAcquire} method should return
* {@code false}, and its {@link #tryAcquireShared} method should
* return a negative value, if this method returns {@code true}
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this:
*
* <pre> {@code
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }}</pre>
*
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
AQS.Node t = tail; // Read fields in reverse initialization order
AQS.Node h = head;
AQS.Node s;
/**
* h.next == null 的情况出现在 {@link #acquireQueued}
*/
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
/**
* Sets head of queue to be node, thus dequeuing. Called only by
* acquire methods. Also nulls out unused fields for sake of GC
* and to suppress unnecessary signals and traversals.
*
* @param node the node
*/
private void setHead(AQS.Node node) {
head = node;
node.thread = null;
node.prev = null;
}
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
// Main exported methods
/**
* Attempts to acquire in exclusive mode. This method should query
* if the state of the object permits it to be acquired in the
* exclusive mode, and if so to acquire it.
*
* <p>This method is always invoked by the thread performing
* acquire. If this method reports failure, the acquire method
* may queue the thread, if it is not already queued, until it is
* signalled by a release from some other thread. This can be used
* to implement method {@link Lock#tryLock()}.
*
* <p>The default
* implementation throws {@link UnsupportedOperationException}.
*
* @param arg the acquire argument. This value is always the one
* passed to an acquire method, or is the value saved on entry
* to a condition wait. The value is otherwise uninterpreted
* and can represent anything you like.
* @return {@code true} if successful. Upon success, this object has
* been acquired.
* @throws IllegalMonitorStateException if acquiring would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}
protected int tryAcquireShared(int arg) {
throw new UnsupportedOperationException();
}
protected boolean tryReleaseShared(int arg) {
throw new UnsupportedOperationException();
}
/**
* Attempts to set the state to reflect a release in exclusive
* mode.
*
* <p>This method is always invoked by the thread performing release.
*
* <p>The default implementation throws
* {@link UnsupportedOperationException}.
*
* @param arg the release argument. This value is always the one
* passed to a release method, or the current state value upon
* entry to a condition wait. The value is otherwise
* uninterpreted and can represent anything you like.
* @return {@code true} if this object is now in a fully released
* state, so that any waiting threads may attempt to acquire;
* and {@code false} otherwise.
* @throws IllegalMonitorStateException if releasing would place this
* synchronizer in an illegal state. This exception must be
* thrown in a consistent fashion for synchronization to work
* correctly.
* @throws UnsupportedOperationException if exclusive mode is not supported
*/
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}
/**
* Returns {@code true} if synchronization is held exclusively with
* respect to the current (calling) thread. This method is invoked
* upon each call to a non-waiting {@link AQS.ConditionObject} method.
* (Waiting methods instead invoke {@link #release}.)
*
* <p>The default implementation throws {@link
* UnsupportedOperationException}. This method is invoked
* internally only within {@link AQS.ConditionObject} methods, so need
* not be defined if conditions are not used.
*
* @return {@code true} if synchronization is held exclusively;
* {@code false} otherwise
* @throws UnsupportedOperationException if conditions are not supported
*/
protected boolean isHeldExclusively() {
throw new UnsupportedOperationException();
}
/**
* Condition implementation for a {@link
* AQS} serving as the basis of a {@link
* Lock} implementation.
*
* <p>Method documentation for this class describes mechanics,
* not behavioral specifications from the point of view of Lock
* and Condition users. Exported versions of this class will in
* general need to be accompanied by documentation describing
* condition semantics that rely on those of the associated
* {@code AQS}.
*
* <p>This class is Serializable, but all fields are transient,
* so deserialized conditions have no waiters.
*/
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient AQS.Node firstWaiter;
/** Last node of condition queue. */
private transient AQS.Node lastWaiter;
/**
* Creates a new {@code ConditionObject} instance.
*/
public ConditionObject() { }
// Internal methods
/**
* Adds a new waiter to wait queue.
* @return its new wait node
*/
private AQS.Node addConditionWaiter() {
AQS.Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != AQS.Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
AQS.Node node = new AQS.Node(Thread.currentThread(), AQS.Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(AQS.Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Removes and transfers all nodes.
* @param first (non-null) the first node on condition queue
*/
private void doSignalAll(AQS.Node first) {
lastWaiter = firstWaiter = null;
do {
AQS.Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
/**
* Unlinks cancelled waiter nodes from condition queue.
* Called only while holding lock. This is called when
* cancellation occurred during condition wait, and upon
* insertion of a new waiter when lastWaiter is seen to have
* been cancelled. This method is needed to avoid garbage
* retention in the absence of signals. So even though it may
* require a full traversal, it comes into play only when
* timeouts or cancellations occur in the absence of
* signals. It traverses all nodes rather than stopping at a
* particular target to unlink all pointers to garbage nodes
* without requiring many re-traversals during cancellation
* storms.
*/
private void unlinkCancelledWaiters() {
AQS.Node t = firstWaiter;
AQS.Node trail = null;
while (t != null) {
AQS.Node next = t.nextWaiter;
if (t.waitStatus != AQS.Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
// public methods
/**
* Moves the longest-waiting thread, if one exists, from the
* wait queue for this condition to the wait queue for the
* owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
@Override
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
AQS.Node first = firstWaiter;
if (first != null)
doSignal(first);
}
/**
* Moves all threads from the wait queue for this condition to
* the wait queue for the owning lock.
*
* @throws IllegalMonitorStateException if {@link #isHeldExclusively}
* returns {@code false}
*/
@Override
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
AQS.Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
@Override
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
AQS.Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
@Override
public void awaitUninterruptibly() {
}
@Override
public long awaitNanos(long nanosTimeout) throws InterruptedException {
return 0;
}
@Override
public boolean await(long time, TimeUnit unit) throws InterruptedException {
return false;
}
@Override
public boolean awaitUntil(Date deadline) throws InterruptedException {
return false;
}
/*
* For interruptible waits, we need to track whether to throw
* InterruptedException, if interrupted while blocked on
* condition, versus reinterrupt current thread, if
* interrupted while blocked waiting to re-acquire.
*/
/** Mode meaning to reinterrupt on exit from wait */
private static final int REINTERRUPT = 1;
/** Mode meaning to throw InterruptedException on exit from wait */
private static final int THROW_IE = -1;
/**
* Checks for interrupt, returning THROW_IE if interrupted
* before signalled, REINTERRUPT if after signalled, or
* 0 if not interrupted.
*/
private int checkInterruptWhileWaiting(AQS.Node node) {
return Thread.interrupted() ?
(transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
0;
}
/**
* Throws InterruptedException, reinterrupts current thread, or
* does nothing, depending on mode.
*/
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
if (interruptMode == THROW_IE)
throw new InterruptedException();
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}
}
/**
* Transfers node, if necessary, to sync queue after a cancelled wait.
* Returns true if thread was cancelled before being signalled.
*
* @param node the node
* @return true if cancelled before the node was signalled
*/
final boolean transferAfterCancelledWait(AQS.Node node) {
if (compareAndSetWaitStatus(node, AQS.Node.CONDITION, 0)) {
enq(node);
return true;
}
/*
* If we lost out to a signal(), then we can't proceed
* until it finishes its enq(). Cancelling during an
* incomplete transfer is both rare and transient, so just
* spin.
*/
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final int fullyRelease(AQS.Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = AQS.Node.CANCELLED;
}
}
/**
* Returns true if a node, always one that was initially placed on
* a condition queue, is now waiting to reacquire on sync queue.
* @param node the node
* @return true if is reacquiring
*/
final boolean isOnSyncQueue(AQS.Node node) {
if (node.waitStatus == AQS.Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
return findNodeFromTail(node);
}
/**
* Returns true if node is on sync queue by searching backwards from tail.
* Called only when needed by isOnSyncQueue.
* @return true if present
*/
private boolean findNodeFromTail(AQS.Node node) {
AQS.Node t = tail;
for (;;) {
if (t == node)
return true;
if (t == null)
return false;
t = t.prev;
}
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
*/
final boolean transferForSignal(AQS.Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, AQS.Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
AQS.Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, AQS.Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
/**
* Setup to support compareAndSet. We need to natively implement
* this here: For the sake of permitting future enhancements, we
* cannot explicitly subclass AtomicInteger, which would be
* efficient and useful otherwise. So, as the lesser of evils, we
* natively implement using hotspot intrinsics API. And while we
* are at it, we do the same for other CASable fields (which could
* otherwise be done with atomic field updaters).
*/
private static final Unsafe unsafe = Unsafe.getUnsafe();
private static final long stateOffset;
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;
static {
try {
stateOffset = unsafe.objectFieldOffset
(AQS.class.getDeclaredField("state"));
headOffset = unsafe.objectFieldOffset
(AQS.class.getDeclaredField("head"));
tailOffset = unsafe.objectFieldOffset
(AQS.class.getDeclaredField("tail"));
waitStatusOffset = unsafe.objectFieldOffset
(AQS.Node.class.getDeclaredField("waitStatus"));
nextOffset = unsafe.objectFieldOffset
(AQS.Node.class.getDeclaredField("next"));
} catch (Exception ex) { throw new Error(ex); }
}
/**
* CAS head field. Used only by enq.
*/
private final boolean compareAndSetHead(AQS.Node update) {
return unsafe.compareAndSwapObject(this, headOffset, null, update);
}
/**
* CAS tail field. Used only by enq.
*/
private final boolean compareAndSetTail(AQS.Node expect, AQS.Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
/**
* CAS waitStatus field of a node.
*/
private static final boolean compareAndSetWaitStatus(AQS.Node node,
int expect,
int update) {
return unsafe.compareAndSwapInt(node, waitStatusOffset,
expect, update);
}
/**
* CAS next field of a node.
*/
private static final boolean compareAndSetNext(AQS.Node node,
AQS.Node expect,
AQS.Node update) {
return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
}
}
wiki.hongxi.org
首页
Java核心技术
- JUC JMM与线程安全
- JUC 指令重排与内存屏障
- JUC Java内存模型FAQ
- JUC 同步和Java内存模型
- JUC volatile实现原理
- JUC AQS详解
- JUC AQS理解
- JUC synchronized优化
- JUC 线程和同步
- JUC 线程状态
- JUC 线程通信
- JUC ThreadLocal介绍及原理
- JUC 死锁及避免方案
- JUC 读写锁简单实现
- JUC 信号量
- JUC 阻塞队列
- NIO Overview
- NIO Channel
- NIO Buffer
- NIO Scatter与Gather
- NIO Channel to Channel Transfers
- NIO Selector
- NIO FileChannel
- NIO SocketChannel
- NIO ServerSocketChannel
- NIO Non-blocking Server
- NIO DatagramChannel
- NIO Pipe
- NIO NIO vs. IO
- NIO DirectBuffer
- NIO zero-copy
- NIO Source Code
- NIO HTTP Protocol
- NIO epoll bug
- Reflection 基础
- Reflection 动态代理
- JVM相关
- 设计模式典型案例
Netty
RocketMQ深入研究
kafka深入研究
Pulsar深入研究
Dubbo源码导读
- Dubbo SPI
- Dubbo 自适应拓展机制
- Dubbo 服务导出
- Dubbo 服务引用
- Dubbo 服务字典
- Dubbo 服务路由
- Dubbo 集群
- Dubbo 负载均衡
- Dubbo 服务调用过程
微服务架构
Redis
Elasticsearch
其他
- Dubbo 框架设计
- Dubbo 优雅停机
- dubbo-spring-boot-starter使用指南
- rocketmq-spring-boot-starter使用指南
- Mybatis multi-database in spring-boot 2
- RocketMQ 客户端简单封装
- Otter 入门
杂谈
关于我