Migliorare la presentazione delle applicazioni Spark utilizzando l'estensione del controllo degli accessi di Spark
Quando si presenta un'applicazione Spark che utilizza i bucket di archiviazione esterni registrati in watsonx.data, l'estensione del controllo degli accessi di Spark consente un'autorizzazione aggiuntiva, migliorando così la sicurezza. Se si abilita l'estensione nella configurazione di spark, solo gli utenti autorizzati possono accedere e gestire i cataloghi watsonx.data attraverso i lavori Spark.
È possibile abilitare l'estensione di controllo degli accessi di Spark per i cataloghi Iceberg, Hive e Hudi.
È possibile utilizzare le politiche dei dati di Ranger o Access Management System (AMS) per concedere o negare l'accesso a utenti, gruppi di utenti, cataloghi (Iceberg, Hive, Hudi), schemi, tabelle e colonne. Oltre all'autorizzazione a livello di dati, si considera anche il privilegio di archiviazione. Per ulteriori informazioni relative all'utilizzo di AMS su cataloghi (Iceberg, Hive e Hudi), bucket, schemi e tabelle, vedere Gestione di ruoli e privilegi. Per ulteriori informazioni su come creare criteri Ranger (definiti in Servizio SQL Hadoop ) e abilitarli su cataloghi (Iceberg, Hive e Hudi), bucket, schemi e tabelle, vedere Gestione dei criteri Ranger.
Prerequisiti
- Creare Cloud Object Storage per memorizzare i dati utilizzati nell'applicazione Spark. Per creare Cloud Object Storage e un bucket, vedere Creazione di un bucket di archiviazione. È possibile fornire due bucket, data-bucket per memorizzare le tabelle watsonx.data e application bucket per mantenere il codice dell'applicazione Spark.
- Registra il bucket Cloud Object Storage in watsonx.data. Per ulteriori informazioni, vedere Aggiungi una coppia di cataloghi di bucket.
- Caricare l'applicazione Spark sullo storage, vedere Caricamento dei dati.
- È necessario avere il ruolo di amministratore IAM o il ruolo MetastoreAdmin, per creare uno schema o una tabella all'interno di watsonx.data.
Procedura
L'estensione del controllo degli accessi di Spark supporta il motore Spark esterno.
-
Per abilitare l'estensione del controllo degli accessi di Spark, è necessario aggiornare la configurazione di Spark con
add authz.IBMSparkACExtension to spark.sql.extensions
. -
Salvate la seguente applicazione Python come iceberg.py.
Iceberg è considerato un esempio. È possibile utilizzare anche i cataloghi Hive e Hudi.
from pyspark.sql import SparkSession
import os
def init_spark():
spark = SparkSession.builder \
.appName("lh-spark-app") \
.enableHiveSupport() \
.getOrCreate()
return spark
def create_database(spark):
# Create a database in the lakehouse catalog
spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")
def list_databases(spark):
# list the database under lakehouse catalog
spark.sql("show databases from lakehouse").show()
def basic_iceberg_table_operations(spark):
# demonstration: Create a basic Iceberg table, insert some data and then query table
spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
spark.sql("select * from lakehouse.demodb.testTable").show()
def create_table_from_parquet_data(spark):
# load parquet data into dataframe
df = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-01.parquet")
# write the dataframe into an Iceberg table
df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
# describe the table created
spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
# query the table
spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()
def ingest_from_csv_temp_table(spark):
# load csv data into a dataframe
csvDF = spark.read.option("header",True).csv("file:///spark-vol/zipcodes.csv")
csvDF.createOrReplaceTempView("tempCSVTable")
# load temporary table into an Iceberg table
spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
# describe the table created
spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
# query the table
spark.sql('select * from lakehouse.demodb.zipcodes').show()
def ingest_monthly_data(spark):
df_feb = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-02.parquet")
df_march = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-03.parquet")
df_april = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-04.parquet")
df_may = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-05.parquet")
df_june = spark.read.option("header",True).parquet("file:///spark-vol/yellow_tripdata_2022-06.parquet")
df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")
def perform_table_maintenance_operations(spark):
# Query the metadata files table to list underlying data files
spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
# There are many smaller files compact them into files of 200MB each using the
# `rewrite_data_files` Iceberg Spark procedure
spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
# Again, query the metadata files table to list underlying data files; 6 files are compacted
# to 3 files
spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
# List all the snapshots
# Expire earlier snapshots. Only latest one with compacted data is required
# Again, List all the snapshots to see only 1 left
spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
#retain only the latest one
latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
print (latest_snapshot_committed_at)
spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
# Removing Orphan data files
spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)
# Rewriting Manifest Files
spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()
def evolve_schema(spark):
# demonstration: Schema evolution
# Add column fare_per_mile to the table
spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
# describe the table
spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
def clean_database(spark):
# clean-up the demo database
spark.sql('drop table if exists lakehouse.demodb.testTable purge')
spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
spark.sql('drop database if exists lakehouse.demodb cascade')
def main():
try:
spark = init_spark()
create_database(spark)
list_databases(spark)
basic_iceberg_table_operations(spark)
# demonstration: Ingest parquet and csv data into a watsonx.data Iceberg table
create_table_from_parquet_data(spark)
ingest_from_csv_temp_table(spark)
# load data for the month of February to June into the table yellow_taxi_2022 created above
ingest_monthly_data(spark)
# demonstration: Table maintenance
perform_table_maintenance_operations(spark)
# demonstration: Schema evolution
evolve_schema(spark)
finally:
# clean-up the demo database
clean_database(spark)
spark.stop()
if __name__ == '__main__':
main()
- Per inviare l'applicazione Spark, specificare i valori dei parametri ed eseguire il seguente comando curl. L'esempio seguente mostra il comando per inviare l'applicazione iceberg.py.
curl --request POST --url https://<region>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications --header 'Authorization: Bearer <token>' --header 'Content-Type: application/json' --header 'Lhinstanceid: <instance_id>' --data '{
"application_details": {
"conf": {
"spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.endpoint": "<wxd-data-bucket-endpoint>",
"spark.hadoop.fs.cos.<COS_SERVICE_NAME>.endpoint": "<COS_ENDPOINT>",
"spark.hadoop.fs.cos.<COS_SERVICE_NAME>.secret.key": "<COS_SECRET_KEY>",
"spark.hadoop.fs.cos.<COS_SERVICE_NAME>.access.key": "<COS_ACCESS_KEY>"
"spark.sql.catalogImplementation": "hive",
"spark.sql.iceberg.vectorization.enabled":"false",
"spark.sql.catalog.<wxd-bucket-catalog-name>":"org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.<wxd-bucket-catalog-name>.type":"hive",
"spark.sql.catalog.<wxd-bucket-catalog-name>.uri":"thrift://<wxd-catalog-metastore-host>",
"spark.hive.metastore.client.auth.mode":"PLAIN",
"spark.hive.metastore.client.plain.username":"<username>",
"spark.hive.metastore.client.plain.password":"xxx",
"spark.hive.metastore.use.SSL":"true",
"spark.hive.metastore.truststore.type":"JKS",
"spark.hive.metastore.truststore.path":"<truststore_path>",
"spark.hive.metastore.truststore.password":"changeit",
"spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.aws.credentials.provider":"com.ibm.iae.s3.credentialprovider.WatsonxCredentialsProvider",
"spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.custom.signers":"WatsonxAWSV4Signer:com.ibm.iae.s3.credentialprovider.WatsonxAWSV4Signer",
"spark.hadoop.fs.s3a.bucket.<wxd-data-bucket-name>.s3.signing-algorithm":"WatsonxAWSV4Signer",
"spark.hadoop.wxd.cas.endpoint":"<cas_endpoint>/cas/v1/signature",
"spark.hadoop.wxd.instanceId":"<instance_crn>",
"spark.hadoop.wxd.apikey":"Basic xxx",
"spark.wxd.api.endpoint":"<wxd-endpoint>",
"spark.driver.extraClassPath":"opt/ibm/connectors/wxd/spark-authz/cpg-client-1.0-jar-with-dependencies.jar:/opt/ibm/connectors/wxd/spark-authz/ibmsparkacextension_2.12-1.0.jar",
"spark.sql.extensions":"<required-storage-support-extension>,authz.IBMSparkACExtension"
},
"application": "cos://<BUCKET_NAME>.<COS_SERVICE_NAME>/<python_file_name>",
}
}
Valori di parametro:
<region>
regione in cui l'istanza è stata fornita. Esempio, regione dell'us-south
.<spark_engine_id>
l'identificatore univoco dell'istanza Spark. Per informazioni su come recuperare l'ID, vedere Gestione dei dettagli del motore Spark.<token>
per ottenere il token di accesso per l'istanza del servizio. Per ulteriori informazioni sulla generazione del token, vedere Generazione di un token.<instance_id>
l'ID dell'istanza dall' URL dell'istanza del cluster watsonx.data. Example, crn:v1:staging:public:lakehouse:us-south:a/7bb9e380dc0c4bc284592b97d5095d3c:5b602d6a-847a-469d-bece-0a29124588c0::.<wxd-data-bucket-name>
: Il nome del bucket di dati associato al motore a scoppio dal gestore dell'infrastruttura.<wxd-data-bucket-endpoint>
: Il nome dell'host dell'endpoint per l'accesso al bucket dei dati di cui sopra. Esempio, s3.us-south.cloud-object-storage.appdomain.cloud per un bucket di archiviazione di oggetti Cloud nella regione us-south.<wxd-bucket-catalog-name>
: Il nome del catalogo associato al bucket dei dati.<wxd-catalog-metastore-host>
: Il metastore associato al bucket registrato.<cos_bucket_endpoint>
: fornire il valore dell'host del metastore. Per ulteriori informazioni, vedere dettagli sulla memorizzazione.<access_key>
: fornire l'access_key_id. Per ulteriori informazioni, vedere dettagli sulla memorizzazione.<secret_key>
: fornire la chiave_di_accesso_segreta. Per ulteriori informazioni, vedere dettagli sulla memorizzazione.<truststore_path>
: fornire il percorso COS in cui viene caricato il certificato trustore. Ad esempiocos://di-bucket.di-test/1902xx-truststore.jks
. Per ulteriori informazioni sulla generazione del trustore, vedere Importazione di certificati autofirmati.<cas_endpoint>
: L'endpoint del Data Access Service (DAS). Per ottenere l'endpoint DAS, vedere Attribuzione dell'endpoint DAS.<username>
: Il nome utente dell'istanza watsonx.data. Qui, ibmlhapikey.<apikey>
: La codifica base64 `ibmlhapikey_<user_id>:<IAM_APIKEY>. Qui, <user_id> è l'id IBM Cloud dell'utente la cui apikey è usata per accedere al bucket dei dati. Per generare la chiave API, accedere alla console watsonx.data e navigare in Profilo > Profilo e impostazioni > Chiavi API e generare una nuova chiave API.<OBJECT_NAME>
: Il nome IBM Cloud Object Storage.<BUCKET_NAME>
: Il bucket di archiviazione in cui risiede il file dell'applicazione.<COS_SERVICE_NAME>
: Il nome del servizio Cloud object Storage.<python file name>
il nome del file dell'applicazione Spark.
Limitazioni:
- L'utente deve avere accesso completo alla creazione di schemi e tabelle.
- Per creare un criterio di dati, è necessario associare il catalogo al motore Presto.
- Se si tenta di visualizzare uno schema che non esiste, il sistema lancia un problema di nullpointer.
- È possibile abilitare l'estensione di controllo degli accessi di Spark per i cataloghi Iceberg, Hive e Hudi.