使用 Spark 访问控制扩展加强 Spark 应用程序的提交
当您提交使用在 watsonx.data 中注册的外部存储桶的 Spark 应用程序时,Spark 访问控制扩展允许额外的授权,从而提高安全性。 如果在 spark 配置中启用扩展,则只有授权用户才能通过 Spark 作业访问和操作 watsonx.data 目录。
您可以为 Iceberg、Hive 和 Hudi 目录启用 Spark 访问控制扩展。
您可以使用Ranger或访问管理系统(AMS)数据策略来授予或拒绝用户、用户组、目录(Iceberg、Hive 和Hudi)、架构、表格和列的访问权限。 除了数据级授权,存储权限也在考虑之列。 如需了解有关在目录(Iceberg、Hive 和Hudi)、数据集、架构和表格中使用AMS的更多信息,请参阅 管理角色和权限。 有关如何创建 Ranger 策略(在 Hadoop SQL 服务下定义)以及在目录(Iceberg、Hive 和 Hudi)、存储桶、架构和表格中启用这些策略的更多信息,请参阅 《 管理 Ranger 策略 》。
先决条件
- 创建 Cloud Object Storage,以存储 Spark 应用程序中使用的数据。 要创建 Cloud Object Storage和存储桶,请参阅 创建存储桶。 您可以提供两个存储桶,数据存储桶用于存储 watsonx.data 表,应用程序存储桶用于维护 Spark 应用程序代码。
- 在 watsonx.data 中注册 Cloud Object Storage 存储桶。 有关更多信息,请参阅 添加桶目录对。
- 将 Spark 应用程序上传到存储器,请参阅 上传数据。
- 您必须具有 IAM 管理员角色或 MetastoreAdmin 角色,才能在 watsonx.data 内创建模式或表。
过程
Spark 访问控制扩展支持外部 Spark 引擎。
-
要启用 Spark 访问控制扩展,必须使用
add authz.IBMSparkACExtension to spark.sql.extensions
更新 Spark 配置。 -
将以下 Python 应用程序保存为 iceberg.py。
冰山就是一个例子。 您还可以使用 Hive 和 Hudi 目录。
from pyspark.sql import SparkSession
import os
def init_spark():
spark = SparkSession.builder \
.appName("lh-spark-app") \
.enableHiveSupport() \
.getOrCreate()
return spark
def create_database(spark):
# Create a database in the lakehouse catalog
spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")
def list_databases(spark):
# list the database under lakehouse catalog
spark.sql("show databases from lakehouse").show()
def basic_iceberg_table_operations(spark):
# demonstration: Create a basic Iceberg table, insert some data and then query table
spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
spark.sql("select * from lakehouse.demodb.testTable").show()
def create_table_from_parquet_data(spark):
# load parquet data into dataframe
df = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-01.parquet")
# write the dataframe into an Iceberg table
df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
# describe the table created
spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
# query the table
spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()
def ingest_from_csv_temp_table(spark):
# load csv data into a dataframe
csvDF = spark.read.option("header",True).csv("file:///spark-vol/zipcodes.csv")
csvDF.createOrReplaceTempView("tempCSVTable")
# load temporary table into an Iceberg table
spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
# describe the table created
spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
# query the table
spark.sql('select * from lakehouse.demodb.zipcodes').show()
def ingest_monthly_data(spark):
df_feb = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-02.parquet")
df_march = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-03.parquet")
df_april = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-04.parquet")
df_may = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-05.parquet")
df_june = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-06.parquet")
df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")
def perform_table_maintenance_operations(spark):
# Query the metadata files table to list underlying data files
spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
# There are many smaller files compact them into files of 200MB each using the
# `rewrite_data_files` Iceberg Spark procedure
spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
# Again, query the metadata files table to list underlying data files; 6 files are compacted
# to 3 files
spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
# List all the snapshots
# Expire earlier snapshots. Only latest one with compacted data is required
# Again, List all the snapshots to see only 1 left
spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
#retain only the latest one
latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
print (latest_snapshot_committed_at)
spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
# Removing Orphan data files
spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)
# Rewriting Manifest Files
spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()
def evolve_schema(spark):
# demonstration: Schema evolution
# Add column fare_per_mile to the table
spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
# describe the table
spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
def clean_database(spark):
# clean-up the demo database
spark.sql('drop table if exists lakehouse.demodb.testTable purge')
spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
spark.sql('drop database if exists lakehouse.demodb cascade')
def main():
try:
spark = init_spark()
create_database(spark)
list_databases(spark)
basic_iceberg_table_operations(spark)
# demonstration: Ingest parquet and csv data into a watsonx.data Iceberg table
create_table_from_parquet_data(spark)
ingest_from_csv_temp_table(spark)
# load data for the month of February to June into the table yellow_taxi_2022 created above
ingest_monthly_data(spark)
# demonstration: Table maintenance
perform_table_maintenance_operations(spark)
# demonstration: Schema evolution
evolve_schema(spark)
finally:
# clean-up the demo database
clean_database(spark)
spark.stop()
if __name__ == '__main__':
main()
- 要提交 Spark 应用程序,请指定参数值并运行以下 curl 命令。 下面的示例显示了提交 iceberg.py 应用程序的命令。
curl --request POST --url https://<region>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications --header 'Authorization: Bearer <token>' --header 'Content-Type: application/json' --header 'Lhinstanceid: <instance_id>' --data '{
"application_details": {
"conf": {
"spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.endpoint": "<wxd-data-bucket-endpoint>",
"spark.hadoop.fs.cos.<COS_SERVICE_NAME>.endpoint": "<COS_ENDPOINT>",
"spark.hadoop.fs.cos.<COS_SERVICE_NAME>.secret.key": "<COS_SECRET_KEY>",
"spark.hadoop.fs.cos.<COS_SERVICE_NAME>.access.key": "<COS_ACCESS_KEY>"
"spark.sql.catalogImplementation": "hive",
"spark.sql.iceberg.vectorization.enabled":"false",
"spark.sql.catalog.<wxd-bucket-catalog-name>":"org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.<wxd-bucket-catalog-name>.type":"hive",
"spark.sql.catalog.<wxd-bucket-catalog-name>.uri":"thrift://<wxd-catalog-metastore-host>",
"spark.hive.metastore.client.auth.mode":"PLAIN",
"spark.hive.metastore.client.plain.username":"<username>",
"spark.hive.metastore.client.plain.password":"xxx",
"spark.hive.metastore.use.SSL":"true",
"spark.hive.metastore.truststore.type":"JKS",
"spark.hive.metastore.truststore.path":"<truststore_path>",
"spark.hive.metastore.truststore.password":"changeit",
"spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.aws.credentials.provider":"com.ibm.iae.s3.credentialprovider.WatsonxCredentialsProvider",
"spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.custom.signers":"WatsonxAWSV4Signer:com.ibm.iae.s3.credentialprovider.WatsonxAWSV4Signer",
"spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.s3.signing-algorithm":"WatsonxAWSV4Signer",
"spark.hadoop.wxd.cas.endpoint":"<cas_endpoint>/cas/v1/signature",
"spark.hadoop.wxd.instanceId":"<instance_crn>",
"spark.hadoop.wxd.apiKey":"Basic xxx",
"spark.wxd.api.endpoint":"<wxd-endpoint>",
"spark.driver.extraClassPath":"opt/ibm/connectors/wxd/spark-authz/cpg-client-1.0-jar-with-dependencies.jar:/opt/ibm/connectors/wxd/spark-authz/ibmsparkacextension_2.12-1.0.jar",
"spark.sql.extensions":"<required-storage-support-extension>,authz.IBMSparkACExtension"
},
"application": "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/<python_file_name>",
}
}
参数值:
<region>
:实例所在的区域。 例如,us-south
地区。<spark_engine_id>
:Spark 实例的唯一标识符。 有关如何检索ID的信息,请参阅 Spark引擎的详细信息。<token>
获取服务实例的访问令牌。 有关生成令牌的更多信息,请参阅 生成令牌。<instance_id>
实例 ID:来自 watsonx.data 集群实例 URL 的实例 ID。 例如,crn:v1:staging:public:lakehouse:us-south:a/7bb9e380dc0c4bc284592b97d5095d3c:5b602d6a-847a-469d-bece-0a29124588c0::。<wxd-data-bucket-name>
:基础设施管理器中与Spark引擎关联的数据桶名称。<wxd-data-bucket-endpoint>
:用于访问上述数据桶的端点主机名。 例如,s3.us-south.cloud-object-storage.appdomain.cloud用于 us-south 区域的云对象存储桶。<wxd-bucket-catalog-name>
:与数据桶关联的目录名称。<wxd-catalog-metastore-host>
:与已注册的邮筒相关联的元存储。<cos_bucket_endpoint>
: 提供 Metastore 主机值。 有关详细信息,请参阅 存储详细信息。<access_key>
: 提供 access_key_id。 有关详细信息,请参阅 存储详细信息。<secret_key>
: 提供 secret_access_key 密钥。 有关详细信息,请参阅 存储详细信息。<truststore_path>
:提供上传信任核心证书的 COS 路径。 例如cos://di-bucket.di-test/1902xx-truststore.jks
。 有关生成信任核心的更多信息,请参阅 导入自签名证书。<cas_endpoint>
: 数据访问服务 (DAS) 端点。 要获取 DAS 端点,请参阅 获取 DAS 端点。<username>
: watsonx.data 实例的用户名。 这里,ibmlhapikey。<apikey>
:base64编码的`ibmlhapikey_<user_id>:<IAM_APIKEY>。这里,<user_id> 是用户的 IBM Cloud id,其 apikey 用于访问数据桶。 要生成 API 密钥,请登录 watsonx.data 控制台,并导航至配置文件 > 配置文件和设置 > API 密钥,然后生成一个新的 API 密钥。<OBJECT_NAME>
: IBM Cloud Object Storage 名称。<BUCKET_NAME>
:应用程序文件所在的存储桶。<COS_SERVICE_NAME>
:云对象存储服务名称。<python file name>
Spark:Spark 应用程序文件名。
限制:
- 用户必须拥有创建模式和表的全部权限。
- 要创建数据策略,必须将目录与 Presto 引擎关联。
- 如果尝试显示不存在的模式,系统会抛出 nullpointer 问题。
- 您可以为 Iceberg、Hive 和 Hudi 目录启用 Spark 访问控制扩展。