Standard Spark 示例
您可以使用以下代码样本来了解如何在不同情况下使用 Spark。
要了解如何访问 Object Storage,请参阅 了解 Object Storage 凭证。
使用已声明的凭证从 Object Storage 读取 CSV 文件
以下代码样本显示如何创建用于将数据从 CSV 文件读取到 Python DataFrame的 Python 脚本。 Python 脚本和 CSV 文件都位于 Object Storage中。
您可以使用在提交 Spark 应用程序时指定的相同 IBM Cloud Object Storage 凭证,或者使用在创建 IBM Analytics Engine 服务实例以从应用程序中的 Object Storage 中读取时设置为缺省配置的凭证。
名为 read-employees.py
的应用程序的示例。 插入 Object Storage 存储区名称和服务名称。 服务名称是提供给 Object Storage 实例的任何名称:
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder.appName("read-write-cos-test").getOrCreate()
sc = spark.sparkContext
return spark,sc
def read_employees(spark,sc):
print("Hello 1" , spark )
employeesDF = spark.read.option("header",True).csv("cos://cosbucketname.cosservicename/employees.csv")
print("Hello 2" , employeesDF)
employeesDF.createOrReplaceTempView("empTable")
seniors = spark.sql("SELECT empTable.NAME FROM empTable WHERE empTable.BAND >= 6")
print("Hello 3", seniors)
seniors.show()
def main():
spark,sc = init_spark()
read_employees(spark,sc)
if __name__ == '__main__':
main()
名为 read-employees.py
的应用程序读取的名为 employees.csv
的 CSV 文件的示例:
NAME,BAND,DEPT
Abraham,8,EG
Barack,6,RT
Clinton,5,GG
Hoover,4,FF
Kennedy,7,NN
Truman,3,TT
要运行名为 read-employees.py
的应用程序,该应用程序从 employees.csv
POST 读取数据,请使用以下名为 read-employees-submit.json
的 JSON 有效内容脚本。 插入 CSV 文件所在的 Object Storage 存储区和服务名称,修改端点路径并插入访问密钥和密钥。
{
"application_details": {
"application": "cos://cosbucketname.cosservicename/read-employees.py",
"conf": {
"spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud/<CHANGME-according-to-instance>",
"spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>",
"spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>"
}
}
}
使用 IAM API 密钥或其他 HMAC 凭证从 Object Storage 读取 CSV 文件
以下代码样本显示如何创建用于将数据从 CSV 文件读取到 Python DataFrame的 Python 脚本。 Python 脚本和 CSV 文件都位于 Object Storage中。
此示例显示如何使用 IAM API 密钥访问 IBM Cloud Object Storage 。
名为 read-employees-iam-key-cos.py
的应用程序的示例。 插入 CSV 文件所在的 Object Storage 存储区名称并修改端点路径。
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder.appName("read-write-cos-test").getOrCreate()
sc = spark.sparkContext
hconf=sc._jsc.hadoopConfiguration()
hconf.set("fs.cos.testcos.endpoint", "s3.direct.us-south.cloud-object-storage.appdomain.cloud/CHANGEME-according-to-instance="
("fs.cos.testcos.iam.api.key","<CHANGEME>")
return spark,sc
def read_employees(spark,sc):
print("Hello1 " , spark )
employeesDF = spark.read.option("header",True).csv("cos://cosbucketname.cosservicename/employees.csv")
print("Hello2" , employeesDF)
employeesDF.createOrReplaceTempView("empTable")
juniors = spark.sql("SELECT empTable.NAME FROM empTable WHERE empTable.BAND < 6")
print("Hello3", juniors)
juniors.show()
def main():
spark,sc = init_spark()
read_employees(spark,sc)
if __name__ == '__main__':
main()
然后,使用访问密钥和密码发布以下名为 read-employees-iam-key-cos-submit.json
的有效内容 JSON 脚本。 插入 CSV 文件所在的 Object Storage 存储区名称并修改端点路径。
{
"application_details": {
"application": "cos://cosbucketname.cosservicename/read-employees-iam-key-cos.py",
"conf": {
"spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud/CHANGME-according-to-instance",
"spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>",
"spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>"
}
}
}
在 Scala 中使用 IAM 读写 Object Storage
- 创建以下格式的 Eclipse 项目:

-
将名为
ScalaReadWriteIAMCOSExample.scala
的以下应用程序添加到analyticsengine
文件夹。 插入 IAM API 密钥和 Object Storage 存储区名称。package com.ibm.analyticsengine import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ object ScalaReadWriteIAMCOSExample { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Spark Scala Example") val sc = new SparkContext(sparkConf) val prefix="fs.cos.cosservicename" sc.hadoopConfiguration.set(prefix + ".endpoint", "s3.direct.us-south.cloud-object-storage.appdomain.cloud") sc.hadoopConfiguration.set(prefix + ".iam.api.key","<CHANGEME>") val data = Array("Sweden", "England", "France", "Tokyo") val myData = sc.parallelize(data) myData.saveAsTextFile("cos://cosbucketname.cosservicename/2021sep25-1.data") val myRDD=sc.textFile("cos://cosbucketname.cosservicename/2021sep25-1.data") myRDD.collect().foreach(println) } }
-
将
SparkScalaExample.sbt
放入src
文件夹中:name := "ScalaReadWriteIAMCOSExample" version := "1.0" scalaVersion := "2.12.10" libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
-
使用 sbt 构建 Scala 项目。 将生成的 jar (
scalareadwriteiamcosexample_2.12-1.0.jar
) 上载到 Object Storage。cd SparkScalaExample sbt package
将 jar 上载到 Object Storage。
-
然后,使用访问密钥和密码发布以下名为
read-write-cos-scala-submit.json
的有效内容 JSON 脚本。 插入 CSV 文件所在的 Object Storage 存储区名称并修改端点路径。{ "application_details": { "application": "cos://cosbucketname.cosservicename/scalareadwriteiamcosexample_2.12-1.0.jar", "class":"com.ibm.analyticsengine.ScalaReadWriteIAMCOSExample", "conf": { "spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud-CHANGME-according-to-instance", "spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>", "spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>" } } }
使用 IAM API 密钥提交应用程序详细信息
以下示例显示名为 read-employees.py
的应用程序的应用程序详细信息,该应用程序使用 IAM API 密钥从 Object Storage 中的 employees.csv
读取数据。 插入 CSV 文件所在的 Object Storage 存储区和服务名称以及 API 密钥。
{
"application_details": {
"application": "cos://cosbucketname.cosservicename/read-employees.py",
"conf": {
"spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud-CHANGME-according-to-instance",
"spark.hadoop.fs.cos.cosservicename.iam.api.key": "CHANGME"
}
}
}