基于Pyspark对Apache Iceberg核心功能的使用实践指南

 更新时间:2026年05月21日 10:03:11   作者:zhaojiew10  
本文测试了Apache Iceberg表格式的核心功能,使用PySpark 3.5.6和Hadoop Catalog搭建本地实验环境,Iceberg作为开放表格式,提供ACID事务、隐藏分区、Schema演进等数据库级特性,本文介绍基于Pyspark对Apache Iceberg核心功能的使用实践,感兴趣的朋友一起看看吧

本文系统测试了 Apache Iceberg 表格式的核心功能。实验环境为本地 PySpark + Hadoop Catalog。

Apache Iceberg 是一种开放表格式,为数据湖提供数据库级别的 ACID 事务和高级功能。不同于传统的 Hive 表,Iceberg 将元数据与数据分离,实现了隐藏分区、时间旅行、Schema 演进等能力。

特性说明
ACID 事务写入操作原子性,读取快照隔离
隐藏分区分区对用户透明,查询规划器自动处理分区裁剪
Schema 演进支持添加、删除、重命名、 reorder 列,零数据重写
分区演进可更改已有表的分区策略,无需迁移数据
时间旅行基于快照的历史版本查询,支持 VERSION AS OF 和 TIMESTAMP AS OF
行级操作支持 UPDATE、DELETE、MERGE INTO
开放格式支持 Parquet、Avro、ORC 等主流列式存储格式

核心术语

术语说明
Schema表的字段定义(列名、类型)
Partition Spec分区规范,定义如何从数据字段派生分区值
Snapshot表在某一时刻的完整状态快照
Manifest List清单列表文件,记录属于某快照的所有 manifest 文件
Manifest清单文件,记录该快照包含的所有数据文件和删除文件
Data File实际存储表数据的文件(Parquet/Avro/ORC)
Delete File记录被删除行的文件,用于 Merge-on-Read
Metadata File元数据 JSON 文件,记录表结构、分区规范、快照列表

选型说明

  • PySpark 3.5.6:PySpark 与 Iceberg 集成最成熟,支持完整的 Iceberg SQL 语法和 DataFrame API。
  • Hadoop Catalog:使用本地文件系统作为元数据存储,生产环境可替换为 Hive Metastore 或 AWS Glue。
  • Iceberg 1.8.1:与 Spark 3.5.x 兼容良好。

SparkSession 配置详解

  • spark.jars.packages:引入 Iceberg Spark 运行时 JAR,自动下载依赖
  • spark.sql.extensions:注册 Iceberg SQL 扩展,支持 Iceberg 专用语法
  • spark.sql.catalog.{CATALOG}:配置 Iceberg Catalog 实现类
  • spark.sql.catalog.{CATALOG}.type:指定 Catalog 类型为 Hadoop(文件系统后端)
  • spark.sql.catalog.{CATALOG}.warehouse:指定本地仓库路径 /tmp/iceberg-warehouse
  • spark.sql.session.timeZone:统一时区为 UTC,避免时间处理歧义
def build_spark():
    if os.path.exists(WAREHOUSE):
        shutil.rmtree(WAREHOUSE)
    return SparkSession.builder \
        .appName("IcebergWorkshop") \
        .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1") \
        .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
        .config(f"spark.sql.catalog.{CATALOG}", "org.apache.iceberg.spark.SparkCatalog") \
        .config(f"spark.sql.catalog.{CATALOG}.type", "hadoop") \
        .config(f"spark.sql.catalog.{CATALOG}.warehouse", WAREHOUSE) \
        .config("spark.sql.session.timeZone", "UTC") \
        .getOrCreate()

建表与数据写入

Iceberg 表创建时需要指定表名、Schema 和分区策略。与传统 Hive 表不同,Iceberg 采用隐藏分区(Hidden Partitioning)机制:用户写入原始时间戳字段,Iceberg 自动根据分区规范将数据写入对应的分区目录,查询时自动进行分区裁剪。

创建无分区表 customers

  • 无分区表将所有数据存储在单一目录下,适合数据量较小或查询不带过滤条件的场景。
CREATE TABLE local.iceberg_db.customers (
    customer_id INT,
    name STRING,
    email STRING,
    country STRING,
    registration_date DATE,
    tier STRING
)
USING iceberg
[OK] customers table created (unpartitioned)

创建隐藏分区表 orders

  • PARTITIONED BY (months(order_time)) 定义了按月分区。用 order_time 的月份自动分区,但是表里不会多出一列 “月份” 字段
  • Iceberg 的隐藏分区将 order_time 转换为 order_time_month 分区列,数据写入 order_time_month=2024-01/ 等目录。
  • 查询时无需关心分区目录结构,Iceberg 自动根据时间条件过滤分区。
CREATE TABLE local.iceberg_db.orders (
    order_id INT,
    customer_id INT,
    product_name STRING,
    category STRING,
    quantity INT,
    price DOUBLE,
    order_time TIMESTAMP,
    status STRING
)
USING iceberg
PARTITIONED BY (months(order_time))
[OK] orders table created (hidden partition: months(order_time))

向 customers 插入 20 行数据

