Airflow를 사용하다 보면 DAG 간의 의존성을 관리해야 하는 상황이 자주 발생합니다. 특히 Trigger DAG Run Operator나 External Task Sensor를 사용해 DAG 간 연결을 만들다 보면, 강한 연결 구조로 인해 관리가 복잡해지고 확장성이 떨어지는 문제가 생길 수 있습니다. 이런 문제를 해결하기 위해 Airflow는 Dataset 기능을 제공합니다.
Dataset의 필요성
기존 문제점: 강한 연결 구조
DAG 간 의존성을 Trigger Operator와 External Sensor로 처리하면, 연결 관리에 많은 노력이 필요합니다. 이러한 방식은 강한 연결(Strong Coupling) 구조를 형성하기 때문에, DAG가 많아질수록 확장성과 유연성이 떨어집니다.
해결책: 약한 연결 구조
Dataset 기능은 큐 시스템의 Publish/Subscribe 구조를 차용하여 DAG 간 의존성을 관리합니다. 이 방식은 DAG 간의 직접적인 연결을 줄이고 약한 연결(Loose Coupling) 구조를 만들어 확장성과 유지보수성을 크게 개선합니다.
Dataset 기능 개요
- Produce/Consume 구조
- 특정 Task가 완료되면, 이를 특정 키(Key) 값으로 Publish합니다.
- 다른 DAG는 해당 키를 Consume하여 트리거됩니다.
- Pub/Sub 모델 적용
- Dataset은 DAG 간 의존성을 설정할 수 있는 Pub/Sub 모델을 제공합니다.
- DAG는 고유한 Key 값을 가지는 Dataset을 Publish하며, 다른 DAG는 이를 구독(Consume)합니다.
- 스케줄 관리
- Consume DAG은 별도의 스케줄을 정의하지 않고, 구독 중인 Dataset에 의해 자동으로 트리거됩니다.
- Dataset에 의해 시작된 DAG의 Run ID는 dataset_triggered_{trigger된 시간} 형식으로 표현됩니다.
- UI 모니터링
- Airflow UI의 Datasets 메뉴에서 Dataset 현황을 쉽게 모니터링할 수 있습니다.
Dataset 활용 코드 예시
1. Dataset 생성 및 Producer DAG 작성
Producer 1
dag_dataset_producer_1 DAG은 "dag_dataset_producer_1"이라는 Key 값을 가진 Dataset을 Publish합니다.
from airflow import Dataset
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
dataset_dag_dataset_producer_1 = Dataset("dag_dataset_producer_1") # Publish할 Dataset Key
with DAG(
dag_id='dag_dataset_producer_1',
schedule='0 7 * * *',
start_date=pendulum.datetime(2024, 11, 25, tz='Asia/Seoul'),
catchup=False
) as dag:
bask_task = BashOperator(
task_id='bash_task',
outlets=[dataset_dag_dataset_producer_1], # Publish할 Dataset 명시
bash_command='echo "producer_1 job done"'
)
Producer 2
dag_dataset_producer_2 DAG은 "dag_dataset_producer_2"라는 Key 값을 가진 Dataset을 Publish합니다.
from airflow import Dataset
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
dataset_dag_dataset_producer_2 = Dataset("dag_dataset_producer_2") # Publish할 Dataset Key
with DAG(
dag_id='dag_dataset_producer_2',
schedule='0 7 * * *',
start_date=pendulum.datetime(2024, 11, 25, tz='Asia/Seoul'),
catchup=False
) as dag:
bask_task = BashOperator(
task_id='bash_task',
outlets=[dataset_dag_dataset_producer_2], # Publish할 Dataset 명시
bash_command='echo "producer_2 job done"'
)
2. Dataset Consumer DAG 작성
Consumer 1
dag_dataset_consumer_1 DAG은 dag_dataset_producer_1 Dataset을 구독합니다. Producer 1 DAG이 완료되면 자동으로 트리거됩니다.
from airflow import Dataset
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
dataset_dag_dataset_producer_1 = Dataset("dag_dataset_producer_1") # 구독할 Dataset Key
with DAG(
dag_id='dag_dataset_consumer_1',
schedule=[dataset_dag_dataset_producer_1], # 구독할 Dataset 명시
start_date=pendulum.datetime(2024, 11, 25, tz='Asia/Seoul'),
catchup=False
) as dag:
bask_task = BashOperator(
task_id='bash_task',
bash_command='echo {{ ti.run_id }} && echo "producer_1이 완료되면 수행됨"'
)
Consumer 2
dag_dataset_consumer_2 DAG은 dag_dataset_producer_1과 dag_dataset_producer_2 두 개의 Dataset을 구독합니다. 두 Producer DAG이 모두 완료된 이후에 실행됩니다.
from airflow import Dataset
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
dataset_dag_dataset_producer_1 = Dataset("dag_dataset_producer_1")
dataset_dag_dataset_producer_2 = Dataset("dag_dataset_producer_2")
with DAG(
dag_id='dag_dataset_consumer_2',
schedule=[dataset_dag_dataset_producer_1, dataset_dag_dataset_producer_2], # 구독할 Dataset 명시
start_date=pendulum.datetime(2024, 11, 25, tz='Asia/Seoul'),
catchup=False
) as dag:
bask_task = BashOperator(
task_id='bash_task',
bash_command='echo {{ ti.run_id }} && echo "producer_1과 producer_2가 완료되면 수행됨"'
)
두 개의 Dataset을 구독하고 있으므로 0 of 2 datasets updated로 나타납니다.
produce 1, 2와 Consumer 1, 2의 그래프
결론
Airflow Dataset 기능은 DAG 간 의존성을 보다 효율적으로 관리할 수 있는 도구입니다. Pub/Sub 구조를 통해 강한 연결을 약한 연결로 전환하고, DAG 간의 확장성을 대폭 개선할 수 있습니다. 또한, Dataset을 활용하면 DAG의 스케줄링과 의존성 관리가 간소화되며, Airflow UI를 통해 Dataset 현황을 모니터링할 수 있습니다.
'MLOps > Airflow' 카테고리의 다른 글
[Airflow] default_args 알아보기 (0) | 2024.12.04 |
---|---|
[Airflow] ExternalTaskSensor: DAG 간 의존성 설정과 활용 (0) | 2024.11.24 |
[Airflow] File Sensor에 대하여 (1) | 2024.11.21 |
[Airflow] Sensor에 대해 알아보자 (0) | 2024.11.19 |
[Airflow] Provider 패키지 설치 (2) | 2024.11.17 |