快捷搜索:  汽车  科技

使用flinkhudi构建流式数据湖:Apache Hudi 新一代流式数据湖平台

使用flinkhudi构建流式数据湖:Apache Hudi 新一代流式数据湖平台查询引擎通常依靠分区来减少读取的文件数量。在数据库方面,Hive 分区只不过是一个粗略的范围索引,它将一组列映射到一个文件列表。像 Iceberg/Delta Lake 这些在云中诞生的表格式,在单个文件 (json/avro) 中内置了对每个文件的列范围跟踪,这有助于避免大型 / 小型表格的规划成本。到目前为止,Hudi 表的这种需求已大大减少,因为 Hudi 会自动强制执行文件大小,这有助于降低从 Parquet 页脚读取统计信息所需的时间,例如,随着聚簇(Clustering)等功能的出现,可以先生成较小的文件,然后以查询优化的方式重新聚簇。我们计划添加索引列范围,它可以扩展到大量小文件并支持更快的变更。请参阅 RFC-27[30] 以跟踪设计过程并参与其中。将来,我们打算添加其他形式的索引作为元数据表上的新分区。让我们简要讨论一下每个部分所扮演的角色。索引可帮助数据库规划更好的查询

时间线是所有 Hudi 表元数据的真实事件日志,存储在 .hoodie 文件夹下,提供对表执行的所有操作的有序日志。事件将保留在时间线上,直至配置的时间 / 活动间隔。每个文件组也专门设计了自包含的日志,这意味着即使影响文件组的操作从时间轴存档,每个文件组中的记录的正确状态也可以通过简单地在本地应用 delta 记录到基本文件来重新构建。这种设计限制了元数据大小,它只与表被写入 / 操作的频率成正比,与整个表的大小无关。这是支持频繁写入 / 提交表的关键设计元素。

最后,时间线上的新事件被消费并反映到内部元数据表上,元数据表实现为另一个提供低写入放大的 MOR 表。Hudi 能够支持对表元数据的快速更改,这与专为缓慢变动数据设计的表格式不同。此外,元数据表使用 HFile 基本文件格式 [28],它提供键的索引查找,避免了不必要的全表扫描。它当前存储作为表的所有物理文件路径相关的元数据,以避免昂贵的云文件 listing。

今天所有表格式面临的一个关键挑战是需要决策快照过期以及控制时间旅行查询(time travel queries)的保留时长,以便它不会干扰查询计划 / 性能。未来,我们计划在 Hudi 中构建一个索引时间线,它可以跨越表的整个历史,支持几个月 / 几年的时间旅行回顾窗口。

索引

索引可帮助数据库规划更好的查询,从而减少 I/O 总量并提供更快的响应时间。有关表文件 listing 和列统计信息的表元数据通常足以让湖上的查询引擎快速生成优化的、引擎特定的查询计划。然而,这还不足以让 Hudi 实现快速插入。

目前 Hudi 已经支持不同的基于主键的索引方案,以快速将摄入的记录键映射到它们所在的文件组中。为此,Hudi 为写入器引入了一个可插拔的索引层,内置支持范围剪枝(当键被排序并且记录在很大程度上按顺序到达时)使用间隔树(interval trees)和布隆过滤器(例如:对基于 uuid 的键,排序几乎没有帮助)。Hudi 还实现了一个基于 HBase 的外部索引,虽然运行成本更高,但性能更好,同时支持用户自定义索引实现。Hudi 也有意识地利用表的分区信息来实现全局和非全局的索引方案。用户可以选择仅在一个分区内强制执行键约束,以换取 O(num_affected_partitions) 复杂度的 upsert 性能,而不是全局索引场景中的 O(total_partitions) 复杂度。

我们向您推荐这篇博客 [29],它详细介绍了索引相关的内容。最终,Hudi 的写入器确保索引始终与时间轴和数据保持同步,而这如果在表格式之上手动实现既麻烦又容易出错。

使用flinkhudi构建流式数据湖:Apache Hudi 新一代流式数据湖平台(1)

将来,我们打算添加其他形式的索引作为元数据表上的新分区。让我们简要讨论一下每个部分所扮演的角色。

查询引擎通常依靠分区来减少读取的文件数量。在数据库方面,Hive 分区只不过是一个粗略的范围索引,它将一组列映射到一个文件列表。像 Iceberg/Delta Lake 这些在云中诞生的表格式,在单个文件 (json/avro) 中内置了对每个文件的列范围跟踪,这有助于避免大型 / 小型表格的规划成本。到目前为止,Hudi 表的这种需求已大大减少,因为 Hudi 会自动强制执行文件大小,这有助于降低从 Parquet 页脚读取统计信息所需的时间,例如,随着聚簇(Clustering)等功能的出现,可以先生成较小的文件,然后以查询优化的方式重新聚簇。我们计划添加索引列范围,它可以扩展到大量小文件并支持更快的变更。请参阅 RFC-27[30] 以跟踪设计过程并参与其中。

