Ejemplos de Spark estándar
Puede utilizar los siguientes ejemplos de código para aprender a utilizar Spark en diferentes situaciones.
Para comprender cómo acceder a Object Storage, consulte Descripción de las credenciales de Object Storage.
Lectura de un archivo CSV de Object Storage utilizando mediante credenciales ya indicadas
Los siguientes ejemplos de código muestran cómo crear un script de Python que lee datos de un archivo CSV a Python DataFrame. Tanto el script de Python como el archivo CSV se encuentran en Object Storage.
Puede utilizar las mismas credenciales de IBM Cloud Object Storage que ha especificado al enviar la aplicación de Spark o que se han establecido como configuración predeterminada al crear la instancia de servicio de IBM Analytics Engine para leer desde Object Storage dentro de la aplicación.
Ejemplo de la aplicación denominada read-employees.py
. Inserte el nombre de grupo y el nombre de servicio de Object Storage . El nombre de servicio es cualquier nombre asignado a la instancia de Object Storage :
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder.appName("read-write-cos-test").getOrCreate()
sc = spark.sparkContext
return spark,sc
def read_employees(spark,sc):
print("Hello 1" , spark )
employeesDF = spark.read.option("header",True).csv("cos://cosbucketname.cosservicename/employees.csv")
print("Hello 2" , employeesDF)
employeesDF.createOrReplaceTempView("empTable")
seniors = spark.sql("SELECT empTable.NAME FROM empTable WHERE empTable.BAND >= 6")
print("Hello 3", seniors)
seniors.show()
def main():
spark,sc = init_spark()
read_employees(spark,sc)
if __name__ == '__main__':
main()
Ejemplo del archivo CSV denominado employees.csv
leído por la aplicación denominada read-employees.py
:
NAME,BAND,DEPT
Abraham,8,EG
Barack,6,RT
Clinton,5,GG
Hoover,4,FF
Kennedy,7,NN
Truman,3,TT
Para ejecutar la aplicación denominada read-employees.py
que lee datos de employees.csv
, utilice POST con el siguiente script de carga útil JSON denominado read-employees-submit.json
. Inserte el nombre
del servicio y del grupo de Object Storage donde se encuentra el archivo CSV, modifique la vía de acceso de punto final e inserte la clave de acceso y la clave secreta.
{
"application_details": {
"application": "cos://cosbucketname.cosservicename/read-employees.py",
"conf": {
"spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud/<CHANGME-according-to-instance>",
"spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>",
"spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>"
}
}
}
Lectura de un archivo CSV desde Object Storage utilizando la clave de API de IAM u otras credenciales HMAC
Los siguientes ejemplos de código muestran cómo crear un script de Python que lee datos de un archivo CSV a Python DataFrame. Tanto el script de Python como el archivo CSV se encuentran en Object Storage.
En este ejemplo, se muestra cómo acceder a IBM Cloud Object Storage utilizando la clave de API de IAM.
Ejemplo de la aplicación denominada read-employees-iam-key-cos.py
. Inserte el nombre de grupo de Object Storage donde se encuentra el archivo CSV y modifique la vía de acceso de punto final.
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder.appName("read-write-cos-test").getOrCreate()
sc = spark.sparkContext
hconf=sc._jsc.hadoopConfiguration()
hconf.set("fs.cos.testcos.endpoint", "s3.direct.us-south.cloud-object-storage.appdomain.cloud/CHANGEME-according-to-instance="
("fs.cos.testcos.iam.api.key","<CHANGEME>")
return spark,sc
def read_employees(spark,sc):
print("Hello1 " , spark )
employeesDF = spark.read.option("header",True).csv("cos://cosbucketname.cosservicename/employees.csv")
print("Hello2" , employeesDF)
employeesDF.createOrReplaceTempView("empTable")
juniors = spark.sql("SELECT empTable.NAME FROM empTable WHERE empTable.BAND < 6")
print("Hello3", juniors)
juniors.show()
def main():
spark,sc = init_spark()
read_employees(spark,sc)
if __name__ == '__main__':
main()
A continuación, utilice POST con el siguiente script JSON de carga útil denominado read-employees-iam-key-cos-submit.json
con la clave de acceso y la contraseña. Inserte el nombre de grupo de Object Storage donde se encuentra el
archivo CSV y modifique la vía de acceso de punto final.
{
"application_details": {
"application": "cos://cosbucketname.cosservicename/read-employees-iam-key-cos.py",
"conf": {
"spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud/CHANGME-according-to-instance",
"spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>",
"spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>"
}
}
}
Lectura y escritura en Object Storage utilizando IAM en Scala
- Cree un proyecto de Eclipse con el siguiente formato:

-
Añada la siguiente aplicación denominada
ScalaReadWriteIAMCOSExample.scala
a la carpetaanalyticsengine
. Inserte la clave de API de IAM y el nombre de grupo de Object Storage.package com.ibm.analyticsengine import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ object ScalaReadWriteIAMCOSExample { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Spark Scala Example") val sc = new SparkContext(sparkConf) val prefix="fs.cos.cosservicename" sc.hadoopConfiguration.set(prefix + ".endpoint", "s3.direct.us-south.cloud-object-storage.appdomain.cloud") sc.hadoopConfiguration.set(prefix + ".iam.api.key","<CHANGEME>") val data = Array("Sweden", "England", "France", "Tokyo") val myData = sc.parallelize(data) myData.saveAsTextFile("cos://cosbucketname.cosservicename/2021sep25-1.data") val myRDD=sc.textFile("cos://cosbucketname.cosservicename/2021sep25-1.data") myRDD.collect().foreach(println) } }
-
Coloque
SparkScalaExample.sbt
en la carpetasrc
:name := "ScalaReadWriteIAMCOSExample" version := "1.0" scalaVersion := "2.12.10" libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
-
Cree el proyecto de Scala mediante sbt. Cargue el archivo jar (
scalareadwriteiamcosexample_2.12-1.0.jar
) resultante en Object Storage.cd SparkScalaExample sbt package
Cargue el archivo jar en Object Storage.
-
A continuación, utilice POST con el siguiente script JSON de carga útil denominado
read-write-cos-scala-submit.json
con la clave de acceso y la contraseña. Inserte el nombre de grupo de Object Storage donde se encuentra el archivo CSV y modifique la vía de acceso de punto final.{ "application_details": { "application": "cos://cosbucketname.cosservicename/scalareadwriteiamcosexample_2.12-1.0.jar", "class":"com.ibm.analyticsengine.ScalaReadWriteIAMCOSExample", "conf": { "spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud-CHANGME-according-to-instance", "spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>", "spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>" } } }
Envío de detalles de aplicación con la clave de API de IAM
En el ejemplo siguiente, se muestran los detalles de aplicación de la aplicación denominada read-employees.py
, que lee datos de employees.csv
en Object Storage utilizando la clave de API de IAM. Inserte el nombre de
servicio y de grupo de Object Storage donde se encuentran el archivo CSV y la clave de API.
{
"application_details": {
"application": "cos://cosbucketname.cosservicename/read-employees.py",
"conf": {
"spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud-CHANGME-according-to-instance",
"spark.hadoop.fs.cos.cosservicename.iam.api.key": "CHANGME"
}
}
}