reentrantlock学习

reentrantlock学习

这边主要是记录一些面试中比较深入的问题

首先我们需要对reentrantlock的底层结构有一定的了解

1
2
3
4
5
6
7
8
9
硬件/OS 阻塞原语

JVM/Unsafe → LockSupport.park/unpark // 最底层的阻塞/唤醒

AQS(AbstractQueuedSynchronizer) // 队列、状态、CAS、可重入计数

ReentrantLock / Semaphore / CountDownLatch / ...

Condition(基于对应的Lock/AQS)

1 lock/unlock 和 signal/await 和 park/unpark的区别

维度 lock / unlock Condition.await / signal[All] LockSupport.park / unpark
抽象层 同步器 API(基于 AQS) 同步器 API(依附某把 Lock) 低层原语(面向线程)
是否绑定锁 ✅(这把锁) ✅(lock.newCondition() 产物) ❌(与锁无关)
作用对象 进入/退出临界区 等待“条件成立”与通知 让某个线程睡/醒
会否释放锁 unlock 释放 await 原子释放锁后挂起;返回前重新获得 不涉及锁(不释放也不获取)
进入的队列 AQS 同步队列 条件队列(await)→ 被 signal 转移到同步队列 无;线程自己阻塞
唤醒触发 其他线程 unlock(释放后下一个获取) signal/signalAll(把等待者送回同步队列再竞争锁) unpark(thread)(发 1 个“许可”)
可先发后收 语义上可,但必须在锁内配合使用 ✅(unpark 可先于 park,最多积 1 个许可)
伪唤醒 可能(Mesa 语义),需 while(!predicate) await() 可能(规范允许),需循环校验条件
中断语义 不涉及中断 await()InterruptedException(清除中断标志);有 awaitUninterruptibly() 版本 被中断会返回,但不抛异常,中断标志仍置位
内存可见性(HB) unlock happens-before 随后对同一把锁的 lock 依赖上面的锁语义:在持锁内修改状态 → signal → 等待者重新拿到同一把锁后可见 自身不是屏障;需配合 volatile/Atomic 等建立可见性
典型用途 保护共享状态 生产者-消费者、条件等待(一个锁可有 多个条件队列) 自定义同步器/调度器的底层积木(AQS 就用它)

Lock + Condition(生产者-消费者)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final ReentrantLock lock = new ReentrantLock();
final Condition notEmpty = lock.newCondition();
final Queue<Integer> q = new ArrayDeque<>();

void put(int x){
lock.lock();
try {
q.add(x);
notEmpty.signal(); // 必须持锁;只叫醒一个等待者
} finally { lock.unlock(); }
}

int take() throws InterruptedException {
lock.lock();
try {
while (q.isEmpty()) { // 防伪唤醒
notEmpty.await(); // 释放锁 + 入队 + 挂起;醒来后再抢回锁
}
return q.remove();
} finally { lock.unlock(); }
}
  • await() 为什么“释放锁”:它把“释放锁+进入等待队列+挂起”做成一个原子步骤;否则用“先 unlockpark”会有丢信号竞态。
  • signal ≠ 立刻让对方运行:只是把它送回同步队列;对方还要重新竞争锁(Mesa 语义)。

低层 park/unpark(自定义同步原语的雏形)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class Gate {
private final AtomicBoolean open = new AtomicBoolean(false);
private final Thread waiter;
Gate(Thread waiter) { this.waiter = waiter; }

void awaitOpen() {
while (!open.get()) { // 可见性依赖 AtomicBoolean
LockSupport.park(this); // 可能被中断/伪唤醒,故循环校验
if (Thread.interrupted()) { /* 处理中断策略 */ }
}
}
void open() {
if (open.compareAndSet(false, true)) {
LockSupport.unpark(waiter); // 可先发后收;最多积1个许可
}
}
}

2

AQS不是有一个队列吗?那如果是非公平的,队列头释放锁了以后,是不是所有队列中的节点都能够去争抢这个锁?

即使是非公平的 ReentrantLock,AQS 在释放时也只唤醒队列头结点的“有效后继”那一个线程,不会把整条队列都唤醒去抢锁,更不会出现“排队的一起群抢”的情形。

