public abstract class AbstractQueuedSynchronizerextends AbstractOwnableSynchronizerimplements java.io.Serializable { /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueuing, and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. */ volatile Node prev; /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. */ volatile Node next;}
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt();}/** * Acquires in exclusive mode, aborting if interrupted. * Implemented by first checking interrupt status, then invoking * at least once {@link #tryAcquire}, returning on * success. Otherwise the thread is queued, possibly repeatedly * blocking and unblocking, invoking {@link #tryAcquire} * until success or the thread is interrupted. This method can be * used to implement method {@link Lock#lockInterruptibly}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * @throws InterruptedException if the current thread is interrupted */public final void acquireInterruptibly(int arg)throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg);}
而涉及到移除,那么就适合双向链表了,双向链表的每个节点都有prev和next的引用,可以更高效地从队列中移除某个特定的节点(如a b c ,我要删除b的时候,直接找到b的prev也就是a,再找到b的next也就是c,然后把a的next设置为c,c的prev设置为a,就完成了对b的删除)。如果是单向链表就需要从头遍历链表来寻找待删除节点的前驱节点。
/** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //如果ws为Node.SIGNAL(值为-1), //这意味着前驱节点已经处于等待状态,期望在释放同步状态时唤醒后继节点。 //在这种情况下,方法可以直接返回true,指示当前线程可以安全地挂起。 return true; if (ws > 0) { //如果ws大于0,说明前驱节点已被取消。 //此时,循环向前遍历等待队列,跳过所有已取消的节点,直到找到一个未被取消的节点作为新的前驱节点,并更新相应的链接。 //这个过程是为了维护等待队列的健康状态,移除其中的无效节点。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { //如果前驱节点的等待状态不是SIGNAL(也即,它是0或PROPAGATE), //则将前驱节点的等待状态更新为SIGNAL。 //这是通过compareAndSetWaitStatus方法完成的,它安全地修改节点的状态,以指示当前节点(node)在释放同步状态时需要被唤醒。 //这里并不立即挂起当前线程,而是返回false,让调用者知道它需要再次尝试获取同步状态,在无法获取时再决定挂起。 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;}
/** * Returns true if the given thread is currently queued. * * <p>This implementation traverses the queue to determine * presence of the given thread. * * @param thread the thread * @return {@code true} if the given thread is on the queue * @throws NullPointerException if the thread is null */public final boolean isQueued(Thread thread) {if (thread == null) throw new NullPointerException();for (Node p = tail; p != null; p = p.prev)if (p.thread == thread) return true;return false;}
public final int getQueueLength() { int n = 0; for (Node p = tail; p != null; p = p.prev) { if (p.thread != null) ++n; } return n;}/** * Returns a collection containing threads that may be waiting to * acquire. Because the actual set of threads may change * dynamically while constructing this result, the returned * collection is only a best-effort estimate. The elements of the * returned collection are in no particular order. This method is * designed to facilitate construction of subclasses that provide * more extensive monitoring facilities. * * @return the collection of threads */public final Collection<Thread> getQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { Thread t = p.thread; if (t != null) list.add(t); } return list;}/** * Returns a collection containing threads that may be waiting to * acquire in exclusive mode. This has the same properties * as {@link #getQueuedThreads} except that it only returns * those threads waiting due to an exclusive acquire. * * @return the collection of threads */public final Collection<Thread> getExclusiveQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { if (!p.isShared()) { Thread t = p.thread; if (t != null) list.add(t); } } return list;}/** * Returns a collection containing threads that may be waiting to * acquire in shared mode. This has the same properties * as {@link #getQueuedThreads} except that it only returns * those threads waiting due to a shared acquire. * * @return the collection of threads */public final Collection<Thread> getSharedQueuedThreads() { ArrayList<Thread> list = new ArrayList<Thread>(); for (Node p = tail; p != null; p = p.prev) { if (p.isShared()) { Thread t = p.thread; if (t != null) list.add(t); } } return list;}