使用不同的表格式
适用于:火花机 面筋加速火花机
本主题介绍了运行 Spark 应用程序的过程,该程序将数据摄取到不同的表格式,如 Apache Hudi、Apache Iceberg 或 Delta Lake 目录。
-
创建具有所需目录的存储(目录可以是 Apache Hudi、Apache Iceberg 或 Delta Lake ),以存储 Spark 应用程序中使用的数据。 要创建存储器,请参阅 添加存储器-目录对。
-
将存储器与本机 Spark 引擎相关联。 有关更多信息,请参阅 使目录与引擎相关联。
-
创建 Cloud Object Storage (COS) 以存储 Spark 应用程序。 要创建 Cloud Object Storage 和存储区,请参阅 创建存储区。
-
在 watsonx.data中注册 Cloud Object Storage。 有关更多信息,请参阅 添加存储器-目录对。
-
根据您选择的目录,将以下 Spark 应用程序 (Python 文件) 保存到本地计算机。 在此处,
iceberg_demo.py
,hudi_demo.py
或delta_demo.py
并将 Spark 应用程序上载到 COS,请参阅 上载数据。 -
要提交包含驻留在 Cloud Object Storage中的数据的 Spark 应用程序,请指定参数值并从下表运行 curl 命令。
- Apache Iceberg
示例文件演示了以下功能:
-
从watsonx.data访问表格
-
向watsonx.data输入数据
-
修改watsonx.data中的模式
在watsonx.data 中执行表格维护活动。
必须将数据插入到 COS 存储区中。 更多信息,请参阅 将样本数据插入 COS 数据桶。
Python应用程序:冰山Python文件
提交Python应用程序的Curl命令:
curl --request POST \ --url https://<wxd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications \ --header 'Authorization: Bearer <token>' \ --header 'Content-Type: application/json' \ --header 'LhInstanceId: <instance_id>' \ --data '{ "application_details": { "conf": { "spark.hadoop.wxd.apiKey":"Basic <user-authentication-string>" }, "application": "s3a://<application-bucket-name>/iceberg.py" } }'
参数值:
-
<wxd_host_name>
:watsonx.data云实例的主机名。 -
<instance_id>
:来自 watsonx.data 实例 URL 的实例 ID。 例如,1609968977179454。 -
<spark_engine_id>
:本地 Spark 引擎的引擎 ID。 -
<token>
:不记名标记。 有关生成令牌的更多信息,请参阅 IAM 令牌。 -
<user-authentication-string>
:值必须是用户 ID 和 API 密钥的 base 64 编码字符串。 有关格式的更多信息,请参阅注释。 -
Apache胡迪
PythonSpark 应用程序演示了以下功能:
-
它会在 Apache Hudi 目录(您创建的用于存储数据的目录)中创建一个数据库。 这里,"
<database_name>
. -
它在“
<database_name>
数据库中创建了一个表,即”<table_name>
。 -
它将数据插入 "
<table_name>
并执行 SELECT 查询操作。 -
使用后,它会删除表和模式。
Python应用程序:
from pyspark.sql import SparkSession def init_spark(): spark = SparkSession.builder .appName("CreateHudiTableInCOS") .enableHiveSupport() .getOrCreate() return spark def main(): try: spark = init_spark() spark.sql("show databases").show() spark.sql("create database if not exists spark_catalog.<database_name> LOCATION 's3a://<data_storage_name>/'").show() spark.sql("create table if not exists spark_catalog.<database_name>.<table_name> (id bigint, name string, location string) USING HUDI OPTIONS ('primaryKey' 'id', hoodie.write.markers.type= 'direct', hoodie.embed.timeline.server= 'false')").show() spark.sql("insert into <database_name>.<table_name> VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show() spark.sql("select * from spark_catalog.<database_name>.<table_name>").show() spark.sql("drop table spark_catalog.<database_name>.<table_name>").show() spark.sql("drop schema spark_catalog.<database_name> CASCADE").show() inally: spark.stop() if __name__ == '__main__': main()
参数值:
<database_name>
:指定要创建的数据库名称。<table_name>
:指定要创建的表的名称。<data_storage_name>
:指定所创建的 Apache Hudi 存储的名称。
用于提交 Python 应用程序的 Curl 命令
curl --request POST --url https://<wxd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications --header 'Authorization: Bearer <token>' --header 'Content-Type: application/json' --header 'LhInstanceId: <instance_id>' --data '{ \n \n "application_details": { "conf": { "spark.sql.catalog.spark_catalog.type": "hive", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.hadoop.wxd.apiKey":"Basic <user-authentication-string>" }, "application": "s3a://<data_storage_name>/hudi_demo.py" }}
参数值:
-
<wxd_host_name>
:watsonx.data云实例的主机名。 -
<instance_id>
:来自 watsonx.data 实例 URL 的实例 ID。 例如,1609968977179454。 -
<spark_engine_id>
:本地 Spark 引擎的引擎 ID。 -
<token>
:不记名标记。 有关生成令牌的更多信息,请参阅 IAM 令牌。 -
<user-authentication-string>
:值必须是用户 ID 和 API 密钥的 base 64 编码字符串。 有关格式的更多信息,请参阅注释。 -
Delta Lake
PythonSpark 应用程序演示了以下功能:
-
它会在 Delta Lake 目录(您创建的用于存储数据的目录)中创建一个数据库。 这里,"
<database_name>
. -
它在“
<database_name>
数据库中创建了一个表,即”<table_name>
。 -
它将数据插入 "
<table_name>
并执行 SELECT 查询操作。 -
使用后,它会删除表和模式。
Python应用程序:
from pyspark.sql import SparkSession import os def init_spark(): spark = SparkSession.builder.appName("lh-hms-cloud") .enableHiveSupport().getOrCreate() return spark def main(): spark = init_spark() spark.sql("show databases").show() spark.sql("create database if not exists spark_catalog.<database_name> LOCATION 's3a://<data_storage_name>/'").show() spark.sql("create table if not exists spark_catalog.<database_name>.<table_name> (id bigint, name string, location string) USING DELTA").show() spark.sql("insert into spark_catalog.<database_name>.<table_name> VALUES (1, 'Sam','Kochi'), (2, 'Tom','Bangalore'), (3, 'Bob','Chennai'), (4, 'Alex','Bangalore')").show() spark.sql("select * from spark_catalog.<database_name>.<table_name>").show() spark.sql("drop table spark_catalog.<database_name>.<table_name>").show() spark.sql("drop schema spark_catalog.<database_name> CASCADE").show() spark.stop() if __name__ == '__main__': main()
参数值:
<database_name>
:指定要创建的数据库名称。<table_name>
:指定要创建的表的名称。<data_storage_name>
:指定所创建的 Apache Hudi 存储的名称。
用于提交 Python 应用程序的 Curl 命令
curl --request POST --url https://<wxd_host_name>/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications --header 'Authorization: Bearer <token>' --header 'Content-Type: application/json' --header 'LhInstanceId: <instance_id>' --data '{ "application_details": { "conf": { "spark.sql.catalog.spark_catalog" : "org.apache.spark.sql.delta.catalog.DeltaCatalog", "spark.sql.catalog.spark_catalog.type" : "hive", "spark.hadoop.wxd.apiKey":"<user-authentication-string>" }, "application": "s3a://<database_name>/delta_demo.py" } }
参数值
<wxd_host_name>
:watsonx.data云实例的主机名。<instance_id>
实例 ID:watsonx.data 集群实例 URL 中的实例 ID。 例如,1609968977179454。<spark_engine_id>
:本地 Spark 引擎的引擎 ID。<token>
:不记名标记。 有关生成令牌的更多信息,请参阅 IAM 令牌。<user-authentication-string>
:值必须是用户 ID 和 API 密钥的 base 64 编码字符串。 有关格式的更多信息,请参阅以下说明。
<user-authentication-string>
的值必须采用格式echo -n 'ibmlhapikey_<username>:<user_apikey>' | base64
。 此处,<user_id>
是其 API 密钥用于访问数据存储区的用户的 IBM Cloud 标识。 此处的<IAM_APIKEY>
是访问对象存储区的用户的 API 密钥。 要生成 API 密钥,请登录到 watsonx.data 控制台,浏览至“概要文件> 概要文件和设置> API 密钥”并生成新的 API 密钥。 如果生成新的 API 密钥,那么旧的 API 密钥将变为无效。 -
提交 Spark 应用程序后,您将收到包含应用程序标识和 Spark 版本的确认消息。 将其保存以供参考。
-
登录到 watsonx.data 集群,访问“引擎详细信息”页面。 在“应用程序”选项卡中,使用应用程序标识列出应用程序并跟踪阶段。 有关更多信息,请参阅 查看和管理应用程序。