Hudi Timeline和Table Service


发布于 2025-06-08 / 20 阅读 / 0 评论 /
Hudi Timeline设计和应用,以及Table Service的使用

Timeline的作用

Hudi维护着一条对Hudi数据集所有操作的不同 Instant组成的 Timeline(时间轴),通过时间轴,用户可以轻易的进行增量查询或基于某个历史时间点的查询,这也是Hudi对外提供基于时间点查询的核心能力之一。

Timeline源码解析

HoodieTimeline类结构

Timeline在Hudi中被定义为 HoodieTimeline接口

Action类型

HoodieTimeline接口定义了针对 Instant的不同操作

String[] VALID_ACTIONS_IN_TIMELINE = {
  COMMIT_ACTION, 
  DELTA_COMMIT_ACTION,
  CLEAN_ACTION, 
  SAVEPOINT_ACTION, 
  RESTORE_ACTION, 
  ROLLBACK_ACTION,
  COMPACTION_ACTION, 
  LOG_COMPACTION_ACTION, 
  REPLACE_COMMIT_ACTION, 
  CLUSTERING_ACTION, 
  INDEXING_ACTION
};

操作说明如下

(1)commit:将记录原子写入数据集。

(2)deltacommit :将一批记录原子写入到 MergeOnRead存储类型的数据集(写入增量日志log文件中)。

(3)clean :删除数据集中不再需要的旧版本文件。

(4)rollback :表示当 commit/deltacommit不成功时进行回滚,其会删除在写入过程中产生的部分文件。

(5)savepoint:将某些文件组标记为已保存,以便其不会被删除。在发生灾难需要恢复数据的情况下,它有助于将数据集还原到时间轴上的某个点。

(6)compaction :将基于行的log日志文件转变成列式parquet数据文件。 compaction在时间轴上表现为特殊提交。

(7)restore:将从某个 savepoint恢复。

HoodieInstant

Timeline与 Instant密切相关,每条 Timeline必须包含零或多个 Instant。所有 Instant构成了 Timeline, Instant在Hudi中被定义为 HoodieInstant,其主要包含三个字段

(1)state:状态,如 requested、 inflight、 completed等状态,状态会转变,如当提交完成时会从 inflight状态转变为 completed状态。

(2)action:操作,对数据集执行的操作类型,如 commit、 deltacommit等。

(3)tmiestamp:时间戳,发生的时间戳,Hudi会保证单调递增。

  /**
   * Instant State.
   */
  public enum State {
    // Requested State (valid state for Compaction)
    REQUESTED,
    // Inflight instant
    INFLIGHT,
    // Committed instant
    COMPLETED,
    // Invalid instant
    NIL
  }

Clean

Clustering

Clustering 是 Hudi 在 0.7.0 版本引入的一项特性,用于优化文件布局,提升读写性能,现在它已经成为 Hudi 的一项重要性能优化手段。

Clustering要解决的问题

通常,数据采集倾向于将数据并行写入多个小文件,这样可以提升写入吞吐量,让下游及早获得采集数据。

但对于查询来说,大量的小文件会严重影响读取性能。

另一方面,在数据采集时,数据是按到达的先后顺序存储的,这种数据分布无法被查询引擎有效利用,如果数据能按查询频率最高的条件列排序后再存储则可以显著提升部分查询的性能,这里有两方面的原因:

(1)一是排序后可以利用谓词下推和 Data Skip 技术跳过大量不相关的数据

(2)二是有一个为人所熟知的理论:统计显示,当一条记录被访问后,与之“临近”的数据也将很快被访问到,现代文件系统(例如 HDFS)一般都有 Block Cache,已读取的数据块会被缓存在内存中,访问临近数据时效率会非常高。这里的“临近”就取决于我们如何对数据进行排序。

