MLOps/Airflow

[Airflow] Dataset: DAG 간 의존성 관리

monkeykim 2024. 12. 3. 00:20

Airflow를 사용하다 보면 DAG 간의 의존성을 관리해야 하는 상황이 자주 발생합니다. 특히 Trigger DAG Run Operator External Task Sensor를 사용해 DAG 간 연결을 만들다 보면, 강한 연결 구조로 인해 관리가 복잡해지고 확장성이 떨어지는 문제가 생길 수 있습니다. 이런 문제를 해결하기 위해 Airflow는 Dataset 기능을 제공합니다.


Dataset의 필요성

기존 문제점: 강한 연결 구조

DAG 간 의존성을 Trigger Operator와 External Sensor로 처리하면, 연결 관리에 많은 노력이 필요합니다. 이러한 방식은 강한 연결(Strong Coupling) 구조를 형성하기 때문에, DAG가 많아질수록 확장성과 유연성이 떨어집니다.

해결책: 약한 연결 구조

Dataset 기능은 큐 시스템의 Publish/Subscribe 구조를 차용하여 DAG 간 의존성을 관리합니다. 이 방식은 DAG 간의 직접적인 연결을 줄이고 약한 연결(Loose Coupling) 구조를 만들어 확장성과 유지보수성을 크게 개선합니다.


Dataset 기능 개요

  1. Produce/Consume 구조
    • 특정 Task가 완료되면, 이를 특정 키(Key) 값으로 Publish합니다.
    • 다른 DAG는 해당 키를 Consume하여 트리거됩니다.
  2. Pub/Sub 모델 적용
    • Dataset은 DAG 간 의존성을 설정할 수 있는 Pub/Sub 모델을 제공합니다.
    • DAG는 고유한 Key 값을 가지는 Dataset을 Publish하며, 다른 DAG는 이를 구독(Consume)합니다.
  3. 스케줄 관리
    • Consume DAG은 별도의 스케줄을 정의하지 않고, 구독 중인 Dataset에 의해 자동으로 트리거됩니다.
    • Dataset에 의해 시작된 DAG의 Run ID는 dataset_triggered_{trigger된 시간} 형식으로 표현됩니다.
  4. UI 모니터링
    • Airflow UI의 Datasets 메뉴에서 Dataset 현황을 쉽게 모니터링할 수 있습니다.

datasets graph


Dataset 활용 코드 예시

1. Dataset 생성 및 Producer DAG 작성

Producer 1

dag_dataset_producer_1 DAG은 "dag_dataset_producer_1"이라는 Key 값을 가진 Dataset을 Publish합니다.

from airflow import Dataset
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum

dataset_dag_dataset_producer_1 = Dataset("dag_dataset_producer_1")  # Publish할 Dataset Key

with DAG(
    dag_id='dag_dataset_producer_1',
    schedule='0 7 * * *',
    start_date=pendulum.datetime(2024, 11, 25, tz='Asia/Seoul'),
    catchup=False
) as dag:
    bask_task = BashOperator(
        task_id='bash_task',
        outlets=[dataset_dag_dataset_producer_1],  # Publish할 Dataset 명시
        bash_command='echo "producer_1 job done"'
    )

Producer 2

dag_dataset_producer_2 DAG은 "dag_dataset_producer_2"라는 Key 값을 가진 Dataset을 Publish합니다.

from airflow import Dataset
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum

dataset_dag_dataset_producer_2 = Dataset("dag_dataset_producer_2")  # Publish할 Dataset Key

with DAG(
    dag_id='dag_dataset_producer_2',
    schedule='0 7 * * *',
    start_date=pendulum.datetime(2024, 11, 25, tz='Asia/Seoul'),
    catchup=False
) as dag:
    bask_task = BashOperator(
        task_id='bash_task',
        outlets=[dataset_dag_dataset_producer_2],  # Publish할 Dataset 명시
        bash_command='echo "producer_2 job done"'
    )

2. Dataset Consumer DAG 작성

Consumer 1

dag_dataset_consumer_1 DAG은 dag_dataset_producer_1 Dataset을 구독합니다. Producer 1 DAG이 완료되면 자동으로 트리거됩니다.

from airflow import Dataset
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum

dataset_dag_dataset_producer_1 = Dataset("dag_dataset_producer_1")  # 구독할 Dataset Key

with DAG(
    dag_id='dag_dataset_consumer_1',
    schedule=[dataset_dag_dataset_producer_1],  # 구독할 Dataset 명시
    start_date=pendulum.datetime(2024, 11, 25, tz='Asia/Seoul'),
    catchup=False
) as dag:
    bask_task = BashOperator(
        task_id='bash_task',
        bash_command='echo {{ ti.run_id }} && echo "producer_1이 완료되면 수행됨"'
    )

Consumer 2

dag_dataset_consumer_2 DAG은 dag_dataset_producer_1과 dag_dataset_producer_2 두 개의 Dataset을 구독합니다. 두 Producer DAG이 모두 완료된 이후에 실행됩니다.

from airflow import Dataset
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum

dataset_dag_dataset_producer_1 = Dataset("dag_dataset_producer_1")
dataset_dag_dataset_producer_2 = Dataset("dag_dataset_producer_2")

with DAG(
    dag_id='dag_dataset_consumer_2',
    schedule=[dataset_dag_dataset_producer_1, dataset_dag_dataset_producer_2],  # 구독할 Dataset 명시
    start_date=pendulum.datetime(2024, 11, 25, tz='Asia/Seoul'),
    catchup=False
) as dag:
    bask_task = BashOperator(
        task_id='bash_task',
        bash_command='echo {{ ti.run_id }} && echo "producer_1과 producer_2가 완료되면 수행됨"'
    )

두 개의 Dataset을 구독하고 있으므로 0 of 2 datasets updated로 나타납니다. 

 


produce 1, 2와 Consumer 1, 2의 그래프

 


결론

Airflow Dataset 기능은 DAG 간 의존성을 보다 효율적으로 관리할 수 있는 도구입니다. Pub/Sub 구조를 통해 강한 연결을 약한 연결로 전환하고, DAG 간의 확장성을 대폭 개선할 수 있습니다. 또한, Dataset을 활용하면 DAG의 스케줄링과 의존성 관리가 간소화되며, Airflow UI를 통해 Dataset 현황을 모니터링할 수 있습니다.