hudi与iceberg功能和实现对比


发布于 2025-09-12 / 15 阅读 / 0 评论 /
介绍数据湖的两个重要表格式——hudi和iceberg在功能和应用场景中的异同。基于hudi-1.0.2和iceberg-1.10.0版本

Iceberg

Spark集成

配置catalog

spark操作iceberg首先需要配置catalog,以便spark引擎能通过catalog获取iceberg库表的元数据信息。

spark注册catalog的属性前缀为spark.sql.catalog.{catalog_name}

配置方式有两种:

(1)SparkContext启动时配置

例如,以下配置了两种catalog,一种时spark内置的catalog(spark_catalog),另一种是基于hadoop的catalog(local)。

spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0\
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
    --conf spark.sql.catalog.spark_catalog.type=hive \
    --conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.local.type=hadoop \
    --conf spark.sql.catalog.local.warehouse=$PWD/warehouse

(2)spark-defaults.conf静态文件配置

例如,以下配置了三种不同类型的catalog

spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port
# omit uri to use the same URI as Spark: hive.metastore.uris in hive-site.xml

spark.sql.catalog.rest_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.rest_prod.type = rest
spark.sql.catalog.rest_prod.uri = http://localhost:8080

spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path

其他iceberg catalog的配置可参考官方文档https://iceberg.apache.org/docs/latest/spark-configuration/#catalog-configuration

spark无法动态配置iceberg catalog,只能在SparkContext启动时或者spark-defaults.conf配置文件中进行静态的配置。

Spark DDL

主要包含以下操作

(1)CREATE TABLE

# 使用‘USING iceberg’创建一张普通的iceberg表
CREATE TABLE my_catalog.my_db.my_table001 (
    id bigint NOT NULL COMMENT 'unique id',
    data string)
USING iceberg;

# 创建分区表(非隐式分区)
CREATE TABLE my_catalog.my_db.my_table002 (
    id bigint,
    data string,
    category string)
USING iceberg
PARTITIONED BY (category);

# 创建隐式分区表
CREATE TABLE my_catalog.my_db.my_table003 (
    id bigint,
    data string,
    category string,
    ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);

(2)CREATE TABLE ... AS SELECT

当使用SparkCatalog时,此操作为原子操作。当使用SparkSessionCatalog时,此操作不是原子操作。

# 普通表
CREATE TABLE my_catalog.my_db.my_table004
USING iceberg
AS SELECT ...

# 新表不会继承select源表的分区信息和表属性,需要在新表中重新指定对应的信息
CREATE TABLE my_catalog.my_db.my_table005
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT ...

(3)REPLACE TABLE ... AS SELECT

同样,当使用SparkCatalog时,此操作为原子操作。当使用SparkSessionCatalog时,此操作不是原子操作。

原子的表替换会为表创建一个新的snapshot,而不是保留表的历史snapshots。

# 替换普通表,保留原表属性和分区信息
REPLACE TABLE my_catalog.my_db.my_table006
USING iceberg
AS SELECT ...

# 替换表,修改表属性和分区信息
REPLACE TABLE my_catalog.my_db.my_table007
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT ...

# 如果表不存在则新建
CREATE OR REPLACE TABLE my_catalog.my_db.my_table008
USING iceberg
AS SELECT ...

表结构和分区信息如果被修改了,则会被替换。为了避免修改表的结构和分区信息,可以使用INSERT OVERWRITE,而不是REPLACE TABLE。

REPLACE TABLE语句中指定的表属性会和原表的属性进行合并。

(4)DROP TABLE

对于DROP TABLE操作来说,0.14版本是个分水岭:0.14版本之前,DROP TABLE会清理表的元数据和删除表数据;0.14版本及以后,DROP TABLE只会清理表的元数据,如果要删除表数据,则需要使用“DROP TABLE PURGE”。

# 清理表的元数据
DROP TABLE my_catalog.my_db.my_table001;

