IBM Cloud Docs
Spark 应用程序 REST API

Spark 应用程序 REST API

IBM Analytics Engine 无服务器套餐提供 REST API 以提交和管理 Spark 应用程序。 支持以下操作:

  1. 获取所需的凭证并设置许可权
  2. 提交 Spark 应用程序
  3. 检索已提交的 Spark 应用程序的状态
  4. 检索已提交的 Spark 应用程序的详细信息
  5. 停止正在运行的 Spark 应用程序

有关可用 API 的描述,请参阅 无服务器套餐的IBM Analytics Engine REST API

本主题中的以下部分显示了每个 Spark 应用程序管理 API 的样本。

必需的凭证和许可权

在可以提交 Spark 应用程序之前,您需要获取认证凭证,并在 Analytics Engine 无服务器实例上设置正确的许可权。

  1. 您需要在供应实例时记下的服务实例的 GUID。 如果未记下 GUID,请参阅 检索无服务器实例的 GUID
  2. 您必须具有正确的许可权才能执行必需的操作。 请参阅 用户许可权
  3. Spark 应用程序 REST API 使用基于 IAM 的认证和授权。

提交 Spark 应用程序

Analytics Engine Serverless 为您提供用于提交 Spark 应用程序的 REST 接口。 传递到 REST API 的有效内容映射到 spark-submit 命令支持的各种命令行参数。 有关更多详细信息,请参阅 用于提交 Spark 应用程序的参数

提交 Spark 应用程序时,需要引用应用程序文件。 为了帮助您快速入门并了解如何使用 AE 无服务器 Spark API,本部分以使用在提交应用程序 API 有效内容中引用的预捆绑 Spark 应用程序文件的示例开头。 后续部分显示如何运行存储在 Object Storage 存储区中的应用程序。

引用预捆绑文件

提供的样本应用程序显示如何在作业有效内容中引用 .py 字数应用程序文件和数据文件。

要了解如何快速开始使用预先捆绑的样本应用程序文件:

  1. 如果尚未生成 IAM 令牌,请生成该令牌。 请参阅 检索 IAM 访问令牌
  2. 将令牌导出到变量中:
    export token=<token generated>
    
  3. 准备有效内容 JSON 文件。 例如,submit-spark-quick-start-app.json:
    {
      "application_details": {
        "application": "/opt/ibm/spark/examples/src/main/python/wordcount.py",
        "arguments": ["/opt/ibm/spark/examples/src/main/resources/people.txt"]
        }
    }
    
  4. 提交 Spark 应用程序:
    curl -X POST https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/<instance_id>/spark_applications --header "Authorization: Bearer $token" -H "content-type: application/json"  -d @submit-spark-quick-start-app.json
    

从 Object Storage 存储区引用文件

要从 Object Storage 存储区引用 Spark 应用程序文件,您需要创建一个存储区,将该文件添加到该存储区,然后从有效内容 JSON 文件引用该文件。

有效内容 JSON 文件中 IBM Cloud Object Storage 实例的端点应该是专用端点。 直接端点提供比公共端点更好的性能,并且不会因任何传出或传入带宽而产生费用。

要提交 Spark 应用程序,请执行以下操作:

  1. 为应用程序文件创建存储区。 有关创建存储区的详细信息,请参阅 存储区操作

  2. 将应用程序文件添加到新创建的存储区。 请参阅 上载对象,以将应用程序文件添加到存储区。

  3. 如果尚未生成 IAM 令牌,请生成该令牌。 请参阅 检索 IAM 访问令牌

  4. 将令牌导出到变量中:

    export token=<token generated>
    
  5. 准备有效内容 JSON 文件。 例如,submit-spark-app.json:

    {
      "application_details": {
         "application": "cos://<application-bucket-name>.<cos-reference-name>/my_spark_application.py",
         "arguments": ["arg1", "arg2"],
         "conf": {
            "spark.hadoop.fs.cos.<cos-reference-name>.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud",
            "spark.hadoop.fs.cos.<cos-reference-name>.access.key": "<access_key>",
            "spark.hadoop.fs.cos.<cos-reference-name>.secret.key": "<secret_key>",
            "spark.app.name": "MySparkApp"
         }
      }
    }
    

    注:

    • 您可以通过有效内容中的 "conf" 部分传递 Spark 应用程序配置值。 有关更多详细信息,请参阅 用于提交 Spark 应用程序的参数
    • 样本有效内容的 "conf" 部分中的 <cos-reference-name> 是提供给 IBM Cloud Object Storage 实例的任何名称,您在 "application" 参数的 URL 中引用该名称。 请参阅 了解 Object Storage 凭证
    • 提交 Spark 应用程序可能需要大约一分钟时间。 确保在客户机代码中设置足够的超时。
    • 记录响应中返回的 "id"。 您需要此值来执行操作,例如,获取应用程序的状态,检索应用程序的详细信息或删除应用程序。
  6. 提交 Spark 应用程序:

    curl -X POST https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/<instance_id>/spark_applications --header "Authorization: Bearer $token" -H "content-type: application/json"  -d @submit-spark-app.json
    

    样本响应:

    {
      "id": "87e63712-a823-4aa1-9f6e-7291d4e5a113",
      "state": "accepted"
    }
    
  7. 如果为实例启用了正向日志记录,那么可以在转发到 IBM Log Analysis的平台日志中查看应用程序输出。 有关详细信息,请参阅 配置和查看日志

