IBM Cloud Docs
使用本地 Spark 引擎提交 Spark 应用程序

使用本地 Spark 引擎提交 Spark 应用程序

适用于火花机 面筋加速火花机

本主题介绍了IBM Cloud watsonx.data本地Spark引擎提交Spark应用程序的过程。

先决条件

  • 创建对象存储:要存储 Spark 应用程序和相关输出,请创建一个存储桶。 要创建 Cloud Object Storage和存储桶,请参阅 创建存储桶。 应用程序和数据保持独立存储。 只注册 watsonx.data 的数据桶。

  • 注册 Cloud Object Storage:在 watsonx.data 中注册 Cloud Object Storage桶。 要注册 Cloud Object Storage 存储桶,请参阅 添加存储桶目录对

    您可以创建不同的 Cloud Object Storage桶来存储应用程序代码和输出。 注册存储输入数据的数据桶和 watsonx.data 表。 您不需要注册存储桶,存储桶通过 watsonx.data 维护应用程序代码。

支持的存储器

  • Azure 数据湖存储(ADLS)

    Azure 数据湖存储 (ADLS) 已被弃用,并将在即将发布的版本中移除。Gen1 您必须过渡到 ADLS Gen2,因为 ADLS Gen1 将不再可用。

  • Amazon S3

  • Google Cloud Storage (GCS)

  • Cloud Object Storage (COS)

在不访问 watsonx.data 目录的情况下提交 Spark 应用程序

您可以通过运行 CURL 命令来提交 Spark 应用程序。 完成以下步骤以提交 Python 应用程序。

运行以下 curl 命令,提交字数统计应用程序。

   curl --request POST --url https://<region>.lakehouse.cloud.ibm.com/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications --header 'Authorization: Bearer <token>' --header 'Content-Type: application/json' --header 'AuthInstanceID: <crn_instance>' --data '{
       "application_details": {
           "application": "/opt/ibm/spark/examples/src/main/python/wordcount.py",
           "arguments": [
               "/opt/ibm/spark/examples/src/main/resources/people.txt"
           ]
       }
   }'

参数:

  • <crn_instance>:watsonx.data 实例的 CRN。
  • <region>:配置 Spark 实例的区域。
  • <spark_engine_id>:Spark 引擎的引擎 ID。
  • <token>:不记名标记。 有关生成令牌的更多信息,请参阅 生成不记名令牌

通过访问 watsonx.data 目录提交 Spark 应用程序

要访问与 Spark 引擎关联的目录中的数据,并对该目录执行一些基本操作,请执行以下操作:

运行以下 cURL 命令:

curl --request POST --url https://<region>.lakehouse.cloud.ibm.com/lakehouse/api/v2/spark_engines/<spark_engine_id>/applications --header 'Authorization: Bearer <token>' --header 'Content-Type: application/json' --header 'AuthInstanceID: <crn_instance>' --data '{
    "application_details": {
        "conf": {
            "spark.hadoop.wxd.apiKey": "Basic <encoded-api-key>"
        },
        "application": "<storage>://<application-bucket-name>/iceberg.py"
    }
}'

参数值:

  • <encoded-api-key>:值的格式必须是 echo -n"ibmlhapikey_<user_id>:<user’s api key>" | base64。 这里,<user_id> 是用户的 IBM Cloud ID,其 api 密钥用于访问数据桶。 这里的 <IAM_APIKEY> 是访问对象存储桶的用户的 API 密钥。 要生成 API 密钥,请登录 watsonx.data控制台并导航至“配置文件”>“配置文件和设置”>“API 密钥”,然后生成一个新的 API 密钥。
  • <storage> 数值取决于您选择的存储类型。 对于 Amazon S3 或云对象存储(COS),必须是 s3a ;对于ADLS,必须是 abfss ;对于GCS存储,必须是 gs
  • <application_bucket_name>:包含应用程序代码的对象存储的名称。 如果该存储未在 watsonx.data 中注册,则必须传递该存储的证书。

冰山目录操作示例 Python 应用程序

下面是 Python 应用程序示例,用于对存储在冰山目录中的数据执行基本操作:

from pyspark.sql import SparkSession
import os
from datetime import datetime
def init_spark():
    spark = SparkSession.builder.appName("lh-hms-cloud").enableHiveSupport().getOrCreate()
    return spark
def create_database(spark,bucket_name,catalog):
    spark.sql(f"create database if not exists {catalog}.<db_name> LOCATION 's3a://{bucket_name}/'")
def list_databases(spark,catalog):
    spark.sql(f"show databases from {catalog}").show()
def basic_iceberg_table_operations(spark,catalog):
    spark.sql(f"create table if not exists {catalog}.<db_name>.<table_name>(id INTEGER, name
    VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
    spark.sql(f"insert into {catalog}.<db_name>.<table_name>
    values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
    spark.sql(f"select * from {catalog}.<db_name>.<table_name>").show()
def clean_database(spark,catalog):
    spark.sql(f'drop table if exists {catalog}.<db_name>.<table_name> purge')
    spark.sql(f'drop database if exists {catalog}.<db_name> cascade')
def main():
    try:
        spark = init_spark()
        create_database(spark,"<wxd-data-bucket-name>","<wxd-data-bucket-catalog-name>")
        list_databases(spark,"<wxd-data-bucket-catalog-name>")
        basic_iceberg_table_operations(spark,"<wxd-data-bucket-catalog-name>")
    finally:
        clean_database(spark,"<wxd-data-bucket-catalog-name>")
        spark.stop()
if __name__ == '__main__':
    main()