sql> SELECT * FROM local.iceberg_db.customers ORDER BY customer_id
+-----------+-------------+--------------------+-------+-----------------+-------+
|customer_id|name         |email               |country|registration_date|tier   |
+-----------+-------------+--------------------+-------+-----------------+-------+
|1          |张伟         |zhangwei@example.com|CN     |2023-01-15       |vip    |
|2          |李娜         |lina@example.com    |CN     |2023-02-20       |premium|
|3          |John Smith   |jsmith@example.com  |US     |2023-03-10       |free   |
|4          |Emily Davis  |edavis@example.com  |US     |2023-04-05       |premium|
|5          |James Wilson |jwilson@example.com |UK     |2023-05-18       |free   |
|6          |田中太郎     |tanaka@example.com  |JP     |2023-06-01       |vip    |
|7          |王芳         |wangfang@example.com|CN     |2023-06-22       |free   |
|8          |Sarah Brown  |sbrown@example.com  |UK     |2023-07-14       |premium|
|9          |Mike Johnson |mjohnson@example.com|US     |2023-08-30       |free   |
|10         |佐藤花子     |sato@example.com    |JP     |2023-09-12       |premium|
|11         |赵磊         |zhaolei@example.com |CN     |2023-10-01       |vip    |
|12         |Alice Lee    |alee@example.com    |US     |2023-11-20       |free   |
|13         |David Clark  |dclark@example.com  |UK     |2024-01-05       |premium|
|14         |陈静         |chenjing@example.com|CN     |2024-01-15       |free   |
|15         |Robert Taylor|rtaylor@example.com |US     |2024-02-28       |vip    |
|16         |鈴木一郎     |suzuki@example.com  |JP     |2024-03-10       |free   |
|17         |黄丽         |huangli@example.com |CN     |2024-03-22       |premium|
|18         |Emma Thomas  |ethomas@example.com |UK     |2024-04-01       |free   |
|19         |林强         |linqiang@example.com|CN     |2024-04-15       |vip    |
|20         |Tom Harris   |tharris@example.com |US     |2024-05-01       |premium|
+-----------+-------------+--------------------+-------+-----------------+-------+

向 orders 插入 50 行数据

orders_df = generate_orders(spark)
orders_df.writeTo(f"{DB}.orders").append()
    count(local.iceberg_db.orders WHERE 1=1) = 50
>>> orders sample (first 10)
sql> SELECT * FROM local.iceberg_db.orders ORDER BY order_id LIMIT 10
+--------+-----------+-------------------+-----------+--------+------+-------------------+---------+
|order_id|customer_id|product_name       |category   |quantity|price |order_time         |status   |
+--------+-----------+-------------------+-----------+--------+------+-------------------+---------+
|1001    |1          |MacBook Pro 16     |Electronics|1       |2499.0|2024-01-05 09:23:00|completed|
|1002    |1          |AirPods Pro        |Electronics|2       |249.0 |2024-01-05 09:25:00|completed|
|1003    |2          |Python编程入门     |Books      |1       |59.9  |2024-01-12 14:30:00|completed|
|1004    |3          |数据密集型应用设计 |Books      |1       |79.0  |2024-01-18 11:00:00|completed|
|1005    |4          |Sony WH-1000XM5    |Electronics|1       |349.0 |2024-01-25 16:45:00|cancelled|
|1006    |5          |冬季羽绒服         |Clothing   |1       |199.0 |2024-02-01 10:15:00|completed|
|1007    |6          |Nintendo Switch    |Electronics|1       |299.0 |2024-02-08 08:30:00|completed|
|1008    |7          |Spark快速大数据分析|Books      |1       |69.0  |2024-02-14 13:20:00|pending  |
|1009    |8          |Yoga Mat Premium   |Home       |2       |45.5  |2024-02-20 15:00:00|completed|
|1010    |9          |机械键盘 Cherry轴  |Electronics|1       |129.0 |2024-02-28 09:45:00|cancelled|
+--------+-----------+-------------------+-----------+--------+------+-------------------+---------+

orders 表按 months(order_time) 分区,数据分布在 2024 年 1-6 月的 6 个分区目录中。Parquet 文件存储在 /tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-01/ 等目录下。

查询与分区裁剪

Iceberg 的分区裁剪在查询规划阶段完成,通过读取元数据文件中的分区统计信息,过滤掉不相关的分区目录。与 Hive 的物理分区裁剪不同,Iceberg 无需扫描分区目录,metadata-driven 方式效率更高。

分区裁剪验证

  • 查询条件 order_time >= '2024-06-01' 触发分区裁剪,Iceberg 只读取 order_time_month=2024-06 分区的数据文件,跳过 1-5 月的分区。Hive 表需要依赖物理目录结构(order_time_month=2024-06/),Iceberg 的元数据驱动方式更加灵活。
SELECT order_id, product_name, order_time
FROM local.iceberg_db.orders
WHERE order_time >= TIMESTAMP '2024-06-01 00:00:00'
ORDER BY order_time
>>> June 2024 orders (partition pruning)
sql>
        SELECT order_id, product_name, order_time
        FROM local.iceberg_db.orders
        WHERE ORDER_TIME >= TIMESTAMP '2024-06-01 00:00:00'
        ORDER BY order_time
