さまざまな表形式での作業
適用範囲 : スパークエンジン グルテン加速スパークエンジン
このトピックでは、 Apache Hudi、 Apache Iceberg または Delta Lake catalog のような異なるテーブル形式にデータを取り込む Spark アプリケーションを実行する手順について説明します。
-
Sparkアプリケーションで使用するデータを格納するために、必要なカタログ(カタログは Apache Hudi、 Apache Iceberg または Delta Lake )を持つストレージを作成する。 ストレージを作成するには、 ストレージとカタログのペアの追加 を参照してください。
-
ストレージをネイティブ Spark エンジンに関連付けます。 詳しくは、 エンジンへのカタログの関連付け を参照してください。
-
Spark アプリケーションを保管するための Cloud Object Storage (COS) を作成します。 Cloud Object Storage およびバケットを作成するには、 ストレージ・バケットの作成 を参照してください。
-
Cloud Object Storage を watsonx.dataに登録します。 詳しくは、 ストレージとカタログのペアの追加 を参照してください。
-
選択したカタログに基づいて、以下の Spark アプリケーション (Python ファイル) をローカル・マシンに保存します。 ここ (
iceberg_demo.py
、hudi_demo.py
、またはdelta_demo.py
) で、Spark アプリケーションを COS にアップロードします。 データのアップロード を参照してください。 -
Cloud Object Storageにあるデータを使用して Spark アプリケーションを実行依頼するには、パラメーター値を指定し、以下の表から curl コマンドを実行します。
- Apache Iceberg
サンプル・ファイルは、以下の機能を示しています:
-
watsonx.dataからテーブルにアクセスする
-
データをwatsonx.dataに取り込む
-
watsonx.dataのスキーマを変更する
watsonx.dataでテーブルのメンテナンス作業を行う。
データを COS バケットに挿入する必要があります。 詳しくは、COSバケットにサンプルデータを挿入する をご覧ください。
Pythonアプリケーション:IcebergPythonファイル
Pythonアプリケーションを送信するCurlコマンド :
curl --request POST \ --url https://<wxd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications \ --header 'Authorization: Bearer <token>' \ --header 'Content-Type: application/json' \ --header 'LhInstanceId: <instance_id>' \ --data '{ "application_details": { "conf": { "spark.hadoop.wxd.apiKey":"Basic <user-authentication-string>" }, "application": "s3a://<application-bucket-name>/iceberg.py" } }'
パラメータ値:
-
<wxd_host_name>
:watsonx.dataクラウドインスタンスのホスト名。 -
<instance_id>
: watsonx.data インスタンス URL のインスタンス ID。 例えば、1609968977179454。 -
<spark_engine_id>
: ネイティブSparkエンジンのエンジンID。 -
<token>
:ベアラートークン。 トークンの生成の詳細については、IAM トークン を参照してください。 -
<user-authentication-string>
: ユーザーIDとAPIキーをBase64でエンコードした文字列。 フォーマットの詳細については、注釈を参照のこと。 -
Apache
PythonSparkアプリケーションは、以下の機能を実証している:
-
Apache Hudiカタログ(データを保存するために作成したもの)内にデータベースを作成します。 ここでは、'
<database_name>
. -
<database_name>
データベース内に'<table_name>
というテーブルを作成する。 -
<table_name>
にデータを挿入し、SELECTクエリー操作を行う。 -
使用後はテーブルとスキーマを削除する。
Pythonアプリケーション:
from pyspark.sql import SparkSession def init_spark(): spark = SparkSession.builder .appName("CreateHudiTableInCOS") .enableHiveSupport() .getOrCreate() return spark def main(): try: spark = init_spark() spark.sql("show databases").show() spark.sql("create database if not exists spark_catalog.<database_name> LOCATION 's3a://<data_storage_name>/'").show() spark.sql("create table if not exists spark_catalog.<database_name>.<table_name> (id bigint, name string, location string) USING HUDI OPTIONS ('primaryKey' 'id', hoodie.write.markers.type= 'direct', hoodie.embed.timeline.server= 'false')").show() spark.sql("insert into <database_name>.<table_name> VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show() spark.sql("select * from spark_catalog.<database_name>.<table_name>").show() spark.sql("drop table spark_catalog.<database_name>.<table_name>").show() spark.sql("drop schema spark_catalog.<database_name> CASCADE").show() inally: spark.stop() if __name__ == '__main__': main()
パラメーター値:
<database_name>
:作成するデータベース名を指定します。<table_name>
:作成するテーブルの名前を指定します。<data_storage_name>
:作成した Apache Hudi ストレージの名前を指定します。
Python アプリケーションをサブミットするための Curl コマンド
curl --request POST --url https://<wxd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications --header 'Authorization: Bearer <token>' --header 'Content-Type: application/json' --header 'LhInstanceId: <instance_id>' --data '{ \n \n "application_details": { "conf": { "spark.sql.catalog.spark_catalog.type": "hive", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.hadoop.wxd.apiKey":"Basic <user-authentication-string>" }, "application": "s3a://<data_storage_name>/hudi_demo.py" }}
パラメーター値:
-
<wxd_host_name>
:watsonx.dataクラウドインスタンスのホスト名。 -
<instance_id>
: watsonx.data インスタンス URL のインスタンス ID。 例えば、1609968977179454。 -
<spark_engine_id>
: ネイティブSparkエンジンのエンジンID。 -
<token>
:ベアラートークン。 トークンの生成の詳細については、IAM トークン を参照してください。 -
<user-authentication-string>
: ユーザーIDとAPIキーをBase64でエンコードした文字列。 フォーマットの詳細については、注釈を参照のこと。 -
Delta Lake
PythonSparkアプリケーションは、以下の機能を実証している:
-
Delta Lake (データを保存するために作成した)カタログの中にデータベースを作成します。 ここでは、'
<database_name>
. -
<database_name>
データベース内に'<table_name>
というテーブルを作成する。 -
<table_name>
にデータを挿入し、SELECTクエリー操作を行う。 -
使用後はテーブルとスキーマを削除する。
Pythonアプリケーション:
from pyspark.sql import SparkSession import os def init_spark(): spark = SparkSession.builder.appName("lh-hms-cloud") .enableHiveSupport().getOrCreate() return spark def main(): spark = init_spark() spark.sql("show databases").show() spark.sql("create database if not exists spark_catalog.<database_name> LOCATION 's3a://<data_storage_name>/'").show() spark.sql("create table if not exists spark_catalog.<database_name>.<table_name> (id bigint, name string, location string) USING DELTA").show() spark.sql("insert into spark_catalog.<database_name>.<table_name> VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show() spark.sql("select * from spark_catalog.<database_name>.<table_name>").show() spark.sql("drop table spark_catalog.<database_name>.<table_name>").show() spark.sql("drop schema spark_catalog.<database_name> CASCADE").show() spark.stop() if __name__ == '__main__': main()
パラメーター値:
<database_name>
:作成するデータベース名を指定します。<table_name>
:作成するテーブルの名前を指定します。<data_storage_name>
:作成した Apache Hudi ストレージの名前を指定します。
Python アプリケーションをサブミットするための Curl コマンド
curl --request POST --url https://<wxd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications --header 'Authorization: Bearer <token>' --header 'Content-Type: application/json' --header 'LhInstanceId: <instance_id>' --data '{ "application_details": { "conf": { "spark.sql.catalog.spark_catalog" : "org.apache.spark.sql.delta.catalog.DeltaCatalog", "spark.sql.catalog.spark_catalog.type" : "hive", "spark.hadoop.wxd.apiKey":"<user-authentication-string>" }, "application": "s3a://<database_name>/delta_demo.py" } }
パラメーター値
<wxd_host_name>
:watsonx.dataクラウドインスタンスのホスト名。<instance_id>
watsonx.data クラスタ・インスタンス URL のインスタンス ID。 例えば、1609968977179454。<spark_engine_id>
: ネイティブSparkエンジンのエンジンID。<token>
ベアラートークン トークンの生成の詳細については、IAM トークン を参照してください。<user-authentication-string>
: ユーザーIDとAPIキーをBase64でエンコードした文字列。 フォーマットについて詳しくは、以下の注を参照してください。
<user-authentication-string>
の値は、echo -n 'ibmlhapikey_<username>:<user_apikey>' | base64
の形式でなければなりません。 ここで、<user_id>
は、データ・バケットへのアクセスに使用される API キーを持つユーザーの IBM Cloud ID です。 ここで、<IAM_APIKEY>
は、オブジェクト・ストア・バケットにアクセスするユーザーの API キーです。 API キーを生成するには、 watsonx.data コンソールにログインし、「プロファイル」>「プロファイルと設定」>「API キー」にナビゲートして、新しい API キーを生成します。 新しい API キーを生成すると、古い API キーが無効になります。 -
Spark アプリケーションをサブミットすると、アプリケーション ID と Spark バージョンを示す確認メッセージが表示されます。 参照用に保存します。
-
watsonx.data クラスターにログインし、「エンジンの詳細」ページにアクセスします。 「アプリケーション」タブで、アプリケーション ID を使用してアプリケーションをリストし、ステージを追跡します。 詳しくは、 アプリケーションの表示および管理 を参照してください。