IBM Cloud Docs
Utilisation du scénario d'utilisation AWS EMR for Spark

Utilisation du scénario d'utilisation AWS EMR for Spark

Cette rubrique fournit la procédure d'exécution d'applications Spark à partir d' Amazon Web Services Elastic MapReduce (AWS EMR) pour obtenir les cas d'utilisation Spark IBM® watsonx.data:

  • ingestion de données
  • requête de données
  • maintenance des tables

Prérequis

Présentation

Pour utiliser des données source qui résident dans des compartiments AWS S3, vous pouvez effectuer l'une des opérations suivantes:

  • configurer l'instance watsonx.data sur AWS
  • configurer une instance IBM Cloud basée sur watsonx.data et inclure un catalogue basé sur un compartiment AWS S3.

Les moteurs de requête watsonx.data peuvent exécuter des requêtes sur des données à partir de compartiments AWS S3. Dans les deux cas, vous pouvez exécuter les opérations d'ingestion de données et de maintenance de schéma Iceberg à l'aide d' AWS EMR Spark.

A propos de l'exemple de cas d'utilisation

L'exemple de fichier python (amazon-lakehouse.py) illustre la création de schéma (amazonschema), de tables et de données d'ingestion. Il prend également en charge les opérations de maintenance de table. Pour plus d'informations sur les fonctionnalités de l'exemple, voir A propos de l'exemple de cas d'utilisation

Exécution de l'exemple de cas d'utilisation

