IBM Cloud Docs
Ejemplos de Spark estándar

Ejemplos de Spark estándar

Puede utilizar los siguientes ejemplos de código para aprender a utilizar Spark en diferentes situaciones.

Para comprender cómo acceder a Object Storage, consulte Descripción de las credenciales de Object Storage.

Lectura de un archivo CSV de Object Storage utilizando mediante credenciales ya indicadas

Los siguientes ejemplos de código muestran cómo crear un script de Python que lee datos de un archivo CSV a Python DataFrame. Tanto el script de Python como el archivo CSV se encuentran en Object Storage.

Puede utilizar las mismas credenciales de IBM Cloud Object Storage que ha especificado al enviar la aplicación de Spark o que se han establecido como configuración predeterminada al crear la instancia de servicio de IBM Analytics Engine para leer desde Object Storage dentro de la aplicación.

Ejemplo de la aplicación denominada read-employees.py. Inserte el nombre de grupo y el nombre de servicio de Object Storage . El nombre de servicio es cualquier nombre asignado a la instancia de Object Storage :

from pyspark.sql import SparkSession

def init_spark():
  spark = SparkSession.builder.appName("read-write-cos-test").getOrCreate()
  sc = spark.sparkContext
  return spark,sc

def read_employees(spark,sc):
  print("Hello 1"  , spark )
  employeesDF = spark.read.option("header",True).csv("cos://cosbucketname.cosservicename/employees.csv")
  print("Hello 2" , employeesDF)
  employeesDF.createOrReplaceTempView("empTable")
  seniors = spark.sql("SELECT empTable.NAME FROM empTable WHERE empTable.BAND >= 6")
  print("Hello 3", seniors)
  seniors.show()

def main():
  spark,sc = init_spark()
  read_employees(spark,sc)

if __name__ == '__main__':
  main()

Ejemplo del archivo CSV denominado employees.csv leído por la aplicación denominada read-employees.py:

NAME,BAND,DEPT
Abraham,8,EG
Barack,6,RT
Clinton,5,GG
Hoover,4,FF
Kennedy,7,NN
Truman,3,TT

Para ejecutar la aplicación denominada read-employees.py que lee datos de employees.csv, utilice POST con el siguiente script de carga útil JSON denominado read-employees-submit.json. Inserte el nombre del servicio y del grupo de Object Storage donde se encuentra el archivo CSV, modifique la vía de acceso de punto final e inserte la clave de acceso y la clave secreta.

{
  "application_details": {
    "application": "cos://cosbucketname.cosservicename/read-employees.py",
    "conf": {
      "spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud/<CHANGME-according-to-instance>",
      "spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>",
      "spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>"
      }
  }
}

Lectura de un archivo CSV desde Object Storage utilizando la clave de API de IAM u otras credenciales HMAC

Los siguientes ejemplos de código muestran cómo crear un script de Python que lee datos de un archivo CSV a Python DataFrame. Tanto el script de Python como el archivo CSV se encuentran en Object Storage.

En este ejemplo, se muestra cómo acceder a IBM Cloud Object Storage utilizando la clave de API de IAM.

Ejemplo de la aplicación denominada read-employees-iam-key-cos.py. Inserte el nombre de grupo de Object Storage donde se encuentra el archivo CSV y modifique la vía de acceso de punto final.

from pyspark.sql import SparkSession

