使用 AWS EMR for Spark 使用案例
在 watsonx.data 中註冊外部 Spark 引擎的選項在此版本中已被廢棄,並將在 2.3 版本中移除。watsonx.data 已包含內建的 Spark 引擎,您可以直接配置和使用,包括 Gluten-accelerated Spark 引擎和原生的 watsonx.data Spark 引擎。
本主題提供從 Amazon Web Services Elastic MapReduce (AWS EMR) 執行 Spark 應用程式的程序,以達到 IBM® watsonx.data Spark 使用案例:
- 資料吸收
- 資料查詢
- 表格維護
必要條件
-
佈建 IBM® watsonx.data 實例。
-
使用 S3 儲存區建立型錄。
-
取得 S3 儲存區認證。
-
在 AWS上設定 EMR 叢集。 如需相關資訊,請參閱 設定 EMR 叢集。
-
從 IBM® watsonx.data提取下列資訊:
- 來自 watsonx.data 的 MDS URL。 有關獲取 MDS 憑證的更多信息,請參閱 獲取元資料服務(MDS)憑證。
- 來自 watsonx.data 的 MDS 憑證。 有關獲取 MDS 憑證的更多信息,請參閱 獲取元資料服務(MDS)憑證。
從 watsonx.data 版本 2.2.0 開始,使用
ibmlhapikey和ibmlhtoken作為使用者名稱的驗證已被廢棄。 這些格式將在未來的版本中逐步淘汰。 為確保與即將推出的版本相容,請使用新格式:ibmlhapikey_username和ibmlhtoken_username。
概觀
若要使用位於 AWS S3 儲存區中的來源資料,您可以執行下列一種方式:
- 在 AWS 上設定 watsonx.data 實例
- 配置 IBM Cloud 型 watsonx.data 實例,並包括 AWS S3 儲存區型型型錄。
watsonx.data 查詢引擎可以對 AWS S3 儲存區中的資料執行查詢。 在這兩種情況下,您都可以使用 AWS EMR Spark 來執行資料吸收及 Iceberg 型綱目維護作業。
關於範例使用案例
範例 Python 檔案 (amazon-lakehouse.py) 會示範建立綱目 (amazonschema)、表格及汲取資料。 它也支援表格維護作業。 如需範例中功能的相關資訊,請參閱 關於範例使用案例
執行範例使用案例
遵循步驟來執行 Spark 範例 Python 檔案。
-
連接至 AWS EMR 叢集。 如需使用 SSH 來連接至 EMR 叢集的相關資訊,請參閱 設定 EMR 叢集。
-
儲存下列範例 Python 檔案。
Spark 範例 Python 檔案
from pyspark.sql import SparkSession import os def init_spark(): spark = SparkSession.builder.appName("lh-hms-cloud")\ .enableHiveSupport().getOrCreate() return spark def create_database(spark): # Create a database in the lakehouse catalog spark.sql("create database if not exists lakehouse.amazonschema LOCATION 's3a://lakehouse-bucket-amz/'") def list_databases(spark): # list the database under lakehouse catalog spark.sql("show databases from lakehouse").show() def basic_iceberg_table_operations(spark): # demonstration: Create a basic Iceberg table, insert some data and then query table spark.sql("create table if not exists lakehouse.amazonschema.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show() spark.sql("insert into lakehouse.amazonschema.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)") spark.sql("select * from lakehouse.amazonschema.testTable").show() def create_table_from_parquet_data(spark): # load parquet data into dataframce df = spark.read.option("header",True).parquet("s3a://source-bucket-amz/nyc-taxi/yellow_tripdata_2022-01.parquet") # write the dataframe into an Iceberg table df.writeTo("lakehouse.amazonschema.yellow_taxi_2022").create() # describe the table created spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25) # query the table spark.sql('select * from lakehouse.amazonschema.yellow_taxi_2022').count() def ingest_from_csv_temp_table(spark): # load csv data into a dataframe csvDF = spark.read.option("header",True).csv("s3a://source-bucket-amz/zipcodes.csv") csvDF.createOrReplaceTempView("tempCSVTable") # load temporary table into an Iceberg table spark.sql('create or replace table lakehouse.amazonschema.zipcodes using iceberg as select * from tempCSVTable') # describe the table created spark.sql('describe table lakehouse.amazonschema.zipcodes').show(25) # query the table spark.sql('select * from lakehouse.amazonschema.zipcodes').show() def ingest_monthly_data(spark): df_feb = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-02.parquet") df_march = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-03.parquet") df_april = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-04.parquet") df_may = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-05.parquet") df_june = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-06.parquet") df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june) df_q1_q2.write.insertInto("lakehouse.amazonschema.yellow_taxi_2022") def perform_table_maintenance_operations(spark): # Query the metadata files table to list underlying data files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show() # There are many smaller files compact them into files of 200MB each using the # `rewrite_data_files` Iceberg Spark procedure spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'amazonschema.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show() # Again, query the metadata files table to list underlying data files; 6 files are compacted # to 3 files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show() # List all the snapshots # Expire earlier snapshots. Only latest one with comacted data is required # Again, List all the snapshots to see only 1 left spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show() #retain only the latest one latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").tail(1)[0].committed_at print (latest_snapshot_committed_at) spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'amazonschema.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show() spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show() # Removing Orphan data files spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'amazonschema.yellow_taxi_2022')").show(truncate=False) # Rewriting Manifest Files spark.sql(f"CALL lakehouse.system.rewrite_manifests('amazonschema.yellow_taxi_2022')").show() def evolve_schema(spark): # demonstration: Schema evolution # Add column fare_per_mile to the table spark.sql('ALTER TABLE lakehouse.amazonschema.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)') # describe the table spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25) def clean_database(spark): # clean-up the demo database spark.sql('drop table if exists lakehouse.amazonschema.testTable purge') spark.sql('drop table if exists lakehouse.amazonschema.zipcodes purge') spark.sql('drop table if exists lakehouse.amazonschema.yellow_taxi_2022 purge') spark.sql('drop database if exists lakehouse.amazonschema cascade') def main(): try: spark = init_spark() clean_database(spark) create_database(spark) list_databases(spark) basic_iceberg_table_operations(spark) # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table create_table_from_parquet_data(spark) ingest_from_csv_temp_table(spark) # load data for the month of Feburary to June into the table yellow_taxi_2022 created above ingest_monthly_data(spark) # demonstration: Table maintenance perform_table_maintenance_operations(spark) # demonstration: Schema evolution evolve_schema(spark) finally: # clean-up the demo database #clean_database(spark) spark.stop() if __name__ == '__main__': main() -
執行以下命令將元資料服務 JAR 檔案從該 位置 下載到您的工作站:
該 JAR 檔必須存在於叢集所有節點上的
/home/hadoop位置中。 記下spark.driver.extraClassPath和spark.executor.extraClassPath。wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-exec-2.3.9-core.jar wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-common-2.3.9.jar wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-metastore-2.3.9.jar -
在 AWS EMR 叢集中配置 MDS 連線詳細資訊以連接到 watsonx.data 元資料服務 (MDS)。 從基於 EMR-6.12.0 (Spark 3.4.1 ) 的叢集使用 Spark-submit 的範例指令如下:
從 EC2 叢集上的 EMR 執行指令,以提交範例 Spark 工作。
spark-submit \ --deploy-mode cluster \ --jars https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.4.0/iceberg-spark-runtime-3.4_2.12-1.4.0.jar,/usr/lib/hadoop/hadoop-aws.jar,/usr/share/aws/aws-java-sdk/aws-java-sdk-bundle*.jar,/usr/lib/hadoop-lzo/lib/* \ --conf spark.sql.catalogImplementation=hive \ --conf spark.driver.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \ --conf spark.executor.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.iceberg.vectorization.enabled=false \ --conf spark.sql.catalog.lakehouse=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.lakehouse.type=hive \ --conf spark.hive.metastore.uris==<<change_endpoint>> \ --conf spark.hive.metastore.client.auth.mode=PLAIN \ --conf spark.hive.metastore.client.plain.username=ibmlhapikey \ --conf spark.hive.metastore.client.plain.password=<<change_pswd>> \ --conf spark.hive.metastore.use.SSL=true \ --conf spark.hive.metastore.truststore.type=JKS \ --conf spark.hive.metastore.truststore.path=file:///etc/pki/java/cacerts \ --conf spark.hive.metastore.truststore.password=changeit \ amazon-lakehouse.py
參數值:
- <<change_endpoint>>:用於存取元儲存的元資料服務 URI 端點。 有關獲取 MDS 憑證的更多信息,請參閱 獲取元資料服務(MDS)憑證。
- <<change_pswd>> : 存取 metastore 的密碼。 有關獲取 MDS 憑證的更多信息,請參閱 獲取元資料服務(MDS)憑證。
若要使用EMR-6.15.0 (Spark 3.4 ) 叢集執行 Spark python 文件,請從該 位置 下載 Iceberg jar,並依照相同的 流程 進行操作。