Reentrant 与 Condition 解析
| 使用场景 | 使用方式 |
|---|---|
| 可中断的场景 | Synchronized 是不可中断的,所以如果某些线程抢到时间片之后但是还是可以被中断,那么请选择 ReentrantLock |
| 唤醒特定线程 | 可以从一个 ReentrantLock 里面获取对应的 Condition 去做到具体的线程唤醒和阻塞,而 Synchronized 只能通过 object.notify / notifyAll 唤醒随机和所有线程,一个是导致争抢,一个是导致惊群 |
| 是否可重入 | ReentrantLock 顾名思义可重入 |
| 是否公平 | ReentrantLock 可以在构造的时候指定是否是公平锁 |
现在就来解释下为什么用 ReentrantLock 配合 Condition 可以达到交替打印的目的,关于,交替打印我好像也高了一个协程的,等和 Java 那版本放一起搞,协程这个是基于 Channel 的,因为协程之间使用 Channel 通信是公平的,比这个还好控制些呢,Java 版本的需要 volitail 去控制共同改变的变量。
解析 ReentrantLock 是怎样配合 Condition 完成细粒度控制的,解析 ReentrantLock 公平锁与非公平锁的区别。
// 几个常用方法
ReentrantLock lock = new ReentrantLock(false);
Condition condi = lock.newCondition();
void func1() {
condi.await();
condi.signal();
}
// 是否公平锁在于构造参数传入的数值,这里的区别就是 new 的 FairSync 还是 NonfairSync, NofairSync 就是默认的继承 Sync 就可以,然后 Sync 继承了 AbstractQueuedSynchronizer ,然后实现了 nonfairTryAcquire
sync = fair ? new FairSync() : new NonfairSync();
// Fair 和 Nonfair 的区别
protected final boolean tryAcquire(int acquires) { // Nonfair 直接调用父类的 nonfairTryAcquires
return this.nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = this.getState();
if (c == 0) {
if (this.compareAndSetState(0, acquires)) {
this.setExclusiveOwnerThread(current);
return true;
}
} else if (current == this.getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
this.setState(nextc);
return true;
}
return false;
}
// 这里去实现公平锁自己的
protected final boolean tryAcquire(int acquires) {
// ...
Thread current = Thread.currentThread();
int c = this.getState();
if (c == 0) {
// 多了一个 hasQueuedPredecessors 判断条件,这里主要就是遍历从后往前遍历节点,看看有没有比自己更新进入队列的,如果有的话就返回 true ,那么这里就不争抢了
// 后面都是使用 cas 去尝试设置进去,然后设置 这个锁目前的持有者是自己
if (!this.hasQueuedPredecessors() && this.compareAndSetState(0, acquires)) {
this.setExclusiveOwnerThread(current);
return true;
}
} else if (current == this.getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) {
throw new Error("Maximum lock count exceeded");
}
this.setState(nextc);
return true;
}
return false;
}
// 看一下 ReentrantLock 的 lock 和 unlock 方法,看看是怎么实现可重入的,应该就是计数
public void lock() {
this.sync.acquire(1);
}
public void unlock() {
this.sync.release(1);
} // 核心就是堆这个 加一 和 减一
// 看看 tryLock 和 其他
public void lockInterruptibly() throws InterruptedException {
this.sync.acquireInterruptibly(1);
}
public boolean tryLock() {
// 难怪把这个 nonfair 方法放到父类,因为 tryLock 的时候都是调用的这个
return this.sync.nonfairTryAcquire(1);
}
// 超时获取失败就算了的锁
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return this.sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
// ReentrantLock 的 newCondition,底层是调用的 new ConditionObject
final ConditionObject newCondition() {
return new ConditionObject(this);
}
// Condition 的 await 和 signal
public final void await() throws InterruptedException {
if (Thread.interrupted()) {
throw new InterruptedException();
} else {
// 主要逻辑在 Node 中,这里 addConditionWaiter 是核心,把当前线程构造一个装进去
/*
构建一个 Node
*/
AbstractQueuedSynchronizer.Node node = this.addConditionWaiter();
int savedState = AbstractQueuedSynchronizer.this.fullyRelease(node);
int interruptMode = 0;
while(!AbstractQueuedSynchronizer.this.isOnSyncQueue(node)) {
LockSupport.park(this); // park 住了
if ((interruptMode = this.checkInterruptWhileWaiting(node)) != 0) {
break;
}
}
if (AbstractQueuedSynchronizer.this.acquireQueued(node, savedState) && interruptMode != -1) {
interruptMode = 1;
}
if (node.nextWaiter != null) {
this.unlinkCancelledWaiters();
}
if (interruptMode != 0) {
this.reportInterruptAfterWait(interruptMode);
}
}
}
private void doSignal(AbstractQueuedSynchronizer.Node first) {
// 唤醒第一个等待的节点
do {
if ((this.firstWaiter = first.nextWaiter) == null) {
this.lastWaiter = null;
}
first.nextWaiter = null;
} while(!AbstractQueuedSynchronizer.this.transferForSignal(first) && (first = this.firstWaiter) != null);
}
// addConditionWaiter 方法
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
Node node = new Node(Node.CONDITION);
}
// node 的构造方法,把当前 Thread 放到 Node 里面
Node(int waitStatus) {
U.putInt(this, WAITSTATUS, waitStatus);
U.putObject(this, THREAD, Thread.currentThread());
}