IBM Cloud Docs
使用 AWS EMR for Spark 使用案例

使用 AWS EMR for Spark 使用案例

在 watsonx.data 中註冊外部 Spark 引擎的選項在此版本中已被廢棄,並將在 2.3 版本中移除。watsonx.data 已包含內建的 Spark 引擎,您可以直接配置和使用,包括 Gluten-accelerated Spark 引擎和原生的 watsonx.data Spark 引擎。

本主題提供從 Amazon Web Services Elastic MapReduce (AWS EMR) 執行 Spark 應用程式的程序,以達到 IBM® watsonx.data Spark 使用案例:

  • 資料吸收
  • 資料查詢
  • 表格維護

必要條件

  • 佈建 IBM® watsonx.data 實例。

  • 使用 S3 儲存區建立型錄。

  • 取得 S3 儲存區認證。

  • 在 AWS上設定 EMR 叢集。 如需相關資訊,請參閱 設定 EMR 叢集

  • 從 IBM® watsonx.data提取下列資訊:

    從 watsonx.data 版本 2.2.0 開始,使用 ibmlhapikeyibmlhtoken 作為使用者名稱的驗證已被廢棄。 這些格式將在未來的版本中逐步淘汰。 為確保與即將推出的版本相容,請使用新格式:ibmlhapikey_usernameibmlhtoken_username

概觀

若要使用位於 AWS S3 儲存區中的來源資料,您可以執行下列一種方式:

  • 在 AWS 上設定 watsonx.data 實例
  • 配置 IBM Cloud 型 watsonx.data 實例,並包括 AWS S3 儲存區型型型錄。

watsonx.data 查詢引擎可以對 AWS S3 儲存區中的資料執行查詢。 在這兩種情況下,您都可以使用 AWS EMR Spark 來執行資料吸收及 Iceberg 型綱目維護作業。

關於範例使用案例

範例 Python 檔案 (amazon-lakehouse.py) 會示範建立綱目 (amazonschema)、表格及汲取資料。 它也支援表格維護作業。 如需範例中功能的相關資訊,請參閱 關於範例使用案例

執行範例使用案例

