AWS EMR for Spark 사용 사례
이 주제에서는 IBM® watsonx.data Spark 유스 케이스를 달성하기 위해 Amazon Web Services Elastic MapReduce (AWS EMR) 에서 Spark 애플리케이션을 실행하는 프로시저를 제공합니다.
- 데이터 통합
- 데이터 쿼리
- 테이블 유지보수
전제조건
- IBM® watsonx.data 인스턴스를 프로비저닝하십시오.
- S3 버킷을 사용하여 카탈로그를 작성하십시오.
- S3 버킷 신임 정보를 가져오십시오.
- AWS에서 EMR 클러스터를 설정하십시오. 자세한 정보는 EMR 클러스터 설정 을 참조하십시오.
- IBM® watsonx.data에서 다음 정보를 페치하십시오.
- watsonx.data MDS URL. MDS 자격 증명 가져오기에 대한 자세한 내용은 메타데이터 서비스(MDS)자격 증명 가져오기를 참조하세요.
- watsonx.data MDS 자격 증명. MDS 자격 증명 가져오기에 대한 자세한 내용은 메타데이터 서비스(MDS)자격 증명 가져오기를 참조하세요.
개요
AWS S3 버킷에 있는 소스 데이터에 대해 작업하기 위해 다음 방법 중 하나를 수행할 수 있습니다.
- AWS 에서 watsonx.data 인스턴스 설정
- IBM Cloud 기반 watsonx.data 인스턴스를 구성하고 AWS S3 버킷 기반 카탈로그를 포함합니다.
watsonx.data 조회 엔진은 AWS S3 버킷의 데이터에 대한 조회를 실행할 수 있습니다. 두 경우 모두 AWS EMR Spark를 사용하여 데이터 수집 및 Iceberg 기반 스키마 유지보수 조작을 실행할 수 있습니다.
샘플 유스 케이스 정보
샘플 python 파일 (amazon-lakehouse.py) 은 스키마 (amazonschema), 테이블 및 수집 데이터 작성을 보여줍니다. 또한 테이블 유지보수 조작을 지원합니다. 샘플의 기능에 대한 자세한 정보는 샘플 유스 케이스 정보 를 참조하십시오.
샘플 유스 케이스 실행
단계에 따라 Spark 샘플 python 파일을 실행하십시오.
-
AWS EMR 클러스터에 연결하십시오. SSH를 사용하여 EMR 클러스터에 연결하는 방법에 대한 자세한 정보는 EMR 클러스터 설정 을 참조하십시오.
-
다음 샘플 python 파일을 저장하십시오.
Spark 샘플 python 파일
from pyspark.sql import SparkSession import os def init_spark(): spark = SparkSession.builder.appName("lh-hms-cloud")\ .enableHiveSupport().getOrCreate() return spark def create_database(spark): # Create a database in the lakehouse catalog spark.sql("create database if not exists lakehouse.amazonschema LOCATION 's3a://lakehouse-bucket-amz/'") def list_databases(spark): # list the database under lakehouse catalog spark.sql("show databases from lakehouse").show() def basic_iceberg_table_operations(spark): # demonstration: Create a basic Iceberg table, insert some data and then query table spark.sql("create table if not exists lakehouse.amazonschema.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show() spark.sql("insert into lakehouse.amazonschema.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)") spark.sql("select * from lakehouse.amazonschema.testTable").show() def create_table_from_parquet_data(spark): # load parquet data into dataframce df = spark.read.option("header",True).parquet("s3a://source-bucket-amz/nyc-taxi/yellow_tripdata_2022-01.parquet") # write the dataframe into an Iceberg table df.writeTo("lakehouse.amazonschema.yellow_taxi_2022").create() # describe the table created spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25) # query the table spark.sql('select * from lakehouse.amazonschema.yellow_taxi_2022').count() def ingest_from_csv_temp_table(spark): # load csv data into a dataframe csvDF = spark.read.option("header",True).csv("s3a://source-bucket-amz/zipcodes.csv") csvDF.createOrReplaceTempView("tempCSVTable") # load temporary table into an Iceberg table spark.sql('create or replace table lakehouse.amazonschema.zipcodes using iceberg as select * from tempCSVTable') # describe the table created spark.sql('describe table lakehouse.amazonschema.zipcodes').show(25) # query the table spark.sql('select * from lakehouse.amazonschema.zipcodes').show() def ingest_monthly_data(spark): df_feb = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-02.parquet") df_march = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-03.parquet") df_april = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-04.parquet") df_may = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-05.parquet") df_june = spark.read.option("header",True).parquet("s3a://source-bucket-amz//nyc-taxi/yellow_tripdata_2022-06.parquet") df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june) df_q1_q2.write.insertInto("lakehouse.amazonschema.yellow_taxi_2022") def perform_table_maintenance_operations(spark): # Query the metadata files table to list underlying data files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show() # There are many smaller files compact them into files of 200MB each using the # `rewrite_data_files` Iceberg Spark procedure spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'amazonschema.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show() # Again, query the metadata files table to list underlying data files; 6 files are compacted # to 3 files spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.amazonschema.yellow_taxi_2022.files").show() # List all the snapshots # Expire earlier snapshots. Only latest one with comacted data is required # Again, List all the snapshots to see only 1 left spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show() #retain only the latest one latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").tail(1)[0].committed_at print (latest_snapshot_committed_at) spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'amazonschema.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show() spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.amazonschema.yellow_taxi_2022.snapshots").show() # Removing Orphan data files spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'amazonschema.yellow_taxi_2022')").show(truncate=False) # Rewriting Manifest Files spark.sql(f"CALL lakehouse.system.rewrite_manifests('amazonschema.yellow_taxi_2022')").show() def evolve_schema(spark): # demonstration: Schema evolution # Add column fare_per_mile to the table spark.sql('ALTER TABLE lakehouse.amazonschema.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)') # describe the table spark.sql('describe table lakehouse.amazonschema.yellow_taxi_2022').show(25) def clean_database(spark): # clean-up the demo database spark.sql('drop table if exists lakehouse.amazonschema.testTable purge') spark.sql('drop table if exists lakehouse.amazonschema.zipcodes purge') spark.sql('drop table if exists lakehouse.amazonschema.yellow_taxi_2022 purge') spark.sql('drop database if exists lakehouse.amazonschema cascade') def main(): try: spark = init_spark() clean_database(spark) create_database(spark) list_databases(spark) basic_iceberg_table_operations(spark) # demonstration: Ingest parquet and csv data into a wastonx.data Iceberg table create_table_from_parquet_data(spark) ingest_from_csv_temp_table(spark) # load data for the month of Feburary to June into the table yellow_taxi_2022 created above ingest_monthly_data(spark) # demonstration: Table maintenance perform_table_maintenance_operations(spark) # demonstration: Schema evolution evolve_schema(spark) finally: # clean-up the demo database #clean_database(spark) spark.stop() if __name__ == '__main__': main()
-
다음 명령을 실행하여 해당 위치에서 워크스테이션으로 메타데이터 서비스 JAR 파일을 다운로드합니다:
JAR 파일은 클러스터의 모든 노드에서
/home/hadoop
위치에 있어야 합니다.spark.driver.extraClassPath
및spark.executor.extraClassPath
을 참고하세요.wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-exec-2.3.9-core.jar wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-common-2.3.9.jar wget https://github.com/IBM-Cloud/IBM-Analytics-Engine/raw/master/wxd-connectors/hms-connector/hive-metastore-2.3.9.jar
-
AWS EMR 클러스터에서 watsonx.data 연결하도록 MDS 연결 세부 정보를 구성합니다 메타데이터 서비스(MDS)에 연결합니다. EMR-6.12.0 (Spark 3.4.1 ) 기반 클러스터에서 spark-submit을 사용하는 예제 명령은 다음과 같습니다:
EC2 클러스터의 EMR에서 명령을 실행하여 샘플 spark 작업을 제출하십시오.
spark-submit \ --deploy-mode cluster \ --jars https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-spark-runtime-3.4_2.12/1.4.0/iceberg-spark-runtime-3.4_2.12-1.4.0.jar,/usr/lib/hadoop/hadoop-aws.jar,/usr/share/aws/aws-java-sdk/aws-java-sdk-bundle*.jar,/usr/lib/hadoop-lzo/lib/* \ --conf spark.sql.catalogImplementation=hive \ --conf spark.driver.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \ --conf spark.executor.extraClassPath=/home/hadoop/hive-common-2.3.9.jar:/home/hadoop/hive-metastore-2.3.9.jar:/home/hadoop/hive-exec-2.3.9-core.jar \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ --conf spark.sql.iceberg.vectorization.enabled=false \ --conf spark.sql.catalog.lakehouse=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.lakehouse.type=hive \ --conf spark.hive.metastore.uris==<<change_endpoint>> \ --conf spark.hive.metastore.client.auth.mode=PLAIN \ --conf spark.hive.metastore.client.plain.username=ibmlhapikey \ --conf spark.hive.metastore.client.plain.password=<<change_pswd>> \ --conf spark.hive.metastore.use.SSL=true \ --conf spark.hive.metastore.truststore.type=JKS \ --conf spark.hive.metastore.truststore.path=file:///etc/pki/java/cacerts \ --conf spark.hive.metastore.truststore.password=changeit \ amazon-lakehouse.py
매개변수 값:
- <<변경_엔드포인트>> : 메타스토어에 액세스하기 위한 메타데이터 서비스 URI 엔드포인트입니다. MDS 자격 증명 가져오기에 대한 자세한 내용은 메타데이터 서비스(MDS)자격 증명 가져오기를 참조하세요.
- <<변경_pswd>> : 메타스토어에 액세스하기 위한 비밀번호입니다. MDS 자격 증명 가져오기에 대한 자세한 내용은 메타데이터 서비스(MDS)자격 증명 가져오기를 참조하세요.
EMR-6.15.0 (Spark 3.4) 클러스터를 사용하여 Spark 파이썬 파일을 실행하려면 해당 위치에서 빙산 항아리를 다운로드하고 동일한 절차를 따르세요.