kotlin-suspend

kotlin-suspend

一、为什么我们需要异步编程?

传统编程里,调用函数大多是同步的:

  • 比如你调用一个方法获取数据,假如它要花100毫秒,你的程序就“傻等”100毫秒。
  • 如果同时来了1000个这样的请求,你的服务器可能会需要1000个线程去傻等,浪费大量资源。

因此诞生了异步编程

  • 程序在等待I/O时,不再占用线程资源;
  • 当结果准备好后,系统再通知你继续执行后面的代码。

接下来,我们用通俗语言逐一介绍5种常见的异步编程模型。

先给出最核心的一句话,再分别拆开讲:

异步 ≠ 并行;异步指“发起操作后不用等待结果就能继续干别的事”,结果准备好时系统通知你。


二、常见的5种异步编程模型详解

1. 回调(Callback)模式

核心思想:

  • 当你需要等待结果时,不再傻等,而是把等待后的操作写成一个函数(回调函数),交给系统。
  • 当系统完成了任务(比如文件读取完成),会自动调用这个函数。

生活类比:

你去餐厅吃饭,服务员告诉你:“饭好了我会喊你”,于是你先去做别的,等饭好了(事件)服务员(系统)再喊你(回调函数执行)。

JavaScript 示例:

1
2
3
4
5
// 用回调函数读取文件,完成后系统自动调用这个函数
fs.readFile('myFile.txt', (err, data) => {
if (err) throw err;
console.log(data.toString());
});

问题: 当回调嵌套很多层(“回调地狱”)时,代码会非常难读。


2. Future / Promise 模式

核心思想:

  • 系统立即返回给你一个空盒子(Future / Promise)
  • 实际结果准备好后,这个“空盒子”里自动装上结果。
  • 你只需要关心“盒子”什么时候满,不需要关心系统如何填充它。

生活类比:

你在网上买东西,商家给你一个取件码(Future)。快递到了自动放到快递柜(自动填充结果),你不需要去等,也不需要盯着送货员,你只需等取件码通知你去取货。

Java 示例:

1
2
3
4
5
6
7
8
// 启动一个异步任务(模拟耗时任务)
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
Thread.sleep(1000); // 模拟慢操作
return "Hello Future!";
});

// 当任务完成,自动执行这个回调
future.thenAccept(result -> System.out.println(result));

Future/Promise 最大优点是把“异步操作”封装成对象。

  • 它既能向下兼容回调模式(内部还是用回调实现),也能向上支撑 async-await 或协程这些更高级模型(被它们一键 await)。
  • 这就是它“承上启下”的含义。

3. Reactive 响应式流模式(Reactive Streams)

在Reactive响应式流模式中,订阅关系就是回调事件,subscribe(...) 干了两件事:

  1. 向上游(Publisher)注册一组回调onNext, onError, onComplete
  2. 立刻返回,线程不会傻等;真正的数据什么时候到达,完全取决于上游。

生活类比:

自来水管,每次只放一点水出来,你可以随时暂停(背压)、过滤、或者将多个水管的水混合到一起。

