Orquestación mediante Apache Airflow
Apache Airflow es una plataforma de código abierto que permite crear, programar y supervisar el flujo de trabajo. Los flujos de trabajo se definen como grafos acíclicos dirigidos (DAG) que constan de múltiples tareas escritas utilizando Python. Cada tarea representa una unidad discreta de trabajo, como ejecutar un script, consultar una base de datos o llamar a una API. La arquitectura de Airflow admite el escalado y la ejecución en paralelo, lo que la hace idónea para gestionar canalizaciones complejas y con gran cantidad de datos.
Apache airflow soporta los siguientes casos de uso :
- ETL o ELT Pipelines : Extraer datos de diversas fuentes, transformarlos y cargarlos en el almacén de datos.
- Almacenamiento de datos : Programación de actualizaciones periódicas y transformaciones de datos en un almacén de datos.
- Procesamiento de datos : Orquestación de tareas distribuidas de procesamiento de datos en distintos sistemas.
Requisitos previos
- Instancia activa autónoma de Apache Airflow.
- Claves API de usuario para watsonx.data (username y api_key). Por ejemplo, "
username
: "yourid@example.com
y "api_key
: "sfw....cv23
. - CRN para watsonx.data (wxd_instance_id). Obtiene el ID de instancia de la página de información watsonx.data.
- Id de motor Spark de un motor Spark activo (spark_engine_id).
- Presto url externa de un motor Presto activo (presto_ext_url).
- Ubicación del certificado SSL en el que confía el sistema (si procede).
- Catálogo asociado a los motores Spark y Presto (nombre_catálogo).
- Nombre del bucket asociado al catálogo seleccionado. (nombre_cubo).
- Instala los paquetes, Pandas y Presto-python-client usando el comando:
pip install pandas presto-python-client
.
Procedimiento
-
El caso de uso considera una tarea de ingesta de datos a Presto. Para ello, cree una aplicación Spark que ingiera datos de Iceberg en el catálogo watsonx.data. Aquí se considera el fichero de ejemplo Python ingestion-job.py.
from pyspark.sql import SparkSession import os, sys def init_spark(): spark = SparkSession.builder.appName("ingestion-demo").enableHiveSupport().getOrCreate() return spark def create_database(spark,bucket_name,catalog): spark.sql("create database if not exists {}.demodb LOCATION 's3a://{}/demodb'".format(catalog,bucket_name)) def list_databases(spark,catalog): # list the database under lakehouse catalog spark.sql("show databases from {}".format(catalog)).show() def basic_iceberg_table_operations(spark,catalog): # demonstration: Create a basic Iceberg table, insert some data and then query table print("creating table") spark.sql("create table if not exists {}.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg".format(catalog)).show() print("table created") spark.sql("insert into {}.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)".format(catalog)) print("data inserted") spark.sql("select * from {}.demodb.testTable".format(catalog)).show() def clean_database(spark,catalog): # clean-up the demo database spark.sql("drop table if exists {}.demodb.testTable purge".format(catalog)) spark.sql("drop database if exists {}.demodb cascade".format(catalog)) def main(wxdDataBucket, wxdDataCatalog): try: spark = init_spark() create_database(spark,wxdDataBucket,wxdDataCatalog) list_databases(spark,wxdDataCatalog) basic_iceberg_table_operations(spark,wxdDataCatalog) finally: # clean-up the demo database clean_database(spark,wxdDataCatalog) spark.stop() if __name__ == '__main__': main(sys.argv[1],sys.argv[2])
-
Sube el archivo al almacenamiento con el nombre,
bucket_name
. Para obtener más información, consulte Añada algunos objetos a sus cubos. -
Diseñe un flujo de trabajo DAG utilizando Python y guarde el archivo Python en la ubicación del directorio de Apache Airflow, directorio '
$AIRFLOW_HOME/dags/
(El valor por defecto de AIRFLOW_HOME es ~/airflow).A continuación se muestra un ejemplo de flujo de trabajo, que ejecuta tareas para ingestar datos a Presto en watsonx.data, y consultar datos desde watsonx.data. Guarde el archivo con el siguiente contenido, como
wxd_pipeline.py
.from datetime import timedelta, datetime from time import sleep import prestodb import pandas as pd import base64 import os # type: ignore # The DAG object from airflow import DAG # Operators from airflow.operators.python_operator import PythonOperator # type: ignore import requests # Initializing the default arguments default_args = { 'owner': 'IBM watsonx.data', 'start_date': datetime(2024, 3, 4), 'retries': 3, 'retry_delay': timedelta(minutes=5), 'wxd_endpoint': 'https://us-south.lakehouse.cloud.ibm.com', # Host endpoint 'wxd_instance_id': 'crn:...::', # watsonx.data CRN 'wxd_username': 'yourid@example.com', # your email id 'wxd_api_key': 'sfw....cv23', # IBM IAM Api Key 'spark_engine_id': 'spark6', # Spark Engine id 'catalog_name': 'my_iceberg_catalog', # Catalog name where data will be ingestion 'bucket_name': 'my-wxd-bucket', # Bucket name (not display name) associated with the above catalog 'presto_eng_host': '2ce72...d59.cise...5s20.lakehouse.appdomain.cloud', # Presto engine hostname (without protocol and port) 'presto_eng_port': 30912 # Presto engine port (in numbers only) } # Instantiate a DAG object wxd_pipeline_dag = DAG('wxd_ingestion_pipeline_saas', default_args=default_args, description='watsonx.data ingestion pipeline', schedule_interval=None, is_paused_upon_creation=True, catchup=False, max_active_runs=1, tags=['wxd', 'watsonx.data'] ) # Workaround: Enable if you want to disable SSL verification os.environ['NO_PROXY'] = '*' # Get access token def get_access_token(): try: url = f"https://iam.cloud.ibm.com/oidc/token" headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json', } data = { 'grant_type': 'urn:ibm:params:oauth:grant-type:apikey', 'apikey': default_args['wxd_api_key'], } response = requests.post('https://iam.cloud.ibm.com/identity/token', headers=headers, data=data) return response.json()['access_token'] except Exception as inst: print('Error in getting access token') print(inst) exit def _ingest_via_spark_engine(): try: print('ingest__via_spark_engine') url = f"{default_args['wxd_endpoint']}/lakehouse/api/v2/spark_engines/{default_args['spark_engine_id']}/applications" headers = {'Content-type': 'application/json', 'Authorization': f'Bearer {get_access_token()}', 'AuthInstanceId': default_args['wxd_instance_id']} auth_str = base64.b64encode(f'ibmlhapikey_{default_args["wxd_username"]}:{default_args["wxd_api_key"]}'.encode('ascii')).decode("ascii") response = requests.post(url, None, { "application_details": { "conf": { "spark.executor.cores": "1", "spark.executor.memory": "1G", "spark.driver.cores": "1", "spark.driver.memory": "1G", "spark.hadoop.wxd.apikey": f"Basic {auth_str}" }, "application": f"s3a://{default_args['bucket_name']}/ingestion-job.py", "arguments": [ default_args['bucket_name'], default_args['catalog_name'] ], } } , headers=headers, verify=False) print("Response", response.content) return response.json()['id'] except Exception as inst: print(inst) raise ValueError('Task failed due to', inst) def _wait_until_job_is_complete(**context): try: print('wait_until_job_is_complete') application_id = context['task_instance'].xcom_pull(task_ids='ingest_via_spark_engine') print(application_id) while True: url = f"{default_args['wxd_endpoint']}/lakehouse/api/v2/spark_engines/{default_args['spark_engine_id']}/applications/{application_id}" headers = {'Content-type': 'application/json', 'Authorization': f'Bearer {get_access_token()}', 'AuthInstanceId': default_args['wxd_instance_id']} response = requests.get(url, headers=headers, verify=False) print(response.content) data = response.json() if data['state'] == 'finished': break elif data['state'] in ['stopped', 'failed', 'killed']: raise ValueError("Job failed: ", data) print('Job is not completed, sleeping for 10secs') sleep(10) except Exception as inst: print(inst) raise ValueError('Task failed due to', inst) def _query_presto(): try: with prestodb.dbapi.connect( host=default_args['presto_eng_host'], port=default_args['presto_eng_port'], user=default_args['wxd_username'], catalog='tpch', schema='tiny', http_scheme='https', auth=prestodb.auth.BasicAuthentication(f'ibmlhapikey_{default_args["wxd_username"]}', default_args["wxd_api_key"]) ) as conn: df = pd.read_sql_query(f"select * from {default_args['catalog_name']}.demodb.testTable limit 5", conn) with pd.option_context('display.max_rows', None, 'display.max_columns', None): print("\n", df.head()) except Exception as inst: print(inst) raise ValueError('Query faield due to ', inst) def start_job(): print('Validating default arguments') if 'wxd_endpoint' not in default_args: raise ValueError('wxd_endpoint is mandatory') if 'wxd_username' not in default_args: raise ValueError('wxd_username is mandatory') if 'wxd_instance_id' not in default_args: raise ValueError('wxd_instance_id is mandatory') if 'wxd_api_key' not in default_args: raise ValueError('wxd_api_key is mandatory') if 'spark_engine_id' not in default_args: raise ValueError('spark_engine_id is mandatory') start = PythonOperator(task_id='start_task', python_callable=start_job, dag=wxd_pipeline_dag) ingest_via_spark_engine = PythonOperator(task_id='ingest_via_spark_engine', python_callable=_ingest_via_spark_engine, dag=wxd_pipeline_dag) wait_until_ingestion_is_complete = PythonOperator(task_id='wait_until_ingestion_is_complete', python_callable=_wait_until_job_is_complete, dag=wxd_pipeline_dag) query_via_presto = PythonOperator(task_id='query_via_presto', python_callable=_query_presto, dag=wxd_pipeline_dag) start >> ingest_via_spark_engine >> wait_until_ingestion_is_complete >> query_via_presto
-
Inicie sesión en Apache Airflow.
-
Busque el trabajo
wxd_pipeline.py
, active el DAG desde la página de la consola Apache Airflow. El flujo de trabajo se ejecuta correctamente.