IBM Cloud Docs
Sparkアクセス制御拡張を使用したSparkアプリケーション投稿の強化

Sparkアクセス制御拡張を使用したSparkアプリケーション投稿の強化

watsonx.dataに登録された外部ストレージバケットを使用するSparkアプリケーションをサブミットすると、Sparkのアクセス制御拡張機能によって追加の認証が可能になり、セキュリティが強化されます。 sparkコンフィギュレーションでエクステンションを有効にすると、watsonx.dataカタログにアクセスし、操作できるのは許可されたユーザーのみになります。

Iceberg、 Hive、HudiカタログのSparkアクセス制御エクステンションを有効にできます。

RangerまたはAccess Management System (AMS) のデータポリシーを使用して、ユーザー、ユーザーグループ、カタログ(Iceberg、 Hive、Hudi)、スキーマ、テーブル、カラムへのアクセスを許可または拒否することができます。 データレベルの権限に加えて、ストレージ権限も考慮される。 AMSをカタログ(Iceberg、 Hive、Hudi)、バケット、スキーマ、テーブルで使用する関連情報については 、「役割と権限の管理」 を参照してください。 Ranger ポリシー ( Hadoop SQL サービスで定義) を作成し、カタログ (Iceberg、 Hive、Hudi)、バケット、スキーマ、テーブルで有効にする方法の詳細については、 「Ranger ポリシーの管理」を参照してください。

前提条件

  • Spark アプリケーションで使用するデータを保存するために、Cloud Object Storage を作成します。 Cloud Object Storageとバケットを作成するには、ストレージバケットの作成 を参照してください。 watsonx.dataテーブルを格納するdata-bucketと、Sparkアプリケーションコードを保持するapplication bucketです。
  • watsonx.dataにCloud Object Storageバケットを登録します。 詳しくは、バケットカタログペアの追加 を参照してください。
  • Sparkアプリケーションをストレージにアップロードします。データのアップロード を参照してください。
  • watsonx.data内にスキーマまたはテーブルを作成するには、IAM管理者ロールまたはMetastoreAdminロールが必要です。

手順

Sparkアクセス制御拡張は、外部Sparkエンジンをサポートします。

  1. Sparkのアクセスコントロールエクステンションを有効にするには、Sparkの設定を add authz.IBMSparkACExtension to spark.sql.extensions で更新する必要があります。

  2. 次のPythonアプリケーションをiceberg.pyとして保存します。

アイスバーグがその例だ。 Hive Hudiのカタログを使うこともできる。


from pyspark.sql import SparkSession
import os

def init_spark():
    spark = SparkSession.builder \
        .appName("lh-spark-app") \
        .enableHiveSupport() \
        .getOrCreate()
    return spark

def create_database(spark):
    # Create a database in the lakehouse catalog
    spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")

def list_databases(spark):
    # list the database under lakehouse catalog
    spark.sql("show databases from lakehouse").show()

def basic_iceberg_table_operations(spark):
    # demonstration: Create a basic Iceberg table, insert some data and then query table
    spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
    spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
    spark.sql("select * from lakehouse.demodb.testTable").show()

def create_table_from_parquet_data(spark):
    # load parquet data into dataframe
    df = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-01.parquet")
    # write the dataframe into an Iceberg table
    df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
    # describe the table created
    spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
    # query the table
    spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()

def ingest_from_csv_temp_table(spark):
    # load csv data into a dataframe
    csvDF = spark.read.option("header",True).csv("file:///spark-vol/zipcodes.csv")
    csvDF.createOrReplaceTempView("tempCSVTable")
    # load temporary table into an Iceberg table
    spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
    # describe the table created
    spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
    # query the table
    spark.sql('select * from lakehouse.demodb.zipcodes').show()

def ingest_monthly_data(spark):
    df_feb = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-02.parquet")
    df_march = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-03.parquet")
    df_april = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-04.parquet")
    df_may = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-05.parquet")
    df_june = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-06.parquet")
    df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
    df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")

def perform_table_maintenance_operations(spark):
    # Query the metadata files table to list underlying data files
    spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
    # There are many smaller files compact them into files of 200MB each using the
    # `rewrite_data_files` Iceberg Spark procedure
    spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
    # Again, query the metadata files table to list underlying data files; 6 files are compacted
    # to 3 files
    spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
    # List all the snapshots
    # Expire earlier snapshots. Only latest one with compacted data is required
    # Again, List all the snapshots to see only 1 left
    spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
    #retain only the latest one
    latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
    print (latest_snapshot_committed_at)
    spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
    spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
    # Removing Orphan data files
    spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)
    # Rewriting Manifest Files
    spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()

def evolve_schema(spark):
    # demonstration: Schema evolution
    # Add column fare_per_mile to the table
    spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
    # describe the table
    spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)

def clean_database(spark):
    # clean-up the demo database
    spark.sql('drop table if exists lakehouse.demodb.testTable purge')
    spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
    spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
    spark.sql('drop database if exists lakehouse.demodb cascade')

def main():
    try:
        spark = init_spark()
        create_database(spark)
        list_databases(spark)
        basic_iceberg_table_operations(spark)
        # demonstration: Ingest parquet and csv data into a watsonx.data Iceberg table
        create_table_from_parquet_data(spark)
        ingest_from_csv_temp_table(spark)
        # load data for the month of February to June into the table yellow_taxi_2022 created above
        ingest_monthly_data(spark)
        # demonstration: Table maintenance
        perform_table_maintenance_operations(spark)
        # demonstration: Schema evolution
        evolve_schema(spark)
    finally:
        # clean-up the demo database
        clean_database(spark)
        spark.stop()

