概述
- AQS是一个实现线程同步器的基础框架,线程同步器的作用是协调多个线程对共享资源的访问,如ReentrantLock,在多个线程共享同一个资源时,实现多线程对该共享资源的同步访问,避免并发访问导致数据不一致等问题。或者协调多个线程之间的操作,如CountDownLatch。
- AQS提供了基于一个FIFO队列实现线程同步器的基础设施,具体的线程同步器实现类,只需要关注对共享资源的访问控制,即是互斥访问还是共享访问,如果是互斥访问则任何时候,只能存在一个线程访问该共享资源,如果是共享访问,通常需要设置同时访问共享资源的最大线程数。而其他不能访问该共享资源的线程,则会存放到AQS的内部队列中,同时AQS也定义了当共享资源可用时,从队列唤醒等待线程,使得被唤醒的线程可以继续执行。
- 对于AQS接口的实现类,只需要定义控制和判断资源可用和不可用即可,即关注共享资源可用状态state的定义,自定义tryAcquire判断是否可以访问共享资源,自定义tryRelease来允许其他线程访问共享资源。
- AQS的实现类通常作为具体的线程同步器,如ReentrantLock,CountDownLatch等的一个内部helper类来实现。
- AQS默认实现是采取一种公平策略,即内部使用的是一种FIFO队列来存放等待线程,等待时间最长的线程优先被唤醒。即对于FIFO等待队列中的线程,按照进入队列的先后顺序,依次唤醒,即从线程等待队列的队列头开始,依次取出线程。公平策略的优点是线程不会饿死,缺点是执行慢的线程影响执行快的线程,整体吞吐量较低。
核心设计
在AQS的整体设计和实现当中,主要包括四个核心设计,分别是:
- int类型state表示同步状态,(1)在互斥访问中,state通常为1,表示在任何时刻,只能存在一个线程对共享资源的访问,如ReentrantLock中,state大于0,表示当前存在某个线程占有了该共享资源了,其他线程需要等待;(2)在共享访问中,state通常为可以同时或者需要同时多少个线程访问共享资源,如CountDownLatch表示当有state个线程同时访问资源时,则达到条件。
- FIFO线程等待队列:在互斥访问当中,在任何时候,只能存在一个线程对共享资源进行访问,其他线程需要放在一个FIFO队列中等待,同时当该线程释放该共享资源时,则从该队列中获取并唤醒一个等待线程。
- 线程状态的控制:由线程生命周期可知,当线程需要等待访问该共享资源的时候,该线程的状态从running变成waiting或者time_waiting,退出CPU资源的竞争,被唤醒时,则从waiting或者time_waiting变成runnable,等待CPU的调度执行。在AQS的实现当中,主要是在LockSupport工具类定义阻塞和唤醒线程的方法,来对线程状态进行控制。
- 自旋和CAS实现无锁写:在FIFO队列增删线程节点,或者更新线程节点的状态waitStatus时,主要是通过UNSAFE类提供的硬件级别的原子操作CAS,以及结合自旋操作来实现无锁更新、写入操作;同时每个线程在被唤醒时,使用自旋来检测条件是否满足,来决定是继续执行还是进行等待。
共享资源同步状态state
-
共享资源同步状态state的定义如下:state使用volatile修饰,从而实现了线程的可见性,即多个线程共享该线程同步器,state的更新对所有线程可见。
-
compareAndSetState:使用UNSAFE类提供的compareAndSwapInt方法来对state更新,即CAS实现无锁更新。
/** * The synchronization state. */ private volatile int state; ... /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a {@code volatile} read * and write. * * @param expect the expected value * @param update the new value * @return {@code true} if successful. False return indicates that the actual * value was not equal to the expected value. */ protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); }
FIFO线程等待队列和线程节点Node
-
FIFO线程等待队列定义如下:队列是基于双向链表实现的,定义了头结点head,尾节点tail,都是使用volatile修饰,是lazy初始化的,即在添加节点的时候再创建。
-
AQS默认是fair实现的,即公平的,所以体现在等待队列中为在tail尾节点添加新的等待线程节点,从head头部节点获取线程节点并唤醒该节点对应的线程。
/** * Head of the wait queue, lazily initialized. Except for * initialization, it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED. */ private transient volatile Node head; /** * Tail of the wait queue, lazily initialized. Modified only via * method enq to add new wait node. */ private transient volatile Node tail; -
线程节点定义:每个节点包含对应的线程引用thread,前置节点pre,即先于thread加入该队列的线程节点,后置节点next,节点的等待状态waitStatus。
static final class Node { /** Marker to indicate a node is waiting in shared mode */ static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ static final Node EXCLUSIVE = null; /** waitStatus value to indicate thread has cancelled */ static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred to the queue to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. */ Node nextWaiter; /** * Returns true if node is waiting in shared mode. */ final boolean isShared() { return nextWaiter == SHARED; } /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */ final Node predecessor() throws NullPointerException { Node p = prev; if (p == null) throw new NullPointerException(); else return p; } Node() { // Used to establish initial head or SHARED marker } Node(Thread thread, Node mode) { // Used by addWaiter this.nextWaiter = mode; this.thread = thread; } Node(Thread thread, int waitStatus) { // Used by Condition this.waitStatus = waitStatus; this.thread = thread; } } -
在AQS的设计当中,在内部使用一个同步队列来存放等待线程节点的,对于条件化等待Condition,在内部使用了一个条件化等待队列。而线程有几种等待状态,使用int类型的waitStatus来表示:对应同步队列的节点,初始化为0,对应条件等待队列的节点,初始化为CONDITION,即-2:
- 大于0的为无效状态,即该状态不会再改变了,对应的线程不会再竞争访问共享资源了,如CANCELLED等于1,CANCELLED表示该线程放弃了,如等待超时或被中断;小于等于0的为有效状态,其中这里需要重点区分的是SIGNAL和CONDITION。
- SIGNAL状态的节点在同步队列SyncQueue中,主要作用是通知有SIGNAL状态的节点:其后续节点在等待访问共享资源,即该SIGNAL状态的节点的下一个节点是需要访问共享资源的线程节点。所以节点状态waitStatus为SIGNAL的线程在访问共享资源完毕,释放资源时,需要调用unparkSuccessor方法来通知和唤醒后续的等待线程节点,通常从head节点开始往后查找到第一个处于等待状态的节点,唤醒该线程节点;
- CONDITION状态的节点在条件等待队列中,为结合ConditionObject来实现条件化等待的节点。在ConditionObject内部维护了一个条件化等待队列,调用await会创建waitStatus为CONDITION的线程节点追加到该条件化等待队列的尾部,之后调用signal的时候,会从条件化等待队列的头部取出一个线程节点,更新该节点的waitStatus为0或者SIGNAL,然后转移到同步队列SyncQueue中,具体为添加到同步队列尾部,从而对应的线程可以进入正常的对共享资源,如lock锁的竞争。
线程状态的控制
-
在AQS体系设计中,主要是在LockSupport类的park定义将一个线程从running状态变成waiting状态,从而阻塞或者说休眠该线程,只有通过unpark方法来唤醒,即在unpark方法中定义了将一个waiting状态的线程变为runnable。方法定义如下:
/** * Makes available the permit for the given thread, if it * was not already available. If the thread was blocked on * {@code park} then it will unblock. Otherwise, its next call * to {@code park} is guaranteed not to block. This operation * is not guaranteed to have any effect at all if the given * thread has not been started. * * @param thread the thread to unpark, or {@code null}, in which case * this operation has no effect */ public static void unpark(Thread thread) { if (thread != null) UNSAFE.unpark(thread); } /** * Disables the current thread for thread scheduling purposes unless the * permit is available. * * <p>If the permit is available then it is consumed and the call returns * immediately; otherwise * the current thread becomes disabled for thread scheduling * purposes and lies dormant until one of three things happens: * * <ul> * <li>Some other thread invokes {@link #unpark unpark} with the * current thread as the target; or * * <li>Some other thread {@linkplain Thread#interrupt interrupts} * the current thread; or * * <li>The call spuriously (that is, for no reason) returns. * </ul> * * <p>This method does <em>not</em> report which of these caused the * method to return. Callers should re-check the conditions which caused * the thread to park in the first place. Callers may also determine, * for example, the interrupt status of the thread upon return. * * @param blocker the synchronization object responsible for this * thread parking * @since 1.6 */ public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); UNSAFE.park(false, 0L); setBlocker(t, null); }
自旋与CAS实现无锁更新
- CAS原子更新在存在线程并发操作时,可能会失败,即expect不是预期的,故返回false,所以需要在线程内部通过自旋来重复执行直到成功,自旋通常是使用for死循环来实现,在for循环内部通过break或者return来跳出,或者将线程重新park阻塞掉。具体在下面acquire和release方法中分析。
核心方法
在AQS的设计中,核心功能为线程对共享资源的访问请求,线程是否对共享资源的访问。对于互斥方法主要是通过acquire和release方法来定义,对应共享方法则使用acquire和release的shared版本来实现。
acquire:请求访问共享资源
-
acquire的定义如下:主要包含三步:(1)tryAcquire:判断当前线程是否可以访问资源,这个也是AQS接口的实现类需要关注和实现的方法,对于互斥访问arg的值通常为1。tryAcquire定义资源访问的条件,返回true则说明当前线程可以对资源进行访问,false则进行执行下一步;(2)acquireQueued:为当前线程创建一个线程等待节点,然后放到FIFO线程等待队列中;(3)selfInterrupt:自己处理中断,acquire默认为忽略中断,selfInterrupt表示当前线程自己稍后补充该中断。
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } -
acquireQueued和addWaiter的定义:
-
acquireQueued:将addWaiter返回的线程节点,即已经入队的线程节点,在for循环中自旋检查(通常在刚加入等待队列或者之后被其他线程唤醒时执行该检查)当前线程节点是否为队列头结点head的下一个节点,如果是则说明下一个可以访问共享资源的就是该线程了,故调用tryAcquire尝试获取资源访问许可,即检查state的值;否则继续阻塞休眠该线程。
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 自旋 for (;;) { final Node p = node.predecessor(); // 当前正在加入队列或者之后被唤醒时, // 检查自身线程是否是下一个可以访问共享资源的节点, // 是则调用tryAcquire尝试获取资源访问许可 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; // 可以访问资源了则返回 return interrupted; } // park阻塞休眠线程,进入waiting, // 等待之后被唤醒再执行for循环自旋 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { // 失败,则将该线程节点设为CANCELLED if (failed) cancelAcquire(node); } } -
shouldParkAfterFailedAcquire:当前线程节点在队列中选择一个合适的位置之后,然后阻塞休眠。即找到一个有效的前置节点,并设置该前置节点的waitStatus为SIGNAL,这样这个前置节点执行完之后,发现自身waitStatus为SIGNAL,则会知道自己的下一个线程节点是需要访问共享资源的线程,故会调用unparkSuccessor来唤醒该下一个节点的线程。
// 当前线程节点查找到一个合适的位置,即前置节点的waitStatus为SIGNAL的, // 然后阻塞休眠,这样该前置节点执行完会release通知唤醒该线程节点 /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */ private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; // 前置节点状态为SIGNAL,则该前置节点执行完之后, // 会调用release查找后继的有效节点,来通知该有效节点,即当前节点 // 则当前节点可以休眠了,到时前置节点会通知唤醒。 if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ // 从tail往前移动,直到找到一个有效的前置节点来作为该线程节点的前置节点 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ // 将前置节点的状态更新为SIGNAL,则下次自旋进入时,当前节点就可以休眠了 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } // 调用LockSupport.park阻塞和休眠该线程 /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */ private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); } -
addWaiter:创建线程节点并入队,刚开始为直接在队列尾部添加该节点,然后再在上面分析的acquireQueued中,直接执行该线程节点(前置节点就是head,说明该线程节点就是下一个可以执行访问共享资源的线程)或者将从tail往前查找一个有效的前置节点作为该线程节点的前置pre节点,具体在shouldParkAfterFailedAcquire中实现。
// 基于指定mode创建线程节点,互斥为EXCLUSIVE /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */ private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure // 尝试直接通过CAS在队列尾部添加该节点, // 此时不加锁,如果失败,则往下执行 // 在enq中使用自旋来添加到队列尾部 Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; // 直接返回 return node; } } // 使用自旋来添加该线程节点到队列尾部 enq(node); return node; } /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { // 自旋 for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; // 成功添加到队列尾部则返回 return t; } } } }
release:释放共享资源
-
release的方法定义如下:通过tryRelease来是否共享资源,通常为将state变量递减,由AQS接口的实现类来实现该方法。成功则获取线程等待队列的头结点head,调用unparkSuccessor来唤醒head的下一个节点,具体为使用LockSupport类的unpark来唤醒该节点对应线程。
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) // 唤醒head的下一个节点对应的线程 // 由于当前只有该线程在运行,其他在等待队列休眠等待, // 所以unparkSuccessor中不需要自旋 unparkSuccessor(h); return true; } return false; } /** * Wakes up node's successor, if one exists. * * @param node the node */ private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 下一个节点 Node s = node.next; // head的下一个节点为无效, // 则从tail往前一直查找, // 找到最先添加的一个有效节点 if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 唤醒对应的线程, // 该线程可以进行执行acquireQueued方法中的自旋, // 检测当前线程是否为head的下一个节点并调用tryAcquired LockSupport.unpark(s.thread); }






还没有评论,来说两句吧...