Flux 10: LSP 支持

一个自研语言只提供 CLI 是不够的。CLI 证明语言能跑,LSP 才决定它能不能舒服地写。对 Flux 这种表达式、pipe、lambda 和 package builtin 很多的语言来说,编辑器体验不是锦上添花,而是降低使用成本的一部分。

cpp/pl/flux/contrib/lsp 的目标就是让 Flux 从“能执行”走到“能日常编辑”:打开文件时能诊断语法错误,输入 array. 时能补 package 函数,光标放在 lambda 参数上能跳回定义,rename 不会误改 shadowing 变量,semantic tokens 能区分 package、函数、参数和引用。

这一篇不按功能列表平铺,而是从一次编辑器请求开始,看 Flux Language Server 如何把 JSON-RPC、文档同步、AST 缓存、符号表和各类 feature handler 串起来。

假设用户在编辑器里打开了这样一段 Flux:

import "array"

data = array.from(rows: [{host: "edge-1", usage: 91.2}])

data
    |> filter(fn: (r) => r.usage > 80.0)
    |> keep(columns: ["host", "usage"])

当用户输入 array.、保存文件、或者把光标放到 r.usage 上时,编辑器会通过 LSP 向 server 发请求或通知。server 不能每个功能都自己 parse 一遍,也不能把 completion、diagnostics、definition 写成彼此独立的临时逻辑。它需要一套共享的语言服务基础设施。

Flux 09: Logical 到 Physical Plan

前面两篇分别讲了内存表执行和 connector pushdown。它们看上去像两条路径:一条是 TableValue 上的 builtin 解释执行,一条是 SQLite/MySQL 上的 Page streaming。第 08 篇要回答的就是中间那层问题:同一条 Flux 查询,如何先被表达成可优化的 logical plan,再落成真正可调度的 physical pipeline。

如果项目只有 eager interpreter,执行路径很直接:AST 调 builtin,builtin 操作 TableValue,然后把结果传给下一个 builtin。但 connector、pushdown、多 split、exchange、partial/final aggregate 和 profile 加进来后,直接解释 AST 就不够了。执行器必须先有一个“尚未执行、可以改写、可以解释”的查询表示。

这就是 logical/physical plan 的位置。Logical plan 负责保存用户查询语义,optimizer 负责改写语义等价的计划,physical plan 负责描述执行拓扑,scheduler 再把它变成 driver 和 operator。

先用一条查询做主线:

import "sqlite"

sqlite.from(path: "metrics.db", table: "cpu")
    |> range(start: 2024-01-01T00:00:00Z, stop: 2024-01-01T01:00:00Z)
    |> filter(fn: (r) => r.host == "edge-1" and r._value > 80.0)
    |> keep(columns: ["_time", "host", "_value"])
    |> group(columns: ["host"])
    |> mean(column: "_value")
    |> sort(columns: ["_value"], desc: true)
    |> limit(n: 10)

这条查询里同时有 source scan、时间范围、谓词、projection、group aggregate、sort 和 limit。它足够小,但已经覆盖了查询引擎里最核心的几个问题:

Flux 08: Connector 与 Pushdown

前面几篇文章已经把 Flux 的词法语法、类型值模型、执行器和表流管线串起来了。到这里,查询引擎终于要碰到一个更现实的问题:数据不一定在内存里,也不一定来自小 CSV,它可能在 SQLite 文件中,也可能在远端 MySQL 实例里。

如果还沿用最朴素的解释器思路,sqlite.from() 先把整张表读成 TableValue,再让 rangefilterkeepsort 在内存里慢慢跑,那么这个实现虽然容易写,却很难称为查询引擎。真正有意义的边界是:Flux 仍然是统一查询语言,但能够把安全、明确、可证明等价的一段计划下推到数据源附近执行。