将 Spark 配置传递到应用程序

您可以使用有效内容中的 "conf" 部分来传递 Spark 应用程序配置。 如果在实例级别指定了 Spark 配置,那么这些配置将由在实例上运行的 Spark 应用程序继承,但可以在提交 Spark 应用程序时通过在有效内容中包含 "conf" 部分来覆盖这些配置。

请参阅 Analytics Engine Serverless 中的 Spark 配置。

用于提交 Spark 应用程序的参数

下表列出了 spark-submit 命令参数与其等效项之间的映射,这些参数将传递到 Spark 应用程序提交 REST API 有效内容的 "application_details" 部分。

表 1. Spark-submit 命令参数与传递到有效内容的等效参数之间的映射
spark-submit 命令参数 Analytics Engine Spark 提交 REST API 的有效内容
<application binary passed as spark-submit command parameter> application_details -> application
<application-arguments> application_details -> arguments
class application_details -> class
jars application_details -> jars
name application_details-> nameapplication_details-> conf-> spark.app.name
packages application_details -> packages
repositories application_details -> repositories
files application_details -> files
archives application_details -> archives
driver-cores application_details -> conf -> spark.driver.cores
driver-memory application_details -> conf -> spark.driver.memory
driver-java-options application_details -> conf -> spark.driver.defaultJavaOptions
driver-library-path application_details -> conf -> spark.driver.extraLibraryPath
driver-class-path application_details -> conf -> spark.driver.extraClassPath
executor-cores application_details -> conf -> spark.executor.cores
executor-memory application_details -> conf -> spark.executor.memory
num-executors application_details -> conf -> ae.spark.executor.count
pyFiles application_details -> conf -> spark.submit.pyFiles
<environment-variables> application_details -> env -> {"key1" : "value1", "key2" : "value2", ..... "}

获取已提交应用程序的状态

要获取已提交应用程序的状态,请输入:

curl -X GET https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/<instance_id>/spark_applications/<application_id>/state --header "Authorization: Bearer $token"

样本响应:

{
    "id": "a9a6f328-56d8-4923-8042-97652fff2af3",
    "state": "finished",
    "start_time": "2020-11-25T14:14:31.311+0000",
    "finish_time": "2020-11-25T14:30:43.625+0000"
}

获取已提交应用程序的详细信息

要获取已提交申请的详细信息,请输入:

curl -X GET https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/<instance_id>/spark_applications/<application_id> --header "Authorization: Bearer $token"

样本响应:

{
  "id": "ecd608d5-xxxx-xxxx-xxxx-08e27456xxxx",
  "spark_application_id": "null",
  "application_details": {
      "application": "cos://sbn-test-bucket-serverless-1.mycosservice/my_spark_application.py",
      "conf": {
          "spark.hadoop.fs.cos.mycosservice.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud",
          "spark.hadoop.fs.cos.mycosservice.access.key": "xxxx",
          "spark.app.name": "MySparkApp",
          "spark.hadoop.fs.cos.mycosservice.secret.key": "xxxx"
      },
      "arguments": [
          "arg1",
          "arg2"
      ]
  },
  "state": "failed",
    "submission_time": "2021-11-30T18:29:21+0000"
}

停止已提交的应用程序

要停止已提交的应用程序,请运行以下命令:

curl -X DELETE https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/<instance_id>/spark_applications/<application_id> --header "Authorization: Bearer $token"

如果删除成功,那么返回 204 – No Content。 应用程序的状态设置为 STOPPED。

此 API 具有幂等性。 如果尝试停止已完成或已停止的应用程序,那么它仍将返回 204。

您可以使用此 API 来停止处于以下状态的应用程序: acceptedwaitingsubmittedrunning

提交应用程序时传递运行时 Spark 版本

提交应用程序时,可以使用有效内容 JSON 脚本中 "application_details" 下的 "runtime" 部分来传递 Spark 运行时版本。 通过 "runtime" 部分传递的 Spark 版本将覆盖在实例级别设置的缺省运行时 Spark 版本。 要了解有关缺省运行时版本的更多信息,请参阅 缺省 Spark 运行时

用于在 Spark 中运行应用程序的 "runtime" 部分的示例 3.3:

{
    "application_details": {
        "application": "/opt/ibm/spark/examples/src/main/python/wordcount.py",
        "arguments": [
            "/opt/ibm/spark/examples/src/main/resources/people.txt"
            ],
        "runtime": {
            "spark_version": "3.3"
        }
    }
}

使用环境变量

提交应用程序时,可以使用有效内容 JSON 脚本中 "application_details" 下的 "env" 部分来传递特定于环境的信息,这将确定应用程序的结果,例如要使用的数据集或任何私钥值。

有效内容中 "env" 部分的示例:

{
    "application_details": {
        "application": "cos://<application-bucket-name>.<cos-reference-name>/my_spark_application.py",
        "arguments": ["arg1", "arg2"],
        "conf": {
            "spark.hadoop.fs.cos.<cos-reference-name>.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud",
            "spark.hadoop.fs.cos.<cos-reference-name>.access.key": "<access_key>",
            "spark.hadoop.fs.cos.<cos-reference-name>.secret.key": "<secret_key>",
            "spark.app.name": "MySparkApp"
            },
        "env": {
            "key1": "value1",
            "key2": "value2",
            "key3": "value3"
            }
        }
}

使用 "application_details" > "env" 设置的环境变量 (如此处所述) 将可供执行程序和驱动程序代码访问。

还可以使用 "spark.executorEnv.[EnvironmentVariableName]" 配置 (application_details> env) 来设置环境变量。 但是,它们只能供在执行程序上运行的任务访问,而不能供驱动程序访问。

用于访问使用 "os.getenv" 调用传递的环境变量的 pyspark 应用程序的示例。

from pyspark.sql.types import IntegerType
import os

def init_spark():
  spark = SparkSession.builder.appName("spark-env-test").getOrCreate()
  sc = spark.sparkContext
  return spark,sc

def returnExecutorEnv(x):
    # Attempt to access environment variable from a task running on executor
    return os.getenv("TESTENV1")

def main():
  spark,sc = init_spark()

  # dummy dataframe
  df=spark.createDataFrame([("1","one")])
  df.show()
  df.rdd.map(lambda x: (x[0],returnExecutorEnv(x[0]))).toDF().show()
  # Attempt to access environment variable on driver
  print (os.getenv("TESTENV1"))
  spark.stop()

if __name__ == '__main__':
  main()

使用非缺省语言版本运行 Spark 应用程序

Spark 运行时支持使用以下语言编写的 Spark 应用程序:

  • Scala
  • Python
  • R

Spark 运行时版本随附缺省运行时语言版本。 IBM 扩展对新语言版本的支持,并除去现有语言版本以避免运行时出现任何安全漏洞。 当出现新的语言版本时,系统还会提供稳定的时间来转换工作负载。 通过传递指向应用程序的语言版本的环境变量,可以使用语言版本来测试工作负载。

样本 Python 代码:

 {
	"application_details": {
		"application": "/opt/ibm/spark/examples/src/main/python/wordcount.py",
		"arguments": [
			"/opt/ibm/spark/examples/src/main/resources/people.txt"
		],
		"env": {
			"RUNTIME_PYTHON_ENV": "python310"
		}
	}
}

样本 R 代码:

{
	"application_details": {
		"env": {
			"RUNTIME_R_ENV": "r42"
		},
		"application": "/opt/ibm/spark/examples/src/main/r/dataframe.R"
	}
}

了解更多

管理 Spark 应用程序时,请遵循建议的 最佳实践