实验性的异步编程支持#

MoonBit 目前提供了实验性的异步编程支持。但异步相关功能的设计和 API 非常不稳定,未来可能会有大的破坏性改动。本文档页面将介绍 MoonBit 异步编程目前的设计,我们欢迎及感谢任何对目前设计的反馈与使用体验分享。

异步函数#

异步函数可以用 async 关键字声明:

async fn my_async_function() -> Unit {
  ...
}

// 匿名/本地函数
test {
  let async_lambda = async fn () {
    ...
  }
  async fn local_async_function() {
    ...
  }
}

调用异步函数时,必须用 !! 操作符来标记这是一次异步函数调用:

async fn some_async_function() -> Unit! {
  ...
}

async fn another_async_function() -> Unit! {
  // 异步函数中的错误也会被 `!!` 转发
  some_async_function!!()
}

如果异步函数会抛出错误,!! 也会把错误一并转发。

异步函数只能在其他异步函数中被调用。目前,在 for .. in 循环中不能使用异步函数。

用于中断异步函数的原语#

MoonBit 提供了两个用于异步编程的原语:%async.suspend%async.run

// `run_async` 会创建一个新的协程,并在其中运行一个异步函数
fn run_async(f : async () -> Unit) -> Unit = "%async.run"

// `suspend` 会中断当前协程的运行。
// `suspend` 会接受一个回调函数,并让这个回调函数来操作中断的协程
async fn suspend[T, E : Error](
  // `f` 是负责操作中断的协程的回调函数
  f : (
    // `f` 的第一个参数用于继续运行被中断的协程
    (T) -> Unit,
    // `f` 的第二个参数用于取消被中断的协程。
    // 取消会被表示为在中断处抛出错误
    (E) -> Unit
  ) -> Unit
) -> T!E = "%async.suspend"

这两个原语不应该让终端用户直接调用。但由于 MoonBit 的异步标准库仍在开发中,目前,用户需要手动绑定这两个原语,才能编写异步程序。

可以用两种不同的方式来理解这两个原语:

  • 理解为协程:%async.run 创建一个新的协程,%async.suspend 中断当前协程。和其他语言的协程的主要区别是:中断协程时,不是由创建协程的地方来负责恢复执行,而是在中断的地方通过一个回调函数就地处理中断后的协程

  • 理解为 delimited continuation:%async.run 是 delimited continuation 中的 reset 操作符,%async.suspend 是 delimited continuation shift 操作符

以下是使用这两个原语的示例:

type! MyError derive(Show)

async fn async_worker(throw_error~ : Bool) -> Unit!MyError {
  suspend!!(fn (resume_ok, resume_err) {
    if throw_error {
      resume_err(MyError)
    } else {
      resume_ok(())
      println("the end of the coroutine")
    }
  })
}

// the program below should print:
//
//   the worker finishes
//   the end of the coroutine
//   after the first coroutine finishes
//   caught MyError
test {
  // when supplying an anonymous function
  // to a higher order function that expects async parameter,
  // the `async` keyword can be omitted
  run_async(fn () {
    try {
      async_worker!!(throw_error=false)
      println("the worker finishes")
    } catch {
      err => println("caught: \{err}")
    }
  })
  println("after the first coroutine finishes")
  run_async(fn () {
    try {
      async_worker!!(throw_error=true)
      println("the worker finishes")
    } catch {
      err => println("caught: \{err}")
    }
  })
}

async_worker 里,suspend 会捕获当前协程剩下的部分,并将它们表示成两个函数,传递给 suspend 的参数。在 suspend 的参数里,调用 resume_ok 会让 suspend!!(...) 正常返回,恢复协程的运行,一直运行到创建这个协程的 run_async(...) 为止。调用 resume_err 也会恢复协程的运行,但它会在 suspend!!(...) 的位置抛出一个错误。

suspend 的类型表明它可能抛出错误。但 suspend 自身不会直接产生任何错误。这一设计保证了协程在每一个的中断点都是可以取消的:调用对应的 resume_err 函数即可。

和 JS 的 Promise/回调 API 整合#

MoonBit 的异步标准库仍在开发中,因此,目前没有直接可用的事件循环和输入输出原语实现。目前,要使用 MoonBit 编写异步程序最简单的办法是使用 JS 后端,并复用 JavaScript 的事件循环和输入输出 API。下面是一个整合 MoonBit 的异步编程支持和 JS 的回调 API 的例子:

type JSTimer
extern "js" fn js_set_timeout(f : () -> Unit, duration : Int) -> JSTimer =
  #| (f, duration) => setTimeout(f, duration)

async fn sleep(duration : Int) -> Unit! {
  suspend!!(fn (resume_ok, _resume_err) {
    let _ = js_set_timeout(fn () { resume_ok(()) }, duration)
  })
}

test {
  run_async(fn () {
    try {
      sleep!!(500)
      println("timer 1 tick")
      sleep!!(1000)
      println("timer 1 tick")
      sleep!!(1500)
      println("timer 1 tick")
    } catch { _ => panic() }
  })
  run_async(fn () {
    try {
      sleep!!(600)
      println("timer 2 tick")
      sleep!!(600)
      println("timer 2 tick")
      sleep!!(600)
      println("timer 2 tick")
    } catch { _ => panic() }
  })
}

和 JS Promise 也非常简单:只需要把 resume_ok 函数用作 Promiseresolveresume_err 用作 Promise 的 reject 回调即可。