MLOps/Airflow

[Airflow] Macro 변수 사용

monkeykim 2024. 11. 3. 16:39

Airflow에서는 매크로 변수(macros)를 사용해 DAG 실행 시점에 필요한 날짜 연산을 동적으로 수행할 수 있습니다. 매크로 변수는 Airflow의 템플릿 엔진(Jinja)을 통해 작업에 필요한 시간 계산을 간편하게 해주는 유용한 도구입니다. 매크로 변수를 사용하면 복잡한 날짜 계산이 필요한 작업도 쉽게 설정할 수 있으며, 데이터 처리 주기에 맞춰 시작일과 종료일을 유연하게 설정할 수 있습니다.


매크로 변수란?

Airflow의 매크로 변수는 템플릿에서 다양한 날짜 연산을 지원하는 도구로, Python의 datetime 및 dateutil 라이브러리에 익숙하다면 더욱 효과적으로 사용할 수 있습니다. 주요 매크로 모듈은 다음과 같습니다:

  • macros.datetime: 날짜와 시간 처리를 위한 모듈
  • macros.timedelta: 시간 차이 계산을 위한 모듈
  • macros.dateutil: 날짜 연산을 돕는 모듈
  • macros.uuid, macros.random: 고유 ID와 랜덤 값 생성 지원

BashOperator에서 매크로 변수 활용하기

BashOperator는 배치 처리 시 날짜 범위를 설정하거나 환경 변수로 날짜 값을 전달해야 하는 경우에 유용하게 사용됩니다. 이때 매크로 변수를 사용해 동적으로 날짜를 계산할 수 있습니다.

월말 마지막 날을 기준으로 데이터 추출

아래는 BashOperator에서 매월 말일을 기준으로 데이터를 추출하는 예시입니다. data_interval_start는 전월 말일을, data_interval_end는 작업 실행일의 하루 전을 나타내도록 설정합니다.

from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id='dag_bash_with_macro_eg1',
    schedule='10 0 L * *',  # 매월 마지막 날에 실행
    start_date=pendulum.datetime(2024, 3, 1, tz='Asia/Seoul'),
    catchup=False
) as dag:
    bash_t1 = BashOperator(
        task_id='bash_t1',
        env={
            'START_DATE': '{{ data_interval_start.in_timezone("Asia/Seoul") | ds }}',
            'END_DATE': '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=1)) | ds }}'
        },
        bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
    )

설명: 이 예시는 매월 마지막 날에 실행되며, 전월 마지막 날부터 실행일 전일까지의 데이터를 추출합니다.

 

매월 두 번째 토요일을 기준으로 데이터 추출

아래는 매월 두 번째 토요일에 배치가 실행되며, 2주 전 월요일부터 2주 전 토요일까지의 데이터를 처리합니다.

from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum

with DAG(
    dag_id='dag_bash_with_macro_eg2',
    schedule='10 0 * * 6#2',  # 매월 두 번째 토요일
    start_date=pendulum.datetime(2024, 3, 1, tz='Asia/Seoul'),
    catchup=False
) as dag:
    bash_t2 = BashOperator(
        task_id='bash_t2',
        env={
            'START_DATE': '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=19)) | ds }}',
            'END_DATE': '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=14)) | ds }}'
        },
        bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
    )

설명: 매월 두 번째 토요일에 실행되며, data_interval_end를 기준으로 19일 전부터 14일 전까지의 데이터를 대상으로 작업을 수행합니다.


PythonOperator에서 매크로 변수 활용하기

PythonOperator는 templates_dict 옵션을 통해 매크로 변수 설정이 가능합니다. **kwargs를 통해 템플릿 변수들이 자동으로 함수에 전달되므로, 각 템플릿 변수에 직접 접근할 수 있습니다.

예를 들어, 3월 15일에 DAG를 실행할 경우, 이 DAG가 2월 1일부터 2월 28일까지의 데이터를 한 번에 처리하는 상황을 가정해 보겠습니다.

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
import pendulum

with DAG(
    dag_id='dag_python_with_macro',
    schedule='10 0 * * *',
    start_date=pendulum.datetime(2024, 3, 1, tz='Asia/Seoul'),
    catchup=False
) as dag:
    @task(task_id='task_using_macro',
    templates_dict={
        'start_date': '{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}',
        'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}'
    })
    def get_datetime_macro(**kwargs):
        templates_dict = kwargs.get('templates_dict') or {}
        if templates_dict:
            start_date = templates_dict.get('start_date') or 'start_date 없음'
            end_date = templates_dict.get('end_date') or 'end_date 없음'
            print(f"Start Date: {start_date}")
            print(f"End Date: {end_date}")

    get_datetime_macro()

설명: 이 예시는 매크로 변수를 사용해 2월의 시작일과 종료일을 설정하여 3월 15일에 실행되는 배치에서 2월 전체 데이터를 한 번에 처리하도록 합니다.


Airflow의 매크로 변수는 주기적 데이터 처리 시점 설정에 매우 유용합니다. BashOperator와 PythonOperator에서 모두 활용 가능하며, 템플릿 변수와 매크로 모듈을 조합해 복잡한 날짜 연산을 간편하게 구현할 수 있습니다. 작업의 기간 설정과 연관된 연산이 필요한 경우 macros.datetime, macros.dateutil 등을 활용해 효율적인 데이터 파이프라인을 구축할 수 있습니다.