G-Machine 第一部分#
本文是关于在 MoonBit 中实现惰性求值的系列文章的第一篇。在本文中,我们将探讨惰性求值的目的以及用于惰性求值的典型抽象机器 G-Machine。
高阶函数和性能挑战#
高阶函数,如 map
和 filter
,通常是许多人对函数式编程的第一印象(尽管函数式编程远不止于这些函数)。它们简化了许多列表处理任务,但另一个问题出现了:这些高阶函数嵌套太深可能会导致性能不佳(因为它需要多次遍历列表)。
为了提高代码效率,一些人提出利用基于高阶函数内部重复模式的编译器优化。例如,将 map(f, map(g, list))
重写为:
map(fn (x) { f(g(x)) }, list)
尝试不错,但重要的是要认识到这种优化技术具有固有的局限性,特别是在处理更复杂的场景时。将所有处理合并到一个函数中可以避免重复的列表遍历,但它会对降低代码的可读性,并使修改变得复杂。是否有一种更平衡的解决方案,兼顾效率和可维护性?
惰性求值是一种技术,可以在这种情况下在一定程度上减少不必要的成本。这种策略可以集成到特定的数据结构中(例如,Java 8 中添加的 Stream 类型,以及早期 Scheme 语言中的 stream),或者整个语言可以被设计为惰性(成功的例子包括 20 世纪 80 年代的 Miranda 语言,以及后来的 Haskell 和 Clean 语言)。
首先,让我们探讨一下惰性列表(Stream
)如何避免在这种情况下多次遍历。
备注
这里的 List[T]
是 @immut/list.T[T]
的 typealias
惰性列表实现#
首先,让我们定义它的类型:
enum Stream[T] {
Empty
Cons(T, () -> Stream[T])
}
Stream[T]
和 List[T]
之间唯一的真正区别在于 Cons
:保存列表其余部分的位置被一个无参数函数(俗称为 thunk)替换。这是惰性求值的一个简单实现:将不想立即计算的东西包装在一个 thunk 中。
我们还需要一个函数将常规列表转换为惰性列表:
fn Stream::from_list[T](l : List[T]) -> Stream[T] {
match l {
Nil => Empty
Cons(x, xs) => Cons(x, fn () { Stream::from_list(xs) })
}
}
这个函数不需要遍历整个列表来将其转换为 Stream
。对于不急着要结果的运算(这里是 Stream::from_list(xs)
),我们直接将它们包装在一个 thunk 中并返回。下面的 map
函数将采用这种方法(不过这里的 xs
已经是一个 thunk 了)。
fn map[X, Y](self : Stream[X], f : (X) -> Y) -> Stream[Y] {
match self {
Empty => Empty
Cons(x, xs) => Cons(f(x), fn () { xs().map(f) })
}
}
take
函数负责执行计算,它可以根据需要提取 n 个元素。
fn take[T](self : Stream[T], n : Int) -> List[T] {
if n == 0 {
Nil
} else {
match self {
Empty => Nil
Cons(x, xs) => Cons(x, xs().take(n - 1))
}
}
}
使用 thunk 实现惰性数据结构的方法很简单,有效地解决了上述问题。这种方法要求用户明确指示代码中应该延迟计算的位置,而惰性语言的策略则更加激进:它默认使用惰性求值来处理所有用户定义的函数!在接下来的章节中,我们将介绍一个惰性函数式语言的最小实现,并简要介绍其潜在的理论模型。
一种惰性求值语言及其抽象语法树#
本文中使用的示例是一种惰性求值语言,故意设计成类似 Clojure(一种 Lisp 方言)的语言,命名为 coreF。这是为了在 Markdown 中使用 Clojure 的语法高亮。请别担心,语法可能有点麻烦,但是绝对够简单。
使用 defn
关键字定义函数:
(defn factorial[n] ;; n 是参数,这个函数计算 n 的阶乘
(if (eq n 0) ;; 定义从这里开始,持续到下面的三行
1
(mul n (factorial (sub n 1)))))
在一般对话中,我们将它成为函数就好。但当我们讨论惰性函数式语言时,我们必须引入一个专门的术语:超组合子( Super Combinator )。在超组合子的定义中,所有自由变量都应该包含在一个初始的 []
中。
coreF 程序的执行从 main
开始,调用一个特定的超组合子,等价于用它的定义替换它。
(defn main[] (factorial 42))
没有参数的超组合子,如 main
,有一个术语:常量应用形式( Constant Applicative Forms ,简称 CAF)。
coreF 还具有几个语言特性,包括自定义数据结构、用于解构的 case
表达式,以及用于声明局部变量的 let
和 letrec
。但是,本文的范围仅限于上述特性(实际上,甚至更少,因为像 eq
、mul
、sub
等内置函数之后才会实现)。
coreF 不包括匿名函数,因为匿名函数会引入额外的自由变量。删除它们需要额外的转换步骤:lambda 提升。这种技术可以将 lambda 表达式转换为外部超组合子,但这不是惰性求值的主要内容,因此在这里省略了。
超组合子最终将被解析为 ScDef[String]
,但编写解析器是一项繁琐的任务。我将在最终代码中提供它。
enum RawExpr[T] {
Var(T)
Num(Int)
Constructor(tag~:Int, arity~:Int) // tag, arity
App(RawExpr[T], RawExpr[T])
Let(Bool, List[(T, RawExpr[T])], RawExpr[T]) // isRec, Defs, Body
Case(RawExpr[T], List[(Int, List[T], RawExpr[T])])
} derive(Show)
struct ScDef[T] {
name : String
args : List[T]
body : RawExpr[T]
} derive(Show)
此外,还有一些预定义的 coreF 程序需要给出。
let prelude_defs : List[ScDef[String]] = {
let args : (FixedArray[String]) -> List[String] = List::of
let id = ScDef::new("I", args(["x"]), Var("x")) // id x = x
let k =
ScDef::new(
"K",
args(["x", "y"]),
Var("x")
) // K x y = x
let k1 =
ScDef::new(
"K1",
args(["x", "y"]),
Var("y")
) // K1 x y = y
let s =
ScDef::new(
"S",
args(["f", "g", "x"]),
App(App(Var("f"), Var("x")), App(Var("g"), Var("x")))
) // S f g x = f x (g x)
let compose =
ScDef::new(
"compose",
args(["f", "g", "x"]),
App(Var("f"), App(Var("g"), Var("x")))
) // compose f g x = f (g x)
let twice =
ScDef::new(
"twice",
args(["f"]),
App(App(Var("compose"), Var("f")), Var("f"))
) // twice f = compose f f
List::of([id, k, k1, s, compose, twice])
}
为什么是图#
在 coreF 语言中,表达式(不是前面提到的 RawExpr[T]
,而是运行时表达式)在被求值时,以图的形式存储在内存中,而不是树。
为什么要这样?看看这个程序:
(defn square[x] (mul x x))
(defn main[] (square (square 3)))
如果我们按照一般的树形表达式来求值,表达式会被规约成:
(mul (square 3) (square 3))
在这种情况下,(square 3)
将被计算两次,这对于惰性求值来说显然是不可取的。
为了更清楚地说明这一点,让我们用 MoonBit 代码做一个不太恰当的类比:
fn square(thunk : () -> Int) -> Int {
thunk() * thunk()
}
使用图来表示程序是为了方便共享计算结果,避免冗余计算。为了实现这个目的,在减少图时实现一个原地更新算法是至关重要的。关于原地更新,让我们用 MoonBit 代码来模拟它:
enum LazyData[T] {
Waiting(() -> T)
Done(T)
}
struct LazyRef[T] {
mut data : LazyData[T]
}
fn extract[T](self : LazyRef[T]) -> T {
match self.data {
Waiting(thunk) => {
let value = thunk()
self.data = Done(value) // 原地更新
value
}
Done(value) => value
}
}
fn square(x : LazyRef[Int]) -> Int {
x.extract() * x.extract()
}
无论哪一方先执行 extract
方法,它都会更新引用的可变字段,并用计算结果替换其内容。
约定#
在讨论图规约如何进行之前,让我们先定义一些术语和基本事实。我们将继续使用相同的程序作为示例:
(defn square[x] (mul x x)) ;; 乘法
(defn main[] (square (square 3)))
内置的原语,如
mul
,是预定义的操作。
对表达式进行求值(当然是惰性的),并原地更新图中对应的节点,这个过程称为规约。
(square 3)
是一个可规约的表达式(reducible expression,通常缩写为 redex),由square
和它的参数组成。它可以规约为(mul 3 3)
。(mul 3 3)
也是一个 redex,但与(square 3)
相比,它是一种不同类型的 redex,因为square
是一个用户定义的组合子,而mul
是一个实现的内置原语。(mul 3 3)
的规约结果是表达式9
,它不能进一步规约。这种不能进一步规约的表达式称为范式(normal form)。一个表达式可能包含多个子表达式(例如
(mul (add 3 5) (mul 7 9))
)。在这种情况下,表达式的规约顺序至关重要——有些程序只在特定的规约顺序下停机。
有一种特殊的规约顺序总是选择最外层的 redex 进行规约,称为 正则序规约(normal order reduction)。下文也将统一采用这种规约顺序。
因此,图规约可以用以下伪代码描述:
当图中存在可规约的表达式时 {
选择最外层的可规约表达式。
规约表达式。
用规约的结果更新图。
}
头晕了吗?让我们找一些例子来演示如何在纸上进行规约。
第一步:找到下一个 redex#
在整个程序中,执行从 main
函数开始。
(defn square[x] (mul x x))
(defn main[] (add 33 (square 3)))
main
本身是一个 CAF - 最简单的 redex。如果我们进行替换,当前要处理的表达式是:
(add 33 (square 3))
根据找到最外层 redex 的原则,我们似乎立即找到了由 add
和它的两个参数组成的 redex(暂时这么假设)。
但是稍等!由于存在默认的柯里化,这个表达式对应的抽象语法树实际上由多个 App
节点嵌套而成。它大致看起来是这样的(为了可读性而简化):
App(App(add, 33), App(square, 3))
这个从 add
到最外层 App
节点的链式结构称为“脊柱”(Spine)
回过头来看一下,add
是一个内部定义的原语。但是,由于它的第二个参数 (square 3)
不是范式,我们不能对它进行规约(将一个未求值的表达式加到一个整数上似乎有点荒谬)。因此,我们不能明确地说 (add 33 (square 3))
是一个 redex;它只是最外层的函数应用。要对它进行规约,我们必须先规约 (square 3)
。
第二步:规约#
因为 square
是一个用户定义的超组合子,所以对 (square 3)
进行规约只涉及参数替换。
如果一个 redex 的参数比超组合子所需的参数——这在高阶函数中很常见——它就成为一个 弱头部范式
(weak head normal form,通常缩写为 WHNF)。在这种情况下,即使它的子表达式包含 redex,也不需要采取任何行动。考虑一个对列表中的所有整数进行三倍化的例子。
(map (mul 3) list-of-int)
在这里,(mul 3)
不能被视为 redex,因为它缺少足够的参数。
第三步:更新#
这一步只影响执行效率,在纸上推导时可以跳过。
这些操作在纸上执行很容易(当代码量不超过半张纸时),但当我们切换到计算机时,我们如何将这些步骤转换为可执行代码呢?
为了回答这个问题,惰性求值编程语言的先驱们提出了各种用于建模惰性求值的 抽象机器。这些包括:
G-Machine
Three Instruction Machine
ABC Machine(由 Clean 语言使用)
Spineless Tagless G-Machine(简称 STG,由 Haskell 语言使用)
它们是用于指导编译器实现的执行模型。重要的是要注意,与今天各种流行的虚拟机(如 JVM)不同,抽象机器更像是编译器的中间表示(IR)。以 Haskell 的编译器 GHC 为例,生成 STG 代码后,它不会直接将其传递给解释器执行。相反,它会根据所选的后端进一步将其转换为 LLVM、C 代码或机器代码。
为了简化实现,本文将直接使用 MoonBit 为 G-Machine 指令编写解释器,从一个最小的示例开始,逐渐添加更多功能。
G-Machine 概述#
虽然 G-Machine 是用于惰性函数式语言的抽象机器,但它的结构和概念与编写一般命令式语言时遇到的情况并没有显著不同。它也有堆和栈等结构,其指令是顺序执行的。一些关键的区别包括:
堆中的内存的基本单位不是字节,而是图节点。
栈只包含指向堆的指针,而不是实际数据。
这种设计可能不太实用,但相对简单。
coreF 中的超组合子被编译成一系列 G-Machine 指令。这些指令大致可以分为以下几类:
访问数据指令,例如
PushArg
(访问函数参数)和PushGlobal
(访问其他超组合子)。在堆中构造/更新图节点,如
MkApp
、PushInt
、Update
用于清理栈中未使用的地址的
Pop
指令。用于表示控制流的
Unwind
指令。
G-Machine 状态解析#
在这个简单版本的 G-Machine 中,状态包括:
堆:这是存储表达式图和与超组合子对应的指令序列的地方。
// 使用 'type' 关键字封装地址类型。 type Addr Int derive(Eq, Show) // 用枚举类型描述图节点。 enum Node { NNum(Int) // 应用节点 NApp(Addr, Addr) // 为超组合子存储参数数量和 // 对应的指令序列 NGlobal(String, Int, List[Instruction]) // 间接节点。实现惰性求值的关键组件 NInd(Addr) } derive(Eq, Show) struct GHeap { // 堆使用数组, // 数组中内容为 None 的空间可作为空闲内存。 mut object_count : Int memory : Array[Node?] } // 为节点分配堆空间。 fn alloc(self : GHeap, node : Node) -> Addr { let heap = self fn next(n : Int) -> Int { (n + 1) % heap.memory.length() } fn free(i : Int) -> Bool { match heap.memory[i] { None => true _ => false } } let mut i = heap.object_count while not(free(i)) { i = next(i) } heap.memory[i] = Some(node) heap.object_count = heap.object_count + 1 return Addr(i) }
栈:栈只保存指向堆的地址。一个简单的实现可以使用
List[Addr]
。全局表:它是一个映射表,记录超组合子(包括预定义的和用户定义的)的名称及其对应的
NGlobal
节点的地址。这里我使用 Robin Hood 哈希表实现。当前要执行的代码序列。
执行状态统计:一个简单的实现涉及计算已执行了多少条指令。
整个状态使用类型 GState
表示。
struct GState {
mut stack : List[Addr]
heap : GHeap
globals : @hashmap.T[String, Addr]
mut code : List[Instruction]
mut stats : GStats
}
type GStats Int
fn stat_incr(self : GState) -> Unit {
self.stats = self.stats._ + 1
}
fn put_stack(self : GState, addr : Addr) -> Unit {
self.stack = Cons(addr, self.stack)
}
fn put_code(self : GState, instrs : List[Instruction]) -> Unit {
self.code = instrs + self.code
}
fn pop1(self : GState) -> Addr {
match self.stack {
Cons(addr, reststack) => {
self.stack = reststack
addr
}
Nil => abort("pop1(): stack size smaller than 1")
}
}
// e1 e2 ..... -> (e1, e2) ......
fn pop2(self : GState) -> (Addr, Addr) {
match self.stack {
Cons(addr1, Cons(addr2, reststack)) => {
self.stack = reststack
(addr1, addr2)
}
_ => abort("pop2(): stack size smaller than 2")
}
}
现在,我们可以将我们在纸上推导的图规约算法一一对应到这个抽象机器:
在机器的初始状态下,所有编译的超组合子都已经放在堆上的
NGlobal
节点中。此时,G-Machine 中的当前代码序列只包含两条指令。第一条指令将main
节点的地址推送到栈上,第二条指令将main
的对应代码序列加载到当前代码序列中。main
的对应代码序列在堆上实例化,分配节点并加载数据,最终在内存中构建一个图。这个过程被称为“实例化”main
。一旦实例化完成,这个图的入口地址被推送到栈顶。实例化完成后,进行清理工作,包括更新图节点(因为
main
没有参数,所以不需要清理栈中的未使用地址)并找到下一个 redex。
所有这些任务都有对应的指令实现。
每条指令的作用#
目前,这个高度简化的 G-Machine 包含 7 条指令。
enum Instruction {
Unwind
PushGlobal(String)
PushInt(Int)
PushArg(Int)
MkApp
Update(Int)
Pop(Int)
} derive (Eq, Show)
PushInt
指令是最简单的。它在堆上分配一个 NNum
节点,并将其地址推送到栈上。
fn push_int(self : GState, num : Int) -> Unit {
let addr = self.heap.alloc(NNum(num))
self.put_stack(addr)
}
PushGlobal
指令从全局表中检索指定超组合子的地址,然后将地址推送到栈上。
fn push_global(self : GState, name : String) -> Unit {
let sc = self.globals[name]
match sc {
None => abort("push_global(): cant find supercombinator \{name}")
Some(addr) => self.put_stack(addr)
}
}
PushArg
指令稍微复杂一些。它对栈上地址的布局有特定要求:第一个地址应指向超组合子节点,接着是 n 个指向 N NApp
节点的地址。PushArg
从 offset + 1
开始检索第 N 个参数。
fn push_arg(self : GState, offset : Int) -> Unit {
let appaddr = self.stack.unsafe_nth(offset + 1)
let arg = match self.heap[appaddr] {
NApp(_, arg) => arg
otherwise =>
abort(
"pusharg: stack offset \{offset} address \{appaddr} node \{otherwise}",
)
}
self.put_stack(arg)
}
MkApp
指令从栈顶取出两个地址,构造一个 NApp
节点,并将其地址推送到栈上。
fn mk_apply(self : GState) -> Unit {
let (a1, a2) = self.pop2()
let appaddr = self.heap.alloc(NApp(a1, a2))
self.put_stack(appaddr)
}
Update
指令假设栈上的第一个地址指向当前 redex 的求值结果。它跳过紧随其后的超组合子节点的地址,并用指向求值结果的间接节点替换第 N 个 NApp
节点。如果当前 redex 是一个 CAF,它直接替换堆上对应的 NGlobal
节点。从这里我们可以看到,为什么在惰性函数式语言中,没有参数的函数和普通变量之间没有太大区别。
fn update(self : GState, n : Int) -> Unit {
let addr = self.pop1()
let dst = self.stack.unsafe_nth(n)
self.heap[dst] = NInd(addr)
}
Unwind
指令在 G-Machine 中类似于一个求值循环。它有几个分支条件,根据栈顶地址对应的节点类型:
Nnum
节点:什么也不做。NApp
节点:将左节点的地址推送到栈上,然后再次Unwind
。NGlobal
节点:如果栈上有足够的参数,将这个超组合子加载到当前代码中。NInd
节点:将这个间接节点中包含的地址推送到栈上,然后再次Unwind
。
fn unwind(self : GState) -> Unit {
let addr = self.pop1()
match self.heap[addr] {
NNum(_) => self.put_stack(addr)
NApp(a1, _) => {
self.put_stack(addr)
self.put_stack(a1)
self.put_code(Cons(Unwind, Nil))
}
NGlobal(_, n, c) =>
if self.stack.length() < n {
abort("Unwinding with too few arguments")
} else {
self.put_stack(addr)
self.put_code(c)
}
NInd(a) => {
self.put_stack(a)
self.put_code(Cons(Unwind, Nil))
}
}
}
Pop
指令弹出 N 个地址,无需单独的函数实现。
将超组合子编译成指令序列#
在 G-Machine 概述部分,我们大致描述了编译超组合子的行为。现在我们可以精确地描述超组合子的编译过程。
首先,在执行编译的超组合子的指令序列之前,栈中必须已经存在某些地址:
栈顶的地址指向一个
NGlobal
节点(超组合子本身)。接下来是 N 个地址(N 是这个超组合子的参数数量),指向一系列 App 节点 - 正好对应一个 redex 的脊柱。栈底的地址指向表达式的最外层 App 节点,其余的依次类推。
在编译超组合子时,我们需要维护一个环境,便于在编译过程中,通过名称找到参数在栈中的相对位置。此外,由于在完成超组合子的实例化后需要清除前面的 N+1 个地址,所以需要传递参数数量 N。
这里,“参数”指的是指向堆上 App 节点的地址,实际参数地址可以通过
PushArg
指令访问。
fn compileSC(self : ScDef[String]) -> (String, Int, List[Instruction]) {
let name = self.name
let body = self.body
let mut arity = 0
fn gen_env(i : Int, args : List[String]) -> List[(String, Int)] {
match args {
Nil => {
arity = i
return Nil
}
Cons(s, ss) => Cons((s, i), gen_env(i + 1, ss))
}
}
let env = gen_env(0, self.args)
(name, arity, compileR(body, env, arity))
}
compileR
函数通过调用 compileC
函数生成实例化超组合子的代码,然后附加三条指令:
Update(N)
: 将堆中原始 redex 更新为一个NInd
节点,然后指向新实例化的超组合子。Pop(N)
: 清除栈中多余的地址。Unwind
: 寻找下一个 redex,开始下一轮规约。
fn compileR(
self : RawExpr[String],
env : List[(String, Int)],
arity : Int
) -> List[Instruction] {
if arity == 0 {
compileC(self, env) + List::of([Update(arity), Unwind])
} else {
compileC(self, env) + List::of([Update(arity), Pop(arity), Unwind])
}
}
在编译超组合子的定义时,使用了一种相当粗糙的方法:如果一个变量不是参数,那么它被视为另一个超组合子(写错了会导致运行时错误)。对于函数应用,首先编译右侧表达式,然后增加环境中对应参数的所有偏移量(因为栈顶增加了一个指向实例化的右侧表达式的额外地址),然后编译左侧表达式,最后添加 MkApp
指令。
fn compileC(
self : RawExpr[String],
env : List[(String, Int)]
) -> List[Instruction] {
match self {
Var(s) =>
match env.lookup(s) {
None => List::of([PushGlobal(s)])
Some(n) => List::of([PushArg(n)])
}
Num(n) => List::of([PushInt(n)])
App(e1, e2) =>
compileC(e2, env) + compileC(e1, argOffset(1, env)) + List::of([MkApp])
_ => abort("not support yet")
}
}
运行 G-Machine#
一旦超组合子被编译,它们需要放在堆上(并将它们的地址添加到全局表中)。这可以递归完成。
fn build_initial_heap(
scdefs : List[(String, Int, List[Instruction])]
) -> (GHeap, @hashmap.T[String, Addr]) {
let heap = { object_count: 0, memory: Array::make(10000, None) }
let globals = @hashmap.new(capacity=50)
loop scdefs {
Nil => ()
Cons((name, arity, instrs), rest) => {
let addr = heap.alloc(NGlobal(name, arity, instrs))
globals[name] = addr
continue rest
}
}
return (heap, globals)
}
定义一个函数 “step”,它通过一步更新 G-Machine 的状态,如果已经到达最终状态,则返回 false。
fn step(self : GState) -> Bool {
match self.code {
Nil => return false
Cons(i, is) => {
self.code = is
self.stat_incr()
match i {
PushGlobal(f) => self.push_global(f)
PushInt(n) => self.push_int(n)
PushArg(n) => self.push_arg(n)
MkApp => self.mk_apply()
Unwind => self.unwind()
Update(n) => self.update(n)
Pop(n) => self.stack = self.stack.drop(n)
}
return true
}
}
}
此外,定义一个函数 “reify”,它不断执行 “step” 函数,直到到达最终状态。
fn reify(self : GState) -> Node {
if self.step() {
self.reify()
} else {
let stack = self.stack
match stack {
Cons(addr, Nil) => {
let res = self.heap[addr]
return res
}
_ => abort("wrong stack \{stack}")
}
}
}
将上述各部分组合起来。
fn run(codes : List[String]) -> Node {
fn parse_then_compile(code : String) -> (String, Int, List[Instruction]) {
let tokens = tokenize(code)
let code =
try {
tokens.parse_sc!()
} catch {
ParseError(s) => abort(s)
} else {
expr => expr
}
let code = compileSC(code)
return code
}
let codes = codes.map(parse_then_compile) + prelude_defs.map(compileSC)
let (heap, globals) = build_initial_heap(codes)
let initialState : GState = {
heap : heap,
stack : Nil,
code : List::of([PushGlobal("main"), Unwind]),
globals : globals,
stats : 0
}
initialState.reify()
}
总结#
到目前为止,我们构建的 G-Machine 的功能太有限,无法运行一个像样的程序。在下一篇文章中,我们将逐步添加诸如原语和自定义数据结构等功能。最后,我们将在介绍 G-Machine 后引入惰性求值技术。
参考#
Peyton Jones, Simon & Lester, David. (2000). Implementing functional languages: a tutorial.