这一篇就专门讲 cpp/pl/flux 当前的 connector 与 pushdown 设计。它不是“把 Flux 翻译成任意 SQL”,而是一套更保守的执行模型:provider package 负责建立数据源入口,optimizer 识别可下推前缀,connector runtime 按 metadata、split、page source 三层执行,unsupported suffix 则可靠 fallback 回内存执行器。

先看一条典型查询:

import "sqlite"

sqlite.from(path: "metrics.db", table: "cpu")
    |> range(start: 2024-01-01T00:00:00Z, stop: 2024-01-01T01:00:00Z)
    |> filter(fn: (r) => r.host == "edge-1" and r._value > 80.0)
    |> keep(columns: ["_time", "host", "_value"])
    |> sort(columns: ["_time"])
    |> limit(n: 10)

从用户视角看,这就是一条普通 Flux pipeline。从引擎内部看,它可以被拆成两段:前缀部分能变成 SQL SELECT ... WHERE ... ORDER BY ... LIMIT ...,后缀部分如果还有复杂 Flux 算子,则继续由内存 runtime 接管。这个“能不能下推”的判断,就是本文的核心。

Flux 07: Table Pipeline

Flux 查询真正有意思的地方,不在单个表达式求值,而在 table stream。filtermapgroupwindowaggregateWindowjoin 这些算子处理的都不是一张普通二维表,而是一组带有 group key 的 logical tables。

这篇文章专门讲 cpp/pl/flux 里 table pipeline 的内存执行模型。它是早期 eager interpreter 的核心,也是 connector / physical executor fallback 到 materialized output 时的共同承载格式。理解这一层,后面的 connector pushdown、logical/physical plan 和性能优化才有上下文。

先看一个 dashboard 查询。它从 CSV 读入 CPU 数据,按时间过滤,再按主机分组,最后做窗口聚合:

import "csv"

csv.from(file: "cpp/pl/flux/examples/ops_dashboard/data/cpu_usage.annotated.csv")
    |> range(start: 2024-01-01T00:00:00Z, stop: 2024-01-01T00:05:00Z)
    |> filter(fn: (r) => r.region == "us-west")
    |> group(columns: ["host"])
    |> aggregateWindow(every: 1m, fn: mean, createEmpty: true)

从用户角度看,这是一条线性的 pipe。从执行器角度看,每一步都在转换一个 table stream:输入可能包含多张 logical table,输出也可能包含多张 logical table,并且每张表都有自己的 group key、列集合和行集合。

Flux 06: 标准库扩展机制

一个语言运行时真正变得可用,往往不是因为表达式求值器支持了多少 operator,而是因为标准库让用户能完成真实任务。cpp/pl/flux 当前已经有一批 package:arraycsvdatedictjoinjsonmathregexpruntimesqlitestringssystemtypes 等。

其中 array 最适合用来讲 builtin 扩展机制,因为它既有普通函数,也有高阶函数,还和“如何替代传统循环”这个问题直接相关。

标准库也是语言项目最容易失控的地方。新增一个 builtin 看起来只是注册一个函数,但真正要做的是:API 形态要稳定,参数错误要可诊断,runtime 行为要可测试,LSP 要能补全,README/SUPPORT_MATRIX 要同步,conformance 要把公开行为锁住。否则函数越多,项目越像一堆临时 helper。

用户写:

import "array"

运行时会从 BuiltinRegistry 中加载 package object,并在当前环境中绑定 array。之后:

Flux 05: UDF 与高阶函数

Flux 查询的一个重要特点是函数无处不在。filter(fn:)map(fn:)reduce(fn:) 都要求用户把函数作为参数传进去。对这个项目来说,UDF 和高阶函数不是锦上添花,而是让查询语言真正可组合的核心能力。

前一篇讲 runtime evaluator 时,我们已经看到函数调用不只是“执行一个 C++ callback”。用户函数需要参数绑定、默认值、闭包环境、pipe 参数和 block return;高阶函数还要把用户函数安全地嵌入数组和表操作里。这一篇专门把这些函数能力拎出来讲,因为它们决定了 Flux 查询能不能写得像查询,而不是像一串硬编码 builtin。

