IBM Cloud Docs
Submitting Spark jobs for MoR to CoW conversion

Submitting Spark jobs for MoR to CoW conversion

Applies to : Spark engine Gluten accelerated Spark engine

This topic describes how to a run Spark job that helps to sync up Iceberg table data from Merge-on-Read (MoR) format to Copy-on-Write (CoW) format. Read operations are more efficient with Iceberg Copy-On-Write tables.

You can use one of the following methods for the conversion:

  • Register COW Table : This approach creates a named reference for the MoR table by using Iceberg Register_table API, which points to a compacted consistent version of the MoR table and serves as the CoW table. This method is recommended and is more cost-effective.
  • Change Data Capture (CDC): This approach maintains two copies of the table. One is the MoR table where the updates are made and the second is a CoW table, which serves as a mirror of the MoR table. The Spark job retrieves the changes from MoR table between last synced snapshot and latest snapshot by using Iceberg CDC procedure and merge them into CoW table. This approach is costlier than Register Table approach since it maintain a replica of MoR table.

About this task

Register COW Table approach

If you have a Merge-on-Read (MoR) table, you can specify the necessary parameter values and use the following sample payload to run the conversion job. The job syncs up the Iceberg operations in the MoR table and generates a compact Cow table.

{
    "application_details": {
        "application": "/opt/ibm/spark/builtin-apps/iceberg/iceberg-apps.jar",
        "class": "com.ibm.iceberg.apps.RegisterCowTable",
        "arguments": [
            "--catalog","<catalog-name>",
            "--database","<database-name>",
            "--mor-table","<mor-table-name>",
            "--cow-table","<cow-table-name>"
        ],
        "conf": {
            "spark.hadoop.wxd.apikey" : "Basic <encoded-api-key>"
        },
    }
}

Parameter values:

  • <catalog-name>: The Iceberg catalog where the MoR table is available.
  • <database-name> : The database where the MoR table is available.
  • <mor-table-name> : The name of the MoR table.
  • <cow-table-name> : The name of the CoW table that is synchronized with the MoR table.
  • <encoded-api-key> : The value must be in the format echo -n"ibmlhapikey_<user_id>:<user’s api key>" | base64. Here, <user_id> is the IBM Cloud ID of the user whose api key is used to access the data bucket. The <IAM_APIKEY> here is the API key of the user accessing the Object store bucket. To generate an API key, login into the watsonx.data console and navigate to Profile > Profile and Settings > API Keys and generate a new API key.

Change Data Capture (CDC) approach

If you have a Merge-on-Read (MoR) table, you can specify the necessary parameter values and use the following sample payload to run the conversion job. The job syncs up the Iceberg operations in the MoR table and generates a compact Cow table.

{
    "application_details": {
        "application": "/opt/ibm/spark/builtin-apps/iceberg/iceberg-apps.jar",
        "class": "com.ibm.iceberg.apps.CDCSync",
        "arguments": [
            "--catalog","<catalog-name>",
            "--database","<database-name>",
            "--mor-table","<mor-table-name>",
            "--cow-table","<cow-table-name>",
            "--primary-key","<primary-key>"
        ],
        "conf": {
            "spark.hadoop.wxd.apikey" : "Basic <encoded-api-key>",
        }
    }
}

Parameter values:

  • <catalog-name>: The Iceberg catalog where the MoR table is available.
  • <database-name> : The database where the MoR table is available.
  • <mor-table-name> : The name of the MoR table.
  • <cow-table-name> : The name of the CoW table that is synchronized with the MoR table.
  • <primary-key> : The primary key that is used for creating CoW table.
  • <encoded-api-key> : The value must be in the format echo -n"ibmlhapikey_<user_id>:<user’s api key>" | base64. Here, <user_id> is the IBM Cloud ID of the user whose api key is used to access the data bucket. The <IAM_APIKEY> here is the API key of the user accessing the Object store bucket. To generate an API key, login into the watsonx.data console and navigate to Profile > Profile and Settings > API Keys and generate a new API key.

Default Hardware configuration

To manually specify the number of CPU cores (Driver and Executor) and memory that is required for the workload , below configs can be modified and passed in the payload:

"num-executors" : "1",
"spark.executor.cores": "1",
"spark.executor.memory": "4G",
"spark.driver.cores": "1",
"spark.driver.memory": "4G",

For details on enabling autoscaling, see Enabling application autoscaling.