AQS简介
AbstractQueueSynchronizer,是java.util.concurrent中最重要的类,是许多常用的并发工具类的基础,ReentrantLock、CountDownLatch、Semaphore、FutureTask 等都是在AQS抽象类的基础上实现而来。 学习AQS源码有助于更好的理解并发工具类的实现原理,对写出更高效、健壮的代码很有帮助。
AQS结构
先看AQS中有那些属性
1 2 3 4 5 6 7 8
| private transient volatile Node head;
private transient volatile Node tail;
private volatile int state;
private transient Thread exclusiveOwnerThread;
|
等待队列是一个双向链表,队列中的每个线程被包装为一个Node实例,下面是Node的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null;
static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2;
static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; }
|
ReentrantLock
ReentrantLock
初始化
ReentrantLock初始化时可选择公平锁or非公平锁,分别由不同的Sync内部类实现
1 2 3
| public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
|
抽象类Sync继承了AQS
1 2
| abstract static class Sync extends AbstractQueuedSynchronizer { }
|
加锁
加锁操作由Sync类acquire方法实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
|
static final class FairSync extends Sync { public void lock() { sync.acquire(1); }
public final void acquire(int arg) {
if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) {
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
private Node addWaiter(Node mode) { Node node = new Node(mode); for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); } } }
final boolean acquireQueued(final Node node, int arg) { boolean interrupted = false; try { for (;;) { final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; return interrupted; }
if (shouldParkAfterFailedAcquire(p, node)) interrupted |= parkAndCheckInterrupt(); } } catch (Throwable t) { cancelAcquire(node); if (interrupted) selfInterrupt(); throw t; } } }
|
对于非公平锁,在tryAcquire方法的实现有所区别,核心是少了hasQueuedPredecessors()判断是是否有线程在排队的逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
|
解锁
解锁操作由Sync类release方法实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public void unlock() { sync.release(1); }
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
|
Condition
一个demo展示Condition的使用场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class BoundedBuffer { private ReentrantLock lock = new ReentrantLock(); private Condition notEmpty = lock.newCondition(); private Condition notFull = lock.newCondition(); private Object[] items = new Object[100]; private int putIdx,takeIdx,count;
public void put(Object item) throws InterruptedException{ lock.lock(); try{ while (count == items.length){ notFull.await(); } items[putIdx] = item; if(++putIdx == items.length){ putIdx = 0; } ++count; notEmpty.signal(); }finally { lock.unlock(); } } public Object take() throws InterruptedException{ lock.lock(); try{ while (count == 0){ notEmpty.await(); } Object item = items[takeIdx]; if(++takeIdx == items.length){ takeIdx = 0; } count--; notFull.signal(); return item; }finally { lock.unlock(); } } }
|
先说流程,方便理解代码
区分两个概念,阻塞队列:asyncQueue, 条件队列:conditionQueue
前置条件,当前线程持有锁
- condition.await(),向条件队列(firstWaiter,lastWaiter)中加入node,从阻塞队列(head,tail)中取node
- condition.signal(),将条件队列firstWaiter转入阻塞队列
- condition.await(),第一步后续从阻塞队列中排队取node成功,进入await后续代码执行
线程中断的情况
- condition.signal()之前线程中断,条件队列的node也会进入阻塞队列,但是await()排队到阻塞队列后会throw new InterruptedException()响应中断,不会执行后续逻辑
- condition.signal()之后线程中断,condition.await()从排队获取到后,执行selfInterrupt(),中断由await()后续代码来响应
await
先看一眼await,了解大概流程,分析里面每一步的方法逻辑后,再看一眼这个方法才能理解
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
|
addConditionWaiter
向条件队列中加入一个Node,很简单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| private Node addConditionWaiter() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node t = lastWaiter; if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Node.CONDITION);
if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
|
fullyRelease
释放当前线程持有的锁,也就是前面demo中的lock锁,方在await的时候让其他线程可以操作资源
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
final int fullyRelease(Node node) { try { int savedState = getState(); if (release(savedState)) return savedState; throw new IllegalMonitorStateException(); } catch (Throwable t) { node.waitStatus = Node.CANCELLED; throw t; } }
|
等待条件被唤醒
这里由两部分代码组成:
- 等待进入阻塞队列
- 从阻塞队列中获取node
while (!isOnSyncQueue(node)), 直到进入阻塞队列(或者线程中断)后跳出while循环
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
|
final boolean isOnSyncQueue(Node node) { if (node.waitStatus == Node.CONDITION || node.prev == null) return false; if (node.next != null) return true; return findNodeFromTail(node); }
|
从阻塞队列中获取node,再看一眼await()里的代码, 注意线程中断时也会进入阻塞队列,根据interruptMode来处理抛异常还是指标记中断
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
|
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT;
if (node.nextWaiter != null) unlinkCancelledWaiters();
if (interruptMode != 0) reportInterruptAfterWait(interruptMode);
|
signal
将条件队列第一个未取消node,转入阻塞队列,这样前面的await()方法就能从阻塞队列中拿到node,继续执行任务了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
|
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); }
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false;
Node p = enq(node); int ws = p.waitStatus; if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }
|
中断的处理
简单来说,两种情况:
- signal()前中断,interruptMode=THROW_IE await()中抛异常
- signal()后中断,interruptMode=REINTERRUPT await()中selfInterrupt()重新标识中断,具体是否响应交给后面的业务代码来处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; }
private int checkInterruptWhileWaiting(Node node) { return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0; }
final boolean transferAfterCancelledWait(Node node) { if (node.compareAndSetWaitStatus(Node.CONDITION, 0)) { enq(node); return true; } while (!isOnSyncQueue(node)) Thread.yield(); return false; }
private void reportInterruptAfterWait(int interruptMode) throws InterruptedException { if (interruptMode == THROW_IE) throw new InterruptedException(); else if (interruptMode == REINTERRUPT) selfInterrupt(); }
|