AWS EMR für Spark-Anwendungsfall verwenden
In diesem Abschnitt wird die Prozedur zum Ausführen von Spark-Anwendungen über Amazon Web Services Elastic MapReduce (AWS EMR) beschrieben, um die Spark-Anwendungsfälle von IBM® watsonx.data zu erreichen:
- Datenaufnahme
- Daten abfragen
- Tabellenpflege
Voraussetzungen
- IBM® watsonx.data-Instanz bereitstellen.
- Erstellen Sie einen Katalog mit dem Bucket S3.
- S3-Bucketberechtigungsnachweise abrufen.
- Richten Sie einen EMR-Cluster in AWSein. Weitere Informationen finden Sie unter EMR-Cluster einrichten.
- Rufen Sie die folgenden Informationen von IBM® watsonx.dataab:
- MDS URL von watsonx.data. Weitere Informationen zum Abrufen der MDS-Anmeldeinformationen finden Sie unter Abrufen der Anmeldeinformationen für den Metadatendienst(MDS).
- MDS-Berechtigungsnachweise von watsonx.data. Weitere Informationen zum Abrufen der MDS-Anmeldeinformationen finden Sie unter Abrufen der Anmeldeinformationen für den Metadatendienst(MDS).
Übersicht
Um mit Quellendaten zu arbeiten, die sich in AWS S3-Buckets befinden, haben Sie eine der folgenden Möglichkeiten:
- watsonx.data-Instanz auf AWS
- IBM Cloud-basierte watsonx.data-Instanz konfigurieren und einen auf AWS S3-Bucket basierenden Katalog einschließen.
Die Abfrageengines watsonx.data können Abfragen für Daten aus AWS S3-Buckets ausführen. In beiden Fällen können Sie die Datenaufnahme und die Iceberg-basierten Schemaverwaltungsoperationen mithilfe von AWS EMR Spark ausführen.
Informationen zum Beispielanwendungsfall
Die Python-Beispieldatei (amazon-lakehouse.py) veranschaulicht die Erstellung von Schemas (amazonschema), Tabellen und die Aufnahme von Daten. Es unterstützt auch Tabellenverwaltungsoperationen. Weitere Informationen zu den Funktionen im Beispiel finden Sie unter Informationen zum Beispielanwendungsfall.
Beispielanwendungsfall ausführen
Führen Sie die Schritte zum Ausführen der Python-Beispieldatei für Spark aus.
-
Stellen Sie eine Verbindung zum AWS EMR-Cluster her. Weitere Informationen zur Verwendung von SSH für die Verbindung zum EMR-Cluster finden Sie unter EMR-Cluster einrichten.
-
Speichern Sie die folgende Python-Beispieldatei.
Python-Beispieldatei für Spark
from pyspark.sql import SparkSession import os def init_spark(): spark = SparkSession.builder.appName("lh-hms-cloud")\ .enableHiveSupport().getOrCreate() return spark def create_database(spark): # Create a database in the lakehouse catalog spark.sql("create database if not exists lakehouse.amazonschema LOCATION 's3a://lakehouse-bucket-amz/'") def list_databases(spark): # list the database under lakehouse catalog spark.sql("show databases from lakehouse").show() def basic_iceberg_table_operations(spark): # demonstration: Create a basic Iceberg table, insert some data and then query table spark.sql("create table if not exists lakehouse.amazonschema.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show() spark.sql("insert into lakehouse.amazonschema.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)") spark.sql("select * from lakehouse.amazonschema.testTable").show() def create_table_from_parquet_data(spark): # load parquet data into dataframce df = spark.read.option("header",True).parquet("s3a://source-bucket-amz/nyc-taxi/yellow_tripdata_2022-01.parquet") # write the dataframe into an Iceberg table df.writeTo("lakehouse.amazonschema.yellow_taxi_2022").create() # describe the table created spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25) # query the table spark.sql('select * from lakehouse.amazonschema.yellow_taxi_2022').count() def ingest_from_csv_temp_table(spark): # load csv data into a dataframe csvDF = spark.read.option("header",True).csv("s3a://source-bucket-amz/zipcodes.csv") csvDF.createOrReplaceTempView("tempCSVTable") # load temporary table into an Iceberg table spark.sql('create or replace table lakehouse.amazonschema.zipcodes using iceberg as select * from tempCSVTable') # describe the table created spark.sql('describe table lakehouse.amazonschema.zipcodes').show(25) # query the table spark.sql('select * from lakehouse.amazonschema.zipcodes').show() def ingest_monthly_data(spark): df_feb = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-02.parquet") df_march = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-03.parquet") df_april = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-04.parquet") df_may = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-05.parquet") df_june = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-06.parquet") df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june) df_q1_q2.write.insertInto("lakehouse.amazonschema.yellow_taxi_2022") def perform_table_maintenance_operations(spark): # Query the metadata files table to list underlying data files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show() # There are many smaller files compact them into files of 200MB each using the # `rewrite_data_files` Iceberg Spark procedure spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'amazonschema.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show() # Again, query the metadata files table to list underlying data files; 6 files are compacted # to 3 files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show() # List all the snapshots # Expire earlier snapshots. Only latest one with comacted data is required # Again, List all the snapshots to see only 1 left spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show() #retain only the latest one latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").tail(1)[0].committed_at print (latest_snapshot_committed_at) spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'amazonschema.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show() spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show() # Removing Orphan data files spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'amazonschema.yellow_taxi_2022')").show(truncate=False) # Rewriting Manifest Files spark.sql(f"CALL lakehouse.system.rewrite_manifests('amazonschema.yellow_taxi_2022')").show() def evolve_schema(spark): # demonstration: Schema evolution # Add column fare_per_mile to the table spark.sql('ALTER TABLE lakehouse.amazonschema.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)') # describe the table spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25) def clean_database(spark): # clean-up the demo database spark.sql('drop table if exists lakehouse.amazonschema.testTable purge') spark.sql('drop table if exists lakehouse.amazonschema.zipcodes purge') spark.sql('drop table if exists lakehouse.amazonschema.yellow_taxi_2022 purge') spark.sql('drop database if exists lakehouse.amazonschema cascade') def main(): try: spark = init_spark() clean_database(spark) create_database(spark) list_databases(spark) basic_iceberg_table_operations(spark) # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table create_table_from_parquet_data(spark) ingest_from_csv_temp_table(spark) # load data for the month of Feburary to June into the table yellow_taxi_2022 created above ingest_monthly_data(spark) # demonstration: Table maintenance perform_table_maintenance_operations(spark) # demonstration: Schema evolution evolve_schema(spark) finally: # clean-up the demo database #clean_database(spark) spark.stop() if __name__ == '__main__': main()
-
Führen Sie die folgenden Befehle aus, um die JAR-Datei des Metadatendienstes von diesem Ort auf Ihre Arbeitsstation herunterzuladen:
Die JAR-Datei muss auf allen Knoten des Clusters an der Position
/home/hadoop
vorhanden sein. Notieren Sie sich diespark.driver.extraClassPath
undspark.executor.extraClassPath
.wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-exec-2.3.9-core.jar wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-common-2.3.9.jar wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-metastore-2.3.9.jar
-
Konfigurieren Sie die MDS-Verbindungsdetails im AWS EMR-Cluster für die Verbindung mit dem watsonx.data Metadaten-Service (MDS) herzustellen. Ein Beispielbefehl zur Verwendung von spark-submit von einem EMR-6.12.0 (Spark 3.4.1 ) basierten Cluster lautet wie folgt:
Führen Sie den Befehl von EMR auf dem EC2-Cluster aus, um den Spark-Beispieljob zu übergeben.
spark-submit \ --deploy-mode cluster \ --jars https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.4.0/iceberg-spark-runtime-3.4_2.12-1.4.0.jar,/usr/lib/hadoop/hadoop-aws.jar,/usr/share/aws/aws-java-sdk/aws-java-sdk-bundle*.jar,/usr/lib/hadoop-lzo/lib/* \ --conf spark.sql.catalogImplementation=hive \ --conf spark.driver.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \ --conf spark.executor.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.iceberg.vectorization.enabled=false \ --conf spark.sql.catalog.lakehouse=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.lakehouse.type=hive \ --conf spark.hive.metastore.uris==<<change_endpoint>> \ --conf spark.hive.metastore.client.auth.mode=PLAIN \ --conf spark.hive.metastore.client.plain.username=ibmlhapikey \ --conf spark.hive.metastore.client.plain.password=<<change_pswd>> \ --conf spark.hive.metastore.use.SSL=true \ --conf spark.hive.metastore.truststore.type=JKS \ --conf spark.hive.metastore.truststore.path=file:///etc/pki/java/cacerts \ --conf spark.hive.metastore.truststore.password=changeit \ amazon-lakehouse.py
Parameterwerte:
- <<change_endpoint>> : Der Metadatendienst-URI-Endpunkt für den Zugriff auf den Metaspeicher. Weitere Informationen zum Abrufen der MDS-Anmeldeinformationen finden Sie unter Abrufen der Anmeldeinformationen für den Metadatendienst(MDS).
- <<change_pswd>> : Das Passwort für den Zugriff auf den Metaspeicher. Weitere Informationen zum Abrufen der MDS-Anmeldeinformationen finden Sie unter Abrufen der Anmeldeinformationen für den Metadatendienst(MDS).
Um die Spark-Python-Datei mit dem EMR-6.15.0 (Spark 3.4)-Cluster auszuführen, laden Sie die Iceberg-Jars von diesem Ort herunter und folgen Sie demselben Verfahren.