IBM Cloud Docs
Apache Airflow를 사용한 오케스트레이션

Apache Airflow를 사용한 오케스트레이션

Apache Airflow는 워크플로우를 생성, 예약 및 모니터링할 수 있는 오픈 소스 플랫폼입니다. 워크플로는 Python 코드를 사용하여 작성된 여러 작업으로 구성된 방향성 비순환 그래프(DAG)로 정의됩니다. 각 작업은 스크립트 실행, 데이터베이스 쿼리 또는 API 호출과 같은 개별적인 작업 단위를 나타냅니다. Airflow 아키텍처는 확장 및 병렬 실행을 지원하므로 복잡하고 데이터 집약적인 파이프라인을 관리하는 데 적합합니다.

Apache 에어플로우가 지원하는 사용 사례는 다음과 같습니다:

  • ETL 또는 ELT 파이프라인: 다양한 소스에서 데이터를 추출하고, 변환하여 데이터 웨어하우스에 로드합니다.
  • 데이터 웨어하우징: 데이터 웨어하우스에서 정기적인 업데이트 및 데이터 변환을 예약합니다.
  • 데이터 처리: 여러 시스템에서 분산된 데이터 처리 작업을 오케스트레이션합니다.

전제조건

  • Apache Airflow 독립형 활성 인스턴스.
  • watsonx.data 사용자 API 키(사용자 이름 및 api_key). 예를 들어, ' username: ' yourid@example.com ' 및 ' api_key' : ' sfw....cv23' 입니다.
  • watsonx.data(wxd_instance_id)에 대한 CRN입니다. watsonx.data 정보 페이지에서 인스턴스 ID를 가져옵니다.
  • 활성 스파크 엔진의 스파크 엔진 ID(spark_engine_id)입니다.
  • Presto 활성 Presto 엔진의 외부 URL(presto_ext_url).
  • 시스템에서 신뢰하는 SSL 인증서 위치(해당되는 경우).
  • 스파크 및 Presto 엔진과 관련된 카탈로그(카탈로그_이름).
  • 선택한 카탈로그와 연결된 버킷의 이름입니다. (버킷_이름).
  • 다음 명령을 사용하여 패키지, 판다 및 Presto-python-client를 설치합니다: pip install pandas presto-python-client.

