异步编程支持#

MoonBit 采用了一种类似 Kotlin 的、基于协程的方式来支持异步编程。MoonBit 的异步编程支持由两部分组成:MoonBit 编译器中对 async 函数的支持,以及 MoonBit 的官方异步运行时 moonbitlang/async。目前,moonbitlang/async 对 native 后端有最好的支持,对 JavaScript 有有限的支持,暂时不支持 WebAssembly 后端。moonbitlang/async 的 API 仍在开发中,可能在未来有变动。

在 MoonBit 中使用异步编程#

要在 MoonBit 中使用 moonbitlang/async 异步编程,首先需要运行 moon add moonbitlang/async 来把 moonbitlang/async添加到当前项目的依赖中。可以在 moon.mod.json 中添加 "preferred-target": "native"来默认使用 moonbitlang/async 支持最好的 native 后端。这之后,只需要在 moon.pkgimport 中添加 moonbitlang/async 的各个包,就能使用 moonbitlang/async 中的各种异步编程 API 了。如果你想看一个以工作流为主线的示例,可以参考原生 CLI 快速开始

可以在 mooncakes.io 查询moonbitlang/async 中的包列表以及它们各自的详细文档,还可以在 moonbitlang/async 的 GitHub 仓库中找到一些简单的 例子。这篇文章会介绍 moonbitlang/async 的基本概念和其中最重要的一些 API。

异步函数#

异步函数用 async 关键字定义。他们隐式地 抛出错误 ,而如果不抛出错误则需要显式地声明 noraise

async fn my_async_function() -> String {
  let (response, body) = @http.get("https://www.moonbitlang.cn")
  guard response.code is (200..<300) else {
    fail("server responded with \{response.code} \{response.reason}")
  }
  body.text()
}

由于 MoonBit 是一门静态类型语言,编译器会跟踪异步函数的调用,因此你可以像调用普通函数一样调用异步函数。MoonBit IDE 会用不同的样式来高亮显示异步函数的调用。例如,如果在 MoonBit IDE 中打开上面的代码片段,就会看到 @http.get 函数被渲染成了斜体 + 下划线的效果。

异步函数只能在异步函数中被调用。调用一个 async 函数会使调用者阻塞并等待被调用的函数返回,类似其他语言中的 await

MoonBit 对异步编程有原生的支持。可以用 async fn main 来声明异步的程序入口,或者用 async test 来编写异步测试。默认情况下,多个异步测试可以并行地同时运行。注意使用 async fn mainasync test 需要在 moon.pkg 中添加 moonbitlang/async 作为依赖。

结构化并发与任务组#

如果一段异步程序除了直接调用其他异步函数没有做别的事情,那么它的控制流会是线性的,和普通的同步程序没有区别。异步程序和同步程序最本质的不同,在于异步程序能同时创建多个任务,并让它们同时运行。由于多个异步任务可以同时运行,异步程序的控制流会复杂许多因此,创建多个任务的能力也带来了新的挑战:如何健壮地管理任务。

moonbitlang/async 采用了 结构化并发 这一编程范式来解决任务管理的问题,并提升异步程序的健壮性。在 moonbitlang/async 中,新任务只能在一个 任务组 中被创建。而任务组必须通过 @async.with_task_group 函数创建:

async fn[Result] with_task_group(
  f : async (@async.TaskGroup[Result]) -> Result,
) -> Result

with_task_group 函数会创建一个新的任务组,在其中创建一个新的任务,并在这个任务中以任务组自己为参数运行 ff 可以通过 spawn_bg 等方法来使用任务组创建更多任务:

/// 在任务组中创建一个新的任务并在后台运行它
fn[Result] @async.TaskGroup::spawn_bg(
  group : TaskGroup[Result],
  f : async () -> Unit,
  ...
) -> Unit

结构化并发范式的“魔法”就在 with_task_group 需要遵守的一条规则之中:

只有当任务组中的所有任务都结束了,with_task_group 才会返回

with_task_group 会保证上述规则在所有情况下都成立。正常情况下,with_task_group 会等待所有任务正常结束。如果 with_task_group 因为某些原因需要提前结束,例如某个任务失败了(默认情况下,如果任务组中的某个任务失败了,with_task_group 也会立刻失败,从而确保错误不会被不小心忽略),它会取消所有还在运行的任务,并等待它们的清理工作完成。结果上看,with_task_group 的规则保证了 孤儿任务(由于忘记被取消,明明已经没用了但仍在运行的任务)在 moonbitlang/async 中不可能出现。

下面是一个使用 with_task_group 创建多个任务,并让它们同时运行的例子:

async test "with_task_group" {
  let log = []
  @async.with_task_group(group => {
    group.spawn_bg(() => for _ in 0..<3 {
      log.push("task #1 tick")
      @async.sleep(200) // 等待 200ms
    })
    group.spawn_bg(() => {
      @async.sleep(100)
      for _ in 0..<3 {
        log.push("task #2 tick")
        @async.sleep(200)
      }
    })
  })
  json_inspect(log, content=[
    "task #1 tick", "task #2 tick", "task #1 tick", "task #2 tick", "task #1 tick",
    "task #2 tick",
  ])
}

with_task_group 是一个非常强大的构造。它能模拟许多其他异步控制流原语。例如,下面是一个使用 with_task_group 实现的、给一个异步函数添加超时限制的辅助函数:

async fn with_timeout(timeout : Int, f : async () -> Unit) -> Unit {
  @async.with_task_group(group => {
    group.spawn_bg(no_wait=true, () => {
      @async.sleep(timeout)
      raise Failure::Failure("timeout!")
    })
    f()
  })
}

这段代码非常简短。但是 with_task_group 的语义保证了这个简单的小函数在所有边界情况上都能有正确的行为:

  • 如果 f 在超时之前顺利返回,由于创建负责计时的任务时设置了 no_wait=truewith_task_group 不会等待计时任务完成,会立即准备结束。为了维护 with_task_group 的性质,计时任务会立即被取消并结束。因此,with_timeout(.., f) 会在 f 返回时立刻返回,不会有不必要的延迟。

  • 如果 f 失败了,它抛出的错误会传播到整个 with_task_group 上。此时,计时任务同样会被自动取消。

  • 如果 f 超时了,计时任务会抛出一个错误,终止整个任务组。此时,f 会被自动取消,因此不会有资源泄漏。

通过取消机制实现模块化的异步程序#

在前面的章节中,我们多次提到了“取消”这一概念。在异步编程中,能够取消一个正在运行的任务是非常重要的。在 moonbitlang/async 中,包括 with_task_group 在内的所有异步操作默认都是可取消的。所以,当用户使用这些基础操作组合出大规模的异步程序时,无论最终得到的程序有多复杂,它依然自动是可取消的。

当一段异步代码被取消时,取消信号会变成从这段代码中断的地方抛出的错误。所以,用户的程序无需对取消进行特殊处理:取消信号会通过错误处理机制自动传播,并触发 defer 表达式和 catch 中的各种清理代码。

取消任意异步代码的能力使 MoonBit 中的异步程序变得高度模块化。moonbitlang/async 包提供了许多使用的异步辅助函数,例如给一段异步代码加上超时、自动重试等。这些辅助函数都依赖于取消机制。例如,下面是一段尝试在时限内进行一次 HTTP 请求,并允许至多三次重试的程序:

async fn make_request() -> String {
  @async.retry(Immediate, max_retry=3, () => @async.with_timeout(1000, () => {
    let (response, body) = @http.get("https://www.moonbitlang.com")
    guard response.code is (200..<300) else {
      fail("the HTTP request is not successful")
    }
    body.text()
  }))
}

和外部世界交互#

除了各种异步编程原语,moonbitlang/async 还提供了一个 性能优秀 的 IO 事件循环,以及一系列丰富的异步 IO 操作,包括 http/https、文件 IO、网络 IO 和创建外部进程。可以在 mooncakes.io 找到完整的 API 列表和它们的详细文档,也可以在 moonbitlang/asyncGitHub 仓库 找到一些简单示例。下面是一个小例子:

async fn download_file(url : String, file_name : String) -> Unit {
  // 流式地处理内容以节省内存
  let (_response, body) = @http.get_stream(url)
  defer body.close()
  let out_file = @fs.create(file_name, permission=0o644)
  defer out_file.close()
  out_file.write_reader(body)
}

JavaScript 支持#

虽然 moonbitlang/async 对 native 后端支持最好,它也对 JavaScript 后端有基本的支持:

  • 所有 IO 无关的 API,例如任务组、超时等,都可以在 JavaScript 后端使用

  • JavaScript 后端无法使用 IO 相关的 API,因为不是所有 JavaScript 环境(例如浏览器)都支持这些 IO 操作

  • moonbitlang/async/js_async 包可以用来和 JavaScript promise 互动。它提供了等待一个 JavaScript promise 和把一个 MoonBit async 函数包装成一个 JavaScript promise 的功能。利用这个功能,用户可以自行绑定目标 JavaScript 环境中支持的异步操作。

关于和 JavaScript 互动的更多信息,可以参考 moonbitlang/async/jsmooncakes.io 页面