通过以下方式提交星火计划申请 IBM cpdctl
适用于:火花机
您可以借助 IBM Cloud Pak for Data 命令行界面 ( IBM cpdctl
) 提交 Spark 应用程序,执行冰山表维护操作。 IBM cpdctl
中提供的 sparkjob
工具允许您提交、列出和获取 Spark 应用程序的详细信息。
先决条件
- 提供 IBM® watsonx.data 实例并添加本地 Spark 引擎。
- 下载并安装 IBM cpdctl。 有关信息,请参阅 安装 IBM cpdctl。
- 在 IBM cpdctl 中配置 watsonx.data 环境。 有关信息,请参阅 配置 IBM cpdctl。
提交星火计划申请
您可以使用 IBM cpdctl
提交涉及以下表格维护活动的 Spark 应用程序。
-
快照管理
- rollback_to_snapshot-将表回滚到特定快照标识。
- rollback_to_timestamp-在特定日期和时间将表回滚到快照。
- set_current_snapshot-设置表的当前快照标识。
- cherrypick_snapshot-Cherry-选取从快照到当前表状态的更改。
-
元数据管理
- expire_snapshot-除去不再需要的旧快照及其文件。
- remove_孤 _files-用于除去 Iceberg 表的任何元数据文件中未引用的文件。
- rewrite_data_files-将小文件组合到更大的文件中,以降低元数据开销和运行时文件打开成本。
- rewrite_manifests-重写表的清单以优化扫描规划。
-
表迁移
- register_table-为存在但没有相应目录标识的 metadata.json 文件创建目录条目。
过程
- 将示例 Python 文件 保存到 Cloud Object Storage 位置。 您必须在提交申请时保存以下存储详情。
如果文件保存在电脑中,您还可以提供文件的路径。 如果是,请在 local-path
字段中指定本地路径。
Python 文件包含不同表格维护操作的命令。 您可以根据自己的使用情况取消对所需部分的注释。 对于涉及目录和模式的用例,请在 Python 文件中自定义目录名、模式名和表名。
-
: 保存 Spark 应用程序的存储路径。 - <Bucket_Name>:Spark 应用程序所在的 Cloud Object Storage 存储的名称。 该存储必须在实例中可用。
- <Spark_File_Name> : Python 文件的名称。
- <BUCKET_ENDPOINT> : 包含 Spark 文件的 Cloud Object Storage 存储的公共端点。
- <BUCKET_ACCESS_KEY>:Cloud Object Storage 存储器的访问键。
- <BUCKET_SECRET_KEY> : Cloud Object Storage 存储器的密钥。
- <SPARK_APP_NAME> : Spark 应用程序的名称。
- <API_KEY> : 生成 <SaaS_API_Key> (请参阅生成 API 密钥 )。
-
使用 IBM
cpdctl
中的 sparkjob 资源中的Create
命令提交 Spark 应用程序。 请参阅如何[使用 wx-data 命令 --help (-h) 部分,了解如何运行]./cpdctl wx-data sparkjob create
命令。 -
您可以使用 sparkjob 资源中的 list 命令列出针对 Spark 引擎提交的 Spark 应用程序,也可以使用 sparkjob 资源中的 get 命令获取 Spark 应用程序的状态。
Spark python 示例文件
# SAAS INSTANCE PYTHON TEMPLATE FOR TABLE MAINTENANCE OPERATIONS
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder \
.appName("Table Maintenance") \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.enableHiveSupport() \
.getOrCreate()
return spark
def main():
try:
spark = init_spark()
# For all commands related to Iceberg Table Maintenance and their details, visit the link given below:
# https://iceberg.apache.org/docs/1.8.0/spark-procedures/
# SNAPSHOT MANAGEMENT --------------------------------------------------------------------------------------------------------------
# Command to get details of all Snapshots in a table
# You can run the below command to get the details regarding all the snapshots available in a selected table
# This command can be run using a Presto Engine to get the list of Snapshots of a Table
# Command Format
# SELECT committed_at, snapshot_id, parent_id, operation FROM {catalog_name}.{schema_name}."{table_name}$snapshots" ORDER BY committed_at;
# Command Example
# SELECT committed_at, snapshot_id, parent_id, operation FROM iceberg_data.iceberg_schema."iceberg_table$snapshots" ORDER BY committed_at;
# Rollback to Snapshot
# Command Format
# spark.sql("CALL {catalog_name}.system.rollback_to_snapshot('{schema_name}.{table_name}', Snapshot_ID)").show()
# Command Example
# spark.sql("CALL iceberg_data.system.rollback_to_snapshot('iceberg_schema.iceberg_table', 6825707396795621602)").show()
# Rollback to TimeStamp
# Command Format
# spark.sql("CALL {catalog_name}.system.rollback_to_timestamp('{schema_name}.{table_name}', TIMESTAMP '{Timestamp_of_Snapshot}')").show()
# Command Example
# spark.sql("CALL iceberg_data.system.rollback_to_timestamp('iceberg_schema.iceberg_table', TIMESTAMP '2025-02-28T11:49:51.892Z')").show()
# Set Current Snapshot
# Command Format
# spark.sql("CALL {catalog_name}.system.set_current_snapshot('{schema_name}.{table_name}', {Snapshot_ID})").show()
# Command Example
# spark.sql("CALL iceberg_data.system.set_current_snapshot('iceberg_schema.iceberg_table', 8505515598581933984)").show()
# Cherry Pick Snapshot
# Command Format
# spark.sql("CALL {catalog_name}.system.cherrypick_snapshot('{schema_name}.{table_name}', {Snapshot_ID})").show()
# Command Example
# spark.sql("CALL iceberg_data.system.cherrypick_snapshot('iceberg_schema.iceberg_table', 7141967805447891098)").show()
# METADATA MANAGEMENT --------------------------------------------------------------------------------------------------------------
# Expire Snapshot
# Command Format
# spark.sql("CALL {catalog_name}.system.expire_snapshots(table => '{schema_name}.{table_name}', snapshot_ids => ARRAY( {ID1}, {ID2}, ... ))").show()
# Command Example
# spark.sql("CALL iceberg_data.system.expire_snapshots(table => 'iceberg_schema.iceberg_table', snapshot_ids => ARRAY(2463746222678678017))").show()
# Remove Orphan Files ( Only lists the Orphan Files as it is a dry run )
# Command Format
# spark.sql("CALL {catalog_name}.system.remove_orphan_files(table => '{schema_name}.{table_name}', dry_run => true)").show()
# Command Example
# spark.sql("CALL iceberg_data.system.remove_orphan_files(table => 'iceberg_schema.iceberg_table', dry_run => true)").show()
# Remove Orphan Files ( in the mentioned folder )
# Command Format
# spark.sql("CALL {catalog_name}.system.remove_orphan_files(table => '{schema_name}.{table_name}', location => '{tablelocation}/data')").show()
# Command Example
# spark.sql("CALL iceberg_data.system.remove_orphan_files(table => 'iceberg_schema.iceberg_table', location => 's3a://iceberg_bucket/iceberg_schema/iceberg_table/data')").show()
# Rewrite Data Files ( Default Config )
# Command Format
# spark.sql("CALL {catalog_name}.system.rewrite_data_files('{schema_name}.{table_name}')").show()
# Command Example
# spark.sql("CALL iceberg_data.system.rewrite_data_files('iceberg_schema.iceberg_table')").show()
# Rewrite Data Files ( Sorting by id and name )
# Command Format
# spark.sql("CALL {catalog_name}.system.rewrite_data_files(table => '{schema_name}.{table_name}', strategy => '{strategy_type}', sort_order => '{sort order for id and name}')").show()
# Command Example
# spark.sql("CALL iceberg_data.system.rewrite_data_files(table => 'iceberg_schema.iceberg_table', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST')").show()
# Rewrite Manifests
# Command Format
# spark.sql("CALL {catalog_name}.system.rewrite_manifests('{schema_name}.{table_name}')").show()
# Command Example
# spark.sql("CALL iceberg_data.system.rewrite_manifests('iceberg_schema.iceberg_table')").show()
# MIGRATION --------------------------------------------------------------------------------------------------------------
# Register Table
# Command Format
# spark.sql("CALL {catalog_name}.system.register_table( table => '{schema_name}.{new_table_name}', metadata_file => '{path/to/metadata/file.json}')").show()
# Command Example
# spark.sql("CALL iceberg_data.system.register_table( table => 'iceberg_schema.iceberg_table_new', metadata_file => 's3a://iceberg_bucket/iceberg_schema/iceberg_table/metadata/00000-ebea9-bb80-4a36-497ed503.metadata.json')").show()
finally:
spark.stop()
if __name__ == '__main__':
main()
您必须手动将上述 Python 文件保存到 Cloud Object Storage 位置。