IBM Cloud Docs
Travailler avec le catalogue Delta Lake

Travailler avec le catalogue Delta Lake

Cette rubrique décrit la procédure d'exécution d'une application Spark qui ingère des données dans un catalogue Delta Lake.

  1. Créer un stockage avec le catalogue Delta Lake pour stocker les données utilisées dans l'application Spark. Pour créer un stockage avec le catalogue Delta Lake, voir Ajout d'une paire stockage-catalogue.

  2. Associez le stockage au moteur Spark externe. Pour plus d'informations, voir Association d'un catalogue à un moteur.

  3. Créez Cloud Object Storage (COS) pour stocker l'application Spark. Pour créer Cloud Object Storage et un compartiment, voir Création d'un compartiment de stockage.

  4. Enregistrez Cloud Object Storage dans watsonx.data. Pour plus d'informations, voir Ajout d'une paire stockage-catalogue.

  5. Sauvegardez l'application Spark suivante (fichierPython ) sur votre machine locale. Ici, delta_demo.py.

    L'application Spark Python présente les fonctionnalités suivantes:

    • Il crée une base de données dans le catalogue Delta Lake (que vous avez créé pour stocker les données). Ici, <database_name>.
    • Il crée une table dans la base de données <database_name>, à savoir <table_name>.
    • Il insère des données dans <table_name> et effectue une opération de requête SELECT.
    • Elle supprime la table et le schéma après utilisation.
    from pyspark.sql import SparkSession
    import os
    
    def init_spark():
        spark = SparkSession.builder.appName("lh-hms-cloud")\
        .enableHiveSupport().getOrCreate()
    
        return spark
    
    def main():
        spark = init_spark()
        spark.sql("show databases").show()
        spark.sql("create database if not exists spark_catalog.<database_name> LOCATION 's3a://<data_storage_name>/'").show()
        spark.sql("create table if not exists spark_catalog.<database_name>.<table_name> (id bigint, name string, location string) USING DELTA").show()
        spark.sql("insert into spark_catalog.<database_name>.<table_name> VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show()
        spark.sql("select * from spark_catalog.<database_name>.<table_name>").show()
        spark.sql("drop table spark_catalog.<database_name>.<table_name>").show()
        spark.sql("drop schema spark_catalog.<database_name> CASCADE").show()
        spark.stop()
    
    if __name__ == '__main__':
        main()
    
  6. Téléchargez l'application Spark dans COS, voir Téléchargement de données.

  7. Pour soumettre l'application Spark avec des données résidant dans Cloud Object Storage, spécifiez les valeurs de paramètre et exécutez la commande curl suivante:

    curl --request POST \
    --url https://api.<region>.ae.cloud.ibm.com/v3/analytics_engines/<iae-instance-guid>/spark_applications \
    --header 'Authorization: Bearer <token>' \
    --header 'Content-Type: application/json' \
    --header 'LhInstanceId: <instance_id>' \
    --data '{
        "conf": {
                "spark.hadoop.fs.s3a.bucket.<data_storage_name>.access.key" : "<data_bucket_access_key>",
                "spark.hadoop.fs.s3a.bucket.<data_storage_name>.secret.key" : "<data_bucket_secret_key>",
                "spark.hadoop.fs.s3a.bucket.<data_storage_name>.endpoint": "<your_data_bucket_direct_endpoint>",
                "spark.sql.catalogImplementation" : "hive",
                "spark.sql.extensions" : "io.delta.sql.DeltaSparkSessionExtension",
                "spark.serializer" : "org.apache.spark.serializer.KryoSerializer",
                "spark.hadoop.hive.metastore.schema.verification" : "false",
                "spark.hadoop.hive.metastore.schema.verification.record.version" : "false",
                "spark.hadoop.datanucleus.schema.autoCreateTables" : "false",
                "spark.sql.catalog.spark_catalog" : "org.apache.spark.sql.delta.catalog.DeltaCatalog",
                "spark.sql.catalog.spark_catalog.type" : "hive",
                "spark.hive.metastore.uris" : "<metastore URL>",
                "spark.hive.metastore.use.SSL" : "true",
                "spark.hive.metastore.truststore.path" : "file:///opt/ibm/jdk/lib/security/cacerts",
                "spark.hive.metastore.truststore.password" : "changeit",
                "spark.hive.metastore.truststore.type" : "JKS",
                "spark.hive.metastore.client.auth.mode" : "PLAIN",
                "spark.hive.metastore.client.plain.username" : "ibmlhapikey",
                "spark.hive.metastore.client.plain.password" : "<wxd_api_key>",
                "spark.hadoop.fs.s3a.path.style.access" : "true"
        },
    
        "application": "s3a://<data_storage_name>/delta_demo.py"
        }
    

Valeurs des paramètres :

  1. Une fois que vous avez soumis l'application Spark, vous recevez un message de confirmation avec l'ID de l'application et la version de Spark. Sauvegardez-le pour référence.
  2. Connectez-vous au cluster watsonx.data et accédez à la page des détails du moteur. Dans l'onglet Applications, utilisez l'ID application pour répertorier l'application et suivre les étapes. Pour plus d'informations, voir Affichage et gestion des applications.