虽然 Hudi 已经针对“随机写”类型的工作负载提供了外部索引,但我们还希望在湖存储之上支持“点查”[31],这有助于避免为许多类数据应用程序添加额外的数据库存储开销。我们还预计利用记录级索引的方案后,基于 uuid/key 的 join 以及 upsert 的性能将大大加快。我们还计划将对布隆过滤器的跟踪从文件页脚移到元数据表上的分区中 [32]。最终,我们希望在即将发布的版本中将所有这些都暴露给查询。

并发控制

并发控制定义了不同的写入器 / 读取器如何协调对表的访问。Hudi 确保原子写入,通过将提交原子地发布到时间线,并标记一个即时时间(instant),以标记该操作具体发生的时间。与通用的基于文件的版本控制不同,Hudi 明确区分了写入进程(执行用户的更新、插入 / 删除)、表服务(写入数据、元数据以优化执行所需的信息)和读取器(执行查询和读取数据)。Hudi 在所有三种类型的进程之间提供快照隔离,这意味着它们都对表的一致快照进行操作。Hudi 在写入器之间提供乐观并发控制 (OCC)[33],同时在写入器和表服务之间以及不同表服务之间提供基于无锁、非阻塞的 MVCC 的并发控制。

完全依赖 OCC 的项目通常通过依赖锁或原子重命名来处理“竞争”操作。这种方法乐观地认为,真正的资源争用永远不会发生,而如果发生冲突,就会使其中一个写入操作失败,这可能会导致严重的资源浪费或操作开销。

想象一下有两个写入器进程的场景:一个每 30 分钟生成一次新数据的摄取写入器作业和一个需要 2 小时才能发出的执行 GDPR 的删除写入器作业。如果在相同的文件上有重叠(在随机删除的真实情况下很可能发生),删除作业几乎肯定会引发“饥饿”并且每次都无法提交,浪费大量的集群资源。Hudi 采用了一种非常不同的方法,我们认为这种方法更适用于湖的事务,这些事务通常是长期运行的。例如,异步压缩可以在不阻塞摄取作业的情况下在后台持有删除记录。这是通过文件级、基于日志的并发控制协议实现的,该协议根据时间线上的开始 instant 对操作进行排序。

使用flinkhudi构建流式数据湖:Apache Hudi 新一代流式数据湖平台(2)

我们正在改进当前基于 OCC 的实现,以尽早检测并发写入器的冲突并在不消耗 CPU 资源的情况下提前终止冲突。我们还尽力在写入器之间添加完全基于日志 [34] 的非阻塞并发控制,其中写入器持续写入增量,稍后以某些确定性时间线顺序解决冲突——这也与流处理程序的写入方式非常相似。这仅是因为 Hudi 的独特设计将操作排序到有序的事件日志中,并且事务处理代码知道操作之间的关系 / 相互依赖关系。

写入器

Hudi 表可以用作 Spark/Flink 管道的接收器,并且 Hudi 写入实现提供了比原生的 parquet/avro sinks 更多的功能。Hudi 将写入操作细分为增量(插入、更新插入、删除)和分批 / 批量操作(插入覆盖、插入覆盖表、删除分区、批量插入),每个操作都是高性能和高内聚的。upsert 和 delete 操作都会自动处理输入流中具有相同键的记录的预合并(例如,从上游表获得的 CDC 流),然后查找索引,最后调用二进制打包算法将数据打包到文件中,同时遵循预配置的目标文件大小 [35]。另一方面,插入操作足够智能,可以避免预合并和索引查找,同时保留管道其余部分的优势。同样,“批量插入” 操作提供了多种排序模式,用于在将数据从外部表导入 Hudi 时控制初始文件大小和文件计数。其他批量写入操作提供了批量数据管道中使用的典型覆盖语义的基于 MVCC 实现,同时保留了所有事务性和增量处理功能,使得在用于常规运行的增量管道和用于回填 / 删除旧分区的批量管道之间无缝切换。写管道还包含通过溢出到 RocksDB[36] 或外部可溢出映射、多线程 / 并发 I/O 来处理大型合并的优化,以提高写入性能。

