MLOps/Airflow

[Airflow] ExternalTaskSensor: DAG 간 의존성 설정과 활용

monkeykim 2024. 11. 24. 17:27

DAG 간 의존성을 설정하는 방법

Airflow에서는 DAG 간 의존성을 설정하는 데 두 가지 주요 방법을 제공합니다.

1. TriggerDagRunOperator

  • 한 DAG의 특정 작업이 완료되었을 때 다른 DAG을 실행하도록 설정할 수 있습니다.
  • DAG 간의 순차적 실행이 필요할 때 유용합니다.

2. ExternalTaskSensor

  • 외부 DAG의 특정 작업(Task)이 완료된 후 현재 DAG의 작업(Task)을 실행하고 싶을 때 사용됩니다.
  • 작업 간의 동기화를 보장하며, 복잡한 DAG 네트워크에서 의존성을 쉽게 관리할 수 있습니다.

https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html

 

airflow.sensors.external_task — Airflow Documentation

 

airflow.apache.org


ExternalTaskSensor 주요 파라미터

ExternalTaskSensor는 외부 DAG 및 작업 상태를 모니터링하기 위한 다양한 파라미터를 제공합니다. 다음은 주요 파라미터와 활용법입니다.

  • external_dag_id
    • 모니터링할 대상 DAG의 ID를 지정합니다.
    • 예: external_dag_id='target_dag'
  • external_task_id / external_task_ids
    • 대상 DAG 내에서 모니터링할 특정 작업의 ID를 지정합니다.
    • 여러 작업을 지정하려면 external_task_ids에 리스트 형태로 전달합니다.
    • 예: external_task_id='task_a' 또는 external_task_ids=['task_a', 'task_b']
  • external_task_group_id
    • TaskGroup 단위로 모니터링이 필요할 경우 사용합니다.
  • allowed_states
    • 모니터링 대상 작업이 완료된 상태를 정의합니다. 기본값은 ['success']입니다.
    • 예: allowed_states=[State.SUCCESS]
  • execution_delta
    • DAG 실행 시간 간의 차이를 나타냅니다.
    • 현재 DAG이 과거의 특정 작업을 참조해야 할 경우 양의 timedelta 값을 전달합니다.
    • 예: execution_delta=timedelta(hours=6)
  • execution_date_fn
    • execution_delta 대신 커스텀 함수로 실행 날짜를 계산할 수 있습니다.
  • check_existence
    • 대상 DAG이나 작업(Task)이 존재하지 않을 경우 센서 실행을 중단합니다.
  • poll_interval
    • 상태를 주기적으로 확인하는 시간 간격(초)입니다.
  • deferrable
    • 센서를 비동기적으로 실행하여 리소스 사용을 줄일 수 있습니다.

ExternalTaskSensor 예제

ExternalTaskSensor를 사용하는 DAG

from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
import pendulum
from datetime import timedelta
from airflow.utils.state import State

with DAG(
    dag_id='dag_external_task_sensor',
    schedule="0 7 * * *",
    start_date=pendulum.datetime(2024, 11, 15, tz="Asia/Seoul"),
    catchup=False
) as dag:

    external_task_sensor_a = ExternalTaskSensor(
        task_id='external_task_sensor_a',
        external_dag_id='dag_branch_python_operator',
        external_task_id='task_a',
        allowed_states=[State.SUCCESS],
        execution_delta=timedelta(hours=6),
        poke_interval=10
    )

모니터링 대상 DAG

from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
import pendulum

with DAG(
    dag_id="dag_branch_python_operator",
    schedule="0 1 * * *",
    start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:

    def select_random():
        import random
        return random.choice(['task_a', 'task_b'])

    branch_task = BranchPythonOperator(
        task_id='branch_task',
        python_callable=select_random
    )

    task_a = PythonOperator(task_id='task_a', python_callable=lambda: print("Task A"))
    task_b = PythonOperator(task_id='task_b', python_callable=lambda: print("Task B"))

    branch_task >> [task_a, task_b]