Submitting Spark application by using IBM cpdctl
Applies to : Spark engine
You can use IBM Cloud Pak for Data Command Line Interface (IBM cpdctl
) to submit a Spark application in watsonx.data. The sparkjob
utility available in the IBM cpdctl
allows you to submit, list and get the
details of a Spark application.
Prerequisites
- Provision IBM® watsonx.data instance and add a native Spark engine.
- Download and install IBM cpdctl. For information, see Installing IBM cpdctl.
- Configure the watsonx.data environment in IBM cpdctl. For information, see Configure IBM cpdctl.
Procedure
This section details the procedure to submit a Spark application. You can create your own Spark application or use the sample file available in this topic as reference. The sample file is used
to perform Iceberg table maintenance operations using sparkjob
.
To automatically perform the table maintenance operation, see Table maintenance operations.
-
Save the sample Python file to a Cloud Object Storage location. You must save the following details of the storage, which is required at the time of submitting the application.
You can also provide the path to the file if it is saved in your computer. Specify the local path in the
local-path
field.The Python file includes commands for the different table maintenance operations. You can uncomment the required section based on your use case scenario. For the use case that involves catalog and schema, customize the catalog_name, schema_name and table_name in the Python file. Also, to customize and use a different query format for table maintenance operations, see Iceberg procedures.
You can use Amazon S3A, ADLS and GCS storage for Spark job submission.
<Path>
: The path of the storage where the Spark application is saved.
You must manually save the Python file to a Cloud Object Storage location before using the
<Path>
variable.<Bucket_Name>
: The name of the Cloud Object Storage storage where the Spark application resides. This storage must be available in the instance and should be associated with the Spark engine.<Spark_File_Name>
: The name of the Python file.<BUCKET_ENDPOINT>
: Public endpoint of the Cloud Object Storage storage containing Spark file.<BUCKET_ACCESS_KEY>
: Access key of the Cloud Object Storage storage.<BUCKET_SECRET_KEY>
: Secret Key of the Cloud Object Storage storage.<SPARK_APP_NAME>
: Name of the Spark application.<API_KEY>
: Generate the <SaaS_API_Key> (See Generating the API key).
-
Use the
Create
command in the sparkjob resource available in IBMcpdctl
to submit the Spark application. See the [How to use wx-data command --help (-h)] section to understand how to run the./cpdctl wx-data sparkjob create
command. -
You can list the Spark applications submitted against a Spark engine by using the list command in the sparkjob resource and also get the status of a Spark application by using the get command in the sparkjob resource.
Spark sample python file
# SAAS INSTANCE PYTHON TEMPLATE FOR TABLE MAINTENANCE OPERATIONS
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder \
.appName("Table Maintenance") \
.config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions') \
.enableHiveSupport() \
.getOrCreate()
return spark
def main():
try:
spark = init_spark()
# For all commands related to Iceberg Table Maintenance and their details, visit the link given below:
# https://iceberg.apache.org/docs/1.8.0/spark-procedures/
# SNAPSHOT MANAGEMENT --------------------------------------------------------------------------------------------------------------
# Command to get details of all Snapshots in a table
# You can run the below command to get the details regarding all the snapshots available in a selected table
# This command can be run using a Presto Engine to get the list of Snapshots of a Table
# Command Format
# SELECT committed_at, snapshot_id, parent_id, operation FROM {catalog_name}.{schema_name}."{table_name}$snapshots" ORDER BY committed_at;
# Command Example
# SELECT committed_at, snapshot_id, parent_id, operation FROM iceberg_data.iceberg_schema."iceberg_table$snapshots" ORDER BY committed_at;
# Rollback to Snapshot
# Command Format
# spark.sql("CALL {catalog_name}.system.rollback_to_snapshot('{schema_name}.{table_name}', Snapshot_ID)").show()
# Command Example
# spark.sql("CALL iceberg_data.system.rollback_to_snapshot('iceberg_schema.iceberg_table', 6825707396795621602)").show()
# Rollback to TimeStamp
# Command Format
# spark.sql("CALL {catalog_name}.system.rollback_to_timestamp('{schema_name}.{table_name}', TIMESTAMP '{Timestamp_of_Snapshot}')").show()
# Command Example
# spark.sql("CALL iceberg_data.system.rollback_to_timestamp('iceberg_schema.iceberg_table', TIMESTAMP '2025-02-28T11:49:51.892Z')").show()
# Set Current Snapshot
# Command Format
# spark.sql("CALL {catalog_name}.system.set_current_snapshot('{schema_name}.{table_name}', {Snapshot_ID})").show()
# Command Example
# spark.sql("CALL iceberg_data.system.set_current_snapshot('iceberg_schema.iceberg_table', 8505515598581933984)").show()
# Cherry Pick Snapshot
# Command Format
# spark.sql("CALL {catalog_name}.system.cherrypick_snapshot('{schema_name}.{table_name}', {Snapshot_ID})").show()
# Command Example
# spark.sql("CALL iceberg_data.system.cherrypick_snapshot('iceberg_schema.iceberg_table', 7141967805447891098)").show()
# METADATA MANAGEMENT --------------------------------------------------------------------------------------------------------------
# Expire Snapshot
# Command Format
# spark.sql("CALL {catalog_name}.system.expire_snapshots(table => '{schema_name}.{table_name}', snapshot_ids => ARRAY( {ID1}, {ID2}, ... ))").show()
# Command Example
# spark.sql("CALL iceberg_data.system.expire_snapshots(table => 'iceberg_schema.iceberg_table', snapshot_ids => ARRAY(2463746222678678017))").show()
# Remove Orphan Files ( Only lists the Orphan Files as it is a dry run )
# Command Format
# spark.sql("CALL {catalog_name}.system.remove_orphan_files(table => '{schema_name}.{table_name}', dry_run => true)").show()
# Command Example
# spark.sql("CALL iceberg_data.system.remove_orphan_files(table => 'iceberg_schema.iceberg_table', dry_run => true)").show()
# Remove Orphan Files ( in the mentioned folder )
# Command Format
# spark.sql("CALL {catalog_name}.system.remove_orphan_files(table => '{schema_name}.{table_name}', location => '{tablelocation}/data')").show()
# Command Example
# spark.sql("CALL iceberg_data.system.remove_orphan_files(table => 'iceberg_schema.iceberg_table', location => 's3a://iceberg_bucket/iceberg_schema/iceberg_table/data')").show()
# Rewrite Data Files ( Default Config )
# Command Format
# spark.sql("CALL {catalog_name}.system.rewrite_data_files('{schema_name}.{table_name}')").show()
# Command Example
# spark.sql("CALL iceberg_data.system.rewrite_data_files('iceberg_schema.iceberg_table')").show()
# Rewrite Data Files ( Sorting by id and name )
# Command Format
# spark.sql("CALL {catalog_name}.system.rewrite_data_files(table => '{schema_name}.{table_name}', strategy => '{strategy_type}', sort_order => '{sort order for id and name}')").show()
# Command Example
# spark.sql("CALL iceberg_data.system.rewrite_data_files(table => 'iceberg_schema.iceberg_table', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST')").show()
# Rewrite Manifests
# Command Format
# spark.sql("CALL {catalog_name}.system.rewrite_manifests('{schema_name}.{table_name}')").show()
# Command Example
# spark.sql("CALL iceberg_data.system.rewrite_manifests('iceberg_schema.iceberg_table')").show()
# MIGRATION --------------------------------------------------------------------------------------------------------------
# Register Table
# Command Format
# spark.sql("CALL {catalog_name}.system.register_table( table => '{schema_name}.{new_table_name}', metadata_file => '{path/to/metadata/file.json}')").show()
# Command Example
# spark.sql("CALL iceberg_data.system.register_table( table => 'iceberg_schema.iceberg_table_new', metadata_file => 's3a://iceberg_bucket/iceberg_schema/iceberg_table/metadata/00000-ebea9-bb80-4a36-497ed503.metadata.json')").show()
finally:
spark.stop()
if __name__ == '__main__':
main()