Java Reactor 示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import reactor.core.publisher.Flux;
public static void main(String[] args) {
// 每 500ms 产生一个 Long(0,1,2,…)
Flux.interval(Duration.ofMillis(500))
.map(i -> "tick-" + i) // 转换成字符串
.subscribe(System.out::println); // 明确调用 System.out.println
// 立刻打印,说明 subscribe() 并不阻塞当前线程
System.out.println("订阅完成,主线程可以继续执行其他逻辑");

// 阻塞一下主线程,以便观察 2-3 次输出
try {
Thread.sleep(1600);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

System.out.println("主线程结束");
}

输出(简化):

1
2
3
4
5
订阅完成,主线程可以继续执行其他逻辑 说明 `subscribe(...)` 是异步的。
tick-0
tick-1
tick-2
主线程结束

只要数据源(定时器、HTTP 流、消息队列)本身是非阻塞的,整个链条就是异步
你用 just() 例子里没有体现时间差,所以看起来“同步”。把它换成 interval()fromIterable().delayElements() 或网络请求,就能直观看到异步效果。

1.底层原理:Scheduler + Worker

Flux.interval(...) 返回的流是基于时间驱动的“计时器流”,它每隔一段时间 发射一个元素
Reactor Core 在内部把定时任务交给了一个 Scheduler

  1. Scheduler
    • 是 Reactor 对线程池的抽象,常见实现有:
      • Schedulers.parallel():CPU 密集型任务池
      • Schedulers.boundedElastic():适合阻塞 I/O 的弹性池
      • 内置的定时器池Schedulers.parallel() 也被用来调度 Flux.interval 的定时任务
  2. Worker
    • 每个 Scheduler 可以创建多个 Worker,每个 Worker 对应一个线程或执行上下文
    • Flux.interval 会取得一个 Worker,然后调用类似:
      1
      2
      3
      4
      worker.schedulePeriodically(
      () -> subscriber.onNext(nextValue), // 定时执行发射
      initialDelay, period, TimeUnit.MILLISECONDS
      );
    • 这样发射元素的逻辑就跑在 Scheduler 的线程 上,跟主线程完全解耦。

2. subscribe() 为什么不阻塞

Reactor 遵循 Reactive Streams 规范

  1. 订阅subscribe)阶段:

    • 建立连接:Publisher(Flux)和 Subscriber(你的回调)互相握手
    • 返回值:一个可取消的句柄
  2. 请求request(n))阶段:

    • 默认 subscribe() 会发起 request(Long.MAX_VALUE),告诉上游“我无限制地接收你发射的所有元素”
  3. 发射onNext)阶段:

    • 上游拿到 request 后,按自己的节奏(定时器、I/O 完成回调等)往下游发送 onNext
    • 这一切都发生在 Scheduler 的线程里,不会影响订阅时的调用栈

因为 所有定时调度 都交给了其他线程,subscribe() 这一行永远不会等到第一个元素发射完才返回。


4. Actor(消息驱动)模式

核心思想:

  • 每个组件(Actor)只负责接收消息,并顺序处理。
  • Actor 之间互不干扰,不用显式加锁,适合并发场景。

生活类比:

你去银行办业务,每个窗口独立处理自己的队列(消息队列)。队伍里的每个人挨个处理(顺序执行),窗口之间互不影响。

Java Actor示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
  
public interface CounterMsg {}

// 递增消息
public static class Inc implements CounterMsg {}

// 查询消息,携带 CompletableFuture 用于回执
public static class Get implements CounterMsg {
public final CompletableFuture<Integer> replyTo;
public Get(CompletableFuture<Integer> replyTo) {
this.replyTo = replyTo;
}
}

public static class CounterActor extends AbstractActor {
private int count = 0;

@Override
public Receive createReceive() {
return receiveBuilder()
.match(Inc.class, msg -> {
count++;
System.out.println("[" + System.currentTimeMillis() + "] Actor incremented to " + count);
})
.match(Get.class, msg -> {
msg.replyTo.complete(count);
})
.build();
}
}

public static void main(String[] args) {
// 1) 创建 ActorSystem
ActorSystem system = ActorSystem.create("MySystem");
// 2) 在 system 中创建一个名为 "counter" 的 CounterActor
ActorRef counter = system.actorOf(Props.create(CounterActor.class), "counter");

// 3) 异步地连续发送 5 条 Inc 消息给Actor,立即返回,不等待。
for (int i = 0; i < 500; i++) {
counter.tell(new Inc(), ActorRef.noSender());
System.out.println("[" + System.currentTimeMillis() + "] Sent Inc #" + i);
}

// 4) 主线程继续执行,不会等待 Actor 完成处理
System.out.println("[" + System.currentTimeMillis() + "] Messages sent, doing other work…");

// 5) 等 3s 再查询当前计数
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

// 6) 通过 Get 消息和 CompletableFuture 拿回最终结果
CompletableFuture<Integer> futureCount = new CompletableFuture<>();
counter.tell(new Get(futureCount), ActorRef.noSender());
futureCount.thenAccept(count ->
System.out.println("[" + System.currentTimeMillis() + "] Final count = " + count)
).thenRun(system::terminate);
}

运行后,打印得到:

1
2
3
4
5
6
7
8
9
10
11
12
13
[1745902915772] Sent Inc #0
[1745902915775] Actor incremented to 1
[1745902915776] Sent Inc #1
[1745902915776] Sent Inc #2
[1745902915776] Actor incremented to 2
[1745902915776] Sent Inc #3
[1745902915776] Actor incremented to 3
[1745902915776] Sent Inc #4
[1745902915776] Actor incremented to 4
[1745902915777] Actor incremented to 5
[1745902915776] Messages sent, doing other work…
[1745902918788] Final count = 5
[INFO] [04/29/2025 13:01:58.815] [main] [CoordinatedShutdown(akka://MySystem)] Running CoordinatedShutdown with reason [ActorSystemTerminateReason]

  • “Sent Inc”“Actor incremented” 交替出现,说明发送方异步 fire-and-forget,Actor 内部在后台线程顺序消费
  • main 线程并未等待 Actor 处理便继续执行,最终通过 GetCompletableFuture 拿到结果。

Actor底层原理

在 Actor 模式下,每个 Actor 都像一个“独立小宇宙”──它拥有自己的邮箱(mailbox)行为逻辑(behavior)执行上下文(dispatcher + 线程池 )

1. 邮箱(Mailbox)与消息队列
  • 邮箱是一个线程安全的队列,专门用来存放别人发给这个 Actor 的消息。
  • 当你调用 actorRef.tell(msg, sender) 时,底层只是把 msg 放入这个队列,然后立马返回,不做任何等待或阻塞。
2. 调度器(Dispatcher)与线程池
  • Dispatcher 相当于一个线程池+事件循环的组合:
    • 线程池:包含若干工作线程,用来“跑”Actor 的消息处理逻辑。
    • 事件循环(Event Loop):定期去查看自己负责的一组 Actor 有没有待处理的消息。
  • 不同 Actor 可以共享同一个 Dispatcher,也可以各自配置不同类型的 Dispatcher(如:针对 CPU 密集型、I/O 密集型等场景)。
    3. 串行处理:Actor 自身线程隔离

当 Dispatcher 的一个工作线程被分配去调度某个 Actor 时,底层会做:

  • “单线程顺序”:每个 Actor 的所有消息都在同一个线程上下文里、一个接一个地被处理。
  • 线程隔离:不同 Actor 的消息处理可以发生在同一个线程池的不同线程上,但它们相互独立,不会并发操作同一个 Actor 的内部状态。

回到上面这个例子,如果我们tell Actor 500次,主线程等待3秒后再用Get查询,会发现需要等Actor完成所有自增才会去查询。这并不是说主线程被阻塞。而是在休眠3庙后,立刻把Get消息入队,但是此时Actor还没有消化完所有的Inc消息,导致看起来像阻塞了一样。

1
2
3
4
5
6
7
8
主线程                            Actor 线程
--------------------------------------------
tell Inc×500 ──▶ mailbox: [Inc,Inc,…,Inc]
(print "…") (sleep 100ms+count++)×500
sleep 3s …
tell Get ──▶ mailbox: [Inc,…,Inc,Get]
完成500次Inc后,处理Get
future.complete thenAccept 打印 Final count

5. 协程 / async-await 模式

核心思想:

  • 程序代码看起来和同步代码几乎一样,但在执行时遇到耗时任务(异步任务)会自动“挂起”,线程去做其他事情;
  • 当任务完成后,“恢复”执行。

生活类比:

你看书看到一半去煮咖啡(挂起),你不会坐着傻等,而是去洗衣服。当咖啡煮好后,你回来继续看书(恢复)。

JavaScript async-await 示例:

1
2
3
4
5
async function fetchData() {
const data = await fetch('https://example.com/api/data');
const json = await data.json();
console.log(json);
}

Kotlin 协程 suspend 示例:

1
2
3
4
5
6
7
8
suspend fun fetchData(): String {
delay(1000) // 模拟耗时异步操作
return "Hello Coroutine!"
}

suspend fun main() {
println(fetchData())
}

“挂起 ≠ 线程阻塞”:

  • 挂起(suspend)时线程立即释放,可以去处理其他请求或协程;
  • 结果返回时,协程继续在某个空闲线程恢复。

    kotlin-suspend 底层逻辑

    Continuation对象

  • 想象你在读一本小说,到某一页你要出去办事,还想回家后接着读。因此,你就在那页插入一个书签,书签里还写着一句「回家后从这里继续读」。

  • 在协程世界里,每遇到一个“挂起点”——比如 delay(1000) 或者网络 I/O 调用——

    • 编译器都会给你创建一个 Continuation 对象,它就相当于那个「书签」。
      1. 它记录了你到底挂在哪行代码(标签 label)。
      2. 它保存了挂起前已计算出的局部变量,就像把你读到一半的内容做个快照。
      3. 它本身就是一个回调函数:“等下恢复时,就把这个书签放回代码里,从这里往下执行”
        所以,Continuation 就是 它带着 “下一步该从哪读” (label) + “我读到哪儿了” (savedX) + “挂起后要谁来叫我” (resume 回调)

在 Kotlin 编译器眼里,所有 suspend fun 并不是“特权函数”,而是被改写成了接受一个额外的 Continuation 参数、返回一个 Object(或者更准确地说,返回 Any?)的普通函数。它的底层逻辑大致可以分成三步:

1. 编译期:状态机+Continuation

假设写了:

1
2
3
4
5
suspend fun fetchData(): String {
val a = callA() // 挂起点①
val b = callB(a) // 挂起点②
return process(b)
}

编译器会把它变成类似下面这种伪代码(高度简化):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
fun fetchData(continuation: Continuation<String>): Any? {
// 下面这个 label 用来记住“我上次挂在哪儿”
when ((continuation as FetchDataContinuation).label) {
// 第一次调用,label=0
0 -> {
// 如果要挂起,就在这里插书签,下次回来从①继续
continuation.label = 1
// 如果挂,就直接 return COROUTINE_SUSPENDED
val resultA = callA(continuation)
if (resultA === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
/*
case1: 如果callA 立即返回了一个“正常值”
就把它保存在局部变量 resultA 里
case2: 如果callA是一个异步挂起点
把那个在异步完成时要填充的值先保存在 `continuation.savedA`中
并返回 `COROUTINE_SUSPENDED`,退出当前函数,不执行后面的任何代码。
若干时刻后,调度器调用 `continuation.resume(valueA)`,
把 “valueA” 塞回去给那个 Continuation
*/
}
1 -> {
// 当callA回调,此时label=1,因此从挂起点①恢复
// 之前保存在 Continuation 对象里的局部变量
val a = continuation.savedA
//下次恢复要从挂起点②开始
continuation.label = 2
val resultB = callB(a, continuation) // 同理,这里可能继续挂起
if (resultB === COROUTINE_SUSPENDED) return COROUTINE_SUSPENDED
}
2 -> {
// 从挂起点②恢复……
val b = continuation.savedB
}
}
return finalResult // 正常走到最后才返回真正的 String
}

这里比较困惑的,就是resultAcontinuation.savedA 分别是什么意思,我在代码中也给了详细注释:

  • resultA:只有在 callA 立即返回时才会持有结果,并在本次调用体内使用。
  • continuation.savedA:只有在 callA 挂起的情况下,才会被存入,然后在恢复分支里被读取。
    二者合起来,就保证了 “同步写法” 的 suspend fun 既能覆盖 即时返回,也能覆盖 异步挂起+恢复 两种情况。

    2. 运行时:挂起 & 恢复

delay(1000) 为例,它本质上是这样做的:

  1. 在协程里调用 delay(1000, continuation)
  2. delay 内部会:
    • Dispatchers 的定时任务调度器里注册一个定时器,到时会调用 continuation.resume(Unit)
    • 立即返回 COROUTINE_SUSPENDED,让刚才那个 suspend 函数也返回 COROUTINE_SUSPENDED
  3. 编译器生成的状态机函数看到自己的调用 callDelay(...) 返回了 COROUTINE_SUSPENDED,就不往下执行,而是退出到调用者(通常是 runBlocking 或者某个调度器线程)。
  4. 1 秒后,定时器触发,线程池里的某条线程调用 continuation.resume(Unit),这是一个 回调——它会把 label 标记的状态机“激活”回去,继续执行 delay 之后的那段代码。

3. 核心结论

  • 挂起点(任何 suspend 调用)都被编译成:
    1. 保存现场到 Continuation
    2. 返回 COROUTINE_SUSPENDED
  • 恢复resume)实际上就是一个回调,给 Continuation 发送 “继续跑” 的信号。
  • 这样一来,在代码里看到的同步式调用(像普通函数)背后,其实隐藏了一个状态机+回调网络,帮我们把“异步回调”糖衣化成“同步风格”的编程体验。

三、为什么Kotlin更倾向于用suspend,而不是Actor、Flow、Reactor?

因为在大部分场景中,你只需要“一问一答”的异步通信,而不一定需要复杂的“流式组合”或“消息队列”。
Kotlin的suspend能用最简单的语法实现异步编程,同时提供了丰富的库(如Flow)兼容其他模型。

  • suspend能用简单代码实现高效异步;
  • 需要更复杂场景时,可以自然过渡到FlowActor,无需大规模重构。
-------------本文结束,感谢您的阅读-------------