- Trigger Rule: 상위 태스크의 상태에 따라 하위 태스크의 실행을 제어합니다.
- Task Group: 태스크를 논리적으로 조직하여 가독성을 높이고 유지 보수를 쉽게 합니다.
- Edge Label: 태스크 간 의존성을 주석 처리하여 DAG 시각화를 더 명확히 합니다.
- DAG 간 의존성 관리: TriggerDagRunOperator와 ExternalTaskSensor를 사용하여 여러 DAG 간의 의존성을 관리합니다.
Airflow에서의 트리거 규칙 이해하기
Trigger Rule이란?
Airflow에서 Trigger Rule은 상위 태스크의 상태에 따라 하위 태스크가 실행될지 여부를 결정하는 규칙입니다. 기본적으로는 모든 상위 태스크가 성공해야 하위 태스크가 실행됩니다(all_success). 하지만 상위 태스크 중 일부가 실패하거나 건너뛰어진 경우에도 하위 태스크가 실행되어야 하는 상황이 있습니다.
Trigger Rule이 중요한 이유
Trigger Rule을 사용하면 복잡한 워크플로우에서 다음과 같은 제어가 가능합니다:
- 다양한 조건에 따른 태스크 실행 흐름을 제어
- 예외와 실패를 유연하게 처리
- 불필요한 태스크 실행을 방지하여 워크플로우 최적화
Trigger Rule의 종류와 사용 예시
Airflow에서는 다양한 트리거 규칙을 제공합니다:
- all_success (기본값)
- 설명: 모든 상위 태스크가 성공하면 실행됩니다.
- 사용 예시: 정상적인 워크플로우에서 하위 태스크가 상위 태스크의 성공에 따라 실행되어야 할 때.
- all_failed
- 설명: 모든 상위 태스크가 실패했을 때 실행됩니다.
- 사용 예시: 모든 상위 태스크가 실패한 경우 오류를 알리는 태스크 실행에 활용.
- all_done
- 설명: 모든 상위 태스크가 완료되면 실행됩니다(성공 여부에 상관없음).
- 사용 예시: 상위 태스크 상태와 관계없이 로그 정리와 같은 후속 작업이 필요한 경우.
- all_skipped
- 설명: 상위 태스크가 모두 건너뛰어진 경우 실행됩니다.
- 사용 예시: 특정 조건에 따라 경로를 건너뛰고 대체 경로를 실행해야 할 때.
- one_success
- 설명: 상위 태스크 중 하나 이상이 성공하면 실행됩니다.
- 사용 예시: 여러 경로 중 하나라도 성공한 경우 후속 작업을 실행하는 데 유용합니다.
- one_failed
- 설명: 상위 태스크 중 하나 이상 실패하면 실행됩니다.
- 사용 예시: 상위 태스크 중 일부가 실패할 경우 알림 또는 롤백 작업을 실행해야 할 때.
- one_done
- 설명: 상위 태스크 중 하나 이상이 완료되면 실행됩니다(성공, 실패 무관).
- 사용 예시: 경로 중 하나라도 완료되면 후속 작업을 바로 수행해야 하는 경우.
- none_failed
- 설명: 상위 태스크에 실패가 없는 경우 실행됩니다.
- 사용 예시: 상위 태스크 중 실패가 없을 때만 후속 작업이 필요할 때.
- none_failed_or_skipped
- 설명: 상위 태스크에 실패나 건너뛰기가 없는 경우, 성공한 태스크가 있을 때 실행됩니다.
- 사용 예시: 성공적으로 처리된 상위 태스크가 있는 경우에만 실행해야 할 때.
- none_skipped
- 설명: 상위 태스크 중 하나도 건너뛰지 않은 경우 실행됩니다.
- 사용 예시: 정상적으로 진행된 상위 태스크가 있을 때 후속 작업이 필요할 때.
- always
- 설명: 상위 태스크의 상태와 관계없이 항상 실행됩니다.
- 사용 예시: 데이터 정리나 로그 백업 등 항상 수행해야 하는 작업에 유용합니다.
예제 1: all_done 트리거 규칙 사용
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
from airflow.decorators import task
from airflow.exceptions import AirflowException
with DAG(
dag_id="dag_with_trigger_rule_all_done",
schedule="10 9 * * *",
start_date=pendulum.datetime(2023, 11, 10, tz="Asia/Seoul"),
catchup=False,
) as dag:
bash_task = BashOperator(
task_id='bash_task',
bash_command='echo "Executing bash_task"',
)
@task(task_id='failing_task')
def failing_task():
raise AirflowException('Intentional Failure')
@task(task_id='successful_task')
def successful_task():
print('successful_task executed successfully')
@task(task_id='downstream_task', trigger_rule='all_done')
def downstream_task():
print('downstream_task executed regardless of upstream task states')
[bash_task, failing_task(), successful_task()] >> downstream_task()
예제 2: none_skipped 트리거 규칙 사용하기
from airflow import DAG
import pendulum
from airflow.decorators import task
with DAG(
dag_id="dag_with_trigger_rule_none_skipped",
schedule="10 9 * * *",
start_date=pendulum.datetime(2023, 11, 10, tz="Asia/Seoul"),
catchup=False,
) as dag:
@task.branch(task_id='branching_task')
def branching_task():
import random
options = ['option_a', 'option_b', 'option_c']
selected_option = random.choice(options)
if selected_option == 'option_a':
return 'task_a'
else:
return ['task_b', 'task_c']
@task(task_id='task_a')
def task_a():
print('Executing task_a')
@task(task_id='task_b')
def task_b():
print('Executing task_b')
@task(task_id='task_c')
def task_c():
print('Executing task_c')
@task(task_id='final_task', trigger_rule='none_skipped')
def final_task():
print('Executing final_task')
branching_task() >> [task_a(), task_b(), task_c()] >> final_task()
Task Group을 활용한 워크플로우 구성
Task Group이란?
Airflow의 Task Group은 DAG 내에서 태스크를 논리적으로 묶는 기능입니다. 복잡한 DAG에서 관련 태스크들을 그룹화하여 가독성을 높이고 유지 보수를 쉽게 할 수 있습니다.
- 복잡한 DAG를 단순화: 관련된 태스크를 그룹화하여 시각적 혼란을 줄일 수 있습니다.
- 계층적 구성: 태스크 그룹은 다른 태스크 그룹 내에 중첩될 수 있습니다.
- 재사용성: 공통 기능을 갖는 태스크 그룹을 함수로 정의하여 여러 DAG에서 재사용할 수 있습니다.
@task_group 데코레이터 사용하기
@task_group 데코레이터를 사용하면 태스크 그룹을 손쉽게 생성할 수 있습니다.
from airflow import DAG
import pendulum
from airflow.decorators import task, task_group
with DAG(
dag_id="dag_with_task_groups",
schedule="10 9 * * *",
start_date=pendulum.datetime(2023, 11, 10, tz="Asia/Seoul"),
catchup=False,
) as dag:
@task_group(group_id='data_extraction_group')
def data_extraction():
"""데이터 추출을 위한 태스크 그룹"""
@task(task_id='extract_data')
def extract_data():
print('Data extracted')
@task(task_id='validate_data')
def validate_data():
print('Data validated')
extract_data() >> validate_data()
@task_group(group_id='data_processing_group')
def data_processing():
"""데이터 처리를 위한 태스크 그룹"""
@task(task_id='process_data')
def process_data():
print('Data processed')
@task(task_id='store_data')
def store_data():
print('Data stored')
process_data() >> store_data()
data_extraction() >> data_processing()
DAG 시각화 개선을 위한 Edge Label 사용
Edge Label이란?
Edge Label은 DAG에서 태스크 간의 연결(엣지)을 주석 처리하여 태스크 간의 관계를 명확히 설명하는 데 사용됩니다. 이를 통해 다음과 같은 장점이 있습니다:
- 복잡한 DAG의 가독성을 향상
- 워크플로우 흐름에 대한 추가 설명 제공
엣지 라벨 사용 방법
airflow.utils.edgemodifier 모듈의 Label 클래스를 사용하여 엣지에 라벨을 추가할 수 있습니다.
from airflow import DAG
import pendulum
from airflow.operators.empty import EmptyOperator
from airflow.utils.edgemodifier import Label
with DAG(
dag_id="dag_with_edge_labels",
schedule="10 9 * * *",
start_date=pendulum.datetime(2023, 11, 10, tz="Asia/Seoul"),
catchup=False,
) as dag:
start = EmptyOperator(task_id='start')
middle = EmptyOperator(task_id='middle')
end = EmptyOperator(task_id='end')
start >> Label('Begin processing') >> middle >> Label('Finish processing') >> end
DAG 간 의존성 관리
복잡한 워크플로우에서는 서로 다른 DAG 간의 의존성을 관리해야 할 때가 있습니다. 이를 위해 Airflow는 TriggerDagRunOperator와 ExternalTaskSensor 두 가지 방법을 제공합니다.
TriggerDagRunOperator 사용하기
TriggerDagRunOperator는 현재 DAG가 다른 DAG를 트리거할 수 있도록 합니다.
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
dag_id="dag_triggering_another_dag",
schedule="10 9 * * *",
start_date=pendulum.datetime(2023, 11, 10, tz="Asia/Seoul"),
catchup=False,
) as dag:
start_task = BashOperator(
task_id='start_task',
bash_command='echo "Starting DAG"',
)
trigger_task = TriggerDagRunOperator(
task_id='trigger_other_dag',
trigger_dag_id='dag_to_be_triggered',
execution_date='{{ ds }}',
reset_dag_run=True,
wait_for_completion=False,
)
start_task >> trigger_task
ExternalTaskSensor 사용하기
ExternalTaskSensor는 다른 DAG의 특정 태스크가 완료될 때까지 대기한 후 다음 작업을 수행합니다.
from airflow import DAG
import pendulum
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.bash import BashOperator
with DAG(
dag_id="dag_waiting_for_other_dag",
schedule="15 9 * * *",
start_date=pendulum.datetime(2023, 11, 10, tz="Asia/Seoul"),
catchup=False,
) as dag:
wait_for_task = ExternalTaskSensor(
task_id='wait_for_task_in_other_dag',
external_dag_id='dag_to_be_waited_on',
external_task_id='task_to_wait_for',
timeout=600,
poke_interval=60,
mode='poke',
)
proceed_task = BashOperator(
task_id='proceed_task',
bash_command='echo "Proceeding after external task completion"',
)
wait_for_task >> proceed_task
'MLOps > Airflow' 카테고리의 다른 글
[Airflow] Custom Operator 만들기 (0) | 2024.11.11 |
---|---|
[Airflow] SimpleHttpOperator에 대해 알아보기 (1) | 2024.11.10 |
[Airflow] BranchPythonOperator로 Task 분기 처리하기 (0) | 2024.11.08 |
[Airflow] 전역 변수 Variable 이용하기 (0) | 2024.11.07 |
[Airflow] 서로 다른 Operator 간 XCom 사용 (Python, Bash, Email) (0) | 2024.11.07 |