精准结论

  • 谁会去争抢?
    1. unpark队首后继线程(只有一个);
    2. 以及新到达、尚未入队的线程(非公平允许它们插队 CAS 抢锁)。
  • 谁不会去争抢?
    队列里更靠后的节点仍然 park 着,没有被唤醒的许可,不会同时去抢。

释放路径(发生了什么)

  1. 持有者 unlock()tryRelease() 成功把 state 归零;
  2. AQS 调 unparkSuccessor(head)
    • 清掉 head.waitStatus 的负值(SIGNAL);
    • 选中 head.next(若为空或取消则自尾向前找第一个有效节点);
    • 对它 LockSupport.unpark(thread) —— 只唤醒这一个

被唤醒的这个线程返回到自旋逻辑里:若前驱是 headtryAcquire() 成功,就成为新 head;否则按 AQS 逻辑再次 park

非公平与“插队”

  • 非公平:新来的线程在看到 state==0 时可以直接 CAS 抢,不看队列里是否有人在等;因此它可能打败刚被唤醒的队首后继。被打败的后继不会消失,仍是队首,等下一次释放再被唤醒/重试。
  • 公平tryAcquire 会检查 hasQueuedPredecessors(),有前驱就不抢,保证 FIFO,不允许新线程插队。
1
2
3
4
5
T1 持锁;队列: head -> [T2] -> [T3] -> [T4]
T1 unlock ➜ unpark(T2) # 只唤醒 T2
(同时) 新线程 T5 来了:
+ 非公平:T5 直接 CAS 抢到锁 ➜ 成功
+ T2 被唤醒尝试 CAS 失败 ➜ 留在队首,等待 T5 释放

3

reenterant lock是cas+队列实现的,那队列头释放锁了,怎么通知别人?

获取失败 → 入队并“订阅唤醒”

  1. 线程尝试 tryAcquire()(内部是 CAS 改 state),失败后调用 acquireQueued(addWaiter(EXCLUSIVE), arg)
  2. addWaiter 把当前线程包装成 NodeCLH FIFO 队列尾部,并确保有一个哑元 head
  3. 在自旋里做两件事:
    • 前驱就是 head,再尝试 tryAcquire()(公平锁会先看 hasQueuedPredecessors())。
    • 否则把前驱的 waitStatus 设为 SIGNAL(-1),表示“前驱释放时请唤醒我”,然后 LockSupport.park(this) 挂起自己。

关键点:不是“我记得你”,而是“我让前驱记得我”(把前驱标成 SIGNAL)。这样前驱释放时知道要叫醒后继。

释放成功 → 唤醒后继

  1. 持有锁的线程在 unlock() 中调用 sync.release(arg)tryRelease(arg)
    • 递减重入计数;降到 0 时清空 exclusiveOwnerThread 并返回 true
  2. release() 见到 true 后调用 unparkSuccessor(head)
    • head.waitStatus < 0 先清 0;
    • 找到 head.next(如果为空或已取消,就从尾往前找第一个 waitStatus <= 0 的有效节点);
    • 对该节点的 thread 调用 LockSupport.unpark(thread)

unpark 的线程从 park() 返回,继续自旋:若其前驱已是新的 headtryAcquire() 成功,就把自己设为新的 head(旧 head.next=null 以便 GC),否则再次 park() 等下一次唤醒。

concurrenthashmap 底层原理,put行为,扩容/迁移时的逻辑

扩容触发条件

ConcurrentHashMap 中,扩容主要由以下条件触发:

  • 负载因子:当当前的 size(元素数目)超过总桶数的负载因子(默认 0.75)时触发扩容。
  • 当前桶数:当当前桶的数量超过一定阈值时,ConcurrentHashMap 会扩容。桶数是 2^n(即 16, 32, 64 等等)。

扩容条件的计算:

假设 threshold = capacity * loadFactor,当 threshold 达到或超过当前元素数量时,就触发扩容。

2. 扩容实现的具体步骤

