IBM Cloud Docs
通过以下方式提交星火计划申请 IBM cpdctl

通过以下方式提交星火计划申请 IBM cpdctl

适用于火花机

您可以借助 IBM Cloud Pak for Data 命令行界面 ( IBM cpdctl ) 提交 Spark 应用程序,执行冰山表维护操作。 IBM cpdctl 中提供的 sparkjob 工具允许您提交、列出和获取 Spark 应用程序的详细信息。

先决条件

  • 提供 IBM® watsonx.data 实例并添加本地 Spark 引擎。
  • 下载并安装 IBM cpdctl。 有关信息,请参阅 安装 IBM cpdctl
  • 在 IBM cpdctl 中配置 watsonx.data 环境。 有关信息,请参阅 配置 IBM cpdctl

提交星火计划申请

您可以使用 IBM cpdctl 提交涉及以下表格维护活动的 Spark 应用程序。

  • 快照管理

    • rollback_to_snapshot-将表回滚到特定快照标识。
    • rollback_to_timestamp-在特定日期和时间将表回滚到快照。
    • set_current_snapshot-设置表的当前快照标识。
    • cherrypick_snapshot-Cherry-选取从快照到当前表状态的更改。
  • 元数据管理

    • expire_snapshot-除去不再需要的旧快照及其文件。
    • remove_孤 _files-用于除去 Iceberg 表的任何元数据文件中未引用的文件。
    • rewrite_data_files-将小文件组合到更大的文件中,以降低元数据开销和运行时文件打开成本。
    • rewrite_manifests-重写表的清单以优化扫描规划。
  • 表迁移

    • register_table-为存在但没有相应目录标识的 metadata.json 文件创建目录条目。

过程

  1. 将示例 Python 文件 保存到 Cloud Object Storage 位置。 您必须在提交申请时保存以下存储详情。

如果文件保存在电脑中,您还可以提供文件的路径。 如果是,请在 local-path 字段中指定本地路径。

Python 文件包含不同表格维护操作的命令。 您可以根据自己的使用情况取消对所需部分的注释。 对于涉及目录和模式的用例,请在 Python 文件中自定义目录名、模式名和表名。

  • : 保存 Spark 应用程序的存储路径。
  • <Bucket_Name>:Spark 应用程序所在的 Cloud Object Storage 存储的名称。 该存储必须在实例中可用。
  • <Spark_File_Name> : Python 文件的名称。
  • <BUCKET_ENDPOINT> : 包含 Spark 文件的 Cloud Object Storage 存储的公共端点。
  • <BUCKET_ACCESS_KEY>:Cloud Object Storage 存储器的访问键。
  • <BUCKET_SECRET_KEY> : Cloud Object Storage 存储器的密钥。
  • <SPARK_APP_NAME> : Spark 应用程序的名称。
  • <API_KEY> : 生成 <SaaS_API_Key> (请参阅生成 API 密钥 )。
  1. 使用 IBM cpdctl 中的 sparkjob 资源中的 Create 命令提交 Spark 应用程序。 请参阅如何[使用 wx-data 命令 --help (-h) 部分,了解如何运行] ./cpdctl wx-data sparkjob create 命令。

  2. 您可以使用 sparkjob 资源中的 list 命令列出针对 Spark 引擎提交的 Spark 应用程序,也可以使用 sparkjob 资源中的 get 命令获取 Spark 应用程序的状态。

Spark python 示例文件



# SAAS INSTANCE PYTHON TEMPLATE FOR TABLE MAINTENANCE OPERATIONS


from pyspark.sql import SparkSession

def init_spark():

    spark = SparkSession.builder \
        .appName("Table Maintenance") \
        .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
        .enableHiveSupport() \
        .getOrCreate()

    return spark

