快捷搜索:  汽车  科技

大数据计算平台设计任务调度(构建高性能数据分析系统的任务编排)

大数据计算平台设计任务调度(构建高性能数据分析系统的任务编排)众所周知,Excel 是使用最广泛的数据分析工具。当我们使用了 Excel 中的公式之后,当我们修改了 A 单元格的值,对应的结果会自动发生变化。而如果在这时,还有其它依赖于此单元格的值时,对应的结果也会发生变化。如下图所示:所以,在开始之前,让我们先看一个简单的例子,Excel 如何实现增量计算。GUI 应用, 诸如于 React 的 Dom Diff不断变化的大型计算,诸如于金融计算、电子表格、大数据系统构建系统,诸如于 Gradle、Bazel、Rustc 等

在起始的那篇《金融 Python 即服务:业务自助的数据服务模式》,我们介绍了:使用 Python 如何使用作为数据系统的 wrapper 层?在这一篇文章里,我们将继续之前的话题,介绍如何使用 Python 作为计算引擎核心的胶水层,即:如何使用 Python 构建 DAG(有向无环图,Directed Acyclic Graph) 任务?

除此,还可以了解一下,如何设计增量 DAG 计算?先看一下增量计算的概念:

增量计算(Incremental computing),是一种软件功能,每当一条数据发生更改时,它都会尝试通过仅重新计算依赖于更改数据的输出来节省时间。

常见的领域有:

  • GUI 应用, 诸如于 React 的 Dom Diff

  • 不断变化的大型计算,诸如于金融计算、电子表格、大数据系统

  • 构建系统,诸如于 Gradle、Bazel、Rustc 等

所以,在开始之前,让我们先看一个简单的例子,Excel 如何实现增量计算。

引子 1:Excel 的增量计算

众所周知,Excel 是使用最广泛的数据分析工具。当我们使用了 Excel 中的公式之后,当我们修改了 A 单元格的值,对应的结果会自动发生变化。而如果在这时,还有其它依赖于此单元格的值时,对应的结果也会发生变化。如下图所示:

大数据计算平台设计任务调度(构建高性能数据分析系统的任务编排)(1)

出自 《How to Recalculate a Spreadsheet》

在 Microsoft 官方的文档里(Excel 重新计算),可以看到对应的触发重新计算场景:输入新数据、删除或插入行或列等等。在 Excel 中,工作表的计算可视为包含三个阶段的过程:

  1. 构造依赖关系树

  2. 构造计算链

  3. 重新计算单元格

一旦触发了重新计算,Excel 会重新构造依赖关系树和计算链,并依赖于此的所有单元格标记为 ”脏单元格“。随后,根据计算链指定的顺序重新计算。通常来说,在我们设计依赖分析时,假定的是函数是不可变的。但是呢,还存在一些特殊的函数类型,诸如于文档中提到的:

  • 异步函数 (UDF)

  • 可变函数。即哪怕参数没有变化时,值也可能修改。诸如于 Now、Today 等。

这意味着,我们在设计增量计算时,需要考虑到这个场景的问题。从原理和实现来说,它一点并不算太复杂,有诸如于

从注解 DAG 到增量 DAG 设计

DAG (有向无环图,Directed Acyclic Graph)是一种常用数据结构,仅就 DAG 而言,它已经在我们日常的各种工具中存在:

  • 依赖系统。诸如如 NPM、Yarn、Gradle、Cargo 等

  • 人工智能。如机器学习等

  • 数据流系统。如编译器、Apache Spark、Apache Airflow 等。

  • 数据可视化。常用的 Graphviz,又或者是各个语言里的 Network 相关的库,诸如于 Python 的 NetworkX。

当我们从任务编排和数据等的角度来看,DAG 的面向普通人术语是叫工作流(Workflow)。

常规 DAG 到函数式 DAG

通常情况下,实现一个 DAG 非常的简单 —— 只是数据结构。在使用时,也比较简单,如下是 Cytoscape 的 API 示例:

  1. cy.add([

  2. { group: 'nodes' data: { id:'n0'} position: { x:100 y:100} }

  3. { group: 'nodes' data: { id:'n1'} position: { x:200 y:200} }

  4. { group: 'edges' data: { id:'e0' source:'n0' target:'n1'} }

  5. ]);

而这一类 DAG 是静态的,当我们需要结合些任务时,就会需要添加函数。由此便会稍微复杂一些,再现看个示例:

  1. comp = Computation

  2. comp.add_node('a')

  3. comp.add_node('b' lambdaa: a 1)

  4. comp.add_node('c' lambdaa b:2*a)

  5. comp.add_node('d' lambdab c: b c)

  6. comp.add_node('e' lambdac: c 1)

  7. comp.compute('d')

  8. comp.get_value_dict

上述的代码中,是 Loman 框架的示例,其中的 lambda a: a 1是 Python 的 Lambda 表达式。Loman 会在运行时,分析这个 Lambda,获得 Lambda 中的参数,随后添加对应的计算依赖。

大数据计算平台设计任务调度(构建高性能数据分析系统的任务编排)(2)

Loman 示例

而在多数场景之下,往往是采用注解的形式,诸如于 Airflow、Gradle 等。

基于注解与条件的 DAG 函数

回到研究的开始,如美银证券的 Quartz 的 DSL 扩展(Little languages),便是在 Loman 的形式上进行了一步扩展。使用注解代替了 Lambda:

  1. classC:


  2. @dag

  3. deff1(self x y):

  4. returnself.f2(x) y


  5. @dag

  6. deff2(self x):

  7. returnx * x

围绕于这个注解,Quartz 在这一层的实现上,包含了四个特性:DAG、记忆化(memoization)、持久化、时间旅行调试(time travel)。考虑到 Quartz 并不是一个开源的实现,社区上的材料不一定靠谱,所以我们还是再看看 Apache Ariflow 的实现。引用官网的示例:

  1. fromdatetimeimportdatetime


  2. fromairflowimportDAG

  3. fromairflow.decoratorsimporttask

  4. fromairflow.Operators.BashimportBashOperator


  5. # A DAG represents a workflow a collection of tasks

  6. withDAG(dag_id="demo" start_date=datetime(2022 1 1) schedule="0 0 * * *")asdag:


  7. # Tasks are represented as operators

  8. hello = BashOperator(task_id="hello" bash_command="echo hello")


  9. @task

  10. defairflow:

  11. print("airflow")


  12. # Set dependencies between tasks

  13. hello >> airflow

从实现上来说,Apache Airflow 的 DAG 实现本着 “工作流即代码” 的思想设计的。上面代码中,比较有意思的是 >>语法,其是在任务之间定义了一个依赖关系并控制任务的执行顺序。

增量 DAG 注解:Gradle —— 监听输入与输出

在编译上,Gradle 也是支持增量编译(也是一种增量计算)的,我们可以先看个简单的示例:

  1. abstractclassIncrementalReverseTaskextendsDefaultTask {

  2. @Incremental

  3. @InputDirectory

  4. abstractDirectoryProperty getinputDir


  5. @OutputDirectory

  6. abstractDirectoryProperty getOutputDir


  7. @TaskAction

  8. voidexecute(InputChanges inputChanges) {

  9. inputChanges.getFileChanges(inputDir).each { change ->

  10. if(change.fileType == FileType.DIRECTORY)return


  11. def targetFile = outputDir.file(change.normalizedPath).get.asFile

  12. if(change.changeType == ChangeType.REMOVED) {

  13. targetFile.delete

  14. } else{

  15. targetFile.text = change.file.text.reverse

  16. }

  17. }

  18. }

  19. }

对于 Gradle 的增量任务来说,通常只需要关注输入和输出,只要 InputDirectoryOutputDirectory不变,那么就认为 Task 不需要再执行。因为在实现处理逻辑时,只关注于这两个值是否发生变化。

Rust 语言:Salsa 框架的增量 DAG 设计

Rust 编译器的文档上也包含了相关的介绍:Incremental compilation,而这里我们是一个相关的实现 —— 增量编译的设计者之一(Niko Matsakis)编写的库 Salsa。Salsa 是一个用于编写增量 (incremental) 、按需 (on-demand) 程序的 Rust 框架,其采用的是 “红-绿”算法。与 Gradle 相似的,Salsa 结构体(Structs)是使用一种 Salsa 属性宏进行了标注的结构体:

  • #[salsa::input]:用于指定计算的“基本输入”

  • #[salsa::tracked]:用于指定在计算过程中创建的中间值

  • #[salsa::interned]:用于指定易于进行相等比较的小型值

由于 Salsa 相比于 Gradle 是位于更底层的基础设施,所以需要手动构建存储层,即 Jar 和数据库)。数据库是一个结构体,它最终存储 Salsa 的所有中间状态,例如来自跟踪函数的被记忆的 (memoized) 返回值。数据库本身是以一些中间结构 (intermediate structure) 的形式定义的,这些中间结构被称为 jars,并包含每个函数的数据。

缓存计算与存储计算

既然,我们已经通过注解将输入、输出、函数等内容标注出来,下一步就是缓存结果。如此一来,我们就可以通过缓存来提升计算性能。对于计算的缓存来说,至少需要包含这三个部分:

  • 函数表达式(Fn 类型)。

  • 零个或多个参数。

  • 一个可选名称。

由此,我们才能获得缓存后的结果。在一些框架的设计里,诸如于 Python 语言

内存:Memoization —— 函数式编程的记忆

Memoization(记忆化)是函数式语言的一种特性,使用一组参数初次调用函数时,缓存参数和计算结果,当再次使用相同的参数调用该函数时,直接返回相应的缓存结果。在一些不支持 memoization 的语言里,需要手动引入这种设计,如 Java:

  1. Map<Integer Integer> cache = newConcurrentHashMap<>;


  2. Integer addOne(Integer x) {

  3. returncache.computeIfAbsent(x -> x 1);

  4. }

上述只是一个加法的示例,万能的 StackOverflow 上有更多的示例:Java memoization method。

当然了,缓存是有负作用的 —— 第一次计算时存储结果会花费一定的时间,不过大部分情况下可以忽略不计。

数据库存储

对于耗时更长的 AI 或者是金融计算场景时,需要采用分布式的任务调度器,才能更快的得到计算结果。于是乎,采用分布式键值存储来对结果进行缓存就是更好的选择。在 Salsa 框架里,由于考虑到不同的类型(input、output、tracked 等),对于数据结构函数等来说,其对应的 Index 由三部分组成:

  1. #[derive(Copy Clone PartialEq Eq PartialOrd Ord Hash Debug)]

  2. pub structDatabaseKeyIndex {

  3. group_index: u16

  4. query_index: u16

  5. key_index: u32

  6. }

大抵是

增量计算框架与算法

由于时间与精力限制(主要是我看不懂一些用英语写的公式,还有暂时没打算学 OCaml),这里就没有展开对于各类计算框架论文的讨论。诸如于 Incremental 和 Adapton 就是相关的论文与实现,就包含了非常不错的资料。

除此:https://lord.io/spreadsheets/ 一文也给了非常好的介绍。

大数据计算平台设计任务调度(构建高性能数据分析系统的任务编排)(3)

这里,我就不展开了。

有了增量计算,然后呢?

后续的计算部分,可以参考 Apache Airflow 来实现。它是一个支持开源分布式任务调度框架,其架构

  • 调度程序,它处理触发计划的工作流,并将任务提交给执行程序以运行。

  • 执行器,它处理正在运行的任务。在默认的 Airflow 安装中,这会在调度程序中运行所有内容,但大多数适合生产的执行程序实际上会将任务执行推送给工作人员。

  • Web 服务器,它提供了一个方便的用户界面来检查、触发和调试 DAG 和任务的行为。

  • DAG 文件的文件夹,由调度程序和执行程序(以及执行程序拥有的任何工作人员)读取

  • 元数据数据库,由调度程序、执行程序和网络服务器用来存储状态。

其架构图如下:

大数据计算平台设计任务调度(构建高性能数据分析系统的任务编排)(4)

不过、过了、还是不过,考虑到 Airflow 的 DAG 实现是 Python,在分布式任务调度并不是那么流行。但是,作为一个参考还是非常不错的。

其他

相关参考资料:

  • 《How to Recalculate a Spreadsheet》一篇非常不错的文章,介绍了不同的算法是如何重新计算电子表格的。当然了,也包含作者自己写的新方案 Anchors。对于写库来说,是一个非常不错的参考。

  • 《Excel 重新计算》介绍了 Excel 重新计算的逻辑。

  • Salsa 文档:https://salsa-rs.netlify.app/ (中文版翻译:https://rust-chinese-translation.github.io/salsa-book/ )

  • Adapton 提供了一个增量计算的编程语言抽象,官网:http://adapton.org/ 提供了非常不错的参考资料

除此,在构建工具方面,在这一方面微软研究院的《Build Systems à la Carte》提供了一个非常不错的介绍,如果你可以参考这一篇《【工业聚看论文】第一期:《Build Systems à la Carte: Theory and Practice》

猜您喜欢: