IBM Cloud Docs
Standard Spark 示例

Standard Spark 示例

您可以使用以下代码样本来了解如何在不同情况下使用 Spark。

要了解如何访问 Object Storage,请参阅 了解 Object Storage 凭证

使用已声明的凭证从 Object Storage 读取 CSV 文件

以下代码样本显示如何创建用于将数据从 CSV 文件读取到 Python DataFrame的 Python 脚本。 Python 脚本和 CSV 文件都位于 Object Storage中。

您可以使用在提交 Spark 应用程序时指定的相同 IBM Cloud Object Storage 凭证,或者使用在创建 IBM Analytics Engine 服务实例以从应用程序中的 Object Storage 中读取时设置为缺省配置的凭证。

名为 read-employees.py的应用程序的示例。 插入 Object Storage 存储区名称和服务名称。 服务名称是提供给 Object Storage 实例的任何名称:

from pyspark.sql import SparkSession

def init_spark():
  spark = SparkSession.builder.appName("read-write-cos-test").getOrCreate()
  sc = spark.sparkContext
  return spark,sc

def read_employees(spark,sc):
  print("Hello 1"  , spark )
  employeesDF = spark.read.option("header",True).csv("cos://cosbucketname.cosservicename/employees.csv")
  print("Hello 2" , employeesDF)
  employeesDF.createOrReplaceTempView("empTable")
  seniors = spark.sql("SELECT empTable.NAME FROM empTable WHERE empTable.BAND >= 6")
  print("Hello 3", seniors)
  seniors.show()

def main():
  spark,sc = init_spark()
  read_employees(spark,sc)

if __name__ == '__main__':
  main()

名为 read-employees.py的应用程序读取的名为 employees.csv 的 CSV 文件的示例:

NAME,BAND,DEPT
Abraham,8,EG
Barack,6,RT
Clinton,5,GG
Hoover,4,FF
Kennedy,7,NN
Truman,3,TT

要运行名为 read-employees.py 的应用程序,该应用程序从 employees.csv POST 读取数据,请使用以下名为 read-employees-submit.json的 JSON 有效内容脚本。 插入 CSV 文件所在的 Object Storage 存储区和服务名称,修改端点路径并插入访问密钥和密钥。

{
  "application_details": {
    "application": "cos://cosbucketname.cosservicename/read-employees.py",
    "conf": {
      "spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud/<CHANGME-according-to-instance>",
      "spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>",
      "spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>"
      }
  }
}

使用 IAM API 密钥或其他 HMAC 凭证从 Object Storage 读取 CSV 文件

以下代码样本显示如何创建用于将数据从 CSV 文件读取到 Python DataFrame的 Python 脚本。 Python 脚本和 CSV 文件都位于 Object Storage中。

此示例显示如何使用 IAM API 密钥访问 IBM Cloud Object Storage 。

名为 read-employees-iam-key-cos.py的应用程序的示例。 插入 CSV 文件所在的 Object Storage 存储区名称并修改端点路径。

from pyspark.sql import SparkSession

def init_spark():
  spark = SparkSession.builder.appName("read-write-cos-test").getOrCreate()
  sc = spark.sparkContext
  hconf=sc._jsc.hadoopConfiguration()
  hconf.set("fs.cos.testcos.endpoint", "s3.direct.us-south.cloud-object-storage.appdomain.cloud/CHANGEME-according-to-instance="
("fs.cos.testcos.iam.api.key","<CHANGEME>")
  return spark,sc


def read_employees(spark,sc):
  print("Hello1 "  , spark )
  employeesDF = spark.read.option("header",True).csv("cos://cosbucketname.cosservicename/employees.csv")
  print("Hello2" , employeesDF)
  employeesDF.createOrReplaceTempView("empTable")
  juniors = spark.sql("SELECT empTable.NAME FROM empTable WHERE empTable.BAND < 6")
  print("Hello3", juniors)
  juniors.show()


def main():
  spark,sc = init_spark()
  read_employees(spark,sc)


if __name__ == '__main__':
  main()

然后,使用访问密钥和密码发布以下名为 read-employees-iam-key-cos-submit.json 的有效内容 JSON 脚本。 插入 CSV 文件所在的 Object Storage 存储区名称并修改端点路径。

{
  "application_details": {
    "application": "cos://cosbucketname.cosservicename/read-employees-iam-key-cos.py",
    "conf": {
      "spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud/CHANGME-according-to-instance",
      "spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>",
      "spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>"
      }
  }
}

在 Scala 中使用 IAM 读写 Object Storage

  1. 创建以下格式的 Eclipse 项目:

显示用于开发 Scala jar 的 Eclipse 项目格式。
图 1。 Eclipse 项目文件夹结构

  1. 将名为 ScalaReadWriteIAMCOSExample.scala 的以下应用程序添加到 analyticsengine 文件夹。 插入 IAM API 密钥和 Object Storage 存储区名称。

    package com.ibm.analyticsengine
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.SparkContext._
    
    object ScalaReadWriteIAMCOSExample {
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("Spark Scala Example")
        val sc = new SparkContext(sparkConf)
            val prefix="fs.cos.cosservicename"
            sc.hadoopConfiguration.set(prefix + ".endpoint", "s3.direct.us-south.cloud-object-storage.appdomain.cloud")
            sc.hadoopConfiguration.set(prefix + ".iam.api.key","<CHANGEME>")
            val data = Array("Sweden", "England", "France", "Tokyo")
            val myData = sc.parallelize(data)
            myData.saveAsTextFile("cos://cosbucketname.cosservicename/2021sep25-1.data")
            val myRDD=sc.textFile("cos://cosbucketname.cosservicename/2021sep25-1.data")
            myRDD.collect().foreach(println)
        }
    }
    
  2. SparkScalaExample.sbt 放入 src 文件夹中:

    name := "ScalaReadWriteIAMCOSExample"
    
    version := "1.0"
    
    scalaVersion := "2.12.10"
    
    libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
    
  3. 使用 sbt 构建 Scala 项目。 将生成的 jar (scalareadwriteiamcosexample_2.12-1.0.jar) 上载到 Object Storage。

    cd SparkScalaExample
    sbt package
    

    将 jar 上载到 Object Storage。

  4. 然后,使用访问密钥和密码发布以下名为 read-write-cos-scala-submit.json 的有效内容 JSON 脚本。 插入 CSV 文件所在的 Object Storage 存储区名称并修改端点路径。

    {
      "application_details": {
        "application": "cos://cosbucketname.cosservicename/scalareadwriteiamcosexample_2.12-1.0.jar",
        "class":"com.ibm.analyticsengine.ScalaReadWriteIAMCOSExample",
        "conf": {
          "spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud-CHANGME-according-to-instance",
          "spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>",
          "spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>"
          }
        }
    }
    

使用 IAM API 密钥提交应用程序详细信息

以下示例显示名为 read-employees.py 的应用程序的应用程序详细信息,该应用程序使用 IAM API 密钥从 Object Storage 中的 employees.csv 读取数据。 插入 CSV 文件所在的 Object Storage 存储区和服务名称以及 API 密钥。

{
  "application_details": {
    "application": "cos://cosbucketname.cosservicename/read-employees.py",
    "conf": {
      "spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud-CHANGME-according-to-instance",
      "spark.hadoop.fs.cos.cosservicename.iam.api.key": "CHANGME"
      }
  }
}