IBM Cloud Docs
使用 Spark 访问控制扩展加强 Spark 应用程序的提交

使用 Spark 访问控制扩展加强 Spark 应用程序的提交

当您提交使用在 watsonx.data 中注册的外部存储桶的 Spark 应用程序时,Spark 访问控制扩展允许额外的授权,从而提高安全性。 如果在 spark 配置中启用扩展,则只有授权用户才能通过 Spark 作业访问和操作 watsonx.data 目录。

您可以为 Iceberg、Hive 和 Hudi 目录启用 Spark 访问控制扩展。

您可以使用Ranger或访问管理系统(AMS)数据策略来授予或拒绝用户、用户组、目录(Iceberg、Hive 和Hudi)、架构、表格和列的访问权限。 除了数据级授权,存储权限也在考虑之列。 如需了解有关在目录(Iceberg、Hive 和Hudi)、数据集、架构和表格中使用AMS的更多信息,请参阅 管理角色和权限。 有关如何创建 Ranger 策略(在 Hadoop SQL 服务下定义)以及在目录(Iceberg、Hive 和 Hudi)、存储桶、架构和表格中启用这些策略的更多信息,请参阅 《 管理 Ranger 策略 》。

先决条件

  • 创建 Cloud Object Storage,以存储 Spark 应用程序中使用的数据。 要创建 Cloud Object Storage和存储桶,请参阅 创建存储桶。 您可以提供两个存储桶,数据存储桶用于存储 watsonx.data 表,应用程序存储桶用于维护 Spark 应用程序代码。
  • 在 watsonx.data 中注册 Cloud Object Storage 存储桶。 有关更多信息,请参阅 添加桶目录对
  • 将 Spark 应用程序上传到存储器,请参阅 上传数据
  • 您必须具有 IAM 管理员角色或 MetastoreAdmin 角色,才能在 watsonx.data 内创建模式或表。

过程

Spark 访问控制扩展支持外部 Spark 引擎。

  1. 要启用 Spark 访问控制扩展,必须使用 add authz.IBMSparkACExtension to spark.sql.extensions 更新 Spark 配置。

  2. 将以下 Python 应用程序保存为 iceberg.py。

冰山就是一个例子。 您还可以使用 Hive 和 Hudi 目录。


from pyspark.sql import SparkSession
import os

def init_spark():
    spark = SparkSession.builder \
        .appName("lh-spark-app") \
        .enableHiveSupport() \
        .getOrCreate()
    return spark

def create_database(spark):
    # Create a database in the lakehouse catalog
    spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")

def list_databases(spark):
    # list the database under lakehouse catalog
    spark.sql("show databases from lakehouse").show()

def basic_iceberg_table_operations(spark):
    # demonstration: Create a basic Iceberg table, insert some data and then query table
    spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
    spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
    spark.sql("select * from lakehouse.demodb.testTable").show()

def create_table_from_parquet_data(spark):
    # load parquet data into dataframe
    df = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-01.parquet")
    # write the dataframe into an Iceberg table
    df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
    # describe the table created
    spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
    # query the table
    spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()

def ingest_from_csv_temp_table(spark):
    # load csv data into a dataframe
    csvDF = spark.read.option("header",True).csv("file:///spark-vol/zipcodes.csv")
    csvDF.createOrReplaceTempView("tempCSVTable")
    # load temporary table into an Iceberg table
    spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
    # describe the table created
    spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
    # query the table
    spark.sql('select * from lakehouse.demodb.zipcodes').show()

def ingest_monthly_data(spark):
    df_feb = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-02.parquet")
    df_march = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-03.parquet")
    df_april = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-04.parquet")
    df_may = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-05.parquet")
    df_june = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-06.parquet")
    df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
    df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")

def perform_table_maintenance_operations(spark):
    # Query the metadata files table to list underlying data files
    spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
    # There are many smaller files compact them into files of 200MB each using the
    # `rewrite_data_files` Iceberg Spark procedure
    spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
    # Again, query the metadata files table to list underlying data files; 6 files are compacted
    # to 3 files
    spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
    # List all the snapshots
    # Expire earlier snapshots. Only latest one with compacted data is required
    # Again, List all the snapshots to see only 1 left
    spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
    #retain only the latest one
    latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
    print (latest_snapshot_committed_at)
    spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
    spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
    # Removing Orphan data files
    spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)
    # Rewriting Manifest Files
    spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()

def evolve_schema(spark):
    # demonstration: Schema evolution
    # Add column fare_per_mile to the table
    spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
    # describe the table
    spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)

def clean_database(spark):
    # clean-up the demo database
    spark.sql('drop table if exists lakehouse.demodb.testTable purge')
    spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
    spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
    spark.sql('drop database if exists lakehouse.demodb cascade')

def main():
    try:
        spark = init_spark()
        create_database(spark)
        list_databases(spark)
        basic_iceberg_table_operations(spark)
        # demonstration: Ingest parquet and csv data into a watsonx.data Iceberg table
        create_table_from_parquet_data(spark)
        ingest_from_csv_temp_table(spark)
        # load data for the month of February to June into the table yellow_taxi_2022 created above
        ingest_monthly_data(spark)
        # demonstration: Table maintenance
        perform_table_maintenance_operations(spark)
        # demonstration: Schema evolution
        evolve_schema(spark)
    finally:
        # clean-up the demo database
        clean_database(spark)
        spark.stop()

