JDK_JUC
目录




1. Condition 案例
class BoundedBuffer {
final Lock lock = new ReentrantLock();
final Condition notFull = lock.newCondition();
final Condition notEmpty = lock.newCondition();
final Object[] items = new Object[100];
int putptr, takeptr, count;
public void put(Object x) throws InterruptedException {
lock.lock();
try {
while (count == items.length) //防止虚假唤醒,Condition的await调用一般会放在一个循环判断中
notFull.await();
items[putptr] = x;
if (++putptr == items.length)
putptr = 0;
++count;
notEmpty.signal();
} finally {
lock.unlock();
}
}
public Object take() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();
Object x = items[takeptr];
if (++takeptr == items.length)
takeptr = 0;
--count;
notFull.signal();
return x;
} finally {
lock.unlock();
}
}
}
2. AQS 方法
2.1 CAS操作
CAS,即CompareAndSet,在Java中CAS操作的实现都委托给一个名为UnSafe类,关于Unsafe类,以后会专门详细介绍该类,目前只要知道,通过该类可以实现对字段的原子操作。
| 方法名 | 修饰符 | 描述 |
|---|---|---|
| compareAndSetState | protected final | CAS修改同步状态值 |
| compareAndSetHead | private final | CAS修改等待队列的头指针 |
| compareAndSetTail | private final | CAS修改等待队列的尾指针 |
| compareAndSetWaitStatus | private static final | CAS修改结点的等待状态 |
| compareAndSetNext | private static final | CAS修改结点的next指针 |
2.2 等待队列的核心操作
| 方法名 | 修饰符 | 描述 |
|---|---|---|
| enq | private | 入队操作 |
| addWaiter | private | 入队操作 |
| setHead | private | 设置头结点 |
| unparkSuccessor | private | 唤醒后继结点 |
| doReleaseShared | private | 释放共享结点 |
| setHeadAndPropagate | private | 设置头结点并传播唤醒 |
2.3 资源的获取操作
| 方法名 | 修饰符 | 描述 |
|---|---|---|
| cancelAcquire | private | 取消获取资源 |
| shouldParkAfterFailedAcquire | private static | 判断是否阻塞当前调用线程 |
| acquireQueued | final | 尝试获取资源,获取失败尝试阻塞线程 |
| doAcquireInterruptibly | private | 独占地获取资源(响应中断) |
| doAcquireNanos | private | 独占地获取资源(限时等待) |
| doAcquireShared | private | 共享地获取资源 |
| doAcquireSharedInterruptibly | private | 共享地获取资源(响应中断) |
| doAcquireSharedNanos | private | 共享地获取资源(限时等待) |
| 方法名 | 修饰符 | 描述 |
|---|---|---|
| acquire | public final | 独占地获取资源 |
| acquireInterruptibly | public final | 独占地获取资源(响应中断) |
| acquireInterruptibly | public final | 独占地获取资源(限时等待) |
| acquireShared | public final | 共享地获取资源 |
| acquireSharedInterruptibly | public final | 共享地获取资源(响应中断) |
| tryAcquireSharedNanos | public final | 共享地获取资源(限时等待) |
2.4 资源的释放操作
| 方法名 | 修饰符 | 描述 |
|---|---|---|
| release | public final | 释放独占资源 |
| releaseShared | public final | 释放共享资源 |
3. AQS 原理
同步状态
/**
* 同步状态.
*/
private volatile int state;
protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
/**
* 以原子的方式更新同步状态.
* 利用Unsafe类实现
*/
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
阻塞&唤醒
- java.util.concurrent.locks包提供了LockSupport类来作为线程阻塞和唤醒的工具。
等待队列
static final class Node {
// 共享模式结点
static final Node SHARED = new Node();
// 独占模式结点
static final Node EXCLUSIVE = null;
static final int CANCELLED = 1;
static final int SIGNAL = -1;
static final int CONDITION = -2;
static final int PROPAGATE = -3;
/**
* INITAL: 0 - 默认,新结点会处于这种状态。
* CANCELLED: 1 - 取消,表示后续结点被中断或超时,需要移出队列;
* SIGNAL: -1- 发信号,表示后续结点被阻塞了;(当前结点在入队后、阻塞前,应确保将其prev结点类型改为SIGNAL,以便prev结点取消或释放时将当前结点唤醒。)
* CONDITION: -2- Condition专用,表示当前结点在Condition队列中,因为等待某个条件而被阻塞了;
* PROPAGATE: -3- 传播,适用于共享模式。(比如连续的读操作结点可以依次进入临界区,设为PROPAGATE有助于实现这种迭代操作。)
*
* waitStatus表示的是后续结点状态,这是因为AQS中使用CLH队列实现线程的结构管理,而CLH结构正是用前一结点某一属性表示当前结点的状态,这样更容易实现取消和超时功能。
*/
volatile int waitStatus;
// 前驱指针
volatile Node prev;
// 后驱指针
volatile Node next;
// 结点所包装的线程
volatile Thread thread;
// Condition队列使用,存储condition队列中的后继节点
Node nextWaiter;
Node() {
}
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
}
/**
* 以自旋的方式不断尝试插入结点至队列尾部
*
* @return 当前结点的前驱结点
*/
private Node enq(final Node node) {
for (; ; ) {
Node t = tail;
if (t == null) { // 如果队列为空,则创建一个空的head结点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}