kotlin-suspend
一、为什么我们需要异步编程?
传统编程里,调用函数大多是同步的:
- 比如你调用一个方法获取数据,假如它要花100毫秒,你的程序就“傻等”100毫秒。
- 如果同时来了1000个这样的请求,你的服务器可能会需要1000个线程去傻等,浪费大量资源。
因此诞生了异步编程:
- 程序在等待I/O时,不再占用线程资源;
- 当结果准备好后,系统再通知你继续执行后面的代码。
接下来,我们用通俗语言逐一介绍5种常见的异步编程模型。
先给出最核心的一句话,再分别拆开讲:
异步 ≠ 并行;异步指“发起操作后不用等待结果就能继续干别的事”,结果准备好时系统通知你。
二、常见的5种异步编程模型详解
1. 回调(Callback)模式
核心思想:
- 当你需要等待结果时,不再傻等,而是把等待后的操作写成一个函数(回调函数),交给系统。
- 当系统完成了任务(比如文件读取完成),会自动调用这个函数。
生活类比:
你去餐厅吃饭,服务员告诉你:“饭好了我会喊你”,于是你先去做别的,等饭好了(事件)服务员(系统)再喊你(回调函数执行)。
JavaScript 示例:
1 | // 用回调函数读取文件,完成后系统自动调用这个函数 |
问题: 当回调嵌套很多层(“回调地狱”)时,代码会非常难读。
2. Future / Promise 模式
核心思想:
- 系统立即返回给你一个空盒子(Future / Promise)。
- 实际结果准备好后,这个“空盒子”里自动装上结果。
- 你只需要关心“盒子”什么时候满,不需要关心系统如何填充它。
生活类比:
你在网上买东西,商家给你一个取件码(Future)。快递到了自动放到快递柜(自动填充结果),你不需要去等,也不需要盯着送货员,你只需等取件码通知你去取货。
Java 示例:
1 | // 启动一个异步任务(模拟耗时任务) |
Future/Promise 最大优点是把“异步操作”封装成对象。
- 它既能向下兼容回调模式(内部还是用回调实现),也能向上支撑 async-await 或协程这些更高级模型(被它们一键 await)。
- 这就是它“承上启下”的含义。
3. Reactive 响应式流模式(Reactive Streams)
在Reactive响应式流模式中,订阅关系就是回调事件,subscribe(...)
干了两件事:
- 向上游(Publisher)注册一组回调:
onNext
,onError
,onComplete
。 - 立刻返回,线程不会傻等;真正的数据什么时候到达,完全取决于上游。
生活类比:
自来水管,每次只放一点水出来,你可以随时暂停(背压)、过滤、或者将多个水管的水混合到一起。
Java Reactor 示例:
1 | import reactor.core.publisher.Flux; |
输出(简化):
1 | 订阅完成,主线程可以继续执行其他逻辑 说明 `subscribe(...)` 是异步的。 |
只要数据源(定时器、HTTP 流、消息队列)本身是非阻塞的,整个链条就是异步;
你用just()
例子里没有体现时间差,所以看起来“同步”。把它换成interval()
、fromIterable().delayElements()
或网络请求,就能直观看到异步效果。
1.底层原理:Scheduler + Worker
Flux.interval(...)
返回的流是基于时间驱动的“计时器流”,它每隔一段时间 发射一个元素。
Reactor Core 在内部把定时任务交给了一个 Scheduler:
- Scheduler
- 是 Reactor 对线程池的抽象,常见实现有:
Schedulers.parallel()
:CPU 密集型任务池Schedulers.boundedElastic()
:适合阻塞 I/O 的弹性池- 内置的定时器池:
Schedulers.parallel()
也被用来调度Flux.interval
的定时任务
- 是 Reactor 对线程池的抽象,常见实现有:
- Worker
- 每个 Scheduler 可以创建多个 Worker,每个 Worker 对应一个线程或执行上下文
Flux.interval
会取得一个 Worker,然后调用类似:1
2
3
4worker.schedulePeriodically(
() -> subscriber.onNext(nextValue), // 定时执行发射
initialDelay, period, TimeUnit.MILLISECONDS
);- 这样发射元素的逻辑就跑在 Scheduler 的线程 上,跟主线程完全解耦。
2. subscribe() 为什么不阻塞
Reactor 遵循 Reactive Streams 规范:
订阅(
subscribe
)阶段:- 建立连接:
Publisher
(Flux)和Subscriber
(你的回调)互相握手 - 返回值:一个可取消的句柄
- 建立连接:
请求(
request(n)
)阶段:- 默认
subscribe()
会发起request(Long.MAX_VALUE)
,告诉上游“我无限制地接收你发射的所有元素”
- 默认
发射(
onNext
)阶段:- 上游拿到
request
后,按自己的节奏(定时器、I/O 完成回调等)往下游发送onNext
- 这一切都发生在 Scheduler 的线程里,不会影响订阅时的调用栈
- 上游拿到
因为 所有定时调度 都交给了其他线程,subscribe()
这一行永远不会等到第一个元素发射完才返回。
4. Actor(消息驱动)模式
核心思想:
- 每个组件(Actor)只负责接收消息,并顺序处理。
- Actor 之间互不干扰,不用显式加锁,适合并发场景。
生活类比:
你去银行办业务,每个窗口独立处理自己的队列(消息队列)。队伍里的每个人挨个处理(顺序执行),窗口之间互不影响。
Java Actor示例
1 |
|
运行后,打印得到:1
2
3
4
5
6
7
8
9
10
11
12
13[1745902915772] Sent Inc #0
[1745902915775] Actor incremented to 1
[1745902915776] Sent Inc #1
[1745902915776] Sent Inc #2
[1745902915776] Actor incremented to 2
[1745902915776] Sent Inc #3
[1745902915776] Actor incremented to 3
[1745902915776] Sent Inc #4
[1745902915776] Actor incremented to 4
[1745902915777] Actor incremented to 5
[1745902915776] Messages sent, doing other work…
[1745902918788] Final count = 5
[INFO] [04/29/2025 13:01:58.815] [main] [CoordinatedShutdown(akka://MySystem)] Running CoordinatedShutdown with reason [ActorSystemTerminateReason]
- “Sent Inc” 与 “Actor incremented” 交替出现,说明发送方异步 fire-and-forget,Actor 内部在后台线程顺序消费。
main
线程并未等待 Actor 处理便继续执行,最终通过Get
和CompletableFuture
拿到结果。
Actor底层原理
在 Actor 模式下,每个 Actor 都像一个“独立小宇宙”──它拥有自己的邮箱(mailbox)、行为逻辑(behavior) 和执行上下文(dispatcher + 线程池 )
1. 邮箱(Mailbox)与消息队列
- 邮箱是一个线程安全的队列,专门用来存放别人发给这个 Actor 的消息。
- 当你调用
actorRef.tell(msg, sender)
时,底层只是把msg
放入这个队列,然后立马返回,不做任何等待或阻塞。
2. 调度器(Dispatcher)与线程池
- Dispatcher 相当于一个线程池+事件循环的组合:
- 线程池:包含若干工作线程,用来“跑”Actor 的消息处理逻辑。
- 事件循环(Event Loop):定期去查看自己负责的一组 Actor 有没有待处理的消息。
- 不同 Actor 可以共享同一个 Dispatcher,也可以各自配置不同类型的 Dispatcher(如:针对 CPU 密集型、I/O 密集型等场景)。
3. 串行处理:Actor 自身线程隔离
当 Dispatcher 的一个工作线程被分配去调度某个 Actor 时,底层会做:
- “单线程顺序”:每个 Actor 的所有消息都在同一个线程上下文里、一个接一个地被处理。
- 线程隔离:不同 Actor 的消息处理可以发生在同一个线程池的不同线程上,但它们相互独立,不会并发操作同一个 Actor 的内部状态。
回到上面这个例子,如果我们tell Actor 500次,主线程等待3秒后再用Get查询,会发现需要等Actor完成所有自增才会去查询。这并不是说主线程被阻塞。而是在休眠3庙后,立刻把Get消息入队,但是此时Actor还没有消化完所有的Inc消息,导致看起来像阻塞了一样。
1
2
3
4
5
6
7
8
主线程 Actor 线程
--------------------------------------------
tell Inc×500 ──▶ mailbox: [Inc,Inc,…,Inc]
(print "…") (sleep 100ms+count++)×500
sleep 3s …
tell Get ──▶ mailbox: [Inc,…,Inc,Get]
完成500次Inc后,处理Get
future.complete thenAccept 打印 Final count
1 | 主线程 Actor 线程 |
5. 协程 / async-await 模式
核心思想:
- 程序代码看起来和同步代码几乎一样,但在执行时遇到耗时任务(异步任务)会自动“挂起”,线程去做其他事情;
- 当任务完成后,“恢复”执行。
生活类比:
你看书看到一半去煮咖啡(挂起),你不会坐着傻等,而是去洗衣服。当咖啡煮好后,你回来继续看书(恢复)。
JavaScript async-await 示例:
1 | async function fetchData() { |
Kotlin 协程 suspend 示例:
1 | suspend fun fetchData(): String { |
“挂起 ≠ 线程阻塞”:
- 挂起(suspend)时线程立即释放,可以去处理其他请求或协程;
- 结果返回时,协程继续在某个空闲线程恢复。
kotlin-suspend 底层逻辑
Continuation对象
想象你在读一本小说,到某一页你要出去办事,还想回家后接着读。因此,你就在那页插入一个书签,书签里还写着一句「回家后从这里继续读」。
在协程世界里,每遇到一个“挂起点”——比如
delay(1000)
或者网络 I/O 调用——- 编译器都会给你创建一个 Continuation 对象,它就相当于那个「书签」。
- 它记录了你到底挂在哪行代码(标签 label)。
- 它保存了挂起前已计算出的局部变量,就像把你读到一半的内容做个快照。
- 它本身就是一个回调函数:“等下恢复时,就把这个书签放回代码里,从这里往下执行”。
所以,Continuation 就是 它带着 “下一步该从哪读” (label
) + “我读到哪儿了” (savedX
) + “挂起后要谁来叫我” (resume 回调)
- 编译器都会给你创建一个 Continuation 对象,它就相当于那个「书签」。
在 Kotlin 编译器眼里,所有 suspend fun
并不是“特权函数”,而是被改写成了接受一个额外的 Continuation
参数、返回一个 Object
(或者更准确地说,返回 Any?
)的普通函数。它的底层逻辑大致可以分成三步:
1. 编译期:状态机+Continuation
假设写了:1
2
3
4
5suspend fun fetchData(): String {
val a = callA() // 挂起点①
val b = callB(a) // 挂起点②
return process(b)
}
编译器会把它变成类似下面这种伪代码(高度简化):
1 | fun fetchData(continuation: Continuation<String>): Any? { |
这里比较困惑的,就是resultA
和continuation.savedA
分别是什么意思,我在代码中也给了详细注释:
resultA
:只有在callA
立即返回时才会持有结果,并在本次调用体内使用。continuation.savedA
:只有在callA
挂起的情况下,才会被存入,然后在恢复分支里被读取。
二者合起来,就保证了 “同步写法” 的suspend fun
既能覆盖 即时返回,也能覆盖 异步挂起+恢复 两种情况。2. 运行时:挂起 & 恢复
以 delay(1000)
为例,它本质上是这样做的:
- 在协程里调用
delay(1000, continuation)
。 delay
内部会:- 在
Dispatchers
的定时任务调度器里注册一个定时器,到时会调用continuation.resume(Unit)
; - 立即返回
COROUTINE_SUSPENDED
,让刚才那个suspend
函数也返回COROUTINE_SUSPENDED
。
- 在
- 编译器生成的状态机函数看到自己的调用
callDelay(...)
返回了COROUTINE_SUSPENDED
,就不往下执行,而是退出到调用者(通常是runBlocking
或者某个调度器线程)。 - 1 秒后,定时器触发,线程池里的某条线程调用
continuation.resume(Unit)
,这是一个 回调——它会把label
标记的状态机“激活”回去,继续执行delay
之后的那段代码。
3. 核心结论
- 挂起点(任何
suspend
调用)都被编译成:- 保存现场到 Continuation
- 返回
COROUTINE_SUSPENDED
- 恢复(
resume
)实际上就是一个回调,给 Continuation 发送 “继续跑” 的信号。 - 这样一来,在代码里看到的同步式调用(像普通函数)背后,其实隐藏了一个状态机+回调网络,帮我们把“异步回调”糖衣化成“同步风格”的编程体验。
三、为什么Kotlin更倾向于用suspend,而不是Actor、Flow、Reactor?
因为在大部分场景中,你只需要“一问一答”的异步通信,而不一定需要复杂的“流式组合”或“消息队列”。
Kotlin的suspend
能用最简单的语法实现异步编程,同时提供了丰富的库(如Flow
)兼容其他模型。
suspend
能用简单代码实现高效异步;- 需要更复杂场景时,可以自然过渡到
Flow
、Actor
,无需大规模重构。