遵循步驟來執行 Spark 範例 Python 檔案。

  1. 連接至 AWS EMR 叢集。 如需使用 SSH 來連接至 EMR 叢集的相關資訊,請參閱 設定 EMR 叢集

  2. 儲存下列範例 Python 檔案。

    Spark 範例 Python 檔案

    
    from pyspark.sql import SparkSession
    import os
    
    def init_spark():
    spark = SparkSession.builder.appName("lh-hms-cloud")\
    .enableHiveSupport().getOrCreate()
    return spark
    
    def create_database(spark):
        # Create a database in the lakehouse catalog
        spark.sql("create database if not exists lakehouse.amazonschema LOCATION 's3a://lakehouse-bucket-amz/'")
    
    def list_databases(spark):
        # list the database under lakehouse catalog
        spark.sql("show databases from lakehouse").show()
    
    def basic_iceberg_table_operations(spark):
        # demonstration: Create a basic Iceberg table, insert some data and then query table
        spark.sql("create table if not exists lakehouse.amazonschema.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
        spark.sql("insert into lakehouse.amazonschema.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
        spark.sql("select * from lakehouse.amazonschema.testTable").show()
    
    def create_table_from_parquet_data(spark):
        # load parquet data into dataframce
        df = spark.read.option("header",True).parquet("s3a://source-bucket-amz/nyc-taxi/yellow_tripdata_2022-01.parquet")
        # write the dataframe into an Iceberg table
        df.writeTo("lakehouse.amazonschema.yellow_taxi_2022").create()
        # describe the table created
        spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25)
        # query the table
        spark.sql('select * from lakehouse.amazonschema.yellow_taxi_2022').count()
    
    def ingest_from_csv_temp_table(spark):
        # load csv data into a dataframe
        csvDF = spark.read.option("header",True).csv("s3a://source-bucket-amz/zipcodes.csv")
        csvDF.createOrReplaceTempView("tempCSVTable")
        # load temporary table into an Iceberg table
        spark.sql('create or replace table lakehouse.amazonschema.zipcodes using iceberg as select * from tempCSVTable')
        # describe the table created
        spark.sql('describe table lakehouse.amazonschema.zipcodes').show(25)
        # query the table
        spark.sql('select * from lakehouse.amazonschema.zipcodes').show()
    
    def ingest_monthly_data(spark):
        df_feb = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-02.parquet")
        df_march = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-03.parquet")
        df_april = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-04.parquet")
        df_may = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-05.parquet")
        df_june = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-06.parquet")
    
        df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
        df_q1_q2.write.insertInto("lakehouse.amazonschema.yellow_taxi_2022")
    
    def perform_table_maintenance_operations(spark):
        # Query the metadata files table to list underlying data files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show()
    
        # There are many smaller files compact them into files of 200MB each using the
        # `rewrite_data_files` Iceberg Spark procedure
        spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'amazonschema.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
    
        # Again, query the metadata files table to list underlying data files; 6 files are compacted
        # to 3 files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show()
    
        # List all the snapshots
        # Expire earlier snapshots. Only latest one with comacted data is required
        # Again, List all the snapshots to see only 1 left
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show()
        #retain only the latest one
        latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
        print (latest_snapshot_committed_at)
        spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'amazonschema.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show()
    
        # Removing Orphan data files
        spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'amazonschema.yellow_taxi_2022')").show(truncate=False)
    
        # Rewriting Manifest Files
        spark.sql(f"CALL lakehouse.system.rewrite_manifests('amazonschema.yellow_taxi_2022')").show()
    
    
    def evolve_schema(spark):
        # demonstration: Schema evolution
        # Add column fare_per_mile to the table
        spark.sql('ALTER TABLE lakehouse.amazonschema.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
        # describe the table
        spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25)
    
    
    def clean_database(spark):
        # clean-up the demo database
        spark.sql('drop table if exists lakehouse.amazonschema.testTable purge')
        spark.sql('drop table if exists lakehouse.amazonschema.zipcodes purge')
        spark.sql('drop table if exists lakehouse.amazonschema.yellow_taxi_2022 purge')
        spark.sql('drop database if exists lakehouse.amazonschema cascade')
    
    def main():
        try:
            spark = init_spark()
            clean_database(spark)
    
            create_database(spark)
            list_databases(spark)
    
            basic_iceberg_table_operations(spark)
    
            # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table
            create_table_from_parquet_data(spark)
            ingest_from_csv_temp_table(spark)
    
            # load data for the month of Feburary to June into the table yellow_taxi_2022 created above
            ingest_monthly_data(spark)
    
            # demonstration: Table maintenance
            perform_table_maintenance_operations(spark)
    
            # demonstration: Schema evolution
            evolve_schema(spark)
        finally:
            # clean-up the demo database
            #clean_database(spark)
            spark.stop()
    
    if __name__ == '__main__':
      main()
    
    
  3. 執行以下命令將元資料服務 JAR 檔案從該 位置 下載到您的工作站:

    該 JAR 檔必須存在於叢集所有節點上的 /home/hadoop 位置中。 記下 spark.driver.extraClassPathspark.executor.extraClassPath

    
    wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-exec-2.3.9-core.jar
    wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-common-2.3.9.jar
    wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-metastore-2.3.9.jar
    
  4. 在 AWS EMR 叢集中配置 MDS 連線詳細資訊以連接到 watsonx.data 元資料服務 (MDS)。 從基於 EMR-6.12.0 (Spark 3.4.1 ) 的叢集使用 Spark-submit 的範例指令如下:

    從 EC2 叢集上的 EMR 執行指令,以提交範例 Spark 工作。

    
    spark-submit \
    --deploy-mode cluster \
    --jars https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.4.0/iceberg-spark-runtime-3.4_2.12-1.4.0.jar,/usr/lib/hadoop/hadoop-aws.jar,/usr/share/aws/aws-java-sdk/aws-java-sdk-bundle*.jar,/usr/lib/hadoop-lzo/lib/* \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.driver.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \
    --conf spark.executor.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.iceberg.vectorization.enabled=false \
    --conf spark.sql.catalog.lakehouse=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.lakehouse.type=hive \
    --conf spark.hive.metastore.uris==<<change_endpoint>> \
    --conf spark.hive.metastore.client.auth.mode=PLAIN \
    --conf spark.hive.metastore.client.plain.username=ibmlhapikey \
    --conf spark.hive.metastore.client.plain.password=<<change_pswd>> \
    --conf spark.hive.metastore.use.SSL=true \
    --conf spark.hive.metastore.truststore.type=JKS \
    --conf spark.hive.metastore.truststore.path=file:///etc/pki/java/cacerts \
    --conf spark.hive.metastore.truststore.password=changeit \
    amazon-lakehouse.py
    

參數值:

若要使用EMR-6.15.0 (Spark 3.4 ) 叢集執行 Spark python 文件,請從該 位置 下載 Iceberg jar,並依照相同的 流程 進行操作。