主键是 Hudi 中的一等公民,并且在更新插入 / 删除之前完成的预合并 / 索引查找可确保键在分区之间或分区内是唯一的。与其他由数据工程师使用 MERGE INTO 语句进行协调的方法相比,这种方法对于关键的用例场景可确保数据质量。Hudi 还附带了几个内置的主键生成器 [37],可以解析所有常见的日期 / 时间戳,使用自定义的主键生成器的可扩展框架处理格式错误的数据。主键将随记录一起物化到 _hoodie_record_key 元数据列,这允许更改主键字段并对旧数据里不正确的主键数据进行修复。

最后,Hudi 提供了一个与 Flink 或 Kafka Streams 中的处理器 API 非常相似的 HoodieRecordPayload 接口,并允许在基本和增量日志记录之间表达任意合并条件。这允许用户表达部分合并(例如,仅将更新的列记录到增量日志以提高效率)并避免在每次合并之前读取所有基本记录。通常,我们发现用户在将旧数据重放 / 回填到表中时会利用这种自定义合并逻辑,同时确保不会覆盖较新的更新,从而使得表的快照能及时返回。这是通过简单地使用 HoodieDefaultPayload 来实现的,它根据数据中配置的预合并字段来选择给定键的最新值。

Hudi 写入器向每条记录添加元数据,该元数据为每个提交中的每条记录编码提交时间和序列号(类似于 Kafka 偏移量),这使得派生记录级别的更改流成为可能。Hudi 还为用户提供了在摄入数据流中指定事件时间字段并在时间轴中跟踪它们的能力。将这些映射到流处理领域的概念,Hudi 包含每个提交记录的到达时间和事件时间 [38],这可以帮助我们为复杂的增量处理管道构建良好的水印 [39]。

在我们着手实现完整的端到端增量 ETL 管道这一宏伟目标之前,我们希望在不久的将来添加新的元数据列,用于对每条记录的源操作(插入、更新、删除)进行编码。总而言之,我们意识到许多用户可能只是想将 Hudi 用作支持事务、快速更新 / 删除的高效写入层。我们正在研究添加对虚拟主键 [40] 的支持并使元数据列可选 [41],以降低存储开销,同时仍然使 Hudi 的其余功能(元数据表、表服务等)可用。

读取器

Hudi 在写入器和读取器之间提供了快照隔离,并允许所有主流的湖查询引擎(Spark、Hive、Flink、Presto、Trino、Impala)甚至云数仓(如 Redshift)在任何表快照上进行一致的查询。事实上,我们也很乐意将 Hudi 表作为外部表与 BigQuery/Snowflake 一起使用,前提是它们也更原生地支持湖的表格式。

我们围绕查询性能的设计理念是在仅读取基本列式文件(COW 快照表、MOR 读优化查询表)时使 Hudi 尽可能轻量,例如在 Presto、Trino、Spark 中使用引擎特定的向量化读取器。与维护我们自研的读取器相比,此模型的可扩展性要高得多。例如 Presto[42]、Trino[43] 都有自己的数据 / 元数据缓存。每当 Hudi 必须为查询合并基础文件和日志文件时,Hudi 都会进行控制并采用多种机制(可溢出的映射、延迟读取)来提高合并性能,同时还提供对数据的读优化查询,以权衡数据新鲜度与查询性能。

在不久的将来,我们将深度投入以通过多种方式提高 MOR 表的快照查询性能,例如内联 Parquet 数据、覆盖有效负载 / 合并的特殊处理。

使用flinkhudi构建流式数据湖:Apache Hudi 新一代流式数据湖平台(3)

忠于其设计目标,Hudi 提供了一些非常强大的增量查询功能,将写入过程中添加的元字段和基于文件组的存储布局联系在一起。虽然仅跟踪文件的表格式,只能提供在每个快照或提交期间更改的文件的信息,但由于跟踪了记录级事件和到达时间,Hudi 会生成给定时间线上某个时间点更改的确切记录集。此外,这种设计允许增量查询在较大数据量的提交中获取较小的数据集,完全解耦数据的写入和增量查询。时间旅行基于增量查询实现,该查询作用在时间线上过去的部分范围。由于 Hudi 确保在任何时间点将主键原子地映射到单个文件组,因此可以在 Hudi 表上支持完整的 CDC 功能,例如提供自时间 t 以来给定记录的所有可能值在 CDC 流上的“before”和“after”视图。所有这些功能都可以在每个文件组的本地构建,因为每个文件组都是一个独立的日志。我们未来在该领域的大部分工作是期望把类似 debezium[44] 所具备的能力带入到 Hudi 中来。

表服务

多年来定义和维持一个项目价值的通常是其基本设计原则和微妙的权衡。数据库通常由多个内部组件组成,它们协同工作以向用户提供效率、性能和出色的可操作性。为了让 Hudi 能作为增量数据管道的状态存储,我们为其设计了内置的表服务和自我管理运行时,可以编排 / 触发这些服务并在内部优化一切。事实上,如果我们比较 RocksDB(一种非常流行的流处理状态存储)和 Hudi 的组件,相似之处就很明显了。