if __name__ == '__main__':
main()

  1. Sparkアプリケーションをサブミットするには、パラメータ値を指定して以下のcurlコマンドを実行する。 次の例は、iceberg.py アプリケーションをサブミットするコマンドです。

curl --request POST   --url https://<region>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications    --header 'Authorization: Bearer <token>'   --header 'Content-Type: application/json'   --header 'Lhinstanceid: <instance_id>'   --data '{
  "application_details": {
  "conf": {
      "spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.endpoint": "<wxd-data-bucket-endpoint>",
      "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.endpoint": "<COS_ENDPOINT>",
      "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.secret.key": "<COS_SECRET_KEY>",
      "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.access.key": "<COS_ACCESS_KEY>"
      "spark.sql.catalogImplementation": "hive",
      "spark.sql.iceberg.vectorization.enabled":"false",
        "spark.sql.catalog.<wxd-bucket-catalog-name>":"org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.<wxd-bucket-catalog-name>.type":"hive",
      "spark.sql.catalog.<wxd-bucket-catalog-name>.uri":"thrift://<wxd-catalog-metastore-host>",
      "spark.hive.metastore.client.auth.mode":"PLAIN",
      "spark.hive.metastore.client.plain.username":"<username>",
      "spark.hive.metastore.client.plain.password":"xxx",
      "spark.hive.metastore.use.SSL":"true",
      "spark.hive.metastore.truststore.type":"JKS",
      "spark.hive.metastore.truststore.path":"<truststore_path>",
      "spark.hive.metastore.truststore.password":"changeit",
        "spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.aws.credentials.provider":"com.ibm.iae.s3.credentialprovider.WatsonxCredentialsProvider",
        "spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.custom.signers":"WatsonxAWSV4Signer:com.ibm.iae.s3.credentialprovider.WatsonxAWSV4Signer",
        "spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.s3.signing-algorithm":"WatsonxAWSV4Signer",
        "spark.hadoop.wxd.cas.endpoint":"<cas_endpoint>/cas/v1/signature",
        "spark.hadoop.wxd.instanceId":"<instance_crn>",
        "spark.hadoop.wxd.apiKey":"Basic xxx",
        "spark.wxd.api.endpoint":"<wxd-endpoint>",
        "spark.driver.extraClassPath":"opt/ibm/connectors/wxd/spark-authz/cpg-client-1.0-jar-with-dependencies.jar:/opt/ibm/connectors/wxd/spark-authz/ibmsparkacextension_2.12-1.0.jar",
        "spark.sql.extensions":"<required-storage-support-extension>,authz.IBMSparkACExtension"

    },
    "application": "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/<python_file_name>",
  }
}

パラメーター値:

  • <region> インスタンスがプロビジョニングされるリージョン。 例: us-south 地域。
  • <spark_engine_id> Spark インスタンスの一意な識別子。 IDの取得方法については 、「Sparkエンジンの詳細の管理 」を参照してください。
  • <token> サービスインスタンスのアクセストークンを取得する。 トークンの生成方法の詳細については 、「トークンの生成」 を参照してください。
  • <instance_id> watsonx.data クラスタ・インスタンス URL のインスタンス ID。 例、 crn:v1:staging:public:lakehouse:us-south:a/7bb9e380dc0c4bc284592b97d5095d3c:5b602d6a-847a-469d-bece-0a29124588c0::。
  • <wxd-data-bucket-name> :インフラストラクチャマネージャーからスパークエンジンに関連付けられたデータバケットの名前。
  • <wxd-data-bucket-endpoint>: 上記のデータバケットにアクセスするためのエンドポイントのホスト名。 例: s3.us-south.cloud-object-storage.appdomain.cloud は、us-southリージョンのクラウドオブジェクトストレージバケットです。
  • <wxd-bucket-catalog-name>: データバケットに関連付けられたカタログ名。
  • <wxd-catalog-metastore-host>: 登録されたバケツに関連付けられたメタストア。
  • <cos_bucket_endpoint> : メタストアのホストの値を指定します。 詳細は ストレージの詳細 を参照。
  • <access_key> : access_key_idを指定します。 詳細は ストレージの詳細 を参照。
  • <secret_key> : secret_access_keyを指定します。 詳細は ストレージの詳細 を参照。
  • <truststore_path> : Trustore証明書がアップロードされるCOSパスを指定します。 例えば、cos://di-bucket.di-test/1902xx-truststore.jks のように。 トラストアの生成に関する詳細は、自己署名証明書のインポート を参照のこと。
  • <cas_endpoint> : データアクセスサービス(DAS)エンドポイント。 DASエンドポイントを取得するには、DASエンドポイントを取得する を参照してください。
  • <username> : watsonx.data インスタンスのユーザー名。 ほら、ibmlhapikey。
  • <apikey> : base64 でエンコードされた `ibmlhapikey_<user_id>:<IAM_APIKEY> である。ここで、<user_id> は IBM Cloud ID であり、そのユーザの apikey がデータバケットへのアクセスに使われる。 API キーを生成するには、watsonx.data コンソールにログインし、[Profile] > [Profile and Settings] > [API Keys] に移動して新しい API キーを生成します。
  • <OBJECT_NAME>: IBM Cloud Object Storage。
  • <BUCKET_NAME>: アプリケーションファイルが存在するストレージバケット。
  • <COS_SERVICE_NAME>: クラウドオブジェクトストレージのサービス名。
  • <python file name> Spark アプリケーションのファイル名。

制限事項

  • ユーザはスキーマとテーブルを作成するためのフルアクセス権を持っていなければならない。
  • データ ポリシーを作成するには、カタログを Presto エンジンに関連付ける必要があります。
  • 存在しないスキーマを表示しようとすると、nullpointer問題がスローされます。
  • Iceberg、 Hive、HudiカタログのSparkアクセス制御エクステンションを有効にできます。