使用 Spark 访问控制扩展加强 Spark 应用程序的提交
适用于:火花机
当您提交使用在 watsonx.data 中注册的外部存储桶的 Spark 应用程序时,Spark 访问控制扩展允许额外的授权,从而提高安全性。 如果在 spark 配置中启用扩展,则只有授权用户才能通过 Spark 作业访问和操作 watsonx.data 目录。
您可以为 Iceberg、Hive 和 Hudi 目录启用 Spark 访问控制扩展。
您可以使用Ranger或访问管理系统(AMS)数据策略来授予或拒绝用户、用户组、目录(Iceberg、Hive 和Hudi)、架构、表格和列的访问权限。 除了数据级授权,存储权限也在考虑之列。 如需了解有关在目录(Iceberg、Hive 和Hudi)、数据集、架构和表格中使用AMS的更多信息,请参阅 管理角色和权限。 有关如何创建 Ranger 策略(在 Hadoop SQL 服务下定义)以及在目录(Iceberg、Hive 和 Hudi)、存储桶、架构和表格中启用这些策略的更多信息,请参阅 《 管理 Ranger 策略 》。
先决条件
- 创建 Cloud Object Storage,以存储 Spark 应用程序中使用的数据。 要创建 Cloud Object Storage和存储桶,请参阅 创建存储桶。 您可以提供两个存储桶,数据存储桶用于存储 watsonx.data 表,应用程序存储桶用于维护 Spark 应用程序代码。
- 在 watsonx.data 中注册 Cloud Object Storage 存储桶。 有关更多信息,请参阅 添加桶目录对。
- 将 Spark 应用程序上传到存储器,请参阅 上传数据。
- 您必须具有 IAM 管理员角色或 MetastoreAdmin 角色,才能在 watsonx.data 内创建模式或表。
过程
Spark 访问控制扩展支持本地 Spark 引擎。
-
要启用 Spark 访问控制扩展,必须使用
add authz.IBMSparkACExtension to spark.sql.extensions
更新 Spark 配置。 -
将以下 Python 应用程序保存为 iceberg.py。
冰山就是一个例子。 您还可以使用 Hive 和 Hudi 目录。
from pyspark.sql import SparkSession
import os
def init_spark():
spark = SparkSession.builder \
.appName("lh-spark-app") \
.enableHiveSupport() \
.getOrCreate()
return spark
def create_database(spark):
# Create a database in the lakehouse catalog
spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")
def list_databases(spark):
# list the database under lakehouse catalog
spark.sql("show databases from lakehouse").show()
def basic_iceberg_table_operations(spark):
# demonstration: Create a basic Iceberg table, insert some data and then query table
spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
spark.sql("select * from lakehouse.demodb.testTable").show()
def create_table_from_parquet_data(spark):
# load parquet data into dataframe
df = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-01.parquet")
# write the dataframe into an Iceberg table
df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
# describe the table created
spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
# query the table
spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()
def ingest_from_csv_temp_table(spark):
# load csv data into a dataframe
csvDF = spark.read.option("header",True).csv("file:///spark-vol/zipcodes.csv")
csvDF.createOrReplaceTempView("tempCSVTable")
# load temporary table into an Iceberg table
spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
# describe the table created
spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
# query the table
spark.sql('select * from lakehouse.demodb.zipcodes').show()
def ingest_monthly_data(spark):
df_feb = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-02.parquet")
df_march = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-03.parquet")
df_april = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-04.parquet")
df_may = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-05.parquet")
df_june = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-06.parquet")
df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")
def perform_table_maintenance_operations(spark):
# Query the metadata files table to list underlying data files
spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
# There are many smaller files compact them into files of 200MB each using the
# `rewrite_data_files` Iceberg Spark procedure
spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
# Again, query the metadata files table to list underlying data files; 6 files are compacted
# to 3 files
spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
# List all the snapshots
# Expire earlier snapshots. Only latest one with compacted data is required
# Again, List all the snapshots to see only 1 left
spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
#retain only the latest one
latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
print (latest_snapshot_committed_at)
spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
# Removing Orphan data files
spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)
# Rewriting Manifest Files
spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()
def evolve_schema(spark):
# demonstration: Schema evolution
# Add column fare_per_mile to the table
spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
# describe the table
spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
def clean_database(spark):
# clean-up the demo database
spark.sql('drop table if exists lakehouse.demodb.testTable purge')
spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
spark.sql('drop database if exists lakehouse.demodb cascade')
def main():
try:
spark = init_spark()
create_database(spark)
list_databases(spark)
basic_iceberg_table_operations(spark)
# demonstration: Ingest parquet and csv data into a watsonx.data Iceberg table
create_table_from_parquet_data(spark)
ingest_from_csv_temp_table(spark)
# load data for the month of February to June into the table yellow_taxi_2022 created above
ingest_monthly_data(spark)
# demonstration: Table maintenance
perform_table_maintenance_operations(spark)
# demonstration: Schema evolution
evolve_schema(spark)
finally:
# clean-up the demo database
clean_database(spark)
spark.stop()
if __name__ == '__main__':
main()
- 要提交 Spark 应用程序,请指定参数值并运行以下 curl 命令。 下面的示例显示了提交 iceberg.py 应用程序的命令。
curl --request POST --url https://<region>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications \
--header 'Authorization: Bearer <token>' --header 'Content-Type: application/json' --header 'Lhinstanceid: <instance_id>' --data '{
"application_details": {
"conf": {
"spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.endpoint": "<wxd-data-bucket-endpoint>",
"spark.hadoop.fs.cos.<COS_SERVICE_NAME>.endpoint": "<COS_ENDPOINT>",
"spark.hadoop.fs.cos.<COS_SERVICE_NAME>.secret.key": "<COS_SECRET_KEY>",
"spark.hadoop.fs.cos.<COS_SERVICE_NAME>.access.key": "<COS_ACCESS_KEY>"
"spark.hadoop.wxd.apiKey":"Basic xxx",
"spark.sql.extensions":"<required-storage-support-extension>,authz.IBMSparkACExtension"
},
"application": "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/<python_file_name>",
}
}
参数值:
<token>
获取服务实例的访问令牌。 有关生成令牌的更多信息,请参阅 生成令牌。<instance_id>
实例 ID:watsonx.data 集群实例 URL 中的实例 ID。 例如,crn:v1:staging:public:lakehouse:us-south:a/7bb9e380dc0c4bc284592b97d5095d3c:5b602d6a-847a-469d-bece-0a29124588c0::。<wxd-data-bucket-endpoint>
:用于访问上述数据桶的端点主机名。 例如,s3.us-south.cloud-object-storage.appdomain.cloud用于 us-south 区域的云对象存储桶。<COS_SERVICE_NAME>
:提供云对象存储服务名称。<COS_ENDPOINT>
提供公共端点。 更多信息,请参阅 终端。<access_key>
: 提供 access_key_id。 更多信息,请参阅 资质证书。<secret_key>
: 提供 secret_access_key 密钥。 更多信息,请参阅 资质证书。<BUCKET_NAME>
:应用程序文件所在的存储桶。<python_file_name>
Spark:Spark 应用程序文件名。
限制:
- 用户必须拥有创建模式和表的全部权限。
- 要创建数据策略,必须将目录与 Presto 引擎关联。
- 如果尝试显示不存在的模式,系统会抛出 nullpointer 问题。
- 您可以为 Iceberg、Hive 和 Hudi 目录启用 Spark 访问控制扩展。