# 清理表的元数据和删除表数据
DROP TABLE my_catalog.my_db.my_table002 PURGE;

(5)ALTER TABLE

Iceberg具备Spark SQL中对ALTER TABLE的完整支持。

# 表重命名
ALTER TABLE my_catalog.my_db.my_table009 RENAME TO my_catalog.my_db.my_table_rename;

# 新增或更新表属性
ALTER TABLE my_catalog.my_db.my_table009 SET TBLPROPERTIES (
    'read.split.target-size'='268435456'
);

# 删除表属性
ALTER TABLE my_catalog.my_db.my_table009 UNSET TBLPROPERTIES ('read.split.target-size');

# 设置表的描述
ALTER TABLE my_catalog.my_db.my_table009 SET TBLPROPERTIES (
    'comment' = 'A table comment.'
);

# 新增列
ALTER TABLE my_catalog.my_db.my_table009
ADD COLUMNS (
    new_column string comment 'new_column docs'
);

# 修改列名
ALTER TABLE my_catalog.my_db.my_table010 RENAME COLUMN data TO payload;

# 修改列定义,包括类型、描述等
ALTER TABLE my_catalog.my_db.my_table011 ALTER COLUMN measurement TYPE double COMMENT 'unit is bytes per second';
ALTER TABLE my_catalog.my_db.my_table011 ALTER COLUMN measurement COMMENT 'unit is kilobytes per second';

# 删除列
ALTER TABLE prod.db.sample DROP COLUMN id;

(6)ALTER TABLE SQL extensions

Iceberg-0.11版本新增了一个扩展模块,用于给Spark新增一些SQL命令,比如

  • CALL用于存储过程

  • ALTER TABLE ... WRITE ORDERED BY用于变更表定义

如需使用此扩展模块,需要新增以下配置

spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

ALTER TABLE SQL extensions用法如下

# 新增分区字段(非隐式分区)
ALTER TABLE my_catalog.my_db.my_table012 ADD PARTITION FIELD abc; -- identity transform

# 新增分区字段(隐式分区)
ALTER TABLE my_catalog.my_db.my_table013 ADD PARTITION FIELD bucket(16, id);
ALTER TABLE my_catalog.my_db.my_table014 ADD PARTITION FIELD truncate(4, data);
ALTER TABLE my_catalog.my_db.my_table015 ADD PARTITION FIELD year(ts);
-- use optional AS keyword to specify a custom name for the partition field 
ALTER TABLE my_catalog.my_db.my_table016 ADD PARTITION FIELD bucket(16, id) AS shard;

# 删除分区字段
ALTER TABLE my_catalog.my_db.my_table017 DROP PARTITION FIELD catalog;
ALTER TABLE my_catalog.my_db.my_table018 DROP PARTITION FIELD bucket(16, id);
ALTER TABLE my_catalog.my_db.my_table019 DROP PARTITION FIELD truncate(4, data);
ALTER TABLE my_catalog.my_db.my_table020 DROP PARTITION FIELD year(ts);
ALTER TABLE my_catalog.my_db.my_table021 DROP PARTITION FIELD shard;

# 更换分区字段
ALTER TABLE my_catalog.my_db.my_table022 REPLACE PARTITION FIELD ts_day WITH day(ts);
-- use optional AS keyword to specify a custom name for the new partition field 
ALTER TABLE my_catalog.my_db.my_table023 REPLACE PARTITION FIELD ts_day WITH day(ts) AS day_of_ts;

# 更换write order by字段
ALTER TABLE my_catalog.my_db.my_table024 WRITE ORDERED BY category, id
-- use optional ASC/DESC keyword to specify sort order of each field (default ASC)
ALTER TABLE my_catalog.my_db.my_table025 WRITE ORDERED BY category ASC, id DESC
-- use optional NULLS FIRST/NULLS LAST keyword to specify null order of each field (default FIRST)
ALTER TABLE my_catalog.my_db.my_table026 WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST

# WRITE ORDERED BY表示任务之间的数据是排序的,如果使用LOCALLY ORDERED BY,表示单个任务内是有序的,任务之间不一定是有序的
ALTER TABLE my_catalog.my_db.my_table027 WRITE LOCALLY ORDERED BY category, id

# 取消table write order的排序功能
ALTER TABLE my_catalog.my_db.my_table028 WRITE UNORDERED

# 使每个分区被某一个writer处理(默认实现是数据hash分布在writer处理)
ALTER TABLE my_catalog.my_db.my_table029 WRITE DISTRIBUTED BY PARTITION

Spark 读

基本查询功能

iceberg支持通用的sparksql语法进行表数据查询

时间旅行查询

spark3.3以后,支持TIMESTAMP AS OF 和 VERSION AS OF进行时间旅行查询。

-- time travel to October 26, 1986 at 01:21:00
SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';

-- time travel to snapshot with id 10963874102873L
SELECT * FROM prod.db.table VERSION AS OF 10963874102873;

-- time travel to the head snapshot of audit-branch
SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';

-- time travel to the snapshot referenced by the tag historical-snapshot
SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';

-- FOR SYSTEM_TIME AS OF and FOR SYSTEM_VERSION AS OF clauses are also supported
SELECT * FROM prod.db.table FOR SYSTEM_TIME AS OF '1986-10-26 01:21:00';
SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 10963874102873;
SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 'audit-branch';
SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 'historical-snapshot';

-- timestamp in seconds
SELECT * FROM prod.db.table TIMESTAMP AS OF 499162860;
SELECT * FROM prod.db.table FOR SYSTEM_TIME AS OF 499162860;

-- The branch or tag may also be specified using a similar syntax to metadata tables, with branch_<branchname> or tag_<tagname>:
SELECT * FROM prod.db.table.`branch_audit-branch`;
SELECT * FROM prod.db.table.`tag_historical-snapshot`;

表元数据查询

通过sparksql可查询表元数据信息

-- 查看表历史
SELECT * FROM {catalog}.{dbname}.{table}.history;

-- 查看元数据文件记录,一条记录就是一个“.metadata.json”文件
SELECT * from {catalog}.{dbname}.{table}.metadata_log_entries;

-- 查看表的有效快照,一条记录表示一个snapshot
SELECT * FROM {catalog}.{dbname}.{table}.snapshots;

-- 查看某个application写入的快照列表
select h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id']
from {catalog}.{dbname}.{table}.history h
join {catalog}.{dbname}.{table}.snapshots s 
on h.snapshot_id = s.snapshot_id
order by made_current_at;

-- show all the table's current manifest entries for both data and delete files.
SELECT * FROM {catalog}.{dbname}.{table}.entries;

-- 查看表当前的所有文件,包括Data File、Position Delete File和Equality Delete File
SELECT * FROM {catalog}.{dbname}.{table}.files;

-- 查看表当前的所有的Data File
SELECT * FROM {catalog}.{dbname}.{table}.data_files;

-- 查看表当前的所有的Delete File
SELECT * FROM {catalog}.{dbname}.{table}.delete_files;

-- 查看表当前manifest list
SELECT * FROM {catalog}.{dbname}.{table}.manifests;

-- 查看表当前分区信息
SELECT * FROM {catalog}.{dbname}.{table}.partitions;

-- show all positional delete files from the current snapshot of table
SELECT * FROM {catalog}.{dbname}.{table}.position_deletes;

-- 以下表信息表示所有快照范围内,并不局限于表的当前最新快照。
SELECT * FROM prod.db.table.all_data_files;
SELECT * FROM prod.db.table.all_delete_files;
SELECT * FROM prod.db.table.all_entries;
SELECT * FROM prod.db.table.all_manifests;

-- show a table's known snapshot references
SELECT * FROM prod.db.table.refs;

Spark 写

Spark-Streaming

Spark Procedures

Flink集成

Hudi

Spark集成

Flink集成