Exemples standard de Spark
Vous pouvez utiliser les exemples de code suivants pour apprendre à utiliser Spark dans différentes situations.
Pour savoir comment accéder à Object Storage, voir Présentation des données d'identification Object Storage.
Lecture d'un fichier CSV à partir de Object Storage à l'aide des données d'identification déjà indiquées
Les exemples de code suivants montrent comment créer un script Python qui lit les données d'un fichier CSV vers un DataFrame Python. Le script Python et le fichier CSV se trouvent dans Object Storage.
Vous pouvez utiliser les mêmes données d'identification IBM Cloud Object Storage que celles que vous avez spécifiées lors de la soumission de l'application Spark ou qui ont été définies en tant que configuration par défaut lorsque vous avez créé l'instance de service IBM Analytics Engine à lire Object Storage dans l'application.
Exemple de l'application appelée read-employees.py
. Insérez le nom de compartiment et le nom de service Object Storage . Le nom de service correspond à tout nom donné à votre instance Object Storage :
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder.appName("read-write-cos-test").getOrCreate()
sc = spark.sparkContext
return spark,sc
def read_employees(spark,sc):
print("Hello 1" , spark )
employeesDF = spark.read.option("header",True).csv("cos://cosbucketname.cosservicename/employees.csv")
print("Hello 2" , employeesDF)
employeesDF.createOrReplaceTempView("empTable")
seniors = spark.sql("SELECT empTable.NAME FROM empTable WHERE empTable.BAND >= 6")
print("Hello 3", seniors)
seniors.show()
def main():
spark,sc = init_spark()
read_employees(spark,sc)
if __name__ == '__main__':
main()
Exemple du fichier CSV appelé employees.csv
qui est lu par l'application intitulée read-employees.py
:
NAME,BAND,DEPT
Abraham,8,EG
Barack,6,RT
Clinton,5,GG
Hoover,4,FF
Kennedy,7,NN
Truman,3,TT
Pour exécuter l'application appelée read-employees.py
qui lit les données à partir de employees.csv
POST, le script de contenu JSON a appelé read-employees-submit.json
. Insérez le compartiment Object Storage
et le nom de service dans lequel se trouve le fichier CSV, modifiez le chemin du point de terminaison final et insérez votre clé d'accès et la clé secrète.
{
"application_details": {
"application": "cos://cosbucketname.cosservicename/read-employees.py",
"conf": {
"spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud/<CHANGME-according-to-instance>",
"spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>",
"spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>"
}
}
}
Lecture d'un fichier CSV à partir de Object Storage à l'aide de la clé de l'API IAM ou des données d'identification HMAC différentes
Les exemples de code suivants montrent comment créer un script Python qui lit les données d'un fichier CSV vers un DataFrame Python. Le script Python et le fichier CSV se trouvent dans Object Storage.
Cet exemple montre comment accéder à IBM Cloud Object Storage à l'aide de la clé de l'API IAM.
Exemple de l'application appelée read-employees-iam-key-cos.py
. Insérez le nom de compartiment Object Storage dans lequel se trouve le fichier CSV et modifiez le chemin du point de terminaison.
from pyspark.sql import SparkSession
def init_spark():
spark = SparkSession.builder.appName("read-write-cos-test").getOrCreate()
sc = spark.sparkContext
hconf=sc._jsc.hadoopConfiguration()
hconf.set("fs.cos.testcos.endpoint", "s3.direct.us-south.cloud-object-storage.appdomain.cloud/CHANGEME-according-to-instance="
("fs.cos.testcos.iam.api.key","<CHANGEME>")
return spark,sc
def read_employees(spark,sc):
print("Hello1 " , spark )
employeesDF = spark.read.option("header",True).csv("cos://cosbucketname.cosservicename/employees.csv")
print("Hello2" , employeesDF)
employeesDF.createOrReplaceTempView("empTable")
juniors = spark.sql("SELECT empTable.NAME FROM empTable WHERE empTable.BAND < 6")
print("Hello3", juniors)
juniors.show()
def main():
spark,sc = init_spark()
read_employees(spark,sc)
if __name__ == '__main__':
main()
Ensuite, publier (POST) le script JSON de contenu suivant appelé read-employees-iam-key-cos-submit.json
avec votre clé d'accès et votre mot de passe. Insérez le nom de compartiment Object Storage dans lequel se trouve le fichier
CSV et modifiez le chemin du point de terminaison.
{
"application_details": {
"application": "cos://cosbucketname.cosservicename/read-employees-iam-key-cos.py",
"conf": {
"spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud/CHANGME-according-to-instance",
"spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>",
"spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>"
}
}
}
Lecture et écriture dans Object Storage à l'aide d'IAM dans Scala
- Créez un projet Eclipse au format suivant :

-
Ajoutez l'application suivante appelée
ScalaReadWriteIAMCOSExample.scala
au dossieranalyticsengine
. Insérez votre clé d'API IAM et le nom de compartiment Object Storage .package com.ibm.analyticsengine import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.SparkContext._ object ScalaReadWriteIAMCOSExample { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("Spark Scala Example") val sc = new SparkContext(sparkConf) val prefix="fs.cos.cosservicename" sc.hadoopConfiguration.set(prefix + ".endpoint", "s3.direct.us-south.cloud-object-storage.appdomain.cloud") sc.hadoopConfiguration.set(prefix + ".iam.api.key","<CHANGEME>") val data = Array("Sweden", "England", "France", "Tokyo") val myData = sc.parallelize(data) myData.saveAsTextFile("cos://cosbucketname.cosservicename/2021sep25-1.data") val myRDD=sc.textFile("cos://cosbucketname.cosservicename/2021sep25-1.data") myRDD.collect().foreach(println) } }
-
Placez
SparkScalaExample.sbt
dans le dossiersrc
:name := "ScalaReadWriteIAMCOSExample" version := "1.0" scalaVersion := "2.12.10" libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.2"
-
Créez le projet Scala à l'aide de sbt. Téléchargez le fichier jar résultant (
scalareadwriteiamcosexample_2.12-1.0.jar
) dans Object Storage.cd SparkScalaExample sbt package
Téléchargez le fichier jar vers le stockage d'objets.
-
Ensuite, publier (POST) le script JSON de contenu suivant appelé
read-write-cos-scala-submit.json
avec votre clé d'accès et votre mot de passe. Insérez le nom de compartiment Object Storage dans lequel se trouve le fichier CSV et modifiez le chemin du point de terminaison.{ "application_details": { "application": "cos://cosbucketname.cosservicename/scalareadwriteiamcosexample_2.12-1.0.jar", "class":"com.ibm.analyticsengine.ScalaReadWriteIAMCOSExample", "conf": { "spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud-CHANGME-according-to-instance", "spark.hadoop.fs.cos.cosservicename.access.key": "<CHANGEME>", "spark.hadoop.fs.cos.cosservicename.secret.key": "<CHANGEME>" } } }
Soumission des détails de l'application avec la clé d'API IAM
L'exemple suivant présente les détails de l'application appelée read-employees.py
qui lit les données dans employees.csv
dans Object Storage à l'aide de la clé de l'API IAM. Insérez le compartiment Object Storage et
le nom de service dans lequel se trouve le fichier CSV et la clé d'API.
{
"application_details": {
"application": "cos://cosbucketname.cosservicename/read-employees.py",
"conf": {
"spark.hadoop.fs.cos.cosservicename.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud-CHANGME-according-to-instance",
"spark.hadoop.fs.cos.cosservicename.iam.api.key": "CHANGME"
}
}
}