使用 AWS EMR for Spark 用例
本主题提供了从 Amazon Web Services Elastic MapReduce (AWS EMR) 运行 Spark 应用程序以实现 IBM® watsonx.data Spark 用例的过程:
- 数据并入
- 数据查询
- 表维护
先决条件
- 供应 IBM® watsonx.data 实例。
- 使用 S3 存储区创建目录。
- 获取 S3 存储区凭证。
- 在 AWS上设置 EMR 集群。 有关更多信息,请参阅 设置 EMR 集群。
- 从 IBM® watsonx.data访存以下信息:
- 来自 watsonx.data 的 MDS URL。 有关获取 MDS 凭据的更多信息,请参阅 获取元数据服务(MDS)凭据。
- 来自 watsonx.data 的 MDS 证书。 有关获取 MDS 凭据的更多信息,请参阅 获取元数据服务(MDS)凭据。
概述
要处理驻留在 AWS S3 存储区中的源数据,可以执行以下任一方法:
- 在 AWS 上设置 watsonx.data 实例
- 配置基于 IBM Cloud 的 watsonx.data 实例,并包含基于 AWS S3 存储区的目录。
watsonx.data 查询引擎可以对 AWS S3 存储区中的数据运行查询。 在这两种情况下,您都可以使用 AWS EMR Spark 来运行数据采集和基于 Iceberg 的模式维护操作。
关于样本用例
样本 python 文件 (amazon-lakehouse.py) 演示了创建模式 (amazonschema),表和采集数据。 它还支持表维护操作。 有关样本中的功能的更多信息,请参阅 关于样本用例
运行样本用例
执行以下步骤以运行 Spark 样本 Python 文件。
-
连接到 AWS EMR 集群。 有关使用 SSH 连接到 EMR 集群的更多信息,请参阅 设置 EMR 集群。
-
保存以下样本 Python 文件。
Spark 样本 Python 文件
from pyspark.sql import SparkSession import os def init_spark(): spark = SparkSession.builder.appName("lh-hms-cloud")\ .enableHiveSupport().getOrCreate() return spark def create_database(spark): # Create a database in the lakehouse catalog spark.sql("create database if not exists lakehouse.amazonschema LOCATION 's3a://lakehouse-bucket-amz/'") def list_databases(spark): # list the database under lakehouse catalog spark.sql("show databases from lakehouse").show() def basic_iceberg_table_operations(spark): # demonstration: Create a basic Iceberg table, insert some data and then query table spark.sql("create table if not exists lakehouse.amazonschema.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show() spark.sql("insert into lakehouse.amazonschema.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)") spark.sql("select * from lakehouse.amazonschema.testTable").show() def create_table_from_parquet_data(spark): # load parquet data into dataframce df = spark.read.option("header",True).parquet("s3a://source-bucket-amz/nyc-taxi/yellow_tripdata_2022-01.parquet") # write the dataframe into an Iceberg table df.writeTo("lakehouse.amazonschema.yellow_taxi_2022").create() # describe the table created spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25) # query the table spark.sql('select * from lakehouse.amazonschema.yellow_taxi_2022').count() def ingest_from_csv_temp_table(spark): # load csv data into a dataframe csvDF = spark.read.option("header",True).csv("s3a://source-bucket-amz/zipcodes.csv") csvDF.createOrReplaceTempView("tempCSVTable") # load temporary table into an Iceberg table spark.sql('create or replace table lakehouse.amazonschema.zipcodes using iceberg as select * from tempCSVTable') # describe the table created spark.sql('describe table lakehouse.amazonschema.zipcodes').show(25) # query the table spark.sql('select * from lakehouse.amazonschema.zipcodes').show() def ingest_monthly_data(spark): df_feb = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-02.parquet") df_march = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-03.parquet") df_april = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-04.parquet") df_may = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-05.parquet") df_june = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-06.parquet") df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june) df_q1_q2.write.insertInto("lakehouse.amazonschema.yellow_taxi_2022") def perform_table_maintenance_operations(spark): # Query the metadata files table to list underlying data files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show() # There are many smaller files compact them into files of 200MB each using the # `rewrite_data_files` Iceberg Spark procedure spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'amazonschema.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show() # Again, query the metadata files table to list underlying data files; 6 files are compacted # to 3 files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show() # List all the snapshots # Expire earlier snapshots. Only latest one with comacted data is required # Again, List all the snapshots to see only 1 left spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show() #retain only the latest one latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").tail(1)[0].committed_at print (latest_snapshot_committed_at) spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'amazonschema.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show() spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show() # Removing Orphan data files spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'amazonschema.yellow_taxi_2022')").show(truncate=False) # Rewriting Manifest Files spark.sql(f"CALL lakehouse.system.rewrite_manifests('amazonschema.yellow_taxi_2022')").show() def evolve_schema(spark): # demonstration: Schema evolution # Add column fare_per_mile to the table spark.sql('ALTER TABLE lakehouse.amazonschema.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)') # describe the table spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25) def clean_database(spark): # clean-up the demo database spark.sql('drop table if exists lakehouse.amazonschema.testTable purge') spark.sql('drop table if exists lakehouse.amazonschema.zipcodes purge') spark.sql('drop table if exists lakehouse.amazonschema.yellow_taxi_2022 purge') spark.sql('drop database if exists lakehouse.amazonschema cascade') def main(): try: spark = init_spark() clean_database(spark) create_database(spark) list_databases(spark) basic_iceberg_table_operations(spark) # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table create_table_from_parquet_data(spark) ingest_from_csv_temp_table(spark) # load data for the month of Feburary to June into the table yellow_taxi_2022 created above ingest_monthly_data(spark) # demonstration: Table maintenance perform_table_maintenance_operations(spark) # demonstration: Schema evolution evolve_schema(spark) finally: # clean-up the demo database #clean_database(spark) spark.stop() if __name__ == '__main__': main()
-
运行以下命令将元数据服务 JAR 文件从该 位置 下载到工作站:
JAR 文件必须存在于集群的所有节点上的
/home/hadoop
位置中。 记下spark.driver.extraClassPath
和spark.executor.extraClassPath
。wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-exec-2.3.9-core.jar wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-common-2.3.9.jar wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-metastore-2.3.9.jar
-
在 AWS EMR 集群中配置 MDS 连接详细信息,以连接到 watsonx.data 元数据服务 (MDS)。 在基于 EMR-6.12.0 (Spark 3.4.1 ) 的集群中使用 spark-submit 的示例命令如下:
从 EC2 集群上的 EMR 运行命令以提交样本 spark 作业。
spark-submit \ --deploy-mode cluster \ --jars https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.4.0/iceberg-spark-runtime-3.4_2.12-1.4.0.jar,/usr/lib/hadoop/hadoop-aws.jar,/usr/share/aws/aws-java-sdk/aws-java-sdk-bundle*.jar,/usr/lib/hadoop-lzo/lib/* \ --conf spark.sql.catalogImplementation=hive \ --conf spark.driver.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \ --conf spark.executor.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.iceberg.vectorization.enabled=false \ --conf spark.sql.catalog.lakehouse=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.lakehouse.type=hive \ --conf spark.hive.metastore.uris==<<change_endpoint>> \ --conf spark.hive.metastore.client.auth.mode=PLAIN \ --conf spark.hive.metastore.client.plain.username=ibmlhapikey \ --conf spark.hive.metastore.client.plain.password=<<change_pswd>> \ --conf spark.hive.metastore.use.SSL=true \ --conf spark.hive.metastore.truststore.type=JKS \ --conf spark.hive.metastore.truststore.path=file:///etc/pki/java/cacerts \ --conf spark.hive.metastore.truststore.password=changeit \ amazon-lakehouse.py
参数值:
- <<change_endpoint>> : 元数据服务 URI 端点,用于访问元存储。 有关获取 MDS 凭据的更多信息,请参阅 获取元数据服务(MDS)凭据。
- <<change_pswd>> : 访问元数据存储的密码。 有关获取 MDS 凭据的更多信息,请参阅 获取元数据服务(MDS)凭据。
要使用EMR-6.15.0(Spark3.4) 集群运行 Spark python 文件,请从该 位置 下载 iceberg jars,并按照同样的 步骤 操作。