当前实现支持 expression-bodied function:

double = (x) => x * 2

也支持 block-bodied function:

Flux 04: 表达式解释器

Parser 负责把源码变成 AST,但 AST 本身不会执行。cpp/pl/flux 的执行入口主要在 runtime/runtime_eval.cppruntime/runtime_exec.cpp:前者处理表达式求值,后者处理文件级语句执行、结果收集和顶层环境。

这一层是语言项目从“能解析”到“能运行”的分水岭。Parser 可以只关心结构,runtime evaluator 必须处理值、作用域、函数调用、错误传播、短路逻辑、pipe 参数、闭包捕获和 builtin 约定。只要这里的边界不稳,后面的标准库、table pipeline、LSP 诊断和 CLI 输出都会跟着变脆。

本文会沿着一条执行路径讲:Value 如何承载运行时数据,Environment 如何处理词法作用域,表达式如何逐步求值,用户函数和 builtin 为什么不同,以及一个 numeric equality bug 为什么必须在 evaluator 层修。

解释器首先需要一个统一的运行时值类型。当前 Value 覆盖了 Flux 子集执行所需的主要类型:

  • null
  • bool
  • int
  • uint
  • float
  • string
  • time
  • duration
  • regexp
  • array
  • object
  • function
  • table

这些值不是孤立存在的。数组承载 array.map/filter/reduce 这类高阶函数;对象既表示普通 record,也表示 package object、函数命名参数和 table row;函数值既可以是用户函数,也可以是 C++ builtin;table 值既可以是已 materialize 的 TableValue,也可以携带 lazy plan。

Value 的核心设计是一个明确的 type enum 加 variant storage。调用方通过 type() 判断,再使用 as_int()as_array()as_object() 这类访问器取值。这个设计比到处使用 std::any 更可控,也比一开始引入完整静态类型系统更轻。

Flux 03: Parser 与 AST

语言实现的第一层,是把源码变成结构化数据。对 cpp/pl/flux 来说,这一层由 syntax/scanner.rl、生成的 scanner、syntax/parser.cppsyntax/ast.h 组成。它的目标不是只让正确程序通过,而是尽量为后面的 runtime、CLI、formatter、LSP 和测试提供稳定、可定位的 AST。

Parser 的质量会向后传导。表达式优先级错了,runtime 会执行错;source location 粗糙,LSP 会跳转错;错误恢复太脆,用户在编辑器里输入半截代码时 diagnostics 就会崩;AST 结构过早脱糖,formatter 和 analyzer 又会丢掉用户写法。

所以这一篇不只讲“递归下降怎么写”,而是讲 Flux 前端需要守住哪些边界:token 上下文、pipe 语法、AST 结构、节点所有权、source location 和 malformed 输入恢复。

Scanner 的职责是把字符流切成 token,并尽量保留位置信息。Flux 的词法不只是普通标识符和数字。当前 scanner 覆盖了:

  • 关键字:packageimportoptionbuiltintestcasereturnif/then/else
  • 字面量:int、uint、float、string、duration、RFC3339 time、regexp。
  • 复合语法:字符串插值、属性注解、record/object、array/dict、函数箭头。
  • 运算符:算术、比较、正则匹配、逻辑、pipe-forward。
  • 类型语法 token:函数类型、record 类型、vector/stream 类型、where 约束。
  • 注释和换行位置信息。

其中最容易低估的是 /。它既可能是除法操作符,也可能是正则字面量开头:

matched = r.host =~ /edge-.*/
ratio = used / total

Scanner 不能只看单个字符决定 token 类型。当前实现会维护“此处是否期待表达式”的上下文,在可能接受表达式的位置允许 regexp literal,在普通二元运算位置按除法处理。这个细节如果做错,parser 后面看到的 token 就已经错了。