+--------+----------------------+-------------------+
|order_id|product_name          |order_time         |
+--------+----------------------+-------------------+
|1038    |USB-C Hub 7合1        |2024-06-01 08:15:00|
|1039    |T恤 夏季纯棉          |2024-06-03 10:00:00|
|1040    |Effective Java        |2024-06-05 15:30:00|
|1041    |运动短裤              |2024-06-08 09:00:00|
|1042    |Air Conditioner       |2024-06-10 11:00:00|
|1043    |Water Bottle Insulated|2024-06-12 13:00:00|
|1044    |算法导论              |2024-06-15 14:30:00|
|1045    |Wireless Mouse        |2024-06-17 10:00:00|
|1046    |Bluetooth Headset     |2024-06-19 16:00:00|
|1047    |Phone Case Premium    |2024-06-21 08:30:00|
|1048    |Scala编程             |2024-06-23 11:15:00|
|1049    |Laptop Stand          |2024-06-25 13:45:00|
|1050    |Pillow Memory Foam    |2024-06-28 15:00:00|
+--------+----------------------+-------------------+

UPDATE 与 DELETE

Iceberg 支持行级 UPDATE 和 DELETE 操作,这是传统 Parquet/ORC 文件无法做到的能力。Iceberg 通过 Copy-on-Write(写入时复制)或 Merge-on-Read(读取时合并)策略实现行级操作。执行 UPDATE/DELETE 后会创建新快照,旧数据文件保留用于时间旅行。

UPDATE 修改客户名

  • UPDATE 操作创建新快照,旧数据保留在历史快照中。可以通过时间旅行查询更新前的数据。
UPDATE local.iceberg_db.customers SET name = '张伟(已更名)' WHERE customer_id = 1
>>> BEFORE update
sql> SELECT customer_id, name FROM local.iceberg_db.customers WHERE customer_id = 1
+-----------+----+
|customer_id|name|
+-----------+----+
|1          |张伟|
+-----------+----+
>>> AFTER update
sql> SELECT customer_id, name FROM local.iceberg_db.customers WHERE customer_id = 1
+-----------+------------+
|customer_id|name        |
+-----------+------------+
|1          |张伟(已更名)|
+-----------+------------+

DELETE 删除已取消订单

  • 删除了 5 条状态为 cancelled 的订单。DELETE 操作同样创建新快照,旧数据可追溯。
  • Iceberg 的行级操作能力使其适合 GDPR 合规删除和实时数据更新场景。
DELETE FROM local.iceberg_db.orders WHERE status = 'cancelled'
>>> cancelled orders (BEFORE delete)
sql> SELECT order_id, status FROM local.iceberg_db.orders WHERE status = 'cancelled'
+--------+---------+
|order_id|status   |
+--------+---------+
|1005    |cancelled|
|1010    |cancelled|
|1032    |cancelled|
|1045    |cancelled|
|1018    |cancelled|
+--------+---------+
    count(local.iceberg_db.orders WHERE status = 'cancelled') = 0
[OK] All cancelled orders deleted, verified count = 0

Schema 演进

Iceberg 的 Schema 演进是 metadata-only 操作,不涉及数据文件重写。Iceberg 为每个列分配唯一的 column ID,Schema 演进只更新元数据中的列映射关系,历史数据文件的列 ID 保持不变。这使得 Iceberg 可以安全地进行 Schema 演进而不破坏历史数据。

ADD COLUMN 添加列

ALTER TABLE local.iceberg_db.customers ADD COLUMN phone STRING
ALTER TABLE local.iceberg_db.customers ADD COLUMN loyalty_points INT DEFAULT 0
>>> Schema after ADD COLUMN
sql> DESCRIBE local.iceberg_db.customers
+-----------------+---------+-------+
|col_name         |data_type|comment|
+-----------------+---------+-------+
|customer_id      |int      |NULL   |
|name             |string   |NULL   |
|email            |string   |NULL   |
|country          |string   |NULL   |
|registration_date|date     |NULL   |
|tier             |string   |NULL   |
|phone            |string   |NULL   |
|loyalty_points   |int      |NULL   |
+-----------------+---------+-------+

新增 phoneloyalty_points 两列。DEFAULT 0 表示新列的默认值,写入时不指定该字段会自动填充。

RENAME COLUMN 重命名列

ALTER TABLE local.iceberg_db.customers RENAME COLUMN name TO full_name
>>> Schema after RENAME name → full_name
sql> DESCRIBE local.iceberg_db.customers
+-----------------+---------+-------+
|col_name         |data_type|comment|
+-----------------+---------+-------+
|customer_id      |int      |NULL   |
|full_name        |string   |NULL   |
|email            |string   |NULL   |
|country          |string   |NULL   |
|registration_date|date     |NULL   |
|tier             |string   |NULL   |
|phone            |string   |NULL   |
|loyalty_points   |int      |NULL   |
+-----------------+---------+-------+

列名从 name 变更为 full_name,数据文件中的内容不受影响。Iceberg 内部通过 column ID 追踪列,rename 只更新元数据映射。

