IBM Cloud Docs
さまざまな表形式での作業

さまざまな表形式での作業

適用範囲スパークエンジン グルテン加速スパークエンジン

このトピックでは、 Apache Hudi、 Apache Iceberg または Delta Lake catalog のような異なるテーブル形式にデータを取り込む Spark アプリケーションを実行する手順について説明します。

  1. Sparkアプリケーションで使用するデータを格納するために、必要なカタログ(カタログは Apache Hudi、 Apache Iceberg または Delta Lake )を持つストレージを作成する。 ストレージを作成するには、 ストレージとカタログのペアの追加 を参照してください。

  2. ストレージをネイティブ Spark エンジンに関連付けます。 詳しくは、 エンジンへのカタログの関連付け を参照してください。

  3. Spark アプリケーションを保管するための Cloud Object Storage (COS) を作成します。 Cloud Object Storage およびバケットを作成するには、 ストレージ・バケットの作成 を参照してください。

  4. Cloud Object Storage を watsonx.dataに登録します。 詳しくは、 ストレージとカタログのペアの追加 を参照してください。

  5. 選択したカタログに基づいて、以下の Spark アプリケーション (Python ファイル) をローカル・マシンに保存します。 ここ ( iceberg_demo.pyhudi_demo.py、または delta_demo.py ) で、Spark アプリケーションを COS にアップロードします。 データのアップロード を参照してください。

  6. Cloud Object Storageにあるデータを使用して Spark アプリケーションを実行依頼するには、パラメーター値を指定し、以下の表から curl コマンドを実行します。

    • Apache Iceberg

    サンプル・ファイルは、以下の機能を示しています:

    • watsonx.dataからテーブルにアクセスする

    • データをwatsonx.dataに取り込む

    • watsonx.dataのスキーマを変更する

    watsonx.dataでテーブルのメンテナンス作業を行う。

    データを COS バケットに挿入する必要があります。 詳しくは、COSバケットにサンプルデータを挿入する をご覧ください。

    Pythonアプリケーション:IcebergPythonファイル

    Pythonアプリケーションを送信するCurlコマンド :

    curl --request POST \
    --url https://<wxd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications \
    --header 'Authorization: Bearer <token>' \
    --header 'Content-Type: application/json' \
    --header 'LhInstanceId: <instance_id>' \
    --data '{  "application_details": {
               "conf": {
                      "spark.hadoop.wxd.apiKey":"Basic <user-authentication-string>"    },
                       "application": "s3a://<application-bucket-name>/iceberg.py"  }
            }'
    

    パラメータ値:

    • <wxd_host_name>:watsonx.dataクラウドインスタンスのホスト名。

    • <instance_id>: watsonx.data インスタンス URL のインスタンス ID。 例えば、1609968977179454。

    • <spark_engine_id>: ネイティブSparkエンジンのエンジンID。

    • <token>:ベアラートークン。 トークンの生成の詳細については、IAM トークン を参照してください。

    • <user-authentication-string>: ユーザーIDとAPIキーをBase64でエンコードした文字列。 フォーマットの詳細については、注釈を参照のこと。

    • Apache

    PythonSparkアプリケーションは、以下の機能を実証している:

    • Apache Hudiカタログ(データを保存するために作成したもの)内にデータベースを作成します。 ここでは、'<database_name>.

    • <database_name> データベース内に'<table_name> というテーブルを作成する。

    • <table_name> にデータを挿入し、SELECTクエリー操作を行う。

    • 使用後はテーブルとスキーマを削除する。

    Pythonアプリケーション:

    from pyspark.sql import SparkSession
            def init_spark():            spark = SparkSession.builder
            .appName("CreateHudiTableInCOS")
            .enableHiveSupport()
            .getOrCreate()
            return spark
            def main():
            try:
            spark = init_spark()
            spark.sql("show databases").show()
            spark.sql("create database if not exists spark_catalog.<database_name> LOCATION 's3a://<data_storage_name>/'").show()
            spark.sql("create table if not exists spark_catalog.<database_name>.<table_name> (id bigint, name string, location string) USING HUDI OPTIONS ('primaryKey' 'id', hoodie.write.markers.type= 'direct', hoodie.embed.timeline.server= 'false')").show()
            spark.sql("insert into <database_name>.<table_name> VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show()
            spark.sql("select * from spark_catalog.<database_name>.<table_name>").show()
            spark.sql("drop table spark_catalog.<database_name>.<table_name>").show()
            spark.sql("drop schema spark_catalog.<database_name> CASCADE").show()
            inally:
            spark.stop()
            if __name__ == '__main__':
            main()
    
    

    パラメーター値:

    • <database_name>:作成するデータベース名を指定します。
    • <table_name>:作成するテーブルの名前を指定します。
    • <data_storage_name>:作成した Apache Hudi ストレージの名前を指定します。

    Python アプリケーションをサブミットするための Curl コマンド

    
    curl --request POST
        --url https://<wxd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications
        --header 'Authorization: Bearer <token>'
        --header 'Content-Type: application/json'
        --header 'LhInstanceId: <instance_id>'
        --data '{ \n \n    "application_details": {
                "conf": {
                        "spark.sql.catalog.spark_catalog.type": "hive",
                        "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog",
                        "spark.hadoop.wxd.apiKey":"Basic <user-authentication-string>"        },
                        "application": "s3a://<data_storage_name>/hudi_demo.py"    }}
    

    パラメーター値:

    • <wxd_host_name>:watsonx.dataクラウドインスタンスのホスト名。

    • <instance_id>: watsonx.data インスタンス URL のインスタンス ID。 例えば、1609968977179454。

    • <spark_engine_id>: ネイティブSparkエンジンのエンジンID。

    • <token>:ベアラートークン。 トークンの生成の詳細については、IAM トークン を参照してください。

    • <user-authentication-string>: ユーザーIDとAPIキーをBase64でエンコードした文字列。 フォーマットの詳細については、注釈を参照のこと。

    • Delta Lake

    PythonSparkアプリケーションは、以下の機能を実証している:

    • Delta Lake (データを保存するために作成した)カタログの中にデータベースを作成します。 ここでは、'<database_name>.

    • <database_name> データベース内に'<table_name> というテーブルを作成する。

    • <table_name> にデータを挿入し、SELECTクエリー操作を行う。

    • 使用後はテーブルとスキーマを削除する。

    Pythonアプリケーション:

    from pyspark.sql import SparkSession
    import os
        def init_spark():
             spark = SparkSession.builder.appName("lh-hms-cloud")
             .enableHiveSupport().getOrCreate()
             return spark
             def main():
                 spark = init_spark()
                 spark.sql("show databases").show()
                         spark.sql("create database if not exists spark_catalog.<database_name> LOCATION 's3a://<data_storage_name>/'").show()
                         spark.sql("create table if not exists spark_catalog.<database_name>.<table_name> (id bigint, name string, location string) USING DELTA").show()
                         spark.sql("insert into spark_catalog.<database_name>.<table_name> VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show()
                         spark.sql("select * from spark_catalog.<database_name>.<table_name>").show()
                         spark.sql("drop table spark_catalog.<database_name>.<table_name>").show()
                         spark.sql("drop schema spark_catalog.<database_name> CASCADE").show()
                         spark.stop()
                         if __name__ == '__main__':
                         main()
    
    

    パラメーター値:

    • <database_name>:作成するデータベース名を指定します。
    • <table_name>:作成するテーブルの名前を指定します。
    • <data_storage_name>:作成した Apache Hudi ストレージの名前を指定します。

    Python アプリケーションをサブミットするための Curl コマンド

    curl --request POST
    --url https://<wxd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications
    --header 'Authorization: Bearer <token>'
    --header 'Content-Type: application/json'
    --header 'LhInstanceId: <instance_id>'
    --data '{        "application_details": {
           "conf": {
           "spark.sql.catalog.spark_catalog" : "org.apache.spark.sql.delta.catalog.DeltaCatalog",
           "spark.sql.catalog.spark_catalog.type" : "hive",
           "spark.hadoop.wxd.apiKey":"<user-authentication-string>"        },
           "application": "s3a://<database_name>/delta_demo.py"        }    }
    

    パラメーター値

    • <wxd_host_name>:watsonx.dataクラウドインスタンスのホスト名。
    • <instance_id> watsonx.data クラスタ・インスタンス URL のインスタンス ID。 例えば、1609968977179454。
    • <spark_engine_id>: ネイティブSparkエンジンのエンジンID。
    • <token> ベアラートークン トークンの生成の詳細については、IAM トークン を参照してください。
    • <user-authentication-string>: ユーザーIDとAPIキーをBase64でエンコードした文字列。 フォーマットについて詳しくは、以下の注を参照してください。

    <user-authentication-string> の値は、 echo -n 'ibmlhapikey_<username>:<user_apikey>' | base64 の形式でなければなりません。 ここで、 <user_id> は、データ・バケットへのアクセスに使用される API キーを持つユーザーの IBM Cloud ID です。 ここで、 <IAM_APIKEY> は、オブジェクト・ストア・バケットにアクセスするユーザーの API キーです。 API キーを生成するには、 watsonx.data コンソールにログインし、「プロファイル」>「プロファイルと設定」>「API キー」にナビゲートして、新しい API キーを生成します。 新しい API キーを生成すると、古い API キーが無効になります。

  7. Spark アプリケーションをサブミットすると、アプリケーション ID と Spark バージョンを示す確認メッセージが表示されます。 参照用に保存します。

  8. watsonx.data クラスターにログインし、「エンジンの詳細」ページにアクセスします。 「アプリケーション」タブで、アプリケーション ID を使用してアプリケーションをリストし、ステージを追跡します。 詳しくは、 アプリケーションの表示および管理 を参照してください。