DAG 간 의존성을 설정하는 방법
Airflow에서는 DAG 간 의존성을 설정하는 데 두 가지 주요 방법을 제공합니다.
1. TriggerDagRunOperator
- 한 DAG의 특정 작업이 완료되었을 때 다른 DAG을 실행하도록 설정할 수 있습니다.
- DAG 간의 순차적 실행이 필요할 때 유용합니다.
2. ExternalTaskSensor
- 외부 DAG의 특정 작업(Task)이 완료된 후 현재 DAG의 작업(Task)을 실행하고 싶을 때 사용됩니다.
- 작업 간의 동기화를 보장하며, 복잡한 DAG 네트워크에서 의존성을 쉽게 관리할 수 있습니다.
https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html
ExternalTaskSensor 주요 파라미터
ExternalTaskSensor는 외부 DAG 및 작업 상태를 모니터링하기 위한 다양한 파라미터를 제공합니다. 다음은 주요 파라미터와 활용법입니다.
- external_dag_id
- 모니터링할 대상 DAG의 ID를 지정합니다.
- 예: external_dag_id='target_dag'
- external_task_id / external_task_ids
- 대상 DAG 내에서 모니터링할 특정 작업의 ID를 지정합니다.
- 여러 작업을 지정하려면 external_task_ids에 리스트 형태로 전달합니다.
- 예: external_task_id='task_a' 또는 external_task_ids=['task_a', 'task_b']
- external_task_group_id
- TaskGroup 단위로 모니터링이 필요할 경우 사용합니다.
- allowed_states
- 모니터링 대상 작업이 완료된 상태를 정의합니다. 기본값은 ['success']입니다.
- 예: allowed_states=[State.SUCCESS]
- execution_delta
- DAG 실행 시간 간의 차이를 나타냅니다.
- 현재 DAG이 과거의 특정 작업을 참조해야 할 경우 양의 timedelta 값을 전달합니다.
- 예: execution_delta=timedelta(hours=6)
- execution_date_fn
- execution_delta 대신 커스텀 함수로 실행 날짜를 계산할 수 있습니다.
- check_existence
- 대상 DAG이나 작업(Task)이 존재하지 않을 경우 센서 실행을 중단합니다.
- poll_interval
- 상태를 주기적으로 확인하는 시간 간격(초)입니다.
- deferrable
- 센서를 비동기적으로 실행하여 리소스 사용을 줄일 수 있습니다.
ExternalTaskSensor 예제
ExternalTaskSensor를 사용하는 DAG
from airflow import DAG
from airflow.sensors.external_task import ExternalTaskSensor
import pendulum
from datetime import timedelta
from airflow.utils.state import State
with DAG(
dag_id='dag_external_task_sensor',
schedule="0 7 * * *",
start_date=pendulum.datetime(2024, 11, 15, tz="Asia/Seoul"),
catchup=False
) as dag:
external_task_sensor_a = ExternalTaskSensor(
task_id='external_task_sensor_a',
external_dag_id='dag_branch_python_operator',
external_task_id='task_a',
allowed_states=[State.SUCCESS],
execution_delta=timedelta(hours=6),
poke_interval=10
)
모니터링 대상 DAG
from airflow import DAG
from airflow.operators.python import BranchPythonOperator, PythonOperator
import pendulum
with DAG(
dag_id="dag_branch_python_operator",
schedule="0 1 * * *",
start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
def select_random():
import random
return random.choice(['task_a', 'task_b'])
branch_task = BranchPythonOperator(
task_id='branch_task',
python_callable=select_random
)
task_a = PythonOperator(task_id='task_a', python_callable=lambda: print("Task A"))
task_b = PythonOperator(task_id='task_b', python_callable=lambda: print("Task B"))
branch_task >> [task_a, task_b]
'MLOps > Airflow' 카테고리의 다른 글
[Airflow] default_args 알아보기 (0) | 2024.12.04 |
---|---|
[Airflow] Dataset: DAG 간 의존성 관리 (0) | 2024.12.03 |
[Airflow] File Sensor에 대하여 (1) | 2024.11.21 |
[Airflow] Sensor에 대해 알아보자 (0) | 2024.11.19 |
[Airflow] Provider 패키지 설치 (2) | 2024.11.17 |