if __name__ == '__main__':
main()

  1. 要提交 Spark 应用程序,请指定参数值并运行以下 curl 命令。 下面的示例显示了提交 iceberg.py 应用程序的命令。

curl --request POST   --url https://<region>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications    --header 'Authorization: Bearer <token>'   --header 'Content-Type: application/json'   --header 'Lhinstanceid: <instance_id>'   --data '{
  "application_details": {
  "conf": {
      "spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.endpoint": "<wxd-data-bucket-endpoint>",
      "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.endpoint": "<COS_ENDPOINT>",
      "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.secret.key": "<COS_SECRET_KEY>",
      "spark.hadoop.fs.cos.<COS_SERVICE_NAME>.access.key": "<COS_ACCESS_KEY>"
      "spark.sql.catalogImplementation": "hive",
      "spark.sql.iceberg.vectorization.enabled":"false",
        "spark.sql.catalog.<wxd-bucket-catalog-name>":"org.apache.iceberg.spark.SparkCatalog",
      "spark.sql.catalog.<wxd-bucket-catalog-name>.type":"hive",
      "spark.sql.catalog.<wxd-bucket-catalog-name>.uri":"thrift://<wxd-catalog-metastore-host>",
      "spark.hive.metastore.client.auth.mode":"PLAIN",
      "spark.hive.metastore.client.plain.username":"<username>",
      "spark.hive.metastore.client.plain.password":"xxx",
      "spark.hive.metastore.use.SSL":"true",
      "spark.hive.metastore.truststore.type":"JKS",
      "spark.hive.metastore.truststore.path":"<truststore_path>",
      "spark.hive.metastore.truststore.password":"changeit",
        "spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.aws.credentials.provider":"com.ibm.iae.s3.credentialprovider.WatsonxCredentialsProvider",
        "spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.custom.signers":"WatsonxAWSV4Signer:com.ibm.iae.s3.credentialprovider.WatsonxAWSV4Signer",
        "spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.s3.signing-algorithm":"WatsonxAWSV4Signer",
        "spark.hadoop.wxd.cas.endpoint":"<cas_endpoint>/cas/v1/signature",
        "spark.hadoop.wxd.instanceId":"<instance_crn>",
        "spark.hadoop.wxd.apiKey":"Basic xxx",
        "spark.wxd.api.endpoint":"<wxd-endpoint>",
        "spark.driver.extraClassPath":"opt/ibm/connectors/wxd/spark-authz/cpg-client-1.0-jar-with-dependencies.jar:/opt/ibm/connectors/wxd/spark-authz/ibmsparkacextension_2.12-1.0.jar",
        "spark.sql.extensions":"<required-storage-support-extension>,authz.IBMSparkACExtension"

    },
    "application": "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/<python_file_name>",
  }
}

参数值:

  • <region>:实例所在的区域。 例如,us-south 地区。
  • <spark_engine_id>:Spark 实例的唯一标识符。 有关如何检索ID的信息,请参阅 Spark引擎的详细信息
  • <token> 获取服务实例的访问令牌。 有关生成令牌的更多信息,请参阅 生成令牌
  • <instance_id> 实例 ID:来自 watsonx.data 集群实例 URL 的实例 ID。 例如,crn:v1:staging:public:lakehouse:us-south:a/7bb9e380dc0c4bc284592b97d5095d3c:5b602d6a-847a-469d-bece-0a29124588c0::。
  • <wxd-data-bucket-name>:基础设施管理器中与Spark引擎关联的数据桶名称。
  • <wxd-data-bucket-endpoint>:用于访问上述数据桶的端点主机名。 例如,s3.us-south.cloud-object-storage.appdomain.cloud用于 us-south 区域的云对象存储桶。
  • <wxd-bucket-catalog-name>:与数据桶关联的目录名称。
  • <wxd-catalog-metastore-host>:与已注册的邮筒相关联的元存储。
  • <cos_bucket_endpoint> : 提供 Metastore 主机值。 有关详细信息,请参阅 存储详细信息
  • <access_key> : 提供 access_key_id。 有关详细信息,请参阅 存储详细信息
  • <secret_key> : 提供 secret_access_key 密钥。 有关详细信息,请参阅 存储详细信息
  • <truststore_path>:提供上传信任核心证书的 COS 路径。 例如 cos://di-bucket.di-test/1902xx-truststore.jks。 有关生成信任核心的更多信息,请参阅 导入自签名证书
  • <cas_endpoint> : 数据访问服务 (DAS) 端点。 要获取 DAS 端点,请参阅 获取 DAS 端点
  • <username> : watsonx.data 实例的用户名。 这里,ibmlhapikey。
  • <apikey>:base64编码的`ibmlhapikey_<user_id>:<IAM_APIKEY>。这里,<user_id> 是用户的 IBM Cloud id,其 apikey 用于访问数据桶。 要生成 API 密钥,请登录 watsonx.data 控制台,并导航至配置文件 > 配置文件和设置 > API 密钥,然后生成一个新的 API 密钥。
  • <OBJECT_NAME>: IBM Cloud Object Storage 名称。
  • <BUCKET_NAME>:应用程序文件所在的存储桶。
  • <COS_SERVICE_NAME>:云对象存储服务名称。
  • <python file name> Spark:Spark 应用程序文件名。

限制:

  • 用户必须拥有创建模式和表的全部权限。
  • 要创建数据策略,必须将目录与 Presto 引擎关联。
  • 如果尝试显示不存在的模式,系统会抛出 nullpointer 问题。
  • 您可以为 Iceberg、Hive 和 Hudi 目录启用 Spark 访问控制扩展。