所以,对于一个湖仓系统来说,在数据接入和数据查询两种场景下,对文件大小和数据排布是有不同要求或偏好的,在数据只有一份,配置也只有一份的情况下,系统可优化的空间非常有限,用户只能在接入性能和查询性能之间进行权衡。

Clustering如何解决问题

针对以上的问题,Hudi 的 Clustering 给出了一套相对完善的解决方案。

Clustering的核心思想是:在数据接入时,允许并行写入多个小文件,以提升写入性能,同时通过一个异步(也可以同步执行,但不推荐)进程或线程周期性地将小文件合并成大文件并在这一过程中对数据按特定的列重新排序,这样在解决小文件问题的同时又改善了查询性能。

实际上,Clustering 是一种通用的数据布局优化手段,Spark SQL/Hive 中的 cluster by 和 Cassandra 中的 clustering key 都是 Clustering 思想的具体实现,只是 Hudi 的 Clustering 除了这一标准功能外还多了一项合并小文件的工作。

Clustering的设计和实现

下面我们来看下Clustering具体是如何实现的。

Clustering的执行机制

Clustering的执行分为:排期(Schedule)和执行(Execute)两个阶段。

Schedule阶段的主要工作是划定哪些文件将参与 Clustering,然后生成一个计划(Clustering Plan)保存到 Timeline 里,此时在 Timeline 里会出现一个名为 replacecommit 的 Instant,状态是 REQUESTED。

Execute阶段的主要工作是读取这个计划(Clustering Plan)并执行它,执行完毕后,Timeline 中的 replacecommit 就会变成 COMPLETED 状态。

Clustering Schedule策略

Clustering 在排期有可插拔的策略,以及在执行期间如何应对数据更新也有相应的更新策略,执行策略和更新策略较为简单

Hudi 有三种 Clustering 排期策略可供选择:

SparkSizeBasedClusteringPlanStrategy:该策略为默认的排期策略,它会筛选出符合条件的小文件(就是看文件大小,小于 clustering.plan.strategy.small.file.limit 规定值的文件就是小文件),然后将选出的小文件分成多个 Group,Group 的数量和大小都是可配置的,划分 Group 的目的是提升 Clustering 的并行度。注意:该策略将会扫描全部分区。

SparkRecentDaysClusteringPlanStrategy:该策略会在此前 N 天的分区内查找小文件,对于使用日期作分区,且数据增量是可预期的数据表来说,这种策略是非常适合的。如果在这种情况下使用默认排期策略,就会扫描全部分区,给系统带来没有必要的负载。

SparkSelectedPartitionsClusteringPlanStrategy:该策略允许我们针对特定的分区进行 Clustering,这可能会应用在运维或某些具有独特业务特征的数据表上。

Clustering Execute过程

Clustering 的运行模式也分为:同步、异步以及半异步三种模式(“半异步”模式是本文使用的一种叫法,为的是和同步、异步两种模式的称谓对齐,Hudi 官方文档对这一模式有介绍,但没有给出命名),它们之间的差异主要体现在从(达到规定阈值的某次)提交(Commit)到排期(Schedule)再到执行(Execute)三个阶段的推进方式上。

在 Hudi 的官方文档中,交替使用了 Sync/Async和Inline/Offline 两组词汇描述推进方式,这两组词汇是有微妙差异的

同步模式(Inline Schedule,Inline Execute)

在该模式下,当累积的提交(Commit)次数到达一个阈值时,会立即触发 Clustering 的排期与执行(排期和执行是连在一起的),而这个阈值是由配置项 hoodie.clustering.inline.max.commits 控制的,默认值是 4,即:默认情况下,每提交 4 次就(有可能)会触发并执行一次 Clustering。

触发同步模式的配置如下

配置

hoodie.clustering.inline

true

hoodie.clustering.schedule.inline

false

hoodie.clustering.async.enabled

false

异步模式(Offline Schedule,Offline Execute)

半异步模式(Inline Schedule,Offline Execute)

Compaction