使用flinkhudi构建流式数据湖:Apache Hudi 新一代流式数据湖平台(4)

Hudi 有几个内置的表服务,目标都是确保高性能的表存储布局和元数据管理,它们在每次写入操作后同步自动调用,或者作为单独的后台作业异步调用。此外,Spark(和 Flink)流写入器可以在“连续”模式下运行,并调用表服务与写入器智能地异步共享底层执行器。

归档(Archival)服务确保时间线为服务间协调(例如压缩服务等待其他压缩服务在同一文件组上完成)以及增量查询保留足够的历史记录。一旦事件从时间线上过期,归档服务就会清除湖存储的任何副作用(例如回滚失败的并发事务)。Hudi 的事务管理允许所有这些服务都是幂等的,因此只需简单的重试即可对故障容错。

清理(Cleaner[45])服务以增量的方式工作(吃我们自己的增量设计狗粮),删除超过保留期限的用于增量查询的文件切片,同时还为长时间运行的批处理作业(例如 Hive ETL)留出足够的时间来完成运行。

压缩(Compaction)服务带有内置策略(基于日期分区,I/O 有界),将基本文件与一组增量日志文件合并以生成新的基本文件,同时允许对文件组进行并发写入。这得益于 Hudi 将文件分组并支持灵活的日志合并,同时在并发更新对同一组记录出现问题时执行非阻塞删除。

聚簇(Clustering[46])服务的功能类似于用户在 BigQuery 或 Snowflake 中找到的功能,用户可以通过排序键将经常查询的记录组合在一起,或者通过将较小的基本文件合并为较大的文件来控制文件大小。聚簇能完全感知时间线上的其他操作(例如清理、压缩),并帮助 Hudi 实施智能优化,例如避免对已聚簇的文件组进行压缩,以节省 I/O。Hudi 还通过使用标记文件跟踪作为写入操作的一部分创建的其他文件,以执行对写入中断的回滚并清除湖存储中的任何未提交的数据。

最后,引导(Bootstrap)服务将原始 Parquet 表一次性零拷贝迁移到 Hudi,同时允许两个管道并行运行,以进行数据验证。清理服务一旦意识到这些已被引导的基本文件,可以选择性地清理它们,以确保满足 GDPR 合规性等需求场景。

我们一直在寻找以有意义的方式改进和增强表服务。在即将发布的版本中,我们正在努力建立一个更具可扩展性 [47] 的清理“部分写入”的模型,方法是使用我们的时间轴元服务器注册标记文件的创建,从而避免昂贵的全表扫描来寻找和删除未提交的文件。我们还有各种提议 [48] 来添加更多的聚簇方案,使用完全基于日志的并发控制以通过并发更新来解锁聚簇

数据服务

正如本文最初所提及的,我们想让 Hudi 能对常见的端到端用例做到开箱即用,因此对一组数据服务进行了深度投入,这些服务提供了特定于数据 / 工作负载的功能,位于表服务之上,直面写入器和读取器。

该列表中最重要的是 Hudi DeltaStreamer 实用程序,它一直是一个非常受欢迎的选择,可以轻松地基于 Kafka 流以及在湖存储之上的不同格式的文件来构建数据湖。随着时间的推移,我们还构建了涵盖所有主流系统的源,例如 RDBMS/ 其他数仓的 JDBC 源、Hive 源,甚至从其他 Hudi 表中增量提取数据。该实用程序支持检查点的自动管理、跟踪源检查点作为目标 Hudi 表元数据的一部分,并支持回填 / 一次性运行。DeltaStreamer 还与 Confluent 等主要 Schema 注册表进行了集成,并提供了对其他流行机制(如 Kafka connect)的检查点转换。它还支持重复数据删除、多级配置管理系统、内置转换器,可以将任意 SQL 或强制 CDC 变更日志 [49] 转换为可写的形式,结合上述其他功能足以使其用于部署生产级的增量管道。

最后,就像 Spark/Flink 流写入器一样,DeltaStreamer 能够以连续模式运行,自动管理表服务。Hudi 还提供了其他几种工具,用于以一次性快照和增量的形式导出 Hudi 表,还可以将新表导入 / 导出 [50]/ 引导到 Hudi。Hudi 还向 Http 端点或 Kafka topic 提供关于表提交操作的通知,可用于分析或在工作流管理器(如 Airflow)中构建数据传感器以触发管道。

使用flinkhudi构建流式数据湖:Apache Hudi 新一代流式数据湖平台(5)

猜您喜欢: