使用本地 Spark 引擎提交 Spark 应用程序
适用于:火花机 面筋加速火花机
本主题介绍了IBM Cloud watsonx.data本地Spark引擎提交Spark应用程序的过程。
先决条件
-
创建对象存储:要存储 Spark 应用程序和相关输出,请创建一个存储桶。 要创建 Cloud Object Storage和存储桶,请参阅 创建存储桶。 应用程序和数据保持独立存储。 只注册 watsonx.data 的数据桶。
-
注册 Cloud Object Storage:在 watsonx.data 中注册 Cloud Object Storage桶。 要注册 Cloud Object Storage 存储桶,请参阅 添加存储桶目录对。
您可以创建不同的 Cloud Object Storage桶来存储应用程序代码和输出。 注册存储输入数据的数据桶和 watsonx.data 表。 您不需要注册存储桶,存储桶通过 watsonx.data 维护应用程序代码。
支持的存储器
-
Azure 数据湖存储(ADLS)
Azure 数据湖存储 (ADLS) 已被弃用,并将在即将发布的版本中移除。Gen1 您必须过渡到 ADLS Gen2,因为 ADLS Gen1 将不再可用。
-
Amazon S3
-
Google Cloud Storage (GCS)
-
Cloud Object Storage (COS)
在不访问 watsonx.data 目录的情况下提交 Spark 应用程序
您可以通过运行 CURL 命令来提交 Spark 应用程序。 完成以下步骤以提交 Python 应用程序。
运行以下 curl 命令,提交字数统计应用程序。
curl --request POST --url https://<region>.lakehouse.cloud.ibm.com/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications --header 'Authorization: Bearer <token>' --header 'Content-Type: application/json' --header 'AuthInstanceID: <crn_instance>' --data '{
"application_details": {
"application": "/opt/ibm/spark/examples/src/main/python/wordcount.py",
"arguments": [
"/opt/ibm/spark/examples/src/main/resources/people.txt"
]
}
}'
参数:
<crn_instance>
:watsonx.data 实例的 CRN。<region>
:配置 Spark 实例的区域。<spark_engine_id>
:Spark 引擎的引擎 ID。<token>
:不记名标记。 有关生成令牌的更多信息,请参阅 生成不记名令牌。
通过访问 watsonx.data 目录提交 Spark 应用程序
要访问与 Spark 引擎关联的目录中的数据,并对该目录执行一些基本操作,请执行以下操作:
运行以下 cURL 命令:
curl --request POST --url https://<region>.lakehouse.cloud.ibm.com/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications --header 'Authorization: Bearer <token>' --header 'Content-Type: application/json' --header 'AuthInstanceID: <crn_instance>' --data '{
"application_details": {
"conf": {
"spark.hadoop.wxd.apiKey": "Basic <encoded-api-key>"
},
"application": "<storage>://<application-bucket-name>/iceberg.py"
}
}'
参数值:
<encoded-api-key>
:值的格式必须是echo -n"ibmlhapikey_<user_id>:<user’s api key>" | base64
。 这里,<user_id> 是用户的 IBM Cloud ID,其 api 密钥用于访问数据桶。 这里的<IAM_APIKEY>
是访问对象存储桶的用户的 API 密钥。 要生成 API 密钥,请登录 watsonx.data控制台并导航至“配置文件”>“配置文件和设置”>“API 密钥”,然后生成一个新的 API 密钥。<storage>
数值取决于您选择的存储类型。 对于 Amazon S3 或云对象存储(COS),必须是s3a
;对于ADLS,必须是abfss
;对于GCS存储,必须是gs
。<application_bucket_name>
:包含应用程序代码的对象存储的名称。 如果该存储未在 watsonx.data 中注册,则必须传递该存储的证书。
冰山目录操作示例 Python 应用程序
下面是 Python 应用程序示例,用于对存储在冰山目录中的数据执行基本操作:
from pyspark.sql import SparkSession
import os
from datetime import datetime
def init_spark():
spark = SparkSession.builder.appName("lh-hms-cloud").enableHiveSupport().getOrCreate()
return spark
def create_database(spark,bucket_name,catalog):
spark.sql(f"create database if not exists {catalog}.<db_name> LOCATION 's3a://{bucket_name}/'")
def list_databases(spark,catalog):
spark.sql(f"show databases from {catalog}").show()
def basic_iceberg_table_operations(spark,catalog):
spark.sql(f"create table if not exists {catalog}.<db_name>.<table_name>(id INTEGER, name
VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
spark.sql(f"insert into {catalog}.<db_name>.<table_name>
values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
spark.sql(f"select * from {catalog}.<db_name>.<table_name>").show()
def clean_database(spark,catalog):
spark.sql(f'drop table if exists {catalog}.<db_name>.<table_name> purge')
spark.sql(f'drop database if exists {catalog}.<db_name> cascade')
def main():
try:
spark = init_spark()
create_database(spark,"<wxd-data-bucket-name>","<wxd-data-bucket-catalog-name>")
list_databases(spark,"<wxd-data-bucket-catalog-name>")
basic_iceberg_table_operations(spark,"<wxd-data-bucket-catalog-name>")
finally:
clean_database(spark,"<wxd-data-bucket-catalog-name>")
spark.stop()
if __name__ == '__main__':
main()