def main():

    try:
        spark = init_spark()

        # For all commands related to Iceberg Table Maintenance and their details, visit the link given below:
        # https://iceberg.apache.org/docs/1.8.0/spark-procedures/


        # SNAPSHOT MANAGEMENT --------------------------------------------------------------------------------------------------------------


        # Command to get details of all Snapshots in a table
        # You can run the below command to get the details regarding all the snapshots available in a selected table
        # This command can be run using a Presto Engine to get the list of Snapshots of a Table
        # Command Format
        # SELECT committed_at, snapshot_id, parent_id, operation FROM {catalog_name}.{schema_name}."{table_name}$snapshots" ORDER BY committed_at;
        # Command Example
        # SELECT committed_at, snapshot_id, parent_id, operation FROM iceberg_data.iceberg_schema."iceberg_table$snapshots" ORDER BY committed_at;


        # Rollback to Snapshot
        # Command Format
        # spark.sql("CALL {catalog_name}.system.rollback_to_snapshot('{schema_name}.{table_name}', Snapshot_ID)").show()
        # Command Example
        # spark.sql("CALL iceberg_data.system.rollback_to_snapshot('iceberg_schema.iceberg_table', 6825707396795621602)").show()


        # Rollback to TimeStamp
        # Command Format
        # spark.sql("CALL {catalog_name}.system.rollback_to_timestamp('{schema_name}.{table_name}', TIMESTAMP '{Timestamp_of_Snapshot}')").show()
        # Command Example
        # spark.sql("CALL iceberg_data.system.rollback_to_timestamp('iceberg_schema.iceberg_table', TIMESTAMP '2025-02-28T11:49:51.892Z')").show()


        # Set Current Snapshot
        # Command Format
        # spark.sql("CALL {catalog_name}.system.set_current_snapshot('{schema_name}.{table_name}', {Snapshot_ID})").show()
        # Command Example
        # spark.sql("CALL iceberg_data.system.set_current_snapshot('iceberg_schema.iceberg_table', 8505515598581933984)").show()


        # Cherry Pick Snapshot
        # Command Format
        # spark.sql("CALL {catalog_name}.system.cherrypick_snapshot('{schema_name}.{table_name}', {Snapshot_ID})").show()
        # Command Example
        # spark.sql("CALL iceberg_data.system.cherrypick_snapshot('iceberg_schema.iceberg_table', 7141967805447891098)").show()


        # METADATA MANAGEMENT --------------------------------------------------------------------------------------------------------------


        # Expire Snapshot
        # Command Format
        # spark.sql("CALL {catalog_name}.system.expire_snapshots(table => '{schema_name}.{table_name}', snapshot_ids => ARRAY( {ID1}, {ID2}, ... ))").show()
        # Command Example
        # spark.sql("CALL iceberg_data.system.expire_snapshots(table => 'iceberg_schema.iceberg_table', snapshot_ids => ARRAY(2463746222678678017))").show()


        # Remove Orphan Files ( Only lists the Orphan Files as it is a dry run )
        # Command Format
        # spark.sql("CALL {catalog_name}.system.remove_orphan_files(table => '{schema_name}.{table_name}', dry_run => true)").show()
        # Command Example
        # spark.sql("CALL iceberg_data.system.remove_orphan_files(table => 'iceberg_schema.iceberg_table', dry_run => true)").show()


        # Remove Orphan Files ( in the mentioned folder )
        # Command Format
        # spark.sql("CALL {catalog_name}.system.remove_orphan_files(table => '{schema_name}.{table_name}', location => '{tablelocation}/data')").show()
        # Command Example
        # spark.sql("CALL iceberg_data.system.remove_orphan_files(table => 'iceberg_schema.iceberg_table', location => 's3a://iceberg_bucket/iceberg_schema/iceberg_table/data')").show()


        # Rewrite Data Files ( Default Config )
        # Command Format
        # spark.sql("CALL {catalog_name}.system.rewrite_data_files('{schema_name}.{table_name}')").show()
        # Command Example
        # spark.sql("CALL iceberg_data.system.rewrite_data_files('iceberg_schema.iceberg_table')").show()


        # Rewrite Data Files ( Sorting by id and name )
        # Command Format
        # spark.sql("CALL {catalog_name}.system.rewrite_data_files(table => '{schema_name}.{table_name}', strategy => '{strategy_type}', sort_order => '{sort order for id and name}')").show()
        # Command Example
        # spark.sql("CALL iceberg_data.system.rewrite_data_files(table => 'iceberg_schema.iceberg_table', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST')").show()


        # Rewrite Manifests
        # Command Format
        # spark.sql("CALL {catalog_name}.system.rewrite_manifests('{schema_name}.{table_name}')").show()
        # Command Example
        # spark.sql("CALL iceberg_data.system.rewrite_manifests('iceberg_schema.iceberg_table')").show()


        # MIGRATION --------------------------------------------------------------------------------------------------------------


        # Register Table
        # Command Format
        # spark.sql("CALL {catalog_name}.system.register_table( table => '{schema_name}.{new_table_name}', metadata_file => '{path/to/metadata/file.json}')").show()
        # Command Example
        # spark.sql("CALL iceberg_data.system.register_table( table => 'iceberg_schema.iceberg_table_new', metadata_file => 's3a://iceberg_bucket/iceberg_schema/iceberg_table/metadata/00000-ebea9-bb80-4a36-497ed503.metadata.json')").show()

    finally:
        spark.stop()

if __name__ == '__main__':
    main()

您必须手动将上述 Python 文件保存到 Cloud Object Storage 位置。