Spark 应用程序 REST API
IBM Analytics Engine 无服务器套餐提供 REST API 以提交和管理 Spark 应用程序。 支持以下操作:
有关可用 API 的描述,请参阅 无服务器套餐的IBM Analytics Engine REST API。
本主题中的以下部分显示了每个 Spark 应用程序管理 API 的样本。
必需的凭证和许可权
在可以提交 Spark 应用程序之前,您需要获取认证凭证,并在 Analytics Engine 无服务器实例上设置正确的许可权。
- 您需要在供应实例时记下的服务实例的 GUID。 如果未记下 GUID,请参阅 检索无服务器实例的 GUID。
- 您必须具有正确的许可权才能执行必需的操作。 请参阅 用户许可权。
- Spark 应用程序 REST API 使用基于 IAM 的认证和授权。
提交 Spark 应用程序
Analytics Engine Serverless 为您提供用于提交 Spark 应用程序的 REST 接口。 传递到 REST API 的有效内容映射到 spark-submit
命令支持的各种命令行参数。 有关更多详细信息,请参阅 用于提交 Spark 应用程序的参数。
提交 Spark 应用程序时,需要引用应用程序文件。 为了帮助您快速入门并了解如何使用 AE 无服务器 Spark API,本部分以使用在提交应用程序 API 有效内容中引用的预捆绑 Spark 应用程序文件的示例开头。 后续部分显示如何运行存储在 Object Storage 存储区中的应用程序。
引用预捆绑文件
提供的样本应用程序显示如何在作业有效内容中引用 .py
字数应用程序文件和数据文件。
要了解如何快速开始使用预先捆绑的样本应用程序文件:
- 如果尚未生成 IAM 令牌,请生成该令牌。 请参阅 检索 IAM 访问令牌。
- 将令牌导出到变量中:
export token=<token generated>
- 准备有效内容 JSON 文件。 例如,submit-spark-quick-start-app.json:
{ "application_details": { "application": "/opt/ibm/spark/examples/src/main/python/wordcount.py", "arguments": ["/opt/ibm/spark/examples/src/main/resources/people.txt"] } }
- 提交 Spark 应用程序:
curl -X POST https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/<instance_id>/spark_applications --header "Authorization: Bearer $token" -H "content-type: application/json" -d @submit-spark-quick-start-app.json
从 Object Storage 存储区引用文件
要从 Object Storage 存储区引用 Spark 应用程序文件,您需要创建一个存储区,将该文件添加到该存储区,然后从有效内容 JSON 文件引用该文件。
有效内容 JSON 文件中 IBM Cloud Object Storage 实例的端点应该是专用端点。 直接端点提供比公共端点更好的性能,并且不会因任何传出或传入带宽而产生费用。
要提交 Spark 应用程序,请执行以下操作:
-
为应用程序文件创建存储区。 有关创建存储区的详细信息,请参阅 存储区操作。
-
将应用程序文件添加到新创建的存储区。 请参阅 上载对象,以将应用程序文件添加到存储区。
-
如果尚未生成 IAM 令牌,请生成该令牌。 请参阅 检索 IAM 访问令牌。
-
将令牌导出到变量中:
export token=<token generated>
-
准备有效内容 JSON 文件。 例如,
submit-spark-app.json
:{ "application_details": { "application": "cos://<application-bucket-name>.<cos-reference-name>/my_spark_application.py", "arguments": ["arg1", "arg2"], "conf": { "spark.hadoop.fs.cos.<cos-reference-name>.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud", "spark.hadoop.fs.cos.<cos-reference-name>.access.key": "<access_key>", "spark.hadoop.fs.cos.<cos-reference-name>.secret.key": "<secret_key>", "spark.app.name": "MySparkApp" } } }
注:
- 您可以通过有效内容中的
"conf"
部分传递 Spark 应用程序配置值。 有关更多详细信息,请参阅 用于提交 Spark 应用程序的参数。 - 样本有效内容的
"conf"
部分中的<cos-reference-name>
是提供给 IBM Cloud Object Storage 实例的任何名称,您在"application"
参数的 URL 中引用该名称。 请参阅 了解 Object Storage 凭证。 - 提交 Spark 应用程序可能需要大约一分钟时间。 确保在客户机代码中设置足够的超时。
- 记录响应中返回的
"id"
。 您需要此值来执行操作,例如,获取应用程序的状态,检索应用程序的详细信息或删除应用程序。
- 您可以通过有效内容中的
-
提交 Spark 应用程序:
curl -X POST https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/<instance_id>/spark_applications --header "Authorization: Bearer $token" -H "content-type: application/json" -d @submit-spark-app.json
样本响应:
{ "id": "87e63712-a823-4aa1-9f6e-7291d4e5a113", "state": "accepted" }
-
如果为实例启用了正向日志记录,那么可以在转发到 IBM Log Analysis的平台日志中查看应用程序输出。 有关详细信息,请参阅 配置和查看日志。
将 Spark 配置传递到应用程序
您可以使用有效内容中的 "conf"
部分来传递 Spark 应用程序配置。 如果在实例级别指定了 Spark 配置,那么这些配置将由在实例上运行的 Spark 应用程序继承,但可以在提交 Spark 应用程序时通过在有效内容中包含 "conf"
部分来覆盖这些配置。
请参阅 Analytics Engine Serverless 中的 Spark 配置。
用于提交 Spark 应用程序的参数
下表列出了 spark-submit
命令参数与其等效项之间的映射,这些参数将传递到 Spark 应用程序提交 REST API 有效内容的 "application_details"
部分。
spark-submit 命令参数 | Analytics Engine Spark 提交 REST API 的有效内容 |
---|---|
<application binary passed as spark-submit command parameter> |
application_details -> application |
<application-arguments> |
application_details -> arguments |
class |
application_details -> class |
jars |
application_details -> jars |
name |
application_details -> name 或 application_details -> conf -> spark.app.name |
packages |
application_details -> packages |
repositories |
application_details -> repositories |
files |
application_details -> files |
archives |
application_details -> archives |
driver-cores |
application_details -> conf -> spark.driver.cores |
driver-memory |
application_details -> conf -> spark.driver.memory |
driver-java-options |
application_details -> conf -> spark.driver.defaultJavaOptions |
driver-library-path |
application_details -> conf -> spark.driver.extraLibraryPath |
driver-class-path |
application_details -> conf -> spark.driver.extraClassPath |
executor-cores |
application_details -> conf -> spark.executor.cores |
executor-memory |
application_details -> conf -> spark.executor.memory |
num-executors |
application_details -> conf -> ae.spark.executor.count |
pyFiles |
application_details -> conf -> spark.submit.pyFiles |
<environment-variables> |
application_details -> env -> {"key1" : "value1", "key2" : "value2", ..... " } |
获取已提交应用程序的状态
要获取已提交应用程序的状态,请输入:
curl -X GET https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/<instance_id>/spark_applications/<application_id>/state --header "Authorization: Bearer $token"
样本响应:
{
"id": "a9a6f328-56d8-4923-8042-97652fff2af3",
"state": "finished",
"start_time": "2020-11-25T14:14:31.311+0000",
"finish_time": "2020-11-25T14:30:43.625+0000"
}
获取已提交应用程序的详细信息
要获取已提交申请的详细信息,请输入:
curl -X GET https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/<instance_id>/spark_applications/<application_id> --header "Authorization: Bearer $token"
样本响应:
{
"id": "ecd608d5-xxxx-xxxx-xxxx-08e27456xxxx",
"spark_application_id": "null",
"application_details": {
"application": "cos://sbn-test-bucket-serverless-1.mycosservice/my_spark_application.py",
"conf": {
"spark.hadoop.fs.cos.mycosservice.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud",
"spark.hadoop.fs.cos.mycosservice.access.key": "xxxx",
"spark.app.name": "MySparkApp",
"spark.hadoop.fs.cos.mycosservice.secret.key": "xxxx"
},
"arguments": [
"arg1",
"arg2"
]
},
"state": "failed",
"submission_time": "2021-11-30T18:29:21+0000"
}
停止已提交的应用程序
要停止已提交的应用程序,请运行以下命令:
curl -X DELETE https://api.us-south.ae.cloud.ibm.com/v3/analytics_engines/<instance_id>/spark_applications/<application_id> --header "Authorization: Bearer $token"
如果删除成功,那么返回 204 – No Content
。 应用程序的状态设置为 STOPPED。
此 API 具有幂等性。 如果尝试停止已完成或已停止的应用程序,那么它仍将返回 204。
您可以使用此 API 来停止处于以下状态的应用程序: accepted
,waiting
,submitted
和 running
。
提交应用程序时传递运行时 Spark 版本
提交应用程序时,可以使用有效内容 JSON 脚本中 "application_details"
下的 "runtime"
部分来传递 Spark 运行时版本。 通过 "runtime"
部分传递的 Spark 版本将覆盖在实例级别设置的缺省运行时 Spark 版本。 要了解有关缺省运行时版本的更多信息,请参阅 缺省 Spark 运行时。
用于在 Spark 中运行应用程序的 "runtime"
部分的示例 3.3:
{
"application_details": {
"application": "/opt/ibm/spark/examples/src/main/python/wordcount.py",
"arguments": [
"/opt/ibm/spark/examples/src/main/resources/people.txt"
],
"runtime": {
"spark_version": "3.3"
}
}
}
使用环境变量
提交应用程序时,可以使用有效内容 JSON 脚本中 "application_details"
下的 "env"
部分来传递特定于环境的信息,这将确定应用程序的结果,例如要使用的数据集或任何私钥值。
有效内容中 "env"
部分的示例:
{
"application_details": {
"application": "cos://<application-bucket-name>.<cos-reference-name>/my_spark_application.py",
"arguments": ["arg1", "arg2"],
"conf": {
"spark.hadoop.fs.cos.<cos-reference-name>.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud",
"spark.hadoop.fs.cos.<cos-reference-name>.access.key": "<access_key>",
"spark.hadoop.fs.cos.<cos-reference-name>.secret.key": "<secret_key>",
"spark.app.name": "MySparkApp"
},
"env": {
"key1": "value1",
"key2": "value2",
"key3": "value3"
}
}
}
使用 "application_details"
> "env"
设置的环境变量 (如此处所述) 将可供执行程序和驱动程序代码访问。
还可以使用 "spark.executorEnv.[EnvironmentVariableName]"
配置 (application_details> env) 来设置环境变量。 但是,它们只能供在执行程序上运行的任务访问,而不能供驱动程序访问。
用于访问使用 "os.getenv"
调用传递的环境变量的 pyspark 应用程序的示例。
from pyspark.sql.types import IntegerType
import os
def init_spark():
spark = SparkSession.builder.appName("spark-env-test").getOrCreate()
sc = spark.sparkContext
return spark,sc
def returnExecutorEnv(x):
# Attempt to access environment variable from a task running on executor
return os.getenv("TESTENV1")
def main():
spark,sc = init_spark()
# dummy dataframe
df=spark.createDataFrame([("1","one")])
df.show()
df.rdd.map(lambda x: (x[0],returnExecutorEnv(x[0]))).toDF().show()
# Attempt to access environment variable on driver
print (os.getenv("TESTENV1"))
spark.stop()
if __name__ == '__main__':
main()
使用非缺省语言版本运行 Spark 应用程序
Spark 运行时支持使用以下语言编写的 Spark 应用程序:
- Scala
- Python
- R
Spark 运行时版本随附缺省运行时语言版本。 IBM 扩展对新语言版本的支持,并除去现有语言版本以避免运行时出现任何安全漏洞。 当出现新的语言版本时,系统还会提供稳定的时间来转换工作负载。 通过传递指向应用程序的语言版本的环境变量,可以使用语言版本来测试工作负载。
样本 Python 代码:
{
"application_details": {
"application": "/opt/ibm/spark/examples/src/main/python/wordcount.py",
"arguments": [
"/opt/ibm/spark/examples/src/main/resources/people.txt"
],
"env": {
"RUNTIME_PYTHON_ENV": "python310"
}
}
}
样本 R 代码:
{
"application_details": {
"env": {
"RUNTIME_R_ENV": "r42"
},
"application": "/opt/ibm/spark/examples/src/main/r/dataframe.R"
}
}
了解更多
管理 Spark 应用程序时,请遵循建议的 最佳实践。