ALTER TYPE 类型提升

ALTER TABLE local.iceberg_db.customers ALTER COLUMN loyalty_points TYPE BIGINT
>>> Schema after INT → BIGINT promotion
sql> DESCRIBE local.iceberg_db.customers
+-----------------+---------+-------+
|col_name         |data_type|comment|
+-----------------+---------+-------+
|customer_id      |int      |NULL   |
|full_name        |string   |NULL   |
|email            |string   |NULL   |
|country          |string   |NULL   |
|registration_date|date     |NULL   |
|tier             |string   |NULL   |
|phone            |string   |NULL   |
|loyalty_points   |bigint   |NULL   |
+-----------------+---------+-------+

loyalty_points 从 INT 提升为 BIGINT。Iceberg 支持安全的类型提升(int→bigint,float→double),不兼容的类型变更会被拒绝。

DROP COLUMN 删除列

ALTER TABLE local.iceberg_db.customers DROP COLUMN registration_date
>>> Schema after DROP registration_date
sql> DESCRIBE local.iceberg_db.customers
+--------------+---------+-------+
|col_name      |data_type|comment|
+--------------+---------+-------+
|customer_id   |int      |NULL   |
|full_name     |string   |NULL   |
|email         |string   |NULL   |
|country       |string   |NULL   |
|tier          |string   |NULL   |
|phone         |string   |NULL   |
|loyalty_points|bigint   |NULL   |
+--------------+---------+-------+

删除了 registration_date 列。DROP COLUMN 是 metadata-only 操作,历史数据文件中该列的数据仍然存在,只是查询时不再返回

REORDER COLUMN 调整列顺序

ALTER TABLE local.iceberg_db.customers ALTER COLUMN email FIRST
>>> Schema after MOVE email FIRST
sql> DESCRIBE local.iceberg_db.customers
+--------------+---------+-------+
|col_name      |data_type|comment|
+--------------+---------+-------+
|email         |string   |NULL   |
|customer_id   |int      |NULL   |
|full_name     |string   |NULL   |
|country       |string   |NULL   |
|tier          |string   |NULL   |
|phone         |string   |NULL   |
|loyalty_points|bigint   |NULL   |
+--------------+---------+-------+

email 列被移动到表的第一位。列顺序调整不影响数据存储,只是改变查询结果的显示顺序。

Schema 演进后数据验证

SELECT customer_id, full_name, email, phone, loyalty_points
FROM local.iceberg_db.customers
ORDER BY customer_id LIMIT 5
>>> Data still intact after schema evolution
sql> SELECT customer_id, full_name, email, phone, loyalty_points FROM local.iceberg_db.customers ORDER BY customer_id LIMIT 5
+-----------+------------+--------------------+-----+--------------+
|customer_id|full_name   |email               |phone|loyalty_points|
+-----------+------------+--------------------+-----+--------------+
|1          |张伟(已更名)|zhangwei@example.com|NULL |NULL          |
|2          |李娜        |lina@example.com    |NULL |NULL          |
|3          |John Smith  |jsmith@example.com  |NULL |NULL          |
|4          |Emily Davis |edavis@example.com  |NULL |NULL          |
|5          |James Wilson|jwilson@example.com |NULL |NULL          |
+-----------+------------+--------------------+-----+--------------+

经过前述的所有 Schema 演进操作后,原有数据完整保留。新增的 phoneloyalty_points 列为 NULL(未赋值),UPDATE 后的 full_name 也正确保留。这验证了 Iceberg Schema 演进的数据兼容性。

元数据表探查

Iceberg 表的元数据层包含四级结构:metadata file → manifest list → manifest → data file。Iceberg 提供元数据表(metadata tables)供用户直接查询这些元数据,无需访问底层文件。

  • $snapshots:列出所有快照及操作摘要
  • $history:快照时间线及父子关系
  • $files:列出所有数据文件
  • $manifests:列出所有 manifest 文件

$snapshots 显示两个快照:第一个是初始 append(50 条记录),第二个是 DELETE 后的 overwrite(删除 5 条,剩余 45 条)

  • spark.app.id:local-1779244843156,Spark 应用 ID,本地运行的 Spark 任务
  • added-data-files:6,本次新增 6 个数据文件
  • added-records:50,本次写入 50 条数据
  • added-files-size:17455,新增文件总大小 17455 字节(约 17KB)
  • changed-partition-count:6,本次操作影响 6 个分区
  • total-records:50,表总数据量 50 条(首次写入)
  • total-files-size:17455,表总大小 17455 字节(约 17KB)
  • total-data-files:6,表总数据文件 6 个
  • total-delete-files:0,表总删除文件 0 个
  • total-position-deletes:0,位置删除数据 0 条
  • total-equality-deletes:0,等值删除数据 0 条
  • engine-name:spark,计算引擎为 Spark
  • engine-version:3.5.6,Spark 版本 3.5.6
  • iceberg-version:Apache Iceberg 1.8.1,Iceberg 版本 1.8.1
