Iceberg
Spark集成
配置catalog
spark操作iceberg首先需要配置catalog,以便spark引擎能通过catalog获取iceberg库表的元数据信息。
spark注册catalog的属性前缀为spark.sql.catalog.{catalog_name}
配置方式有两种:
(1)SparkContext启动时配置
例如,以下配置了两种catalog,一种时spark内置的catalog(spark_catalog),另一种是基于hadoop的catalog(local)。
spark-sql --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.10.0\
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf spark.sql.catalog.local.warehouse=$PWD/warehouse(2)spark-defaults.conf静态文件配置
例如,以下配置了三种不同类型的catalog
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port
# omit uri to use the same URI as Spark: hive.metastore.uris in hive-site.xml
spark.sql.catalog.rest_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.rest_prod.type = rest
spark.sql.catalog.rest_prod.uri = http://localhost:8080
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://nn:8020/warehouse/path其他iceberg catalog的配置可参考官方文档https://iceberg.apache.org/docs/latest/spark-configuration/#catalog-configuration
spark无法动态配置iceberg catalog,只能在SparkContext启动时或者spark-defaults.conf配置文件中进行静态的配置。
Spark DDL
主要包含以下操作
(1)CREATE TABLE
# 使用‘USING iceberg’创建一张普通的iceberg表
CREATE TABLE my_catalog.my_db.my_table001 (
id bigint NOT NULL COMMENT 'unique id',
data string)
USING iceberg;
# 创建分区表(非隐式分区)
CREATE TABLE my_catalog.my_db.my_table002 (
id bigint,
data string,
category string)
USING iceberg
PARTITIONED BY (category);
# 创建隐式分区表
CREATE TABLE my_catalog.my_db.my_table003 (
id bigint,
data string,
category string,
ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category);(2)CREATE TABLE ... AS SELECT
当使用SparkCatalog时,此操作为原子操作。当使用SparkSessionCatalog时,此操作不是原子操作。
# 普通表
CREATE TABLE my_catalog.my_db.my_table004
USING iceberg
AS SELECT ...
# 新表不会继承select源表的分区信息和表属性,需要在新表中重新指定对应的信息
CREATE TABLE my_catalog.my_db.my_table005
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT ...(3)REPLACE TABLE ... AS SELECT
同样,当使用SparkCatalog时,此操作为原子操作。当使用SparkSessionCatalog时,此操作不是原子操作。
原子的表替换会为表创建一个新的snapshot,而不是保留表的历史snapshots。
# 替换普通表,保留原表属性和分区信息
REPLACE TABLE my_catalog.my_db.my_table006
USING iceberg
AS SELECT ...
# 替换表,修改表属性和分区信息
REPLACE TABLE my_catalog.my_db.my_table007
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT ...
# 如果表不存在则新建
CREATE OR REPLACE TABLE my_catalog.my_db.my_table008
USING iceberg
AS SELECT ...表结构和分区信息如果被修改了,则会被替换。为了避免修改表的结构和分区信息,可以使用INSERT OVERWRITE,而不是REPLACE TABLE。
REPLACE TABLE语句中指定的表属性会和原表的属性进行合并。
(4)DROP TABLE
对于DROP TABLE操作来说,0.14版本是个分水岭:0.14版本之前,DROP TABLE会清理表的元数据和删除表数据;0.14版本及以后,DROP TABLE只会清理表的元数据,如果要删除表数据,则需要使用“DROP TABLE PURGE”。
# 清理表的元数据
DROP TABLE my_catalog.my_db.my_table001;
# 清理表的元数据和删除表数据
DROP TABLE my_catalog.my_db.my_table002 PURGE;(5)ALTER TABLE
Iceberg具备Spark SQL中对ALTER TABLE的完整支持。
# 表重命名
ALTER TABLE my_catalog.my_db.my_table009 RENAME TO my_catalog.my_db.my_table_rename;
# 新增或更新表属性
ALTER TABLE my_catalog.my_db.my_table009 SET TBLPROPERTIES (
'read.split.target-size'='268435456'
);
# 删除表属性
ALTER TABLE my_catalog.my_db.my_table009 UNSET TBLPROPERTIES ('read.split.target-size');
# 设置表的描述
ALTER TABLE my_catalog.my_db.my_table009 SET TBLPROPERTIES (
'comment' = 'A table comment.'
);
# 新增列
ALTER TABLE my_catalog.my_db.my_table009
ADD COLUMNS (
new_column string comment 'new_column docs'
);
# 修改列名
ALTER TABLE my_catalog.my_db.my_table010 RENAME COLUMN data TO payload;
# 修改列定义,包括类型、描述等
ALTER TABLE my_catalog.my_db.my_table011 ALTER COLUMN measurement TYPE double COMMENT 'unit is bytes per second';
ALTER TABLE my_catalog.my_db.my_table011 ALTER COLUMN measurement COMMENT 'unit is kilobytes per second';
# 删除列
ALTER TABLE prod.db.sample DROP COLUMN id;(6)ALTER TABLE SQL extensions
Iceberg-0.11版本新增了一个扩展模块,用于给Spark新增一些SQL命令,比如
CALL用于存储过程
ALTER TABLE ... WRITE ORDERED BY用于变更表定义
如需使用此扩展模块,需要新增以下配置
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensionsALTER TABLE SQL extensions用法如下
# 新增分区字段(非隐式分区)
ALTER TABLE my_catalog.my_db.my_table012 ADD PARTITION FIELD abc; -- identity transform
# 新增分区字段(隐式分区)
ALTER TABLE my_catalog.my_db.my_table013 ADD PARTITION FIELD bucket(16, id);
ALTER TABLE my_catalog.my_db.my_table014 ADD PARTITION FIELD truncate(4, data);
ALTER TABLE my_catalog.my_db.my_table015 ADD PARTITION FIELD year(ts);
-- use optional AS keyword to specify a custom name for the partition field
ALTER TABLE my_catalog.my_db.my_table016 ADD PARTITION FIELD bucket(16, id) AS shard;
# 删除分区字段
ALTER TABLE my_catalog.my_db.my_table017 DROP PARTITION FIELD catalog;
ALTER TABLE my_catalog.my_db.my_table018 DROP PARTITION FIELD bucket(16, id);
ALTER TABLE my_catalog.my_db.my_table019 DROP PARTITION FIELD truncate(4, data);
ALTER TABLE my_catalog.my_db.my_table020 DROP PARTITION FIELD year(ts);
ALTER TABLE my_catalog.my_db.my_table021 DROP PARTITION FIELD shard;
# 更换分区字段
ALTER TABLE my_catalog.my_db.my_table022 REPLACE PARTITION FIELD ts_day WITH day(ts);
-- use optional AS keyword to specify a custom name for the new partition field
ALTER TABLE my_catalog.my_db.my_table023 REPLACE PARTITION FIELD ts_day WITH day(ts) AS day_of_ts;
# 更换write order by字段
ALTER TABLE my_catalog.my_db.my_table024 WRITE ORDERED BY category, id
-- use optional ASC/DESC keyword to specify sort order of each field (default ASC)
ALTER TABLE my_catalog.my_db.my_table025 WRITE ORDERED BY category ASC, id DESC
-- use optional NULLS FIRST/NULLS LAST keyword to specify null order of each field (default FIRST)
ALTER TABLE my_catalog.my_db.my_table026 WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST
# WRITE ORDERED BY表示任务之间的数据是排序的,如果使用LOCALLY ORDERED BY,表示单个任务内是有序的,任务之间不一定是有序的
ALTER TABLE my_catalog.my_db.my_table027 WRITE LOCALLY ORDERED BY category, id
# 取消table write order的排序功能
ALTER TABLE my_catalog.my_db.my_table028 WRITE UNORDERED
# 使每个分区被某一个writer处理(默认实现是数据hash分布在writer处理)
ALTER TABLE my_catalog.my_db.my_table029 WRITE DISTRIBUTED BY PARTITIONSpark 读
基本查询功能
iceberg支持通用的sparksql语法进行表数据查询
时间旅行查询
spark3.3以后,支持TIMESTAMP AS OF 和 VERSION AS OF进行时间旅行查询。
-- time travel to October 26, 1986 at 01:21:00
SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
-- time travel to snapshot with id 10963874102873L
SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
-- time travel to the head snapshot of audit-branch
SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
-- time travel to the snapshot referenced by the tag historical-snapshot
SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';
-- FOR SYSTEM_TIME AS OF and FOR SYSTEM_VERSION AS OF clauses are also supported
SELECT * FROM prod.db.table FOR SYSTEM_TIME AS OF '1986-10-26 01:21:00';
SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 10963874102873;
SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 'audit-branch';
SELECT * FROM prod.db.table FOR SYSTEM_VERSION AS OF 'historical-snapshot';
-- timestamp in seconds
SELECT * FROM prod.db.table TIMESTAMP AS OF 499162860;
SELECT * FROM prod.db.table FOR SYSTEM_TIME AS OF 499162860;
-- The branch or tag may also be specified using a similar syntax to metadata tables, with branch_<branchname> or tag_<tagname>:
SELECT * FROM prod.db.table.`branch_audit-branch`;
SELECT * FROM prod.db.table.`tag_historical-snapshot`;表元数据查询
通过sparksql可查询表元数据信息
-- 查看表历史
SELECT * FROM {catalog}.{dbname}.{table}.history;
-- 查看元数据文件记录,一条记录就是一个“.metadata.json”文件
SELECT * from {catalog}.{dbname}.{table}.metadata_log_entries;
-- 查看表的有效快照,一条记录表示一个snapshot
SELECT * FROM {catalog}.{dbname}.{table}.snapshots;
-- 查看某个application写入的快照列表
select h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary['spark.app.id']
from {catalog}.{dbname}.{table}.history h
join {catalog}.{dbname}.{table}.snapshots s
on h.snapshot_id = s.snapshot_id
order by made_current_at;
-- show all the table's current manifest entries for both data and delete files.
SELECT * FROM {catalog}.{dbname}.{table}.entries;
-- 查看表当前的所有文件,包括Data File、Position Delete File和Equality Delete File
SELECT * FROM {catalog}.{dbname}.{table}.files;
-- 查看表当前的所有的Data File
SELECT * FROM {catalog}.{dbname}.{table}.data_files;
-- 查看表当前的所有的Delete File
SELECT * FROM {catalog}.{dbname}.{table}.delete_files;
-- 查看表当前manifest list
SELECT * FROM {catalog}.{dbname}.{table}.manifests;
-- 查看表当前分区信息
SELECT * FROM {catalog}.{dbname}.{table}.partitions;
-- show all positional delete files from the current snapshot of table
SELECT * FROM {catalog}.{dbname}.{table}.position_deletes;
-- 以下表信息表示所有快照范围内,并不局限于表的当前最新快照。
SELECT * FROM prod.db.table.all_data_files;
SELECT * FROM prod.db.table.all_delete_files;
SELECT * FROM prod.db.table.all_entries;
SELECT * FROM prod.db.table.all_manifests;
-- show a table's known snapshot references
SELECT * FROM prod.db.table.refs;Spark 写
Spark-Streaming
Spark Procedures