Flux 02: 语法导览

在进入 parser、runtime 和查询执行之前,先需要一张“这门 Flux 子集到底怎么写”的地图。

这个项目不是官方 Flux 的完整实现,而是一个可运行、可测试、可继续扩展的 Flux-like 子集。它已经覆盖常见查询、函数、标准库和 table pipeline,但仍有一些语法和语义边界需要明确。本文站在使用者视角,不讲 parser 怎么实现,只讲当前支持哪些写法、它们是什么意思,以及哪些地方暂时不要期待完整官方行为。

如果你已经熟悉 Flux,可以把这篇当作项目方言说明;如果你第一次接触 Flux,可以先读这篇,再去看后面的 parser 和 runtime 实现。

一个 Flux 文件通常由 package、import、option、变量定义和表达式组成:

package demo

import "array"
import regexp "regexp"

option location = {zone: "UTC", offset: 0s}

threshold = 80.0

array.from(rows: [
    {_time: 2024-01-01T00:00:00Z, host: "edge-1", _value: 91.2},
])
    |> filter(fn: (r) => r._value > threshold)

当前实现支持:

Flux 01: 项目目标与整体架构

这几年我一直在写一个 C++20 实现的 Flux 查询语言实验项目:cpp/pl/flux。它不是为了完整复刻 InfluxData 官方 Flux,也不是为了立刻做成生产级数据库,而是为了回答一个更工程化的问题:如果从零实现一个可运行、可调试、可测试的 Flux 子集,需要哪些模块,它们之间应该如何分层?

很多语言项目会停在 parser demo:能把源码解析成 AST,能打印一棵树,已经很有成就感。但查询语言更麻烦。它不仅要理解表达式,还要处理表流、group key、窗口、聚合、数据源、输出格式、IDE 体验和性能退化。只要其中一个边界没想清楚,后面就很容易把优化逻辑、运行时逻辑和标准库逻辑搅在一起。

目前这个项目已经不只是一个 parser。它包含 scanner、parser、AST dump、表达式解释器、运行时值模型、标准库 package、表流执行、SQLite/MySQL connector、查询计划、Page-based physical executor、CLI、REPL、LSP 和 conformance examples。换句话说,它已经有了一个小型语言运行时和单机查询引擎的骨架。

这一篇是整个系列的入口。我会先讲项目目标和能力边界,再讲一条 Flux 查询从源码到输出会经过哪些层,最后给出代码阅读和运行方式。后面的文章会沿着这些边界逐层展开。

Flux 很适合拿来做查询引擎实验,因为它同时具备三种特征。

第一,它是一门表达式语言。它有字面量、对象、数组、函数、闭包、条件表达式、正则、字符串插值和命名参数。实现它时,必须认真处理 scanner、parser、AST、runtime value、environment 和 function call。

第二,它是一门 pipeline 查询语言。|> 不是普通装饰语法;它决定了用户如何把数据源、过滤、投影、聚合和输出串起来。实现 pipe 后,运行时必须决定:一个表算子是立即执行,还是追加到 lazy logical plan 等待 optimizer 处理?

第三,它围绕 table stream 建模。Flux 查询不是简单返回一个数组,而是返回一组 logical table,每张表有 group key、列、行和结果名。这让它天然适合探索查询引擎里的 table pipeline、window、aggregate、join 和 connector pushdown。

一段很小的 Flux 查询就能覆盖这些问题:

import "array"

array.from(rows: [
    {_time: 2024-01-01T00:00:00Z, host: "edge-1", region: "east", usage: 91},
    {_time: 2024-01-01T00:01:00Z, host: "edge-2", region: "west", usage: 42},
])
    |> filter(fn: (r) => r.usage > 80)
    |> keep(columns: ["host", "usage"])
    |> yield(name: "hot_hosts")

为了让它真正跑起来,项目至少需要完成这些事情: