IBM Cloud Docs
使用 AWS EMR for Spark 用例

使用 AWS EMR for Spark 用例

本主题提供了从 Amazon Web Services Elastic MapReduce (AWS EMR) 运行 Spark 应用程序以实现 IBM® watsonx.data Spark 用例的过程:

  • 数据并入
  • 数据查询
  • 表维护

先决条件

  • 供应 IBM® watsonx.data 实例。
  • 使用 S3 存储区创建目录。
  • 获取 S3 存储区凭证。
  • 在 AWS上设置 EMR 集群。 有关更多信息,请参阅 设置 EMR 集群
  • 从 IBM® watsonx.data访存以下信息:

概述

要处理驻留在 AWS S3 存储区中的源数据,可以执行以下任一方法:

  • 在 AWS 上设置 watsonx.data 实例
  • 配置基于 IBM Cloud 的 watsonx.data 实例,并包含基于 AWS S3 存储区的目录。

watsonx.data 查询引擎可以对 AWS S3 存储区中的数据运行查询。 在这两种情况下,您都可以使用 AWS EMR Spark 来运行数据采集和基于 Iceberg 的模式维护操作。

关于样本用例

样本 python 文件 (amazon-lakehouse.py) 演示了创建模式 (amazonschema),表和采集数据。 它还支持表维护操作。 有关样本中的功能的更多信息,请参阅 关于样本用例

运行样本用例

执行以下步骤以运行 Spark 样本 Python 文件。

  1. 连接到 AWS EMR 集群。 有关使用 SSH 连接到 EMR 集群的更多信息,请参阅 设置 EMR 集群

  2. 保存以下样本 Python 文件。

    Spark 样本 Python 文件

    
    from pyspark.sql import SparkSession
    import os
    
    def init_spark():
    spark = SparkSession.builder.appName("lh-hms-cloud")\
    .enableHiveSupport().getOrCreate()
    return spark
    
    def create_database(spark):
        # Create a database in the lakehouse catalog
        spark.sql("create database if not exists lakehouse.amazonschema LOCATION 's3a://lakehouse-bucket-amz/'")
    
    def list_databases(spark):
        # list the database under lakehouse catalog
        spark.sql("show databases from lakehouse").show()
    
    def basic_iceberg_table_operations(spark):
        # demonstration: Create a basic Iceberg table, insert some data and then query table
        spark.sql("create table if not exists lakehouse.amazonschema.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
        spark.sql("insert into lakehouse.amazonschema.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
        spark.sql("select * from lakehouse.amazonschema.testTable").show()
    
    def create_table_from_parquet_data(spark):
        # load parquet data into dataframce
        df = spark.read.option("header",True).parquet("s3a://source-bucket-amz/nyc-taxi/yellow_tripdata_2022-01.parquet")
        # write the dataframe into an Iceberg table
        df.writeTo("lakehouse.amazonschema.yellow_taxi_2022").create()
        # describe the table created
        spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25)
        # query the table
        spark.sql('select * from lakehouse.amazonschema.yellow_taxi_2022').count()
    
    def ingest_from_csv_temp_table(spark):
        # load csv data into a dataframe
        csvDF = spark.read.option("header",True).csv("s3a://source-bucket-amz/zipcodes.csv")
        csvDF.createOrReplaceTempView("tempCSVTable")
        # load temporary table into an Iceberg table
        spark.sql('create or replace table lakehouse.amazonschema.zipcodes using iceberg as select * from tempCSVTable')
        # describe the table created
        spark.sql('describe table lakehouse.amazonschema.zipcodes').show(25)
        # query the table
        spark.sql('select * from lakehouse.amazonschema.zipcodes').show()
    
    def ingest_monthly_data(spark):
        df_feb = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-02.parquet")
        df_march = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-03.parquet")
        df_april = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-04.parquet")
        df_may = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-05.parquet")
        df_june = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-06.parquet")
    
        df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
        df_q1_q2.write.insertInto("lakehouse.amazonschema.yellow_taxi_2022")
    
    def perform_table_maintenance_operations(spark):
        # Query the metadata files table to list underlying data files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show()
    
        # There are many smaller files compact them into files of 200MB each using the
        # `rewrite_data_files` Iceberg Spark procedure
        spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'amazonschema.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
    
        # Again, query the metadata files table to list underlying data files; 6 files are compacted
        # to 3 files
        spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show()
    
        # List all the snapshots
        # Expire earlier snapshots. Only latest one with comacted data is required
        # Again, List all the snapshots to see only 1 left
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show()
        #retain only the latest one
        latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
        print (latest_snapshot_committed_at)
        spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'amazonschema.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
        spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show()
    
        # Removing Orphan data files
        spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'amazonschema.yellow_taxi_2022')").show(truncate=False)
    
        # Rewriting Manifest Files
        spark.sql(f"CALL lakehouse.system.rewrite_manifests('amazonschema.yellow_taxi_2022')").show()
    
    
    def evolve_schema(spark):
        # demonstration: Schema evolution
        # Add column fare_per_mile to the table
        spark.sql('ALTER TABLE lakehouse.amazonschema.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
        # describe the table
        spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25)
    
    
    def clean_database(spark):
        # clean-up the demo database
        spark.sql('drop table if exists lakehouse.amazonschema.testTable purge')
        spark.sql('drop table if exists lakehouse.amazonschema.zipcodes purge')
        spark.sql('drop table if exists lakehouse.amazonschema.yellow_taxi_2022 purge')
        spark.sql('drop database if exists lakehouse.amazonschema cascade')
    
    def main():
        try:
            spark = init_spark()
            clean_database(spark)
    
            create_database(spark)
            list_databases(spark)
    
            basic_iceberg_table_operations(spark)
    
            # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table
            create_table_from_parquet_data(spark)
            ingest_from_csv_temp_table(spark)
    
            # load data for the month of Feburary to June into the table yellow_taxi_2022 created above
            ingest_monthly_data(spark)
    
            # demonstration: Table maintenance
            perform_table_maintenance_operations(spark)
    
            # demonstration: Schema evolution
            evolve_schema(spark)
        finally:
            # clean-up the demo database
            #clean_database(spark)
            spark.stop()
    
    if __name__ == '__main__':
      main()
    
    
  3. 运行以下命令将元数据服务 JAR 文件从该 位置 下载到工作站:

    JAR 文件必须存在于集群的所有节点上的 /home/hadoop 位置中。 记下 spark.driver.extraClassPathspark.executor.extraClassPath

    
    wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-exec-2.3.9-core.jar
    wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-common-2.3.9.jar
    wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-metastore-2.3.9.jar
    
  4. 在 AWS EMR 集群中配置 MDS 连接详细信息,以连接到 watsonx.data 元数据服务 (MDS)。 在基于 EMR-6.12.0 (Spark 3.4.1 ) 的集群中使用 spark-submit 的示例命令如下:

    从 EC2 集群上的 EMR 运行命令以提交样本 spark 作业。

    
    spark-submit \
    --deploy-mode cluster \
    --jars https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.4.0/iceberg-spark-runtime-3.4_2.12-1.4.0.jar,/usr/lib/hadoop/hadoop-aws.jar,/usr/share/aws/aws-java-sdk/aws-java-sdk-bundle*.jar,/usr/lib/hadoop-lzo/lib/* \
    --conf spark.sql.catalogImplementation=hive \
    --conf spark.driver.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \
    --conf spark.executor.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.iceberg.vectorization.enabled=false \
    --conf spark.sql.catalog.lakehouse=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.lakehouse.type=hive \
    --conf spark.hive.metastore.uris==<<change_endpoint>> \
    --conf spark.hive.metastore.client.auth.mode=PLAIN \
    --conf spark.hive.metastore.client.plain.username=ibmlhapikey \
    --conf spark.hive.metastore.client.plain.password=<<change_pswd>> \
    --conf spark.hive.metastore.use.SSL=true \
    --conf spark.hive.metastore.truststore.type=JKS \
    --conf spark.hive.metastore.truststore.path=file:///etc/pki/java/cacerts \
    --conf spark.hive.metastore.truststore.password=changeit \
    amazon-lakehouse.py
    

参数值:

要使用EMR-6.15.0(Spark3.4) 集群运行 Spark python 文件,请从该 位置 下载 iceberg jars,并按照同样的 步骤 操作。