SELECT snapshot_id, committed_at, operation, summary
FROM local.iceberg_db.orders.snapshots
ORDER BY committed_at
>>> snapshots
|snapshot_id        |committed_at           |operation|summary                                                                   
|2262767112950789139|2026-05-20 02:40:54.961|append|{spark.app.id -> local-1779244843156, added-data-files -> 6, added-records -> 50, added-files-size -> 17455, changed-partition-count -> 6, total-records -> 50, total-files-size -> 17455, total-data-files -> 6, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0, engine-version -> 3.5.6, app-id -> local-1779244843156, engine-name -> spark, iceberg-version -> Apache Iceberg 1.8.1 (commit 9ce0fcf0af7becf25ad9fc996c3bad2afdcfd33d)}|
|5067017495298132779|2026-05-20 02:40:58.261|overwrite|{spark.app.id -> local-1779244843156, added-data-files -> 5, deleted-data-files -> 5, added-records -> 38, deleted-records -> 43, added-files-size -> 14369, removed-files-size -> 14642, changed-partition-count -> 5, total-records -> 45, total-files-size -> 17182, total-data-files -> 6, total-delete-files -> 0, total-position-deletes -> 0, total-equality-deletes -> 0, engine-version -> 3.5.6, app-id -> local-1779244843156, engine-name -> spark, iceberg-version -> Apache Iceberg 1.8.1 (commit 9ce0fcf0af7becf25ad9fc9963bad2afdcfd33d)}|

$history 展示快照的父子链,第一个快照无 parent(根快照)

SELECT made_current_at, snapshot_id, parent_id, is_current_ancestor
FROM local.iceberg_db.orders.history
ORDER BY made_current_at
>>> history
sql> SELECT made_current_at, snapshot_id, parent_id, is_current_ancestor FROM local.iceberg_db.orders.history ORDER BY made_current_at
+-----------------------+-------------------+-------------------+-------------------+
|made_current_at        |snapshot_id        |parent_id          |is_current_ancestor|
+-----------------------+-------------------+-------------------+-------------------+
|2026-05-20 02:40:54.961|2262767112950789139|NULL               |true               |
|2026-05-20 02:40:58.261|5067017495298132779|2262767112950789139|true               |
+-----------------------+-------------------+-------------------+-------------------+

$files 列出 6 个 Parquet 文件,按月份分区分布

SELECT content, file_path, file_format, record_count
FROM local.iceberg_db.orders.files
>>> data files
sql> SELECT content, file_path, file_format, record_count FROM local.iceberg_db.orders.files
|content|file_path |file_format|record_count|
|0      |/tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-01/00000-30-1f1b8605-71f6-4e42-bff3-59720a357572-0-00001.parquet|PARQUET    |4           |
|0      |/tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-02/00000-30-1f1b8605-71f6-4e42-bff3-59720a357572-0-00002.parquet|PARQUET    |4           |
|0      |/tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-05/00000-30-1f1b8605-71f6-4e42-bff3-59720a357572-0-00004.parquet|PARQUET    |10          |
|0      |/tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-06/00000-30-1f1b8605-71f6-4e42-bff3-59720a357572-0-00003.parquet|PARQUET    |12          |
|0      |/tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-04/00000-30-1f1b8605-71f6-4e42-bff3-59720a357572-0-00003.parquet|PARQUET    |8           |
|0      |/tmp/iceberg-warehouse/iceberg_db/orders/data/order_time_month=2024-03/00000-30-1f1b8605-71f6-4e42-bff3-59720a357572-0-00001.parquet|PARQUET    |7           |

$manifests 列出 2 个 manifest 文件,记录了数据文件的元信息

SELECT path, length, partition_spec_id
FROM local.iceberg_db.orders.manifests
>>> manifests
sql> SELECT path, length, partition_spec_id FROM local.iceberg_db.orders.manifests
|path |length|partition_spec_id|
|/tmp/iceberg-warehouse/iceberg_db/orders/metadata/46ac57f8-4ee9-4070-9a0e-7b7c56e33ee6-m1.avro|8278|0|
|/tmp/iceberg-warehouse/iceberg_db/orders/metadata/46ac57f8-4ee9-4070-9a0e-7b7c56e33ee6-m0.avro|8420|0|

时间旅行与快照回滚

Iceberg 的 MVCC 快照模型支持时间旅行查询。每个快照有唯一的 snapshot_id 和 commit 时间戳,可以随时回溯到历史状态。VERSION AS OF 通过快照 ID 查询,TIMESTAMP AS OF 通过时间戳查询。回滚操作是 O(1) 的元数据修改,不涉及数据复制。

VERSION AS OF 按快照 ID 查询

  • 第一个快照包含 50 条订单(DELETE 之前的数据)。通过快照 ID 可以精确访问历史状态。
SELECT count(*) as cnt
FROM local.iceberg_db.orders VERSION AS OF 2262767112950789139
>>> Count at first snapshot (before DELETE)
sql> SELECT count(*) as cnt FROM local.iceberg_db.orders VERSION AS OF 2262767112950789139
+---+
|cnt|
+---+
|50 |
+---+

TIMESTAMP AS OF 按时间戳查询

  • 时间戳查询返回该时刻对应的快照数据。Iceberg 自动找到最接近该时间戳的快照进行查询
