IBM Cloud Docs
AWS EMR für Spark-Anwendungsfall verwenden

AWS EMR für Spark-Anwendungsfall verwenden

In diesem Abschnitt wird die Prozedur zum Ausführen von Spark-Anwendungen über Amazon Web Services Elastic MapReduce (AWS EMR) beschrieben, um die Spark-Anwendungsfälle von IBM® watsonx.data zu erreichen:

  • Datenaufnahme
  • Daten abfragen
  • Tabellenpflege

Voraussetzungen

Übersicht

Um mit Quellendaten zu arbeiten, die sich in AWS S3-Buckets befinden, haben Sie eine der folgenden Möglichkeiten:

  • watsonx.data-Instanz auf AWS
  • IBM Cloud-basierte watsonx.data-Instanz konfigurieren und einen auf AWS S3-Bucket basierenden Katalog einschließen.

Die Abfrageengines watsonx.data können Abfragen für Daten aus AWS S3-Buckets ausführen. In beiden Fällen können Sie die Datenaufnahme und die Iceberg-basierten Schemaverwaltungsoperationen mithilfe von AWS EMR Spark ausführen.

Informationen zum Beispielanwendungsfall

Die Python-Beispieldatei (amazon-lakehouse.py) veranschaulicht die Erstellung von Schemas (amazonschema), Tabellen und die Aufnahme von Daten. Es unterstützt auch Tabellenverwaltungsoperationen. Weitere Informationen zu den Funktionen im Beispiel finden Sie unter Informationen zum Beispielanwendungsfall.

Beispielanwendungsfall ausführen

