Orchestrierung mit Apache Airflow
Apache Airflow ist eine Open-Source-Plattform, mit der Sie Arbeitsabläufe erstellen, planen und überwachen können. Arbeitsabläufe werden als gerichtete azyklische Graphen (DAGs) definiert, die aus mehreren Aufgaben bestehen, die mit Python-Code geschrieben werden. Jede Aufgabe stellt eine diskrete Arbeitseinheit dar, z. B. die Ausführung eines Skripts, die Abfrage einer Datenbank oder den Aufruf einer API. Die Airflow-Architektur unterstützt Skalierung und parallele Ausführung und eignet sich daher für die Verwaltung komplexer, datenintensiver Pipelines.
Apache airflow unterstützt die folgenden Anwendungsfälle:
- ETL- oder ELT-Pipelines: Extraktion von Daten aus verschiedenen Quellen, Umwandlung und Laden in das Data Warehouse.
- Data Warehousing: Planung regelmäßiger Aktualisierungen und Datenumwandlungen in einem Data Warehouse.
- Datenverarbeitung: Orchestrierung von verteilten Datenverarbeitungsaufgaben über verschiedene Systeme hinweg.
Voraussetzungen
- Apache Airflow als eigenständige aktive Instanz.
- Benutzer-API-Schlüssel für watsonx.data (username und api_key). Zum Beispiel: "
username
: "yourid@example.com
und "api_key
: "sfw....cv23
. - CRN für watsonx.data (wxd_instance_id). Holen Sie die Instanz-ID aus der Informationsseite watsonx.data.
- Spark-Engine-ID von einer aktiven Spark-Engine (spark_engine_id).
- Presto externe URL von einer aktiven Presto Engine (presto_ext_url).
- Ort des SSL-Zertifikats, dem das System vertraut (falls zutreffend).
- Mit Spark- und Presto verbundener Katalog (catalog_name).
- Name des Bereichs, der mit dem ausgewählten Katalog verbunden ist. (bucket_name).
- Installieren Sie die Pakete, Pandas und Presto-python-client mit dem Befehl:
pip install pandas presto-python-client
.
Vorgehensweise
-
Der Anwendungsfall betrachtet eine Aufgabe zur Aufnahme von Daten in Presto. Erstellen Sie dazu eine Spark-Anwendung, die Iceberg-Daten in den watsonx.data einspeist. Hier wird das Beispiel Python Datei ingestion-job.py betrachtet.
from pyspark.sql import SparkSession import os, sys def init_spark(): spark = SparkSession.builder.appName("ingestion-demo").enableHiveSupport().getOrCreate() return spark def create_database(spark,bucket_name,catalog): spark.sql("create database if not exists {}.demodb LOCATION 's3a://{}/demodb'".format(catalog,bucket_name)) def list_databases(spark,catalog): # list the database under lakehouse catalog spark.sql("show databases from {}".format(catalog)).show() def basic_iceberg_table_operations(spark,catalog): # demonstration: Create a basic Iceberg table, insert some data and then query table print("creating table") spark.sql("create table if not exists {}.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg".format(catalog)).show() print("table created") spark.sql("insert into {}.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)".format(catalog)) print("data inserted") spark.sql("select * from {}.demodb.testTable".format(catalog)).show() def clean_database(spark,catalog): # clean-up the demo database spark.sql("drop table if exists {}.demodb.testTable purge".format(catalog)) spark.sql("drop database if exists {}.demodb cascade".format(catalog)) def main(wxdDataBucket, wxdDataCatalog): try: spark = init_spark() create_database(spark,wxdDataBucket,wxdDataCatalog) list_databases(spark,wxdDataCatalog) basic_iceberg_table_operations(spark,wxdDataCatalog) finally: # clean-up the demo database clean_database(spark,wxdDataCatalog) spark.stop() if __name__ == '__main__': main(sys.argv[1],sys.argv[2])
-
Laden Sie die Datei mit dem Namen
bucket_name
in den Speicher hoch. Weitere Informationen finden Sie unter Hinzufügen einiger Objekte zu Ihren Buckets. -
Entwerfen Sie einen DAG-Workflow mit Python und speichern Sie die Python im Apache Airflow "
$AIRFLOW_HOME/dags/
(der Standardwert von AIRFLOW_HOME ist auf ~/airflow eingestellt).Nachfolgend ein Beispiel für einen Arbeitsablauf, bei dem Aufgaben zur Aufnahme von Daten in Presto in watsonx.data und zur Abfrage von Daten aus watsonx.data ausgeführt werden. Speichern Sie die Datei mit dem folgenden Inhalt als
wxd_pipeline.py
.from datetime import timedelta, datetime from time import sleep import prestodb import pandas as pd import base64 import os # type: ignore # The DAG object from airflow import DAG # Operators from airflow.operators.python_operator import PythonOperator # type: ignore import requests # Initializing the default arguments default_args = { 'owner': 'IBM watsonx.data', 'start_date': datetime(2024, 3, 4), 'retries': 3, 'retry_delay': timedelta(minutes=5), 'wxd_endpoint': 'https://us-south.lakehouse.cloud.ibm.com', # Host endpoint 'wxd_instance_id': 'crn:...::', # watsonx.data CRN 'wxd_username': 'yourid@example.com', # your email id 'wxd_api_key': 'sfw....cv23', # IBM IAM Api Key 'spark_engine_id': 'spark6', # Spark Engine id 'catalog_name': 'my_iceberg_catalog', # Catalog name where data will be ingestion 'bucket_name': 'my-wxd-bucket', # Bucket name (not display name) associated with the above catalog 'presto_eng_host': '2ce72...d59.cise...5s20.lakehouse.appdomain.cloud', # Presto engine hostname (without protocol and port) 'presto_eng_port': 30912 # Presto engine port (in numbers only) } # Instantiate a DAG object wxd_pipeline_dag = DAG('wxd_ingestion_pipeline_saas', default_args=default_args, description='watsonx.data ingestion pipeline', schedule_interval=None, is_paused_upon_creation=True, catchup=False, max_active_runs=1, tags=['wxd', 'watsonx.data'] ) # Workaround: Enable if you want to disable SSL verification os.environ['NO_PROXY'] = '*' # Get access token def get_access_token(): try: url = f"https://iam.cloud.ibm.com/oidc/token" headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json', } data = { 'grant_type': 'urn:ibm:params:oauth:grant-type:apikey', 'apikey': default_args['wxd_api_key'], } response = requests.post('https://iam.cloud.ibm.com/identity/token', headers=headers, data=data) return response.json()['access_token'] except Exception as inst: print('Error in getting access token') print(inst) exit def _ingest_via_spark_engine(): try: print('ingest__via_spark_engine') url = f"{default_args['wxd_endpoint']}/lakehouse/api/v2/spark_engines/{default_args['spark_engine_id']}/applications" headers = {'Content-type': 'application/json', 'Authorization': f'Bearer {get_access_token()}', 'AuthInstanceId': default_args['wxd_instance_id']} auth_str = base64.b64encode(f'ibmlhapikey_{default_args["wxd_username"]}:{default_args["wxd_api_key"]}'.encode('ascii')).decode("ascii") response = requests.post(url, None, { "application_details": { "conf": { "spark.executor.cores": "1", "spark.executor.memory": "1G", "spark.driver.cores": "1", "spark.driver.memory": "1G", "spark.hadoop.wxd.apikey": f"Basic {auth_str}" }, "application": f"s3a://{default_args['bucket_name']}/ingestion-job.py", "arguments": [ default_args['bucket_name'], default_args['catalog_name'] ], } } , headers=headers, verify=False) print("Response", response.content) return response.json()['id'] except Exception as inst: print(inst) raise ValueError('Task failed due to', inst) def _wait_until_job_is_complete(**context): try: print('wait_until_job_is_complete') application_id = context['task_instance'].xcom_pull(task_ids='ingest_via_spark_engine') print(application_id) while True: url = f"{default_args['wxd_endpoint']}/lakehouse/api/v2/spark_engines/{default_args['spark_engine_id']}/applications/{application_id}" headers = {'Content-type': 'application/json', 'Authorization': f'Bearer {get_access_token()}', 'AuthInstanceId': default_args['wxd_instance_id']} response = requests.get(url, headers=headers, verify=False) print(response.content) data = response.json() if data['state'] == 'finished': break elif data['state'] in ['stopped', 'failed', 'killed']: raise ValueError("Job failed: ", data) print('Job is not completed, sleeping for 10secs') sleep(10) except Exception as inst: print(inst) raise ValueError('Task failed due to', inst) def _query_presto(): try: with prestodb.dbapi.connect( host=default_args['presto_eng_host'], port=default_args['presto_eng_port'], user=default_args['wxd_username'], catalog='tpch', schema='tiny', http_scheme='https', auth=prestodb.auth.BasicAuthentication(f'ibmlhapikey_{default_args["wxd_username"]}', default_args["wxd_api_key"]) ) as conn: df = pd.read_sql_query(f"select * from {default_args['catalog_name']}.demodb.testTable limit 5", conn) with pd.option_context('display.max_rows', None, 'display.max_columns', None): print("\n", df.head()) except Exception as inst: print(inst) raise ValueError('Query faield due to ', inst) def start_job(): print('Validating default arguments') if 'wxd_endpoint' not in default_args: raise ValueError('wxd_endpoint is mandatory') if 'wxd_username' not in default_args: raise ValueError('wxd_username is mandatory') if 'wxd_instance_id' not in default_args: raise ValueError('wxd_instance_id is mandatory') if 'wxd_api_key' not in default_args: raise ValueError('wxd_api_key is mandatory') if 'spark_engine_id' not in default_args: raise ValueError('spark_engine_id is mandatory') start = PythonOperator(task_id='start_task', python_callable=start_job, dag=wxd_pipeline_dag) ingest_via_spark_engine = PythonOperator(task_id='ingest_via_spark_engine', python_callable=_ingest_via_spark_engine, dag=wxd_pipeline_dag) wait_until_ingestion_is_complete = PythonOperator(task_id='wait_until_ingestion_is_complete', python_callable=_wait_until_job_is_complete, dag=wxd_pipeline_dag) query_via_presto = PythonOperator(task_id='query_via_presto', python_callable=_query_presto, dag=wxd_pipeline_dag) start >> ingest_via_spark_engine >> wait_until_ingestion_is_complete >> query_via_presto
-
Melden Sie sich bei Apache Airflow an.
-
Suchen Sie nach
wxd_pipeline.py
Job, aktivieren Sie die DAG auf der Apache Airflow Konsolenseite. Der Workflow wird erfolgreich ausgeführt.