Travailler avec le catalogue Delta Lake
Cette rubrique décrit la procédure d'exécution d'une application Spark qui ingère des données dans un catalogue Delta Lake.
-
Créer un stockage avec le catalogue Delta Lake pour stocker les données utilisées dans l'application Spark. Pour créer un stockage avec le catalogue Delta Lake, voir Ajout d'une paire stockage-catalogue.
-
Associez le stockage au moteur Spark externe. Pour plus d'informations, voir Association d'un catalogue à un moteur.
-
Créez Cloud Object Storage (COS) pour stocker l'application Spark. Pour créer Cloud Object Storage et un compartiment, voir Création d'un compartiment de stockage.
-
Enregistrez Cloud Object Storage dans watsonx.data. Pour plus d'informations, voir Ajout d'une paire stockage-catalogue.
-
Sauvegardez l'application Spark suivante (fichierPython ) sur votre machine locale. Ici,
delta_demo.py
.L'application Spark Python présente les fonctionnalités suivantes:
- Il crée une base de données dans le catalogue Delta Lake (que vous avez créé pour stocker les données). Ici,
<database_name>
. - Il crée une table dans la base de données
<database_name>
, à savoir<table_name>
. - Il insère des données dans
<table_name>
et effectue une opération de requête SELECT. - Elle supprime la table et le schéma après utilisation.
from pyspark.sql import SparkSession import os def init_spark(): spark = SparkSession.builder.appName("lh-hms-cloud")\ .enableHiveSupport().getOrCreate() return spark def main(): spark = init_spark() spark.sql("show databases").show() spark.sql("create database if not exists spark_catalog.<database_name> LOCATION 's3a://<data_storage_name>/'").show() spark.sql("create table if not exists spark_catalog.<database_name>.<table_name> (id bigint, name string, location string) USING DELTA").show() spark.sql("insert into spark_catalog.<database_name>.<table_name> VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show() spark.sql("select * from spark_catalog.<database_name>.<table_name>").show() spark.sql("drop table spark_catalog.<database_name>.<table_name>").show() spark.sql("drop schema spark_catalog.<database_name> CASCADE").show() spark.stop() if __name__ == '__main__': main()
- Il crée une base de données dans le catalogue Delta Lake (que vous avez créé pour stocker les données). Ici,
-
Téléchargez l'application Spark dans COS, voir Téléchargement de données.
-
Pour soumettre l'application Spark avec des données résidant dans Cloud Object Storage, spécifiez les valeurs de paramètre et exécutez la commande curl suivante:
curl --request POST \ --url https://api.<region>.ae.cloud.ibm.com/v3/analytics_engines/<iae-instance-guid>/spark_applications \ --header 'Authorization: Bearer <token>' \ --header 'Content-Type: application/json' \ --header 'LhInstanceId: <instance_id>' \ --data '{ "conf": { "spark.hadoop.fs.s3a.bucket.<data_storage_name>.access.key" : "<data_bucket_access_key>", "spark.hadoop.fs.s3a.bucket.<data_storage_name>.secret.key" : "<data_bucket_secret_key>", "spark.hadoop.fs.s3a.bucket.<data_storage_name>.endpoint": "<your_data_bucket_direct_endpoint>", "spark.sql.catalogImplementation" : "hive", "spark.sql.extensions" : "io.delta.sql.DeltaSparkSessionExtension", "spark.serializer" : "org.apache.spark.serializer.KryoSerializer", "spark.hadoop.hive.metastore.schema.verification" : "false", "spark.hadoop.hive.metastore.schema.verification.record.version" : "false", "spark.hadoop.datanucleus.schema.autoCreateTables" : "false", "spark.sql.catalog.spark_catalog" : "org.apache.spark.sql.delta.catalog.DeltaCatalog", "spark.sql.catalog.spark_catalog.type" : "hive", "spark.hive.metastore.uris" : "<metastore URL>", "spark.hive.metastore.use.SSL" : "true", "spark.hive.metastore.truststore.path" : "file:///opt/ibm/jdk/lib/security/cacerts", "spark.hive.metastore.truststore.password" : "changeit", "spark.hive.metastore.truststore.type" : "JKS", "spark.hive.metastore.client.auth.mode" : "PLAIN", "spark.hive.metastore.client.plain.username" : "ibmlhapikey", "spark.hive.metastore.client.plain.password" : "<wxd_api_key>", "spark.hadoop.fs.s3a.path.style.access" : "true" }, "application": "s3a://<data_storage_name>/delta_demo.py" }
Valeurs des paramètres :
-
<region>
: région dans laquelle vous mettez à disposition l'instance de moteur d'analyse. -
<iae-instance-guid>
: identificateur global unique de l'instance du moteur d'analyse. Pour cela, voir Extraction des détails d'une instance sans serveur. -
<token>
: Le jeton du porteur. Pour plus d'informations sur la génération du jeton, voir Jeton IAM. -
<your_data_bucket_direct_endpoint>
: noeud final direct permettant d'accéder au compartiment de données. Exemple: s3.us-south.cloud-object-storage.appdomain.cloud pour un compartiment Cloud Object Storage dans la région us-south. Pour plus d'informations, voir Données d'identification du service. -
<data_bucket_access_key>
: clé d'accès pour le stockage d'objets Cloud (stockage de données). Pour plus d'informations, voir Création de données d'identification HMAC à l'aide de l'interface de ligne de commande. -
<data_bucket_secret_key>
: clé secrète pour le stockage d'objets Cloud (stockage de données). Pour plus d'informations, voir Création de données d'identification HMAC à l'aide de l'interface de ligne de commande. -
<metastore URL>
: L' URL du catalogue. Pour plus d'informations, voir Obtenir le point de terminaison MDS. -
<wxd_api_key>
: pour générer une clé d'API, connectez-vous à la console watsonx.data et accédez à Profil > Profil et paramètres > Clés d'API et générez une nouvelle clé d'API.
- Une fois que vous avez soumis l'application Spark, vous recevez un message de confirmation avec l'ID de l'application et la version de Spark. Sauvegardez-le pour référence.
- Connectez-vous au cluster watsonx.data et accédez à la page des détails du moteur. Dans l'onglet Applications, utilisez l'ID application pour répertorier l'application et suivre les étapes. Pour plus d'informations, voir Affichage et gestion des applications.