Führen Sie die Schritte zum Ausführen der Python-Beispieldatei für Spark aus.

  1. Stellen Sie eine Verbindung zum AWS EMR-Cluster her. Weitere Informationen zur Verwendung von SSH für die Verbindung zum EMR-Cluster finden Sie unter EMR-Cluster einrichten.

  2. Speichern Sie die folgende Python-Beispieldatei.

    Python-Beispieldatei für Spark

    
    from pyspark.sql import SparkSession
    import os
    
    def init_spark():
    spark = SparkSession.builder.appName("lh-hms-cloud")\
    .enableHiveSupport().getOrCreate()
    return spark
    
    def create_database(spark):
        # Create a database in the lakehouse catalog
        spark.sql("create database if not exists lakehouse.amazonschema LOCATION 's3a://lakehouse-bucket-amz/'")
    
    def list_databases(spark):
        # list the database under lakehouse catalog
        spark.sql("show databases from lakehouse").show()
    
    def basic_iceberg_table_operations(spark):
        # demonstration: Create a basic Iceberg table, insert some data and then query table
        spark.sql("create table if not exists lakehouse.amazonschema.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
        spark.sql("insert into lakehouse.amazonschema.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
        spark.sql("select * from lakehouse.amazonschema.testTable").show()
    
    def create_table_from_parquet_data(spark):
        # load parquet data into dataframce
        df = spark.read.option("header",True).parquet("s3a://source-bucket-amz/nyc-taxi/yellow_tripdata_2022-01.parquet")
        # write the dataframe into an Iceberg table
        df.writeTo("lakehouse.amazonschema.yellow_taxi_2022").create()
        # describe the table created
        spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25)
        # query the table
        spark.sql('select * from lakehouse.amazonschema.yellow_taxi_2022').count()
    
    def ingest_from_csv_temp_table(spark):
        # load csv data into a dataframe
        csvDF = spark.read.option("header",True).csv("s3a://source-bucket-amz/zipcodes.csv")
        csvDF.createOrReplaceTempView("tempCSVTable")
        # load temporary table into an Iceberg table
        spark.sql('create or replace table lakehouse.amazonschema.zipcodes using iceberg as select * from tempCSVTable')
        # describe the table created
        spark.sql('describe table lakehouse.amazonschema.zipcodes').show(25)
        # query the table
        spark.sql('select * from lakehouse.amazonschema.zipcodes').show()
    
    def ingest_monthly_data(spark):
        df_feb = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-02.parquet")
        df_march = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-03.parquet")
        df_april = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-04.parquet")
        df_may = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-05.parquet")
        df_june = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-06.parquet")
    
        df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
        df_q1_q2.write.insertInto("lakehouse.amazonschema.yellow_taxi_2022")
    
    def perform_table_maintenance_operations(spark):
        # Query the metadata files table to list underlying data files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show()
    
        # There are many smaller files compact them into files of 200MB each using the
        # `rewrite_data_files` Iceberg Spark procedure
        spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'amazonschema.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
    
        # Again, query the metadata files table to list underlying data files; 6 files are compacted
        # to 3 files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show()
    
        # List all the snapshots
        # Expire earlier snapshots. Only latest one with comacted data is required
        # Again, List all the snapshots to see only 1 left
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show()
        #retain only the latest one
        latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
        print (latest_snapshot_committed_at)
        spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'amazonschema.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show()
    
        # Removing Orphan data files
        spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'amazonschema.yellow_taxi_2022')").show(truncate=False)
    
        # Rewriting Manifest Files
        spark.sql(f"CALL lakehouse.system.rewrite_manifests('amazonschema.yellow_taxi_2022')").show()
    
    
    def evolve_schema(spark):
        # demonstration: Schema evolution
        # Add column fare_per_mile to the table
        spark.sql('ALTER TABLE lakehouse.amazonschema.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
        # describe the table
        spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25)
    
    
    def clean_database(spark):
        # clean-up the demo database
        spark.sql('drop table if exists lakehouse.amazonschema.testTable purge')
        spark.sql('drop table if exists lakehouse.amazonschema.zipcodes purge')
        spark.sql('drop table if exists lakehouse.amazonschema.yellow_taxi_2022 purge')
        spark.sql('drop database if exists lakehouse.amazonschema cascade')
    
    def main():
        try:
            spark = init_spark()
            clean_database(spark)
    
            create_database(spark)
            list_databases(spark)
    
            basic_iceberg_table_operations(spark)
    
            # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table
            create_table_from_parquet_data(spark)
            ingest_from_csv_temp_table(spark)
    
            # load data for the month of Feburary to June into the table yellow_taxi_2022 created above
            ingest_monthly_data(spark)
    
            # demonstration: Table maintenance
            perform_table_maintenance_operations(spark)
    
            # demonstration: Schema evolution
            evolve_schema(spark)
        finally:
            # clean-up the demo database
            #clean_database(spark)
            spark.stop()
    
    if __name__ == '__main__':
      main()
    
    
  3. Führen Sie die folgenden Befehle aus, um die JAR-Datei des Metadatendienstes von diesem Ort auf Ihre Arbeitsstation herunterzuladen:

    Die JAR-Datei muss auf allen Knoten des Clusters an der Position /home/hadoop vorhanden sein. Notieren Sie sich die spark.driver.extraClassPath und spark.executor.extraClassPath.

    
    wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-exec-2.3.9-core.jar
    wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-common-2.3.9.jar
    wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-metastore-2.3.9.jar
    
  4. Konfigurieren Sie die MDS-Verbindungsdetails im AWS EMR-Cluster für die Verbindung mit dem watsonx.data Metadaten-Service (MDS) herzustellen. Ein Beispielbefehl zur Verwendung von spark-submit von einem EMR-6.12.0 (Spark 3.4.1 ) basierten Cluster lautet wie folgt:

    Führen Sie den Befehl von EMR auf dem EC2-Cluster aus, um den Spark-Beispieljob zu übergeben.

    
    spark-submit \
    --deploy-mode cluster \
    --jars https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.4.0/iceberg-spark-runtime-3.4_2.12-1.4.0.jar,/usr/lib/hadoop/hadoop-aws.jar,/usr/share/aws/aws-java-sdk/aws-java-sdk-bundle*.jar,/usr/lib/hadoop-lzo/lib/* \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.driver.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \
    --conf spark.executor.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.iceberg.vectorization.enabled=false \
    --conf spark.sql.catalog.lakehouse=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.lakehouse.type=hive \
    --conf spark.hive.metastore.uris==<<change_endpoint>> \
    --conf spark.hive.metastore.client.auth.mode=PLAIN \
    --conf spark.hive.metastore.client.plain.username=ibmlhapikey \
    --conf spark.hive.metastore.client.plain.password=<<change_pswd>> \
    --conf spark.hive.metastore.use.SSL=true \
    --conf spark.hive.metastore.truststore.type=JKS \
    --conf spark.hive.metastore.truststore.path=file:///etc/pki/java/cacerts \
    --conf spark.hive.metastore.truststore.password=changeit \
    amazon-lakehouse.py
    

Parameterwerte:

Um die Spark-Python-Datei mit dem EMR-6.15.0 (Spark 3.4)-Cluster auszuführen, laden Sie die Iceberg-Jars von diesem Ort herunter und folgen Sie demselben Verfahren.