IBM Cloud Docs
Getting started with Spark use case

Getting started with Spark use case

This topic provides the procedure to run Spark use cases for watsonx.data by using Python samples. All the samples are written by using Spark Python APIs.

Prerequisites

  • Provision an IBM® watsonx.data instance.
  • Configure an IBM Analytics Engine instance.
  • Cloud Object Storage bucket connection details.
  • To enable your Spark application to work with the watsonx.data catalog and storage, you must have Metastore admin role. Without Metastore admin privilege, you cannot ingest data to storage using Native Spark engine. For more information about the Spark configuration, see Working with the watsonx.data catalog and storage.

About the sample use case

The sample file demonstrates the following functionalities:

  • Accessing tables from watsonx.data

    The Create a database in Lakehouse catalog section from the sample python file creates a database demodb in the configured watsonx.data instance with a catalog named lakehouse. demodb is configured to store all the data and metadata under the Cloud Object Storage(COS) bucket lakehouse-bucket. It also creates an iceberg table testTable and accesses it.

  • Ingesting data to watsonx.data

    The Ingest parquet data into a lakehouse table section from the sample python file allows you to ingest data in parquet and CSV format from a source Cloud Object Storage bucket source-bucket into a watsonx.data table. Sample data in parquet format is inserted from source COS bucket source-bucket into the watsonx.data table yellow_taxi_2022 (see the steps for inserting sample data into the source COS bucket). It also shows ingesting data in CSV format from COS bucket source-bucket into the table zipcode in the database demodb.

  • Modifying schema in watsonx.data

    The Schema evolution section from the sample python file allows you to modify data in watsonx.data.

  • Performing table maintenance activities in watsonx.data

    Table maintenance helps in keeping the watsonx.data table performant. Iceberg provides table maintenance procedures out of the box that allows performing powerful table optimizations in a declarative fashion. The following sample demonstrates how to do some table maintenance operations by using Spark. For more information about the Iceberg Spark table maintenance operations, see Table Operations.

Inserting sample data into the COS bucket

To insert data to COS, do the following steps.

  1. Create a COS bucket(for example, source-bucket) to store sample data to be ingested into watsonx.data instance. For information about creating COS bucket see, Getting started with IBM Cloud Object Storage.

    As a user of Object Storage, you not only need to know the API key or the HMAC keys to configure Object Storage, but also the IBM Analytics Engine service endpoints to connect to Object Storage. See Selecting regions and endpoints for more information on the endpoints to use based on your Object Storage bucket type, such as regional versus cross-regional. You can also view the endpoints across regions for your Object Storage service by selecting the service on your IBM Cloud dashboard and clicking Endpoint in the navigation pane. Always choose the direct endpoint. Direct endpoint provide better performance and do not incur charges. An example of an endpoint for US-South Cross region is s3.direct.us.cloud-object-storage.appdomain.cloud.

  2. Download sample csv file (for example, zipcodes.csv) and parquet sample data (for example, six months taxi data for year 2022) from the following links.

  3. Install IBM Cloud Object Storage plug-in. For more information about how to install plug-in, see IBM Cloud Object Storage CLI.

  4. Use the COS cli to upload the sample data into COS bucket.

    ibmcloud cos upload --bucket <cos_bucket_name> --key <source_file_name> --file <path_to_source_file>
    

    Parameter values:

    • <cos_bucket_name>: name of the bucket created in step1.
    • <source_file_name>: the name of the sample data file that you downloaded. Here, key zipcodes.csv is the file name (see the following example).
    • <path_to_source_file>: the path to the location in your machine where the file resides. Here, path/zipcodes.csv is the file path (see the following example).

    For example:

    ibmcloud cos upload --bucket source-bucket --key zipcodes.csv --file <path/zipcodes.csv>
    

Running the sample use case

Follow the steps to run the Spark sample Python file.

Spark sample Python file


    from pyspark.sql import SparkSession
    import os

    def init_spark():
    spark = SparkSession.builder \
        .appName("lh-hms-cloud") \
        .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.endpoint" ,"s3.direct.us-south.cloud-object-storage.appdomain.cloud") \
        .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.access.key" ,"<lakehouse-bucket-access-key>") \
        .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.secret.key" ,"<lakehouse-bucket-secret-key>") \
        .config("spark.hadoop.fs.s3a.bucket.source-bucket.endpoint" ,"s3.direct.us-south.cloud-object-storage.appdomain.cloud") \
        .config("spark.hadoop.fs.s3a.bucket.source-bucket.access.key" ,"<source-bucket-access-key>") \
        .config("spark.hadoop.fs.s3a.bucket.source-bucket.secret.key" ,"<source-bucket-secret-key>") \
        .enableHiveSupport() \
        .getOrCreate()
    return spark

    def create_database(spark):
        # Create a database in the lakehouse catalog
        spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")

    def list_databases(spark):
        # list the database under lakehouse catalog
        spark.sql("show databases from lakehouse").show()

    def basic_iceberg_table_operations(spark):
        # demonstration: Create a basic Iceberg table, insert some data and then query table
        spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
        spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
        spark.sql("select * from lakehouse.demodb.testTable").show()

    def create_table_from_parquet_data(spark):
        # load parquet data into dataframe
        df = spark.read.option("header",True).parquet("s3a://source-bucket/nyc-taxi/yellow_tripdata_2022-01.parquet")
        # write the dataframe into an Iceberg table
        df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
        # describe the table created
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()

    def ingest_from_csv_temp_table(spark):
        # load csv data into a dataframe
        csvDF = spark.read.option("header",True).csv("s3a://source-bucket/zipcodes.csv")
        csvDF.createOrReplaceTempView("tempCSVTable")
        # load temporary table into an Iceberg table
        spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
        # describe the table created
        spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.zipcodes').show()

    def ingest_monthly_data(spark):
        df_feb = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-02.parquet")
        df_march = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-03.parquet")
        df_april = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-04.parquet")
        df_may = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-05.parquet")
        df_june = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-06.parquet")

        df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
        df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")

    def perform_table_maintenance_operations(spark):
        # Query the metadata files table to list underlying data files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()

        # There are many smaller files compact them into files of 200MB each using the
        # `rewrite_data_files` Iceberg Spark procedure
        spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()

        # Again, query the metadata files table to list underlying data files; 6 files are compacted
        # to 3 files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()

        # List all the snapshots
        # Expire earlier snapshots. Only latest one with compacted data is required
        # Again, List all the snapshots to see only 1 left
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
        #retain only the latest one
        latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
        print (latest_snapshot_committed_at)
        spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()

        # Removing Orphan data files
        spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)

        # Rewriting Manifest Files
        spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()


    def evolve_schema(spark):
        # demonstration: Schema evolution
        # Add column fare_per_mile to the table
        spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
        # describe the table
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)


    def clean_database(spark):
        # clean-up the demo database
        spark.sql('drop table if exists lakehouse.demodb.testTable purge')
        spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
        spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
        spark.sql('drop database if exists lakehouse.demodb cascade')

    def main():
        try:
            spark = init_spark()

            create_database(spark)
            list_databases(spark)

            basic_iceberg_table_operations(spark)

            # demonstration: Ingest parquet and csv data into a watsonx.data Iceberg table
            create_table_from_parquet_data(spark)
            ingest_from_csv_temp_table(spark)

            # load data for the month of February to June into the table yellow_taxi_2022 created above
            ingest_monthly_data(spark)

            # demonstration: Table maintenance
            perform_table_maintenance_operations(spark)

            # demonstration: Schema evolution
            evolve_schema(spark)
        finally:
            # clean-up the demo database
            clean_database(spark)
            spark.stop()

    if __name__ == '__main__':
    main()
  1. Save the following sample python file.
  2. Upload the Python file to the Cloud Object Storage bucket. You must maintain the Spark applications and their dependencies in a Cloud Object Storage bucket and not mix them with data buckets.
  3. Generate the IAM token for the IBM Analytics Engine token. For more information about how to generate an IAM token, see IAM token.
  4. Run the following curl command to submit the Spark application:
    curl https://api.<region>.ae.cloud.ibm.com/v3/analytics_engines/<iae-instance-guid>/spark_applications -H "Authorization: Bearer <iam-bearer-token>" -X POST -d '{
    "application_details": {
        "application": "s3a://<application_bucket>/lakehouse-hms-test-cloud-doc-sample.py",
        "conf": {
            "spark.hadoop.fs.s3a.bucket.<application-bucket>.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud",
            "spark.hadoop.fs.s3a.bucket.<application-bucket>.access.key": "<hmac_access_key_for_application-bucket>",
            "spark.hadoop.fs.s3a.bucket.<application-bucket>.secret.key": "<hmac_secret_key_for_application-bucket>"
        }
    }
    }'

This sample is tested on the Cloud Object Storage buckets in the us-south region. Change the region in the Cloud Object Storage endpoint configuration as per the region where your Cloud Object Storage buckets reside. It is recommended to provision the COS buckets in the region where Analytics Engine instance is provisioned.

Working with the watsonx.data catalog and storage

To enable your Spark application to work with the watsonx.data catalog and storage, add the following configuration to your application payload:

spark.hive.metastore.client.plain.username=ibmlhapikey
spark.hive.metastore.client.plain.password=<api-key-of-the-user-which-has-metastore-admin-role>
spark.hadoop.wxd.apiKey=Basic base64(ibmlhapikey_ibmcloudid:apikey)