扩容时,ConcurrentHashMap 会创建一个新的表(newTable),其大小是当前表的两倍,并将现有的元素从旧表迁移到新表。扩容过程是通过以下步骤实现的:

  1. 初始化扩容线程
    sizeCtl 变量的值表示需要扩容时,ConcurrentHashMap 会设置 sizeCtl 来控制扩容的进程。sizeCtl 是一个 int 变量,它既表示扩容时的新表大小,也表示扩容的状态。
    • 负值表示正在扩容:如果 sizeCtl 为负数,表示扩容操作正在进行,扩容线程需要进行工作窃取。
    • 正值表示表的容量:如果 sizeCtl 是正数,则它表示当前 table 的容量,扩容是按此容量的 2 倍进行的。
  2. 迁移过程(工作窃取)
    • ConcurrentHashMap 的扩容通过 工作窃取(work-stealing)的方式来并行化迁移工作。扩容操作的核心是将旧表的数据迁移到新表。
    • 在扩容期间,多个线程会共同参与扩容迁移。每个线程会尝试将一个桶中的元素移动到新的表中(两个桶会分配到新表中的不同位置)。
    • 每个桶会单独迁移,所以扩容过程中不会加锁整个 ConcurrentHashMap,可以避免“全表阻塞”的问题。
  3. 扩容的迁移过程
    • 迁移操作会通过 ForwardingNode 来标记当前的桶是否已经被迁移。在扩容时,每个桶会被标记为 ForwardingNode,指示该桶已经被迁移到新表中。
    • 当一个线程遇到一个桶的首节点是 ForwardingNode 时,它会尝试去迁移数据。
  4. 迁移步骤
    迁移时,ConcurrentHashMap 会将 链表节点 按照它们的 hash 重新计算并分配到新表的相应位置。具体步骤:
    • 每个桶中的元素都会根据哈希值计算新表中的位置。
    • 元素重新分配到新表中后,ForwardingNode 会表示桶已迁移,其他线程看到该桶时会参与后续迁移
  5. 结束扩容
    一旦所有元素都成功迁移到新的表中,扩容操作完成。此时,sizeCtl 会被更新为新的表容量,并且所有的迁移标记(如 ForwardingNode)都会被清除。

3. 扩容的并发性和延迟

扩容时,ConcurrentHashMap 允许多个线程并行迁移数据,以最大程度地减少扩容时的性能损失。并且扩容期间不会完全锁住整个 ConcurrentHashMap,这样可以保证读操作的并发性。

  • 扩容过程中对读取操作的影响
    • 读操作:扩容时不会中断对 ConcurrentHashMap 的读取操作。读取操作并不需要等待迁移完成,只要能够读取到当前桶的数据即可。如果正在迁移的桶存在 ForwardingNode,读操作会自动跳过迁移中的桶,继续在新表中查找。
  • 扩容期间工作窃取
    • 每个线程会尝试窃取一个桶的迁移任务(分配和迁移桶),直到所有桶迁移完成。这样能显著减少扩容的延迟。
  • 新旧表数据一致性
    • 在迁移期间,ConcurrentHashMap 保证不会丢失数据,所有的操作(插入、更新、删除)都可以正确执行。只要数据还未迁移到新表,其他线程访问旧表也可以正常工作。

4. 扩容时写入操作如何处理

  • 写操作的执行路径:扩容时,写操作会直接写入到新表中。如果一个线程正在写入,且该线程的 put 操作未影响正在迁移的桶,它会直接向新表的相应位置插入数据。
  • 扩容中的桶锁:在扩容过程中,某些桶会因为迁移而变成 ForwardingNode(一个标记,表示该桶正在迁移),但并不会阻塞写入。只有在该桶尚未迁移到新表时,才会有锁操作。迁移时,迁移的桶会被加锁(使用桶级别的 synchronized),防止多线程同时修改同一个桶的数据。
    • 在一个线程尝试写入桶时,如果它发现该桶正在迁移,它会进入帮助迁移的模式,参与迁移。此时,扩容过程仍然可以继续进行,不会因为某个线程的写入而阻塞整个操作。
    • 也就是说,当桶正在迁移时,只有该桶的写操作会被阻塞。其他桶的读和写操作都不会受影响

5. 扩容时的线程协作与“工作窃取”原理

为了避免扩容期间所有线程都在等待迁移的单一线程,ConcurrentHashMap 采用了工作窃取策略,多个线程可以并行地“窃取”任务来迁移桶,直到所有数据迁移完成。这个机制极大地提高了扩容的并行度,减少了扩容的延迟。

-------------本文结束,感谢您的阅读-------------