IBM Cloud Docs
Spark ユース・ケースの概要

Spark ユース・ケースの概要

このトピックでは、Python サンプルを使用して watsonx.data の Spark ユース・ケースを実行する手順について説明します。 すべてのサンプルは、Spark Python API を使用して作成されています。

前提条件

  • IBM® watsonx.data インスタンスをプロビジョンします。
  • IBM Analytics Engine インスタンスを設定します。
  • Cloud Object Storage バケット接続の詳細。
  • Sparkアプリケーションをwatsonx.dataカタログとストレージを管理するには、Metastore 管理者ロールが必要です。 Metastore 管理者権限がないと、ネイティブ Spark エンジンを使用してデータをストレージに取り込むことはできません。 Sparkの設定の詳細については、以下を参照してください。との連携watsonx.dataカタログとストレージ

サンプル・ユース・ケースについて

このサンプル・ファイルは、以下の機能を示しています。

  • watsonx.data からの表へのアクセス

    サンプル Python ファイルLakehouse カタログでのデータベースの作成 セクションは、レイクハウスという名前のカタログを使用して、構成済みの watsonx.data インスタンスに デモダブ データベースを作成します。デモダブ は、Cloud Object Storage(COS) バケット レイクハウス・バケツの下にすべてのデータとメタデータを保管するように構成されています。 また、iceberg 表 testTable も作成し、それにアクセスします。

  • watsonx.data へのデータの取り込み

    サンプル Python ファイル「Parquet データを Lakehouse テーブルに取り込み (Ingest parquet data into a lakehouse table)」 セクションを使用すると、ソース Cloud Object Storage バケット source-bucket から watsonx.data テーブルに Parquet および CSV 形式のデータを取り込むことができます。 ソース COS バケット source-bucket からサンプル・データが watsonx.data 表 yellow_taxi_2022 に挿入されます (サンプル・データをソース COS バケットに挿入するための ステップ を参照してください)。 COS バケット source-bucket からデータベース demodb の表 zipcode への CSV 形式のデータの取り込みも表示されます。

  • watsonx.data でのスキーマの変更

    サンプル Python ファイルSchema evolution セクションを使用すると、watsonx.dataのデータを変更できます。

  • watsonx.data での表の保守アクティビティーの実行

    表の保守は、watsonx.data 表のパフォーマンスの維持に役立ちます。 Iceberg は、すぐに使用可能な表保守手順を提供します。これにより、宣言的な方法で強力な表最適化を実行できます。 以下のサンプルは、Spark を使用していくつかの表保守操作を行う方法を示しています。 Ceberg の Spark 表の保守操作について詳しくは、表の操作 を参照してください。

COS バケットへのサンプル・データの挿入

COS にデータを挿入するには、以下の手順を実行します。

  1. COS バケット (例えば、source-bucket) を作成して、 watsonx.data インスタンスに取り込むサンプル・データを保管します。 COS バケットの作成については、Getting started with IBM Cloud Object Storage を参照してください。

    Object Storage のユーザーは、 Object Storage を設定するためにAPIキーまたはHMACキーを知っておく必要があるだけでなく、 IBM Analytics Engine サービスエンドポイントを知っておく必要もあります。 Object Storage に接続するために。 Object Storage バケットタイプに基づいて使用するエンドポイントの詳細については、リージョンとエンドポイントの選択 を参照してください。 また、 IBM Cloud ダッシュボードで該当するサービスを選択し、ナビゲーションウィンドウで 「エンドポイント」 をクリックすると、 Object Storage サービスの地域別エンドポイントを表示できます。 常に直接エンドポイントを選択してください。 ダイレクト・エンドポイントは、パフォーマンスを向上させ、課金を発生させません。 US-South Cross リージョンのエンドポイントの例は、 s3.direct.us.cloud-object-storage.appdomain.cloud です。

  2. 以下のリンクから、サンプル csv ファイル (例えば、 zipcodes.csv) と Parquet サンプル・データ (例えば、2022 年の 6 カ月タクシー・データ) をダウンロードします。

  3. IBM Cloud Object Storage プラグインをインストールします。 プラグインのインストール方法の詳細については、IBM Cloud Object Storage CLI を参照してください。

  4. COS CLI を使用して、サンプル・データを COS バケットにアップロードします。

    ibmcloud cos upload --bucket <cos_bucket_name> --key <source_file_name> --file <path_to_source_file>
    

    パラメーター値:

    • < cos_bucket_name>: step1で作成したバケットの名前。
    • < source_file_name>: ダウンロードしたサンプル・データ・ファイルの名前。 ここで、 key zipcodes.csv はファイル名です (以下の例を参照)。
    • < path_to_source_file>: ファイルが存在するマシン内の場所へのパス。 ここで、 path/zipcodes.csv はファイル・パスです (以下の例を参照)。

    以下に例を示します。

    ibmcloud cos upload --bucket source-bucket --key zipcodes.csv --file <path/zipcodes.csv>
    

サンプル・ユース・ケースの実行

ステップに従って、Spark サンプル Python ファイルを実行します。

