使用 Apache Airflow 进行协调
Apache Airflow 是一个开源平台,可帮助您创建、调度和监控工作流。 工作流被定义为有向无环图(DAG),由多个使用 Python 代码编写的任务组成。 每个任务代表一个独立的工作单元,如运行脚本、查询数据库或调用 API。 Airflow 架构支持扩展和并行执行,因此适合管理复杂的数据密集型流水线。
Apache airflow 支持以下用例:
- ETL 或 ELT 管道:从各种来源提取数据、转换数据并将其加载到数据仓库中。
- 数据仓库:在数据仓库中安排定期更新和数据转换。
- 数据处理:跨不同系统协调分布式数据处理任务。
先决条件
- Apache Airflow独立活动实例。
- watsonx.data的用户 API 密钥(username 和 api_key)。 例如,"
username
":"yourid@example.com
,"api_key
":"sfw....cv23
。 - watsonx.data (wxd_instance_id) 的 CRN。 从 watsonx.data 信息页面获取实例 ID。
- 活动 Spark 引擎的 Spark 引擎 ID(spark_engine_id)。
- Presto 外部 url 来自活动的 Presto 引擎 (presto_ext_url)。
- 系统信任的 SSL 证书位置(如适用)。
- 与 Spark 和Presto引擎相关的目录(catalog_name)。
- 与所选目录相关联的存储桶的名称。(bucket_name)。
- 使用命令安装 Pandas 和 Presto-python-client 软件包:
pip install pandas presto-python-client
.
过程
-
该用例考虑的任务是将数据导入Presto。 为此,请创建一个 Spark 应用程序,将 Iceberg 数据导入watsonx.data目录。 在此,我们将考虑示例 Python 文件 ingestion-job.py。
from pyspark.sql import SparkSession import os, sys def init_spark(): spark = SparkSession.builder.appName("ingestion-demo").enableHiveSupport().getOrCreate() return spark def create_database(spark,bucket_name,catalog): spark.sql("create database if not exists {}.demodb LOCATION 's3a://{}/demodb'".format(catalog,bucket_name)) def list_databases(spark,catalog): # list the database under lakehouse catalog spark.sql("show databases from {}".format(catalog)).show() def basic_iceberg_table_operations(spark,catalog): # demonstration: Create a basic Iceberg table, insert some data and then query table print("creating table") spark.sql("create table if not exists {}.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg".format(catalog)).show() print("table created") spark.sql("insert into {}.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)".format(catalog)) print("data inserted") spark.sql("select * from {}.demodb.testTable".format(catalog)).show() def clean_database(spark,catalog): # clean-up the demo database spark.sql("drop table if exists {}.demodb.testTable purge".format(catalog)) spark.sql("drop database if exists {}.demodb cascade".format(catalog)) def main(wxdDataBucket, wxdDataCatalog): try: spark = init_spark() create_database(spark,wxdDataBucket,wxdDataCatalog) list_databases(spark,wxdDataCatalog) basic_iceberg_table_operations(spark,wxdDataCatalog) finally: # clean-up the demo database clean_database(spark,wxdDataCatalog) spark.stop() if __name__ == '__main__': main(sys.argv[1],sys.argv[2])
-
上传文件到存储空间,文件名为
bucket_name
。 有关更多信息,请参阅 为您的数据桶添加一些对象。 -
使用Python设计 DAG 工作流,并将Python文件保存到Apache Airflow目录位置的 "
$AIRFLOW_HOME/dags/
目录(AIRFLOW_HOME 的默认值设置为 ~/airflow)。下面是一个工作流示例,其中执行的任务是在 watsonx.data 中向 Presto 采集数据,并从 watsonx.data 中查询数据。 以
wxd_pipeline.py
保存文件,文件内容如下。from datetime import timedelta, datetime from time import sleep import prestodb import pandas as pd import base64 import os # type: ignore # The DAG object from airflow import DAG # Operators from airflow.operators.python_operator import PythonOperator # type: ignore import requests # Initializing the default arguments default_args = { 'owner': 'IBM watsonx.data', 'start_date': datetime(2024, 3, 4), 'retries': 3, 'retry_delay': timedelta(minutes=5), 'wxd_endpoint': 'https://us-south.lakehouse.cloud.ibm.com', # Host endpoint 'wxd_instance_id': 'crn:...::', # watsonx.data CRN 'wxd_username': 'yourid@example.com', # your email id 'wxd_api_key': 'sfw....cv23', # IBM IAM Api Key 'spark_engine_id': 'spark6', # Spark Engine id 'catalog_name': 'my_iceberg_catalog', # Catalog name where data will be ingestion 'bucket_name': 'my-wxd-bucket', # Bucket name (not display name) associated with the above catalog 'presto_eng_host': '2ce72...d59.cise...5s20.lakehouse.appdomain.cloud', # Presto engine hostname (without protocol and port) 'presto_eng_port': 30912 # Presto engine port (in numbers only) } # Instantiate a DAG object wxd_pipeline_dag = DAG('wxd_ingestion_pipeline_saas', default_args=default_args, description='watsonx.data ingestion pipeline', schedule_interval=None, is_paused_upon_creation=True, catchup=False, max_active_runs=1, tags=['wxd', 'watsonx.data'] ) # Workaround: Enable if you want to disable SSL verification os.environ['NO_PROXY'] = '*' # Get access token def get_access_token(): try: url = f"https://iam.cloud.ibm.com/oidc/token" headers = { 'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json', } data = { 'grant_type': 'urn:ibm:params:oauth:grant-type:apikey', 'apikey': default_args['wxd_api_key'], } response = requests.post('https://iam.cloud.ibm.com/identity/token', headers=headers, data=data) return response.json()['access_token'] except Exception as inst: print('Error in getting access token') print(inst) exit def _ingest_via_spark_engine(): try: print('ingest__via_spark_engine') url = f"{default_args['wxd_endpoint']}/lakehouse/api/v2/spark_engines/{default_args['spark_engine_id']}/applications" headers = {'Content-type': 'application/json', 'Authorization': f'Bearer {get_access_token()}', 'AuthInstanceId': default_args['wxd_instance_id']} auth_str = base64.b64encode(f'ibmlhapikey_{default_args["wxd_username"]}:{default_args["wxd_api_key"]}'.encode('ascii')).decode("ascii") response = requests.post(url, None, { "application_details": { "conf": { "spark.executor.cores": "1", "spark.executor.memory": "1G", "spark.driver.cores": "1", "spark.driver.memory": "1G", "spark.hadoop.wxd.apikey": f"Basic {auth_str}" }, "application": f"s3a://{default_args['bucket_name']}/ingestion-job.py", "arguments": [ default_args['bucket_name'], default_args['catalog_name'] ], } } , headers=headers, verify=False) print("Response", response.content) return response.json()['id'] except Exception as inst: print(inst) raise ValueError('Task failed due to', inst) def _wait_until_job_is_complete(**context): try: print('wait_until_job_is_complete') application_id = context['task_instance'].xcom_pull(task_ids='ingest_via_spark_engine') print(application_id) while True: url = f"{default_args['wxd_endpoint']}/lakehouse/api/v2/spark_engines/{default_args['spark_engine_id']}/applications/{application_id}" headers = {'Content-type': 'application/json', 'Authorization': f'Bearer {get_access_token()}', 'AuthInstanceId': default_args['wxd_instance_id']} response = requests.get(url, headers=headers, verify=False) print(response.content) data = response.json() if data['state'] == 'finished': break elif data['state'] in ['stopped', 'failed', 'killed']: raise ValueError("Job failed: ", data) print('Job is not completed, sleeping for 10secs') sleep(10) except Exception as inst: print(inst) raise ValueError('Task failed due to', inst) def _query_presto(): try: with prestodb.dbapi.connect( host=default_args['presto_eng_host'], port=default_args['presto_eng_port'], user=default_args['wxd_username'], catalog='tpch', schema='tiny', http_scheme='https', auth=prestodb.auth.BasicAuthentication(f'ibmlhapikey_{default_args["wxd_username"]}', default_args["wxd_api_key"]) ) as conn: df = pd.read_sql_query(f"select * from {default_args['catalog_name']}.demodb.testTable limit 5", conn) with pd.option_context('display.max_rows', None, 'display.max_columns', None): print("\n", df.head()) except Exception as inst: print(inst) raise ValueError('Query faield due to ', inst) def start_job(): print('Validating default arguments') if 'wxd_endpoint' not in default_args: raise ValueError('wxd_endpoint is mandatory') if 'wxd_username' not in default_args: raise ValueError('wxd_username is mandatory') if 'wxd_instance_id' not in default_args: raise ValueError('wxd_instance_id is mandatory') if 'wxd_api_key' not in default_args: raise ValueError('wxd_api_key is mandatory') if 'spark_engine_id' not in default_args: raise ValueError('spark_engine_id is mandatory') start = PythonOperator(task_id='start_task', python_callable=start_job, dag=wxd_pipeline_dag) ingest_via_spark_engine = PythonOperator(task_id='ingest_via_spark_engine', python_callable=_ingest_via_spark_engine, dag=wxd_pipeline_dag) wait_until_ingestion_is_complete = PythonOperator(task_id='wait_until_ingestion_is_complete', python_callable=_wait_until_job_is_complete, dag=wxd_pipeline_dag) query_via_presto = PythonOperator(task_id='query_via_presto', python_callable=_query_presto, dag=wxd_pipeline_dag) start >> ingest_via_spark_engine >> wait_until_ingestion_is_complete >> query_via_presto
-
登录 Apache Airflow。
-
搜索
wxd_pipeline.py
作业,从 Apache Airflow 控制台页面启用 DAG。 工作流程成功执行。