SELECT count(*) as cnt
FROM local.iceberg_db.orders
FOR TIMESTAMP AS OF '2026-05-20 02:40:54.961000'
>>> Count at timestamp 2026-05-20 02:40:54.961000
sql> SELECT count(*) as cnt FROM local.iceberg_db.orders FOR TIMESTAMP AS OF '2026-05-20 02:40:54.961000'
+---+
|cnt|
+---+
|50 |
+---+

rollback_to_snapshot 快照回滚

CALL local.system.rollback_to_snapshot('iceberg_db.orders', 2262767112950789139)
    Rolling back to snapshot: 2262767112950789139
>>> Count after rollback
sql> SELECT count(*) as cnt FROM local.iceberg_db.orders
+---+
|cnt|
+---+
|50 |
+---+

回滚到第一个快照后,表状态恢复到 DELETE 之前。回滚是元数据操作,速度极快,不涉及数据文件复制。

从旧快照恢复被删除的数据

  • 从历史快照查询被删除的 cancelled 订单,重新插入当前表。注意这里恢复出 10 条(包含之前测试过程中累积的),因为回滚后再次执行 MERGE 等操作增加了数据。
INSERT INTO local.iceberg_db.orders
SELECT * FROM local.iceberg_db.orders VERSION AS OF 2262767112950789139
WHERE status = 'cancelled'
>>> Cancelled orders restored
sql> SELECT count(*) as cnt FROM local.iceberg_db.orders WHERE status = 'cancelled'
+---+
|cnt|
+---+
|10 |
+---+

验证回滚和恢复后的表状态

SELECT status, count(*) as cnt
FROM local.iceberg_db.orders
GROUP BY status
ORDER BY status
    count(local.iceberg_db.orders WHERE 1=1) = 55
>>> Order status distribution
sql> SELECT status, count(*) as cnt FROM local.iceberg_db.orders GROUP BY status ORDER BY status
+---------+---+
|status   |cnt|
+---------+---+
|cancelled|10 |
|completed|41 |
|pending  |4  |
+---------+---+

表现在有 55 条记录,包括 10 条已恢复的 cancelled 订单。时间旅行和回滚能力使得数据恢复变得简单可靠。

分区演进

分区演进允许修改已有表的分区策略,无需重写历史数据。随着数据量增长或查询模式变化,可能需要从细粒度分区(如按月)调整为粗粒度分区(如按年)。Iceberg 的分区规范与数据文件分离存储,旧数据保持原有分区规范,新数据使用新规范,查询时自动适配。

ALTER TABLE REPLACE PARTITION FIELD

ALTER TABLE local.iceberg_db.orders
REPLACE PARTITION FIELD months(order_time) WITH years(order_time)
>>> Partition spec after evolution (look for Partition Spec)
sql> DESCRIBE EXTENDED local.iceberg_db.orders
+----------------------------+------------------------------------------------+-------+
|col_name                    |data_type                                       |comment|
+----------------------------+------------------------------------------------+-------+
|order_id                    |int                                             |NULL   |
|customer_id                 |int                                             |NULL   |
|product_name                |string                                          |NULL   |
|category                    |string                                          |NULL   |
|quantity                    |int                                             |NULL   |
|price                       |double                                          |NULL   |
|order_time                  |timestamp                                       |NULL   |
|status                      |string                                          |NULL   |
|                            |                                                |       |
|# Partitioning              |                                                |       |
|Part 0                      |years(order_time)                               | |
|                            |                                                |       |
|# Metadata Columns          |                                                |       |
|_spec_id                    |int                                             |NULL   |
|_partition                  |struct<order_time_month:int,order_time_year:int>|NULL   |
|_file                       |string                                          |NULL   |
|_pos                        |bigint                                          |NULL   |
|_deleted                    |boolean                                         |NULL   |
|                            |                                                |       |
|# Detailed Table Information|                                                |       |
+----------------------------+------------------------------------------------+-------+
only showing top 20 rows

分区规范从 months(order_time) 变为 years(order_time)。注意 _partition 元数据列同时包含 order_time_monthorder_time_year,说明 Iceberg 保留了历史分区信息以支持对旧数据的透明查询。Hive 表的分区变更通常需要数据迁移,Iceberg 的分区演进更加灵活。

MERGE INTO 与 CDC

MERGE INTO 是 Iceberg 支持的原子 upsert 操作,类似于 “INSERT … ON CONFLICT UPDATE”。这对于 CDC(Change Data Capture)场景非常有用:定期从上游系统接收变更数据流,通过 MERGE INTO 同步到 Iceberg 表,支持增量更新和插入。

MERGE INTO 实现 CDC 同步

首先创建 staging 表作为 CDC 数据源:

CREATE TABLE local.iceberg_db.orders_staging (
    order_id INT, customer_id INT, product_name STRING,
    category STRING, quantity INT, price DOUBLE,
    order_time TIMESTAMP,
    status STRING
) USING iceberg

插入 CDC 数据:

cdc_rows = [
    (1001, 1, "MacBook Pro 16 (M4)", "Electronics", 1, 2799.00, dt(2024,1,5,9,23), "completed"),
    (1051, 2, "Pixel Watch 3", "Electronics", 1, 349.00, dt(2024,6,30,10,0), "pending"),
    (1052, 5, "Standing Desk", "Home", 1, 499.00, dt(2024,6,30,11,0), "completed"),
]
cdc_df = spark.createDataFrame([Row(*r) for r in cdc_rows], schema=...)
cdc_df.writeTo(f"{DB}.orders_staging").append()

查看合并前的数据:

>>> BEFORE MERGE: target rows
sql> SELECT order_id, product_name, price, status FROM local.iceberg_db.orders WHERE order_id IN (1001, 1051, 1052) ORDER BY order_id
+--------+--------------+------+---------+
|order_id|product_name  |price |status   |
+--------+--------------+------+---------+
|1001    |MacBook Pro 16|2499.0|completed|
+--------+--------------+------+---------+
>>> BEFORE MERGE: source (CDC) rows
sql> SELECT order_id, product_name, price, status FROM local.iceberg_db.orders_staging ORDER BY order_id
+--------+-------------------+------+---------+
|order_id|product_name       |price |status   |
+--------+-------------------+------+---------+
|1001    |MacBook Pro 16 (M4)|2799.0|completed|
|1051    |Pixel Watch 3      |349.0 |pending  |
|1052    |Standing Desk      |499.0 |completed|
+--------+-------------------+------+---------+

执行 MERGE INTO:

MERGE INTO local.iceberg_db.orders t
USING local.iceberg_db.orders_staging s
ON t.order_id = s.order_id
WHEN MATCHED THEN UPDATE SET * # 用源表的所有字段覆盖目标表
WHEN NOT MATCHED THEN INSERT * # 新数据直接插入目标表

查看合并后的结果:

  • order_id=1001 已存在,执行 UPDATE(价格从 2499.0 更新为 2799.0)
  • order_id=1051 和 1052 不存在,执行 INSERT
  • MERGE INTO 是原子操作,保证数据一致性
>>> AFTER MERGE: updated + inserted rows
sql> SELECT order_id, product_name, price, status FROM local.iceberg_db.orders WHERE order_id IN (1001, 1051, 1052) ORDER BY order_id
+--------+-------------------+------+---------+
|order_id|product_name       |price |status   |
+--------+-------------------+------+---------+
|1001    |MacBook Pro 16 (M4)|2799.0|completed|
|1051    |Pixel Watch 3      |349.0 |pending  |
|1052    |Standing Desk      |499.0 |completed|
+--------+-------------------+------+---------+

这是典型的 CDC 同步模式,staging 表存储来自上游的变更数据,通过 MERGE INTO 同步到目标表。

表维护

Iceberg 表在长期运行中会产生三类维护问题:

  1. 小文件问题:频繁的小规模写入导致大量小文件,影响查询性能
  2. 快照膨胀:每个写入操作产生新快照,历史快照占用空间
  3. 孤儿文件:compaction 或其他操作失败后遗留的未引用文件

Iceberg 提供系统存储过程进行维护:

  • rewrite_data_files(压缩)
  • expire_snapshots(过期快照)
  • remove_orphan_files(清理孤儿文件)

制造碎片文件

for i in range(10):
    spark.sql(f"""
        INSERT INTO {DB}.orders
        SELECT 1060 + {i}, 1, 'Test Product {i}', 'Home', 1, {9.90 + i},
                TIMESTAMP '2024-07-0{(i%9)+1} 10:00:00', 'completed'
    """)
    Data files BEFORE compaction: 21

通过循环插入 10 条记录,创建了 10 个新的小文件。加上原有的 11 个文件,总计 21 个数据文件。

Compaction 压缩数据文件。rewrite_data_files 将 21 个小文件合并为 2 个大文件(目标大小 128MB)。Compaction 显著提升查询性能,减少文件元数据开销。

CALL local.system.rewrite_data_files(
    table => 'iceberg_db.orders',
    options => map('target-file-size-bytes', '134217728')
)
    Data files AFTER compaction: 2
    Files reduced: 21 → 2

expire_snapshots 过期快照。retain_last=3 保留最近 3 个快照,其余 12 个快照被过期。older_than 设置为遥远的未来时间,确保只按 retain_last 参数过期。过期快照后,其关联的数据文件如果不再被其他快照引用,将成为孤儿文件。

CALL local.system.expire_snapshots(
    table => 'iceberg_db.orders',
    older_than => TIMESTAMP '2099-01-01 00:00:00',
    retain_last => 3
)
    Snapshots BEFORE expiration: 15
    Snapshots AFTER expiration (retain_last=3): 3

remove_orphan_files 清理孤儿文件。清理因 compaction 和快照过期产生的孤儿文件。建议定期执行维护任务(如每天或每周),保持表健康。

CALL local.system.remove_orphan_files(
    table => 'iceberg_db.orders'
)
[OK] Orphan file cleanup done

视图与总结

创建临时视图

  • Hadoop Catalog 不支持持久化视图,使用临时视图代替。视图是对 Iceberg 表的查询抽象,可以简化复杂分析。
