Spark 用例入门
本主题提供了使用 Python 样本对 watsonx.data 运行 Spark 用例的过程。 所有样本都是使用 Spark Python API 编写的。
先决条件
- 供应 IBM® watsonx.data 实例。
- 配置 IBM Analytics Engine 实例。
- Cloud Object Storage 铲斗连接细节。
- 为了使您的 Spark 应用程序能够与watsonx.data目录和存储,您必须具有 Metastore 管理员角色。 如果没有 Metastore 管理员权限,您将无法使用 Native Spark 引擎将数据导入存储。 有关 Spark 配置的更多信息,请参阅 与watsonx.data目录和存储。
关于样本用例
样本文件演示了以下功能:
-
从 watsonx.data 访问表
样本 Python 文件 中的 在 Lakehouse 目录中创建数据库 部分在配置的 watsonx.data 实例中创建数据库 demodb,该实例具有名为 lakehouse 的目录。demodb 配置为将所有数据和元数据存储在 Cloud Object Storage(COS) 存储区 lakehouse-bucket 下。 它还会创建一个 iceberg 表 testTable 并对其进行访问。
-
将数据采集到 watsonx.data
通过 样本 python 文件 中的 将拼合数据采集到 lakehouse 表中 部分,可以将来自源 Cloud Object Storage 存储区 source-bucket 的拼合和 CSV 格式的数据采集到 watsonx.data 表中。 将拼合格式的样本数据从源 COS 存储区 source-bucket 插入到 watsonx.data 表 yellow_taxi_2022 中 (请参阅 步骤 以将样本数据插入到源 COS 存储区中)。 它还显示将 CSV 格式的数据从 COS 存储区 source-bucket 采集到数据库 demodb 中的表 zipcode 中。
-
在 watsonx.data 中修改模式
样本 Python 文件 中的 模式演进 部分允许您修改 watsonx.data中的数据。
-
在 watsonx.data 中执行表维护活动
表维护有助于保持 watsonx.data 表的性能。 Iceberg 提供现成可用的表维护过程,允许以声明式方式执行强大的表优化。 以下样本演示如何使用 Spark 执行一些表维护操作。 有关 Iceberg Spark 表维护操作的更多信息,请参阅 表操作。
将样本数据插入 COS 存储区
要将数据插入 COS,请执行以下步骤。
-
创建 COS 存储区 (例如,source-bucket) 以存储要采集到 watsonx.data 实例中的样本数据。 有关创建 COS 存储桶的信息,请参阅 开始使用 IBM Cloud Object Storage。
作为 Object Storage 的用户,您不仅需要知道配置 Object Storage 的 API 密钥或 HMAC 密钥,还需要知道连接到 IBM Analytics Engine 的服务端点。 请参阅 选择区域和端点,了解根据 Object Storage 存储桶类型(如区域与跨区域)使用端点的更多信息。 您还可以通过在 IBM Cloud 面板上选择服务并单击导航窗格中的 Endpoint 来查看 Object Storage 服务的跨区域端点。 始终选择 直接端点。 直接端点提供更好的性能,不会产生费用。 美南交叉区域的端点示例为
s3.direct.us.cloud-object-storage.appdomain.cloud
。 -
从以下链接下载样本 csv 文件 (例如,zipcodes.csv) 和 parquet 样本数据 (例如,2022 年的 6 个月出租车数据)。
-
安装 IBM Cloud Object Storage 插件。 有关如何安装插件的详细信息,请参阅 IBM Cloud Object Storage CLI。
-
使用 COS CLI 将样本数据上载到 COS 存储区。
ibmcloud cos upload --bucket <cos_bucket_name> --key <source_file_name> --file <path_to_source_file>
参数值:
- <cos_bucket_name>: name of the bucket created in step1.
- <source_file_name>: the name of the sample data file that you downloaded. 此处,key zipcodes.csv 是文件名 (请参阅以下示例)。
- <path_to_source_file>: the path to the location in your machine where the file resides. 此处,path/zipcodes.csv 是文件路径 (请参阅以下示例)。
例如:
ibmcloud cos upload --bucket source-bucket --key zipcodes.csv --file <path/zipcodes.csv>
运行样本用例
执行以下步骤以运行 Spark 样本 Python 文件。
Spark 样本 Python 文件
from pyspark.sql import SparkSession
import os
def init_spark():
spark = SparkSession.builder \
.appName("lh-hms-cloud") \
.config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.endpoint" ,"s3.direct.us-south.cloud-object-storage.appdomain.cloud") \
.config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.access.key" ,"<lakehouse-bucket-access-key>") \
.config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.secret.key" ,"<lakehouse-bucket-secret-key>") \
.config("spark.hadoop.fs.s3a.bucket.source-bucket.endpoint" ,"s3.direct.us-south.cloud-object-storage.appdomain.cloud") \
.config("spark.hadoop.fs.s3a.bucket.source-bucket.access.key" ,"<source-bucket-access-key>") \
.config("spark.hadoop.fs.s3a.bucket.source-bucket.secret.key" ,"<source-bucket-secret-key>") \
.enableHiveSupport() \
.getOrCreate()
return spark
def create_database(spark):
# Create a database in the lakehouse catalog
spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")
def list_databases(spark):
# list the database under lakehouse catalog
spark.sql("show databases from lakehouse").show()
def basic_iceberg_table_operations(spark):
# demonstration: Create a basic Iceberg table, insert some data and then query table
spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
spark.sql("select * from lakehouse.demodb.testTable").show()
def create_table_from_parquet_data(spark):
# load parquet data into dataframe
df = spark.read.option("header",True).parquet("s3a://source-bucket/nyc-taxi/yellow_tripdata_2022-01.parquet")
# write the dataframe into an Iceberg table
df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
# describe the table created
spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
# query the table
spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()
def ingest_from_csv_temp_table(spark):
# load csv data into a dataframe
csvDF = spark.read.option("header",True).csv("s3a://source-bucket/zipcodes.csv")
csvDF.createOrReplaceTempView("tempCSVTable")
# load temporary table into an Iceberg table
spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
# describe the table created
spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
# query the table
spark.sql('select * from lakehouse.demodb.zipcodes').show()
def ingest_monthly_data(spark):
df_feb = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-02.parquet")
df_march = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-03.parquet")
df_april = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-04.parquet")
df_may = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-05.parquet")
df_june = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-06.parquet")
df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")
def perform_table_maintenance_operations(spark):
# Query the metadata files table to list underlying data files
spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
# There are many smaller files compact them into files of 200MB each using the
# `rewrite_data_files` Iceberg Spark procedure
spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
# Again, query the metadata files table to list underlying data files; 6 files are compacted
# to 3 files
spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
# List all the snapshots
# Expire earlier snapshots. Only latest one with compacted data is required
# Again, List all the snapshots to see only 1 left
spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
#retain only the latest one
latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
print (latest_snapshot_committed_at)
spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
# Removing Orphan data files
spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)
# Rewriting Manifest Files
spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()
def evolve_schema(spark):
# demonstration: Schema evolution
# Add column fare_per_mile to the table
spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
# describe the table
spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
def clean_database(spark):
# clean-up the demo database
spark.sql('drop table if exists lakehouse.demodb.testTable purge')
spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
spark.sql('drop database if exists lakehouse.demodb cascade')
def main():
try:
spark = init_spark()
create_database(spark)
list_databases(spark)
basic_iceberg_table_operations(spark)
# demonstration: Ingest parquet and csv data into a watsonx.data Iceberg table
create_table_from_parquet_data(spark)
ingest_from_csv_temp_table(spark)
# load data for the month of February to June into the table yellow_taxi_2022 created above
ingest_monthly_data(spark)
# demonstration: Table maintenance
perform_table_maintenance_operations(spark)
# demonstration: Schema evolution
evolve_schema(spark)
finally:
# clean-up the demo database
clean_database(spark)
spark.stop()
if __name__ == '__main__':
main()
- 保存以下样本 Python 文件。
- 将 Python 文件上载到 Cloud Object Storage 存储区。 您必须在 Cloud Object Storage 存储区中维护 Spark 应用程序及其依赖关系,而不能将其与数据存储区混合。
- 为 IBM Analytics Engine 令牌生成 IAM 令牌。 有关如何生成 IAM 令牌的详细信息,请参阅 IAM 令牌。
- 运行以下 curl 命令以提交 Spark 应用程序:
curl https://api.<region>.ae.cloud.ibm.com/v3/analytics_engines/<iae-instance-guid>/spark_applications -H "Authorization: Bearer <iam-bearer-token>" -X POST -d '{
"application_details": {
"application": "s3a://<application_bucket>/lakehouse-hms-test-cloud-doc-sample.py",
"conf": {
"spark.hadoop.fs.s3a.bucket.<application-bucket>.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud",
"spark.hadoop.fs.s3a.bucket.<application-bucket>.access.key": "<hmac_access_key_for_application-bucket>",
"spark.hadoop.fs.s3a.bucket.<application-bucket>.secret.key": "<hmac_secret_key_for_application-bucket>"
}
}
}'
此样本在 us-south 区域中的 Cloud Object Storage 存储区上进行测试。 根据 Cloud Object Storage 存储区所在的区域,更改 Cloud Object Storage 端点配置中的区域。 建议在供应 Analytics Engine 实例的区域中供应 COS 存储区。
如果提交的Spark申请持续三天以上处于运行状态,则该申请将被自动终止。
与watsonx.data目录和存储
为了使您的 Spark 应用程序能够与watsonx.data目录和存储,将以下配置添加到您的应用程序有效负载:
spark.hive.metastore.client.plain.username=ibmlhapikey
spark.hive.metastore.client.plain.password=<api-key-of-the-user-which-has-metastore-admin-role>
spark.hadoop.wxd.apiKey=Basic base64(ibmlhapikey_ibmcloudid:apikey)