Utilisation du scénario d'utilisation AWS EMR for Spark
Cette rubrique fournit la procédure d'exécution d'applications Spark à partir d' Amazon Web Services Elastic MapReduce (AWS EMR) pour obtenir les cas d'utilisation Spark IBM® watsonx.data:
- ingestion de données
- requête de données
- maintenance des tables
Prérequis
- Mettez à disposition l'instance IBM® watsonx.data.
- Créez un catalogue avec un compartiment S3.
- Obtenez les données d'identification du compartiment S3.
- Configurez le cluster EMR sur AWS. Pour plus d'informations, voir Configuration d'un cluster EMR.
- Extrayez les informations suivantes de IBM® watsonx.data:
- URL MDS de watsonx.data Pour plus d'informations sur l'obtention des informations d'identification MDS, voir Obtention des informations d'identification du service de métadonnées(MDS).
- MDS Credentials from watsonx.data Pour plus d'informations sur l'obtention des informations d'identification MDS, voir Obtention des informations d'identification du service de métadonnées(MDS).
Présentation
Pour utiliser des données source qui résident dans des compartiments AWS S3, vous pouvez effectuer l'une des opérations suivantes:
- configurer l'instance watsonx.data sur AWS
- configurer une instance IBM Cloud basée sur watsonx.data et inclure un catalogue basé sur un compartiment AWS S3.
Les moteurs de requête watsonx.data peuvent exécuter des requêtes sur des données à partir de compartiments AWS S3. Dans les deux cas, vous pouvez exécuter les opérations d'ingestion de données et de maintenance de schéma Iceberg à l'aide d' AWS EMR Spark.
A propos de l'exemple de cas d'utilisation
L'exemple de fichier python (amazon-lakehouse.py) illustre la création de schéma (amazonschema), de tables et de données d'ingestion. Il prend également en charge les opérations de maintenance de table. Pour plus d'informations sur les fonctionnalités de l'exemple, voir A propos de l'exemple de cas d'utilisation
Exécution de l'exemple de cas d'utilisation
Suivez les étapes pour exécuter l'exemple de fichier Python Spark.
-
Connectez-vous au cluster AWS EMR. Pour plus d'informations sur l'utilisation de SSH pour la connexion au cluster EMR, voir Configuration du cluster EMR.
-
Sauvegardez l'exemple de fichier Python suivant.
Exemple de fichier Python Spark
from pyspark.sql import SparkSession import os def init_spark(): spark = SparkSession.builder.appName("lh-hms-cloud")\ .enableHiveSupport().getOrCreate() return spark def create_database(spark): # Create a database in the lakehouse catalog spark.sql("create database if not exists lakehouse.amazonschema LOCATION 's3a://lakehouse-bucket-amz/'") def list_databases(spark): # list the database under lakehouse catalog spark.sql("show databases from lakehouse").show() def basic_iceberg_table_operations(spark): # demonstration: Create a basic Iceberg table, insert some data and then query table spark.sql("create table if not exists lakehouse.amazonschema.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show() spark.sql("insert into lakehouse.amazonschema.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)") spark.sql("select * from lakehouse.amazonschema.testTable").show() def create_table_from_parquet_data(spark): # load parquet data into dataframce df = spark.read.option("header",True).parquet("s3a://source-bucket-amz/nyc-taxi/yellow_tripdata_2022-01.parquet") # write the dataframe into an Iceberg table df.writeTo("lakehouse.amazonschema.yellow_taxi_2022").create() # describe the table created spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25) # query the table spark.sql('select * from lakehouse.amazonschema.yellow_taxi_2022').count() def ingest_from_csv_temp_table(spark): # load csv data into a dataframe csvDF = spark.read.option("header",True).csv("s3a://source-bucket-amz/zipcodes.csv") csvDF.createOrReplaceTempView("tempCSVTable") # load temporary table into an Iceberg table spark.sql('create or replace table lakehouse.amazonschema.zipcodes using iceberg as select * from tempCSVTable') # describe the table created spark.sql('describe table lakehouse.amazonschema.zipcodes').show(25) # query the table spark.sql('select * from lakehouse.amazonschema.zipcodes').show() def ingest_monthly_data(spark): df_feb = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-02.parquet") df_march = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-03.parquet") df_april = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-04.parquet") df_may = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-05.parquet") df_june = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-06.parquet") df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june) df_q1_q2.write.insertInto("lakehouse.amazonschema.yellow_taxi_2022") def perform_table_maintenance_operations(spark): # Query the metadata files table to list underlying data files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show() # There are many smaller files compact them into files of 200MB each using the # `rewrite_data_files` Iceberg Spark procedure spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'amazonschema.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show() # Again, query the metadata files table to list underlying data files; 6 files are compacted # to 3 files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show() # List all the snapshots # Expire earlier snapshots. Only latest one with comacted data is required # Again, List all the snapshots to see only 1 left spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show() #retain only the latest one latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").tail(1)[0].committed_at print (latest_snapshot_committed_at) spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'amazonschema.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show() spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show() # Removing Orphan data files spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'amazonschema.yellow_taxi_2022')").show(truncate=False) # Rewriting Manifest Files spark.sql(f"CALL lakehouse.system.rewrite_manifests('amazonschema.yellow_taxi_2022')").show() def evolve_schema(spark): # demonstration: Schema evolution # Add column fare_per_mile to the table spark.sql('ALTER TABLE lakehouse.amazonschema.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)') # describe the table spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25) def clean_database(spark): # clean-up the demo database spark.sql('drop table if exists lakehouse.amazonschema.testTable purge') spark.sql('drop table if exists lakehouse.amazonschema.zipcodes purge') spark.sql('drop table if exists lakehouse.amazonschema.yellow_taxi_2022 purge') spark.sql('drop database if exists lakehouse.amazonschema cascade') def main(): try: spark = init_spark() clean_database(spark) create_database(spark) list_databases(spark) basic_iceberg_table_operations(spark) # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table create_table_from_parquet_data(spark) ingest_from_csv_temp_table(spark) # load data for the month of Feburary to June into the table yellow_taxi_2022 created above ingest_monthly_data(spark) # demonstration: Table maintenance perform_table_maintenance_operations(spark) # demonstration: Schema evolution evolve_schema(spark) finally: # clean-up the demo database #clean_database(spark) spark.stop() if __name__ == '__main__': main()
-
Exécutez les commandes suivantes pour télécharger le fichier JAR du service de métadonnées sur votre poste de travail :
Le fichier JAR doit être présent dans l'emplacement
/home/hadoop
sur tous les noeuds du cluster. Notez les adressesspark.driver.extraClassPath
etspark.executor.extraClassPath
.wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-exec-2.3.9-core.jar wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-common-2.3.9.jar wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-metastore-2.3.9.jar
-
Configurez les détails de la connexion MDS dans le cluster AWS EMR pour vous connecter au watsonx.data Metadata Service (MDS). Un exemple de commande pour utiliser spark-submit à partir d'un cluster basé sur EMR-6.12.0 (Spark 3.4.1 ) est le suivant :
Exécutez la commande à partir d'EMR sur le cluster EC2 pour soumettre l'exemple de travail Spark.
spark-submit \ --deploy-mode cluster \ --jars https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.4.0/iceberg-spark-runtime-3.4_2.12-1.4.0.jar,/usr/lib/hadoop/hadoop-aws.jar,/usr/share/aws/aws-java-sdk/aws-java-sdk-bundle*.jar,/usr/lib/hadoop-lzo/lib/* \ --conf spark.sql.catalogImplementation=hive \ --conf spark.driver.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \ --conf spark.executor.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.iceberg.vectorization.enabled=false \ --conf spark.sql.catalog.lakehouse=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.lakehouse.type=hive \ --conf spark.hive.metastore.uris==<<change_endpoint>> \ --conf spark.hive.metastore.client.auth.mode=PLAIN \ --conf spark.hive.metastore.client.plain.username=ibmlhapikey \ --conf spark.hive.metastore.client.plain.password=<<change_pswd>> \ --conf spark.hive.metastore.use.SSL=true \ --conf spark.hive.metastore.truststore.type=JKS \ --conf spark.hive.metastore.truststore.path=file:///etc/pki/java/cacerts \ --conf spark.hive.metastore.truststore.password=changeit \ amazon-lakehouse.py
Valeurs des paramètres :
- <<change_endpoint>> : Le point d'accès URI du service de métadonnées pour accéder au métastore. Pour plus d'informations sur l'obtention des informations d'identification MDS, voir Obtention des informations d'identification du service de métadonnées(MDS).
- <<change_pswd>> : Le mot de passe pour accéder au métastore. Pour plus d'informations sur l'obtention des informations d'identification MDS, voir Obtention des informations d'identification du service de métadonnées(MDS).
Pour exécuter le fichier python Spark en utilisant le cluster EMR-6.15.0 (Spark 3.4), téléchargez les jars iceberg à partir de l'emplacement et suivez la même procédure.