Einführung in den Spark-Anwendungsfall
Dieser Abschnitt enthält die Prozedur zum Ausführen von Spark-Anwendungsfällen für watsonx.data unter Verwendung von Python-Beispielen. Alle Beispiele werden mithilfe von Spark Python-APIs geschrieben.
Voraussetzungen
- Sie stellen eine IBM® watsonx.data-Instanz bereit.
- Konfigurieren Sie eine IBM Analytics Engine-Instanz.
- Cloud Object Storage details zur Eimerverbindung.
- Damit Ihre Spark-Anwendung mit demwatsonx.data Katalog und Speicher, Sie müssen über die Metastore-Administratorrolle verfügen. Ohne Metastore-Administratorrechte können Sie mit der Native Spark-Engine keine Daten in den Speicher einspeisen. Weitere Informationen zur Spark-Konfiguration finden Sie unter Zusammenarbeit mit demwatsonx.data Katalog und Lagerung.
Informationen zum Beispielanwendungsfall
Die Beispieldatei veranschaulicht die folgenden Funktionen:
-
Zugriff auf Tabellen über watsonx.data
Der Abschnitt Create a database in Lakehouse catalog aus der Beispiel-Python-Datei erstellt eine Datenbank demodb in der konfigurierten watsonx.data-Instanz mit einem Katalog namens lakehouse. demodb ist so konfiguriert, dass alle Daten und Metadaten unter dem Cloud Object Storage-Bucket (COS-Bucket) lakehouse-bucket gespeichert werden. Außerdem wird eine Eisberg-Tabelle testTable erstellt und auf sie zugegriffen.
-
Aufnehmen von Daten in watsonx.data
Der Abschnitt Ingest parquet data into a lakehouse table aus der Python-Beispieldatei ermöglicht Ihnen, Daten im Parquet-und CSV-Format aus einem Cloud Object Storage-Quellenbucket quellenbucket in eine watsonx.data-Tabelle aufzunehmen. Beispieldaten im Parquet-Format werden aus dem Quellen-COS-Bucket source-bucket in die watsonx.data-Tabelle yellow_taxi_2022 eingefügt (siehe Schritte zum Einfügen von Beispieldaten in das Quellen-COS-Bucket). Außerdem werden Daten im CSV-Format aus dem COS-Bucket source-bucket in die Tabelle zipcode in der Datenbank demodb aufgenommen.
-
Schema in watsonx.data ändern
Im Abschnitt Schema evolution der Beispiel-Python-Datei können Sie Daten in watsonx.dataändern.
-
Ausführen von Tabellenverwaltungsaktivitäten in watsonx.data
Die Tabellenverwaltung unterstützt Sie dabei, die Leistung der Tabelle watsonx.data aufrechtzuerhalten. Iceberg stellt ohne Vorbereitungs-oder Anpassungsaufwand Prozeduren für die Tabellenverwaltung bereit, die die Ausführung leistungsfähiger Tabellenoptimierungen auf deklarative Weise ermöglichen. Das folgende Beispiel veranschaulicht, wie einige Tabellenverwaltungsoperationen mit Spark ausgeführt werden. Weitere Informationen zu den Iceberg Spark-Tabellenwartungsoperationen finden Sie unter Tabellenoperationen.
Beispieldaten in COS-Bucket einfügen
Führen Sie die folgenden Schritte aus, um Daten in COS einzufügen:
-
Erstellen Sie ein COS-Bucket (z. B. source-bucket), um Beispieldaten zu speichern, die in die watsonx.data-Instanz aufgenommen werden sollen. Informationen zum Erstellen von COS-Buckets finden Sie unter Einstieg mit IBM Cloud Object Storage.
Als Benutzer von Object Storage müssen Sie nicht nur den API-Schlüssel oder die HMAC-Schlüssel kennen, um Object Storage zu konfigurieren, sondern auch die IBM Analytics Engine-Dienstendpunkte, um eine Verbindung zu Object Storage herzustellen. Unter Auswahl von Regionen und Endpunkten finden Sie weitere Informationen zu den Endpunkten, die auf der Grundlage Ihres Object Storage Bucket-Typs zu verwenden sind, z. B. regional oder regionsübergreifend. Sie können auch die Endpunkte für Ihren Object Storage-Dienst in verschiedenen Regionen anzeigen, indem Sie den Dienst in Ihrem IBM Cloud-Dashboard auswählen und im Navigationsbereich auf Endpunkt klicken. Wählen Sie immer den direkten Endpunkt. Direkte Endpunkte bieten eine bessere Leistung und verursachen keine Gebühren. Ein Beispiel für einen Endpunkt für die Region US-South Cross ist
s3.direct.us.cloud-object-storage.appdomain.cloud
. -
Laden Sie die CSV-Beispieldatei (z. B. zipcodes.csv) und Parquet-Beispieldaten (z. B. Taxi-Daten für sechs Monate für das Jahr 2022) von den folgenden Links herunter:
-
Installieren Sie das IBM Cloud Object Storage-Plug-in. Weitere Informationen zur Installation des Plug-ins finden Sie unter IBM Cloud Object Storage CLI.
-
Verwenden Sie die COS-Befehlszeilenschnittstelle, um die Beispieldaten in das COS-Bucket hochzuladen.
ibmcloud cos upload --bucket <cos_bucket_name> --key <source_file_name> --file <path_to_source_file>
Parameterwerte:
- < cos_bucket_name>: Name des Buckets, das in step1erstellt wurde.
- < name_der_quellendatei>: Der Name der heruntergeladenen Beispieldatendatei. Hier steht key zipcodes.csv für den Dateinamen (siehe folgendes Beispiel).
- < Pfad_zu_Quellendatei>: Der Pfad zu der Position auf Ihrer Maschine, an der sich die Datei befindet In diesem Beispiel ist path/zipcodes.csv der Dateipfad (siehe folgendes Beispiel).
Beispiel:
ibmcloud cos upload --bucket source-bucket --key zipcodes.csv --file <path/zipcodes.csv>
Beispielanwendungsfall ausführen
Führen Sie die Schritte zum Ausführen der Spark-Beispieldatei Python aus.
Spark-Beispieldatei Python
from pyspark.sql import SparkSession
import os
def init_spark():
spark = SparkSession.builder \
.appName("lh-hms-cloud") \
.config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.endpoint" ,"s3.direct.us-south.cloud-object-storage.appdomain.cloud") \
.config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.access.key" ,"<lakehouse-bucket-access-key>") \
.config("spark.hadoop.fs.s3a.bucket.lakehouse-bucket.secret.key" ,"<lakehouse-bucket-secret-key>") \
.config("spark.hadoop.fs.s3a.bucket.source-bucket.endpoint" ,"s3.direct.us-south.cloud-object-storage.appdomain.cloud") \
.config("spark.hadoop.fs.s3a.bucket.source-bucket.access.key" ,"<source-bucket-access-key>") \
.config("spark.hadoop.fs.s3a.bucket.source-bucket.secret.key" ,"<source-bucket-secret-key>") \
.enableHiveSupport() \
.getOrCreate()
return spark
def create_database(spark):
# Create a database in the lakehouse catalog
spark.sql("create database if not exists lakehouse.demodb LOCATION 's3a://lakehouse-bucket/'")
def list_databases(spark):
# list the database under lakehouse catalog
spark.sql("show databases from lakehouse").show()
def basic_iceberg_table_operations(spark):
# demonstration: Create a basic Iceberg table, insert some data and then query table
spark.sql("create table if not exists lakehouse.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg").show()
spark.sql("insert into lakehouse.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)")
spark.sql("select * from lakehouse.demodb.testTable").show()
def create_table_from_parquet_data(spark):
# load parquet data into dataframe
df = spark.read.option("header",True).parquet("s3a://source-bucket/nyc-taxi/yellow_tripdata_2022-01.parquet")
# write the dataframe into an Iceberg table
df.writeTo("lakehouse.demodb.yellow_taxi_2022").create()
# describe the table created
spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
# query the table
spark.sql('select * from lakehouse.demodb.yellow_taxi_2022').count()
def ingest_from_csv_temp_table(spark):
# load csv data into a dataframe
csvDF = spark.read.option("header",True).csv("s3a://source-bucket/zipcodes.csv")
csvDF.createOrReplaceTempView("tempCSVTable")
# load temporary table into an Iceberg table
spark.sql('create or replace table lakehouse.demodb.zipcodes using iceberg as select * from tempCSVTable')
# describe the table created
spark.sql('describe table lakehouse.demodb.zipcodes').show(25)
# query the table
spark.sql('select * from lakehouse.demodb.zipcodes').show()
def ingest_monthly_data(spark):
df_feb = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-02.parquet")
df_march = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-03.parquet")
df_april = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-04.parquet")
df_may = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-05.parquet")
df_june = spark.read.option("header",True).parquet("s3a://source-bucket//nyc-taxi/yellow_tripdata_2022-06.parquet")
df_q1_q2 = df_feb.union(df_march).union(df_april).union(df_may).union(df_june)
df_q1_q2.write.insertInto("lakehouse.demodb.yellow_taxi_2022")
def perform_table_maintenance_operations(spark):
# Query the metadata files table to list underlying data files
spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
# There are many smaller files compact them into files of 200MB each using the
# `rewrite_data_files` Iceberg Spark procedure
spark.sql(f"CALL lakehouse.system.rewrite_data_files(table => 'demodb.yellow_taxi_2022', options => map('target-file-size-bytes','209715200'))").show()
# Again, query the metadata files table to list underlying data files; 6 files are compacted
# to 3 files
spark.sql("SELECT file_path, file_size_in_bytes FROM lakehouse.demodb.yellow_taxi_2022.files").show()
# List all the snapshots
# Expire earlier snapshots. Only latest one with compacted data is required
# Again, List all the snapshots to see only 1 left
spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
#retain only the latest one
latest_snapshot_committed_at = spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").tail(1)[0].committed_at
print (latest_snapshot_committed_at)
spark.sql(f"CALL lakehouse.system.expire_snapshots(table => 'demodb.yellow_taxi_2022',older_than => TIMESTAMP '{latest_snapshot_committed_at}',retain_last => 1)").show()
spark.sql("SELECT committed_at, snapshot_id, operation FROM lakehouse.demodb.yellow_taxi_2022.snapshots").show()
# Removing Orphan data files
spark.sql(f"CALL lakehouse.system.remove_orphan_files(table => 'demodb.yellow_taxi_2022')").show(truncate=False)
# Rewriting Manifest Files
spark.sql(f"CALL lakehouse.system.rewrite_manifests('demodb.yellow_taxi_2022')").show()
def evolve_schema(spark):
# demonstration: Schema evolution
# Add column fare_per_mile to the table
spark.sql('ALTER TABLE lakehouse.demodb.yellow_taxi_2022 ADD COLUMN(fare_per_mile double)')
# describe the table
spark.sql('describe table lakehouse.demodb.yellow_taxi_2022').show(25)
def clean_database(spark):
# clean-up the demo database
spark.sql('drop table if exists lakehouse.demodb.testTable purge')
spark.sql('drop table if exists lakehouse.demodb.zipcodes purge')
spark.sql('drop table if exists lakehouse.demodb.yellow_taxi_2022 purge')
spark.sql('drop database if exists lakehouse.demodb cascade')
def main():
try:
spark = init_spark()
create_database(spark)
list_databases(spark)
basic_iceberg_table_operations(spark)
# demonstration: Ingest parquet and csv data into a watsonx.data Iceberg table
create_table_from_parquet_data(spark)
ingest_from_csv_temp_table(spark)
# load data for the month of February to June into the table yellow_taxi_2022 created above
ingest_monthly_data(spark)
# demonstration: Table maintenance
perform_table_maintenance_operations(spark)
# demonstration: Schema evolution
evolve_schema(spark)
finally:
# clean-up the demo database
clean_database(spark)
spark.stop()
if __name__ == '__main__':
main()
- Speichern Sie die folgende Python-Beispieldatei.
- Laden Sie die Python-Datei in das Bucket Cloud Object Storage hoch. Sie müssen die Spark-Anwendungen und ihre Abhängigkeiten in einem Cloud Object Storage-Bucket verwalten und nicht mit Datenbuckets kombinieren.
- Generieren Sie das IAM-Token für das Token IBM Analytics Engine. Weitere Informationen über die Erstellung eines IAM-Tokens finden Sie unter IAM-Token.
- Führen Sie den folgenden curl-Befehl aus, um die Spark-Anwendung zu übergeben:
curl https://api.<region>.ae.cloud.ibm.com/v3/analytics_engines/<iae-instance-guid>/spark_applications -H "Authorization: Bearer <iam-bearer-token>" -X POST -d '{
"application_details": {
"application": "s3a://<application_bucket>/lakehouse-hms-test-cloud-doc-sample.py",
"conf": {
"spark.hadoop.fs.s3a.bucket.<application-bucket>.endpoint": "https://s3.direct.us-south.cloud-object-storage.appdomain.cloud",
"spark.hadoop.fs.s3a.bucket.<application-bucket>.access.key": "<hmac_access_key_for_application-bucket>",
"spark.hadoop.fs.s3a.bucket.<application-bucket>.secret.key": "<hmac_secret_key_for_application-bucket>"
}
}
}'
Dieses Beispiel wurde mit den Cloud Object Storage-Buckets in der Region us-south getestet. Ändern Sie die Region in der Cloud Object Storage-Endpunktkonfiguration entsprechend der Region, in der sich Ihre Cloud Object Storage-Buckets befinden. Es wird empfohlen, die COS-Buckets in der Region bereitzustellen, in der die Analytics Engine-Instanz bereitgestellt wird.
Wenn der eingereichte Spark-Antrag länger als drei Tage im Status "in Bearbeitung" verbleibt, wird er automatisch beendet.
Zusammenarbeit mit demwatsonx.data Katalog und Lagerung
Damit Ihre Spark-Anwendung mit demwatsonx.data Katalog und Speicher, fügen Sie Ihrer Anwendungsnutzlast die folgende Konfiguration hinzu:
spark.hive.metastore.client.plain.username=ibmlhapikey
spark.hive.metastore.client.plain.password=<api-key-of-the-user-which-has-metastore-admin-role>
spark.hadoop.wxd.apiKey=Basic base64(ibmlhapikey_ibmcloudid:apikey)