프로시저

  1. 이 사용 사례에서는 Presto 데이터를 수집하는 작업을 고려합니다. 이렇게 하려면 watsonx.data 카탈로그에 Iceberg 데이터를 수집하는 Spark 애플리케이션을 만드세요. 여기서는 샘플 Python 파일 ingestion-job.py가 고려됩니다.

    from pyspark.sql import SparkSession
    import os, sys
    
    def init_spark():
        spark = SparkSession.builder.appName("ingestion-demo").enableHiveSupport().getOrCreate()
        return spark
    
    def create_database(spark,bucket_name,catalog):
        spark.sql("create database if not exists {}.demodb LOCATION 's3a://{}/demodb'".format(catalog,bucket_name))
    
    def list_databases(spark,catalog):
        # list the database under lakehouse catalog
        spark.sql("show databases from {}".format(catalog)).show()
    
    def basic_iceberg_table_operations(spark,catalog):
        # demonstration: Create a basic Iceberg table, insert some data and then query table
        print("creating table")
        spark.sql("create table if not exists {}.demodb.testTable(id INTEGER, name VARCHAR(10), age INTEGER, salary DECIMAL(10, 2)) using iceberg".format(catalog)).show()
        print("table created")
        spark.sql("insert into {}.demodb.testTable values(1,'Alan',23,3400.00),(2,'Ben',30,5500.00),(3,'Chen',35,6500.00)".format(catalog))
        print("data inserted")
        spark.sql("select * from {}.demodb.testTable".format(catalog)).show()
    
    
    
    def clean_database(spark,catalog):
        # clean-up the demo database
        spark.sql("drop table if exists {}.demodb.testTable purge".format(catalog))
        spark.sql("drop database if exists {}.demodb cascade".format(catalog))
    
    def main(wxdDataBucket, wxdDataCatalog):
        try:
            spark = init_spark()
    
            create_database(spark,wxdDataBucket,wxdDataCatalog)
            list_databases(spark,wxdDataCatalog)
            basic_iceberg_table_operations(spark,wxdDataCatalog)
    
    
        finally:
            # clean-up the demo database
            clean_database(spark,wxdDataCatalog)
            spark.stop()
    
    if __name__ == '__main__':
        main(sys.argv[1],sys.argv[2])
    
    
  2. bucket_name 이라는 이름으로 파일을 스토리지에 업로드합니다. 자세한 내용은 버킷에 객체 추가하기 를 참조하세요.

  3. Python 사용하여 DAG 워크플로우를 설계하고 Python 파일을 Apache Airflow 디렉토리 위치인 ' $AIRFLOW_HOME/dags/ 디렉토리에 저장합니다(AIRFLOW_HOME의 기본값은 ~/airflow로 설정되어 있습니다).

    다음은 watsonx.data에서 Presto으로 데이터를 수집하고 watsonx.data에서 데이터를 쿼리하는 작업을 실행하는 워크플로우의 예입니다. 다음 콘텐츠가 포함된 파일을 wxd_pipeline.py 로 저장합니다.

    
    from datetime import timedelta, datetime
    from time import sleep
    import prestodb
    import pandas as pd
    import base64
    import os # type: ignore
    
    # The DAG object
    from airflow import DAG
    
    # Operators
    from airflow.operators.python_operator import PythonOperator # type: ignore
    import requests
    
    # Initializing the default arguments
    default_args = {
        'owner': 'IBM watsonx.data',
        'start_date': datetime(2024, 3, 4),
        'retries': 3,
        'retry_delay': timedelta(minutes=5),
        'wxd_endpoint': 'https://us-south.lakehouse.cloud.ibm.com', # Host endpoint
        'wxd_instance_id': 'crn:...::', # watsonx.data CRN
        'wxd_username': 'yourid@example.com', # your email id
        'wxd_api_key': 'sfw....cv23', # IBM IAM Api Key
        'spark_engine_id': 'spark6', # Spark Engine id
        'catalog_name': 'my_iceberg_catalog', # Catalog name where data will be ingestion
        'bucket_name': 'my-wxd-bucket', # Bucket name (not display name) associated with the above catalog
        'presto_eng_host': '2ce72...d59.cise...5s20.lakehouse.appdomain.cloud', # Presto engine hostname (without protocol and port)
        'presto_eng_port': 30912 # Presto engine port (in numbers only)
    }
    
    # Instantiate a DAG object
    wxd_pipeline_dag = DAG('wxd_ingestion_pipeline_saas',
            default_args=default_args,
            description='watsonx.data ingestion pipeline',
            schedule_interval=None,
            is_paused_upon_creation=True,
            catchup=False,
            max_active_runs=1,
            tags=['wxd', 'watsonx.data']
    )
    
    # Workaround: Enable if you want to disable SSL verification
    os.environ['NO_PROXY'] = '*'
    
    
    # Get access token
    def get_access_token():
        try:
            url = f"https://iam.cloud.ibm.com/oidc/token"
            headers = {
                'Content-Type': 'application/x-www-form-urlencoded',
                'Accept': 'application/json',
            }
    
            data = {
                'grant_type': 'urn:ibm:params:oauth:grant-type:apikey',
                'apikey': default_args['wxd_api_key'],
            }
    
    
            response = requests.post('https://iam.cloud.ibm.com/identity/token', headers=headers, data=data)
    
            return response.json()['access_token']
        except Exception as inst:
            print('Error in getting access token')
            print(inst)
            exit
    
    
    def _ingest_via_spark_engine():
        try:
            print('ingest__via_spark_engine')
            url = f"{default_args['wxd_endpoint']}/lakehouse/api/v2/spark_engines/{default_args['spark_engine_id']}/applications"
    
            headers = {'Content-type': 'application/json', 'Authorization': f'Bearer {get_access_token()}', 'AuthInstanceId': default_args['wxd_instance_id']}
            auth_str = base64.b64encode(f'ibmlhapikey_{default_args["wxd_username"]}:{default_args["wxd_api_key"]}'.encode('ascii')).decode("ascii")
    
            response = requests.post(url, None, {
                "application_details": {
                    "conf": {
                        "spark.executor.cores": "1",
                        "spark.executor.memory": "1G",
                        "spark.driver.cores": "1",
                        "spark.driver.memory": "1G",
                        "spark.hadoop.wxd.apikey": f"Basic {auth_str}"
                    },
                    "application": f"s3a://{default_args['bucket_name']}/ingestion-job.py",
                    "arguments": [
                        default_args['bucket_name'],
                        default_args['catalog_name']
                    ],
                }
            } , headers=headers, verify=False)
    
            print("Response", response.content)
            return response.json()['id']
        except Exception as inst:
            print(inst)
            raise ValueError('Task failed due to', inst)
    
    
    def _wait_until_job_is_complete(**context):
        try:
            print('wait_until_job_is_complete')
            application_id = context['task_instance'].xcom_pull(task_ids='ingest_via_spark_engine')
            print(application_id)
    
            while True:
                url = f"{default_args['wxd_endpoint']}/lakehouse/api/v2/spark_engines/{default_args['spark_engine_id']}/applications/{application_id}"
                headers = {'Content-type': 'application/json', 'Authorization': f'Bearer {get_access_token()}', 'AuthInstanceId': default_args['wxd_instance_id']}
    
                response = requests.get(url, headers=headers, verify=False)
                print(response.content)
    
                data = response.json()
    
                if data['state'] == 'finished':
                    break
                elif data['state'] in ['stopped', 'failed', 'killed']:
                    raise ValueError("Job failed: ", data)
    
                print('Job is not completed, sleeping for 10secs')
                sleep(10)
        except Exception as inst:
            print(inst)
            raise ValueError('Task failed due to', inst)
    
    
    def _query_presto():
        try:
            with prestodb.dbapi.connect(
                host=default_args['presto_eng_host'],
                port=default_args['presto_eng_port'],
                user=default_args['wxd_username'],
                catalog='tpch',
                schema='tiny',
                http_scheme='https',
                auth=prestodb.auth.BasicAuthentication(f'ibmlhapikey_{default_args["wxd_username"]}', default_args["wxd_api_key"])
            ) as conn:
                df = pd.read_sql_query(f"select * from {default_args['catalog_name']}.demodb.testTable limit 5", conn)
    
                with pd.option_context('display.max_rows', None, 'display.max_columns', None):
                    print("\n", df.head())
        except Exception as inst:
            print(inst)
            raise ValueError('Query faield due to ', inst)
    
    
    def start_job():
        print('Validating default arguments')
    
        if 'wxd_endpoint' not in default_args:
            raise ValueError('wxd_endpoint is mandatory')
    
        if 'wxd_username' not in default_args:
            raise ValueError('wxd_username is mandatory')
    
        if 'wxd_instance_id' not in default_args:
            raise ValueError('wxd_instance_id is mandatory')
    
        if 'wxd_api_key' not in default_args:
            raise ValueError('wxd_api_key is mandatory')
    
        if 'spark_engine_id' not in default_args:
            raise ValueError('spark_engine_id is mandatory')
    
    
    start = PythonOperator(task_id='start_task', python_callable=start_job, dag=wxd_pipeline_dag)
    
    ingest_via_spark_engine = PythonOperator(task_id='ingest_via_spark_engine', python_callable=_ingest_via_spark_engine, dag=wxd_pipeline_dag)
    wait_until_ingestion_is_complete = PythonOperator(task_id='wait_until_ingestion_is_complete', python_callable=_wait_until_job_is_complete, dag=wxd_pipeline_dag)
    query_via_presto = PythonOperator(task_id='query_via_presto', python_callable=_query_presto, dag=wxd_pipeline_dag)
    
    start >> ingest_via_spark_engine >> wait_until_ingestion_is_complete >> query_via_presto
    
    
  4. Apache Airflow 로그인합니다.

  5. wxd_pipeline.py 작업을 검색하고, Apache Airflow 콘솔 페이지에서 DAG를 활성화합니다. 워크플로우가 성공적으로 실행됩니다.