def init_spark():
  spark = SparkSession.builder.appName("read-write-cos-test").getOrCreate()
  sc = spark.sparkContext
  hconf=sc._jsc.hadoopConfiguration()
  hconf.set("fs.cos.testcos.endpoint", "s3.direct.us-south.cloud-object-storage.appdomain.cloud/CHANGEME-according-to-instance="
("fs.cos.testcos.iam.api.key","<CHANGEME>")
  return spark,sc


def read_employees(spark,sc):
  print("Hello1 "  , spark )
  employeesDF = spark.read.option("header",True).csv("cos://cosbucketname.cosservicename/employees.csv")
  print("Hello2" , employeesDF)
  employeesDF.createOrReplaceTempView("empTable")
  juniors = spark.sql("SELECT empTable.NAME FROM empTable WHERE empTable.BAND < 6")
  print("Hello3", juniors)
  juniors.show()


def main():
  spark,sc = init_spark()
  read_employees(spark,sc)


if __name__ == '__main__':
  main()

A continuación, utilice POST con el siguiente script JSON de carga útil denominado read-employees-iam-key-cos-submit.json con la clave de acceso y la contraseña. Inserte el nombre de grupo de Object Storage donde se encuentra el archivo CSV y modifique la vía de acceso de punto final.

{
  "application_details": {
    "application": "cos://cosbucketname.cosservicename/read-employees-iam-key-cos.py",
    "conf": {
      "spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud/CHANGME-according-to-instance",
      "spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>",
      "spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>"
      }
  }
}

Lectura y escritura en Object Storage utilizando IAM en Scala

  1. Cree un proyecto de Eclipse con el siguiente formato:

Muestra el formato de proyecto Eclipse que se debe utilizar para desarrollar el jar de Scala.
Figura 1. Eclipse

  1. Añada la siguiente aplicación denominada ScalaReadWriteIAMCOSExample.scala a la carpeta analyticsengine. Inserte la clave de API de IAM y el nombre de grupo de Object Storage.

    package com.ibm.analyticsengine
    import org.apache.spark.{SparkConf, SparkContext}
    import org.apache.spark.SparkContext._
    
    object ScalaReadWriteIAMCOSExample {
      def main(args: Array[String]) {
        val sparkConf = new SparkConf().setAppName("Spark Scala Example")
        val sc = new SparkContext(sparkConf)
            val prefix="fs.cos.cosservicename"
            sc.hadoopConfiguration.set(prefix + ".endpoint", "s3.direct.us-south.cloud-object-storage.appdomain.cloud")
            sc.hadoopConfiguration.set(prefix + ".iam.api.key","<CHANGEME>")
            val data = Array("Sweden", "England", "France", "Tokyo")
            val myData = sc.parallelize(data)
            myData.saveAsTextFile("cos://cosbucketname.cosservicename/2021sep25-1.data")
            val myRDD=sc.textFile("cos://cosbucketname.cosservicename/2021sep25-1.data")
            myRDD.collect().foreach(println)
        }
    }
    
  2. Coloque SparkScalaExample.sbt en la carpeta src:

    name := "ScalaReadWriteIAMCOSExample"
    
    version := "1.0"
    
    scalaVersion := "2.12.10"
    
    libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
    
  3. Cree el proyecto de Scala mediante sbt. Cargue el archivo jar (scalareadwriteiamcosexample_2.12-1.0.jar) resultante en Object Storage.

    cd SparkScalaExample
    sbt package
    

    Cargue el archivo jar en Object Storage.

  4. A continuación, utilice POST con el siguiente script JSON de carga útil denominado read-write-cos-scala-submit.json con la clave de acceso y la contraseña. Inserte el nombre de grupo de Object Storage donde se encuentra el archivo CSV y modifique la vía de acceso de punto final.

    {
      "application_details": {
        "application": "cos://cosbucketname.cosservicename/scalareadwriteiamcosexample_2.12-1.0.jar",
        "class":"com.ibm.analyticsengine.ScalaReadWriteIAMCOSExample",
        "conf": {
          "spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud-CHANGME-according-to-instance",
          "spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>",
          "spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>"
          }
        }
    }
    

Envío de detalles de aplicación con la clave de API de IAM

En el ejemplo siguiente, se muestran los detalles de aplicación de la aplicación denominada read-employees.py, que lee datos de employees.csv en Object Storage utilizando la clave de API de IAM. Inserte el nombre de servicio y de grupo de Object Storage donde se encuentran el archivo CSV y la clave de API.

{
  "application_details": {
    "application": "cos://cosbucketname.cosservicename/read-employees.py",
    "conf": {
      "spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud-CHANGME-according-to-instance",
      "spark.hadoop.fs.cos.cosservicename.iam.api.key": "CHANGME"
      }
  }
}