Spark サンプル Python ファイル


    from pyspark.sql import SparkSession
    import os

    def init_spark():
    spark = SparkSession.builder \
        .appName("lh-hms-cloud") \
        .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.endpoint" ,"s3.direct.us-south.cloud-object-storage.appdomain.cloud") \
        .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.access.key" ,"<lakehouse-bucket-access-key>") \
        .config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.secret.key" ,"<lakehouse-bucket-secret-key>") \
        .config("spark.hadoop.fs.s3a.bucket.source-bucket.endpoint" ,"s3.direct.us-south.cloud-object-storage.appdomain.cloud") \
        .config("spark.hadoop.fs.s3a.bucket.source-bucket.access.key" ,"<source-bucket-access-key>") \
        .config("spark.hadoop.fs.s3a.bucket.source-bucket.secret.key" ,"<source-bucket-secret-key>") \
        .enableHiveSupport() \
        .getOrCreate()
    return spark

    def create_database(spark):
        # Create a database in the lakehouse catalog
        spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")

    def list_databases(spark):
        # list the database under lakehouse catalog
        spark.sql("show databases from lakehouse").show()

    def basic_iceberg_table_operations(spark):
        # demonstration: Create a basic Iceberg table, insert some data and then query table
        spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
        spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
        spark.sql("select * from lakehouse.demodb.testTable").show()

    def create_table_from_parquet_data(spark):
        # load parquet data into dataframe
        df = spark.read.option("header",True).parquet("s3a://source-bucket/nyc-taxi/yellow_tripdata_2022-01.parquet")
        # write the dataframe into an Iceberg table
        df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
        # describe the table created
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()

    def ingest_from_csv_temp_table(spark):
        # load csv data into a dataframe
        csvDF = spark.read.option("header",True).csv("s3a://source-bucket/zipcodes.csv")
        csvDF.createOrReplaceTempView("tempCSVTable")
        # load temporary table into an Iceberg table
        spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
        # describe the table created
        spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
        # query the table
        spark.sql('select * from lakehouse.demodb.zipcodes').show()

    def ingest_monthly_data(spark):
        df_feb = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-02.parquet")
        df_march = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-03.parquet")
        df_april = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-04.parquet")
        df_may = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-05.parquet")
        df_june = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-06.parquet")

        df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
        df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")

    def perform_table_maintenance_operations(spark):
        # Query the metadata files table to list underlying data files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()

        # There are many smaller files compact them into files of 200MB each using the
        # `rewrite_data_files` Iceberg Spark procedure
        spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()

        # Again, query the metadata files table to list underlying data files; 6 files are compacted
        # to 3 files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()

        # List all the snapshots
        # Expire earlier snapshots. Only latest one with compacted data is required
        # Again, List all the snapshots to see only 1 left
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
        #retain only the latest one
        latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
        print (latest_snapshot_committed_at)
        spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()

        # Removing Orphan data files
        spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)

        # Rewriting Manifest Files
        spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()


    def evolve_schema(spark):
        # demonstration: Schema evolution
        # Add column fare_per_mile to the table
        spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
        # describe the table
        spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)


    def clean_database(spark):
        # clean-up the demo database
        spark.sql('drop table if exists lakehouse.demodb.testTable purge')
        spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
        spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
        spark.sql('drop database if exists lakehouse.demodb cascade')

    def main():
        try:
            spark = init_spark()

            create_database(spark)
            list_databases(spark)

            basic_iceberg_table_operations(spark)

            # demonstration: Ingest parquet and csv data into a watsonx.data Iceberg table
            create_table_from_parquet_data(spark)
            ingest_from_csv_temp_table(spark)

            # load data for the month of February to June into the table yellow_taxi_2022 created above
            ingest_monthly_data(spark)

            # demonstration: Table maintenance
            perform_table_maintenance_operations(spark)

            # demonstration: Schema evolution
            evolve_schema(spark)
        finally:
            # clean-up the demo database
            clean_database(spark)
            spark.stop()

    if __name__ == '__main__':
    main()
  1. 以下のサンプル Python ファイルを保存します。
  2. Python ファイルを Cloud Object Storage バケットにアップロードします。 Spark アプリケーションとその依存関係は、Cloud Object Storage バケット内で維持する必要があり、データ・バケットと混在させないようにする必要があります。
  3. IBM Analytics Engine トークンの IAM トークンを生成します。 IAMトークンの生成方法の詳細については、IAMトークン を参照してください。
  4. 以下の curl コマンドを実行して、Spark アプリケーションをサブミットします。
    curl https://api.<region>.ae.cloud.ibm.com/v3/analytics_engines/<iae-instance-guid>/spark_applications -H "Authorization: Bearer <iam-bearer-token>" -X POST -d '{
    "application_details": {
        "application": "s3a://<application_bucket>/lakehouse-hms-test-cloud-doc-sample.py",
        "conf": {
            "spark.hadoop.fs.s3a.bucket.<application-bucket>.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud",
            "spark.hadoop.fs.s3a.bucket.<application-bucket>.access.key": "<hmac_access_key_for_application-bucket>",
            "spark.hadoop.fs.s3a.bucket.<application-bucket>.secret.key": "<hmac_secret_key_for_application-bucket>"
        }
    }
    }'

このサンプルは、us-south 地域の Cloud Object Storage バケットでテストされています。 Cloud Object Storage バケットが存在するリージョンに応じて、Cloud Object Storage エンドポイント構成内のリージョンを変更します。 Analytics Engine インスタンスがプロビジョンされているリージョンに COS バケットをプロビジョンすることをお勧めします。

提出されたSparkアプリケーションが3日以上実行状態が続いた場合、自動的に終了されます。

との連携watsonx.dataカタログとストレージ

Sparkアプリケーションをwatsonx.dataカタログとストレージを管理するには、アプリケーション ペイロードに次の構成を追加します。

spark.hive.metastore.client.plain.username=ibmlhapikey
spark.hive.metastore.client.plain.password=<api-key-of-the-user-which-has-metastore-admin-role>
spark.hadoop.wxd.apiKey=Basic base64(ibmlhapikey_ibmcloudid:apikey)