Suivez les étapes pour exécuter l'exemple de fichier Python Spark.

  1. Connectez-vous au cluster AWS EMR. Pour plus d'informations sur l'utilisation de SSH pour la connexion au cluster EMR, voir Configuration du cluster EMR.

  2. Sauvegardez l'exemple de fichier Python suivant.

    Exemple de fichier Python Spark

    
    from pyspark.sql import SparkSession
    import os
    
    def init_spark():
    spark = SparkSession.builder.appName("lh-hms-cloud")\
    .enableHiveSupport().getOrCreate()
    return spark
    
    def create_database(spark):
        # Create a database in the lakehouse catalog
        spark.sql("create database if not exists lakehouse.amazonschema LOCATION 's3a://lakehouse-bucket-amz/'")
    
    def list_databases(spark):
        # list the database under lakehouse catalog
        spark.sql("show databases from lakehouse").show()
    
    def basic_iceberg_table_operations(spark):
        # demonstration: Create a basic Iceberg table, insert some data and then query table
        spark.sql("create table if not exists lakehouse.amazonschema.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
        spark.sql("insert into lakehouse.amazonschema.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
        spark.sql("select * from lakehouse.amazonschema.testTable").show()
    
    def create_table_from_parquet_data(spark):
        # load parquet data into dataframce
        df = spark.read.option("header",True).parquet("s3a://source-bucket-amz/nyc-taxi/yellow_tripdata_2022-01.parquet")
        # write the dataframe into an Iceberg table
        df.writeTo("lakehouse.amazonschema.yellow_taxi_2022").create()
        # describe the table created
        spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25)
        # query the table
        spark.sql('select * from lakehouse.amazonschema.yellow_taxi_2022').count()
    
    def ingest_from_csv_temp_table(spark):
        # load csv data into a dataframe
        csvDF = spark.read.option("header",True).csv("s3a://source-bucket-amz/zipcodes.csv")
        csvDF.createOrReplaceTempView("tempCSVTable")
        # load temporary table into an Iceberg table
        spark.sql('create or replace table lakehouse.amazonschema.zipcodes using iceberg as select * from tempCSVTable')
        # describe the table created
        spark.sql('describe table lakehouse.amazonschema.zipcodes').show(25)
        # query the table
        spark.sql('select * from lakehouse.amazonschema.zipcodes').show()
    
    def ingest_monthly_data(spark):
        df_feb = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-02.parquet")
        df_march = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-03.parquet")
        df_april = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-04.parquet")
        df_may = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-05.parquet")
        df_june = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-06.parquet")
    
        df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
        df_q1_q2.write.insertInto("lakehouse.amazonschema.yellow_taxi_2022")
    
    def perform_table_maintenance_operations(spark):
        # Query the metadata files table to list underlying data files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show()
    
        # There are many smaller files compact them into files of 200MB each using the
        # `rewrite_data_files` Iceberg Spark procedure
        spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'amazonschema.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
    
        # Again, query the metadata files table to list underlying data files; 6 files are compacted
        # to 3 files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show()
    
        # List all the snapshots
        # Expire earlier snapshots. Only latest one with comacted data is required
        # Again, List all the snapshots to see only 1 left
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show()
        #retain only the latest one
        latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
        print (latest_snapshot_committed_at)
        spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'amazonschema.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show()
    
        # Removing Orphan data files
        spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'amazonschema.yellow_taxi_2022')").show(truncate=False)
    
        # Rewriting Manifest Files
        spark.sql(f"CALL lakehouse.system.rewrite_manifests('amazonschema.yellow_taxi_2022')").show()
    
    
    def evolve_schema(spark):
        # demonstration: Schema evolution
        # Add column fare_per_mile to the table
        spark.sql('ALTER TABLE lakehouse.amazonschema.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
        # describe the table
        spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25)
    
    
    def clean_database(spark):
        # clean-up the demo database
        spark.sql('drop table if exists lakehouse.amazonschema.testTable purge')
        spark.sql('drop table if exists lakehouse.amazonschema.zipcodes purge')
        spark.sql('drop table if exists lakehouse.amazonschema.yellow_taxi_2022 purge')
        spark.sql('drop database if exists lakehouse.amazonschema cascade')
    
    def main():
        try:
            spark = init_spark()
            clean_database(spark)
    
            create_database(spark)
            list_databases(spark)
    
            basic_iceberg_table_operations(spark)
    
            # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table
            create_table_from_parquet_data(spark)
            ingest_from_csv_temp_table(spark)
    
            # load data for the month of Feburary to June into the table yellow_taxi_2022 created above
            ingest_monthly_data(spark)
    
            # demonstration: Table maintenance
            perform_table_maintenance_operations(spark)
    
            # demonstration: Schema evolution
            evolve_schema(spark)
        finally:
            # clean-up the demo database
            #clean_database(spark)
            spark.stop()
    
    if __name__ == '__main__':
      main()
    
    
  3. Exécutez les commandes suivantes pour télécharger le fichier JAR du service de métadonnées sur votre poste de travail :

    Le fichier JAR doit être présent dans l'emplacement /home/hadoop sur tous les noeuds du cluster. Notez les adresses spark.driver.extraClassPath et spark.executor.extraClassPath.

    
    wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-exec-2.3.9-core.jar
    wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-common-2.3.9.jar
    wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-metastore-2.3.9.jar
    
  4. Configurez les détails de la connexion MDS dans le cluster AWS EMR pour vous connecter au watsonx.data Metadata Service (MDS). Un exemple de commande pour utiliser spark-submit à partir d'un cluster basé sur EMR-6.12.0 (Spark 3.4.1 ) est le suivant :

    Exécutez la commande à partir d'EMR sur le cluster EC2 pour soumettre l'exemple de travail Spark.

    
    spark-submit \
    --deploy-mode cluster \
    --jars https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.4.0/iceberg-spark-runtime-3.4_2.12-1.4.0.jar,/usr/lib/hadoop/hadoop-aws.jar,/usr/share/aws/aws-java-sdk/aws-java-sdk-bundle*.jar,/usr/lib/hadoop-lzo/lib/* \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.driver.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \
    --conf spark.executor.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.iceberg.vectorization.enabled=false \
    --conf spark.sql.catalog.lakehouse=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.lakehouse.type=hive \
    --conf spark.hive.metastore.uris==<<change_endpoint>> \
    --conf spark.hive.metastore.client.auth.mode=PLAIN \
    --conf spark.hive.metastore.client.plain.username=ibmlhapikey \
    --conf spark.hive.metastore.client.plain.password=<<change_pswd>> \
    --conf spark.hive.metastore.use.SSL=true \
    --conf spark.hive.metastore.truststore.type=JKS \
    --conf spark.hive.metastore.truststore.path=file:///etc/pki/java/cacerts \
    --conf spark.hive.metastore.truststore.password=changeit \
    amazon-lakehouse.py
    

Valeurs des paramètres :

Pour exécuter le fichier python Spark en utilisant le cluster EMR-6.15.0 (Spark 3.4), téléchargez les jars iceberg à partir de l'emplacement et suivez la même procédure.