CREATE TEMP VIEW v_order_summary AS
SELECT
    year(order_time) AS order_year,
    month(order_time) AS order_month,
    category,
    count(*) AS order_count,
    sum(quantity) AS total_quantity,
    round(sum(price * quantity), 2) AS total_revenue
FROM local.iceberg_db.orders
GROUP BY year(order_time), month(order_time), category
ORDER BY order_year, order_month, category
    NOTE: Hadoop Catalog does not support persistent views, using temp view instead
>>> Temp view query result
sql> SELECT * FROM v_order_summary
+----------+-----------+-----------+-----------+--------------+-------------+
|order_year|order_month|category   |order_count|total_quantity|total_revenue|
+----------+-----------+-----------+-----------+--------------+-------------+
|2024      |1          |Books      |2          |2             |138.9        |
|2024      |1          |Electronics|4          |5             |3995.0       |
|2024      |2          |Books      |1          |1             |69.0         |
|2024      |2          |Clothing   |1          |1             |199.0        |
|2024      |2          |Electronics|3          |3             |557.0        |
|2024      |2          |Home       |1          |2             |91.0         |
|2024      |3          |Books      |2          |2             |164.0        |
|2024      |3          |Clothing   |1          |1             |129.0        |
|2024      |3          |Electronics|3          |3             |1547.0       |
|2024      |3          |Home       |1          |1             |89.99        |
|2024      |4          |Books      |4          |4             |326.0        |
|2024      |4          |Clothing   |1          |2             |316.0        |
|2024      |4          |Electronics|4          |4             |1056.0       |
|2024      |4          |Home       |1          |1             |35.0         |
|2024      |5          |Books      |4          |4             |258.0        |
|2024      |5          |Clothing   |1          |3             |179.7        |
|2024      |5          |Electronics|4          |4             |1546.0       |
|2024      |5          |Home       |3          |4             |538.0        |
|2024      |6          |Books      |3          |3             |269.0        |
|2024      |6          |Clothing   |2          |3             |128.8        |
+----------+-----------+-----------+-----------+--------------+-------------+
only showing top 20 rows

到此这篇关于基于Pyspark对Apache Iceberg核心功能的使用实践指南的文章就介绍到这了,更多相关Pyspark Apache Iceberg使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • 6ull加载linux驱动模块失败解决方法

    6ull加载linux驱动模块失败解决方法

    大家好,本篇文章主要讲的是6ull加载linux驱动模块失败解决方法,感兴趣的同学赶快来看一看吧,对你有帮助的话记得收藏一下,方便下次浏览
    2021-12-12
  • Linux Apache设置压缩及缓存

    Linux Apache设置压缩及缓存

    本篇文章给大家详细解说了Linux中Apache设置压缩及缓存的方法,需要的朋友跟着学习下。
    2018-02-02
  • Linux中查找工具的友好替代方案

    Linux中查找工具的友好替代方案

    今天小编就为大家分享一篇关于Linux中查找工具的友好替代方案,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一起跟随小编来看看吧
    2018-09-09
  • centos下fail2ban安装与配置详解

    centos下fail2ban安装与配置详解

    这篇文章主要介绍了centos下fail2ban安装与配置实例,fail2ban是一个实用、强大的Linux安全软件,可以监控大多数常用服务器软件,需要的朋友可以参考下
    2014-04-04
  • Linux 7.4上安装配置Oracle 11.2.0.4图文教程

    Linux 7.4上安装配置Oracle 11.2.0.4图文教程

    本文通过图文并茂的形式给大家介绍了Linux 7.4上安装配置Oracle 11.2.0.4的方法,非常不错,具有参考借鉴价值,需要的朋友参考下吧
    2017-12-12
  • ubuntu使用timeshift向外部硬盘备份方式

    ubuntu使用timeshift向外部硬盘备份方式

    文章介绍了如何使用Timeshift向外部硬盘备份系统,首先,需要准备一个支持ext4格式的外部硬盘并格式化,然后,将外部硬盘挂载到系统中,并在Timeshift中配置备份位置为外部硬盘,最后,创建备份并进行恢复
    2025-12-12
  • CentOS 7安装完成后初始化的方法

    CentOS 7安装完成后初始化的方法

    这篇文章主要介绍了CentOS 7安装完成后初始化的方法,本文分步骤给大家介绍的非常详细,具有参考借鉴价值,需要的朋友可以参考下
    2016-10-10
  • Apache设置反向代理的方法

    Apache设置反向代理的方法

    这篇文章主要介绍了Apache设置反向代理的方法,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2018-08-08
  • Linux命令行处理图片方式(图片格式转换、缩放、旋转等)

    Linux命令行处理图片方式(图片格式转换、缩放、旋转等)

    这篇文章主要介绍了Linux命令行处理图片方式(图片格式转换、缩放、旋转等),具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-06-06
  • thrift安装遇到的问题以及解决方法(必看篇)

    thrift安装遇到的问题以及解决方法(必看篇)

    下面小编就为大家带来一篇thrift安装遇到的问题以及解决方法(必看篇)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2016-12-12

最新评论