Airflow에서는 매크로 변수(macros)를 사용해 DAG 실행 시점에 필요한 날짜 연산을 동적으로 수행할 수 있습니다. 매크로 변수는 Airflow의 템플릿 엔진(Jinja)을 통해 작업에 필요한 시간 계산을 간편하게 해주는 유용한 도구입니다. 매크로 변수를 사용하면 복잡한 날짜 계산이 필요한 작업도 쉽게 설정할 수 있으며, 데이터 처리 주기에 맞춰 시작일과 종료일을 유연하게 설정할 수 있습니다.
매크로 변수란?
Airflow의 매크로 변수는 템플릿에서 다양한 날짜 연산을 지원하는 도구로, Python의 datetime 및 dateutil 라이브러리에 익숙하다면 더욱 효과적으로 사용할 수 있습니다. 주요 매크로 모듈은 다음과 같습니다:
- macros.datetime: 날짜와 시간 처리를 위한 모듈
- macros.timedelta: 시간 차이 계산을 위한 모듈
- macros.dateutil: 날짜 연산을 돕는 모듈
- macros.uuid, macros.random: 고유 ID와 랜덤 값 생성 지원
BashOperator에서 매크로 변수 활용하기
BashOperator는 배치 처리 시 날짜 범위를 설정하거나 환경 변수로 날짜 값을 전달해야 하는 경우에 유용하게 사용됩니다. 이때 매크로 변수를 사용해 동적으로 날짜를 계산할 수 있습니다.
월말 마지막 날을 기준으로 데이터 추출
아래는 BashOperator에서 매월 말일을 기준으로 데이터를 추출하는 예시입니다. data_interval_start는 전월 말일을, data_interval_end는 작업 실행일의 하루 전을 나타내도록 설정합니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
with DAG(
dag_id='dag_bash_with_macro_eg1',
schedule='10 0 L * *', # 매월 마지막 날에 실행
start_date=pendulum.datetime(2024, 3, 1, tz='Asia/Seoul'),
catchup=False
) as dag:
bash_t1 = BashOperator(
task_id='bash_t1',
env={
'START_DATE': '{{ data_interval_start.in_timezone("Asia/Seoul") | ds }}',
'END_DATE': '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=1)) | ds }}'
},
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
설명: 이 예시는 매월 마지막 날에 실행되며, 전월 마지막 날부터 실행일 전일까지의 데이터를 추출합니다.
매월 두 번째 토요일을 기준으로 데이터 추출
아래는 매월 두 번째 토요일에 배치가 실행되며, 2주 전 월요일부터 2주 전 토요일까지의 데이터를 처리합니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
with DAG(
dag_id='dag_bash_with_macro_eg2',
schedule='10 0 * * 6#2', # 매월 두 번째 토요일
start_date=pendulum.datetime(2024, 3, 1, tz='Asia/Seoul'),
catchup=False
) as dag:
bash_t2 = BashOperator(
task_id='bash_t2',
env={
'START_DATE': '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=19)) | ds }}',
'END_DATE': '{{ (data_interval_end.in_timezone("Asia/Seoul") - macros.dateutil.relativedelta.relativedelta(days=14)) | ds }}'
},
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
설명: 매월 두 번째 토요일에 실행되며, data_interval_end를 기준으로 19일 전부터 14일 전까지의 데이터를 대상으로 작업을 수행합니다.
PythonOperator에서 매크로 변수 활용하기
PythonOperator는 templates_dict 옵션을 통해 매크로 변수 설정이 가능합니다. **kwargs를 통해 템플릿 변수들이 자동으로 함수에 전달되므로, 각 템플릿 변수에 직접 접근할 수 있습니다.
예를 들어, 3월 15일에 DAG를 실행할 경우, 이 DAG가 2월 1일부터 2월 28일까지의 데이터를 한 번에 처리하는 상황을 가정해 보겠습니다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.decorators import task
import pendulum
with DAG(
dag_id='dag_python_with_macro',
schedule='10 0 * * *',
start_date=pendulum.datetime(2024, 3, 1, tz='Asia/Seoul'),
catchup=False
) as dag:
@task(task_id='task_using_macro',
templates_dict={
'start_date': '{{ (data_interval_end.in_timezone("Asia/Seoul") + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}',
'end_date': '{{ (data_interval_end.in_timezone("Asia/Seoul").replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}'
})
def get_datetime_macro(**kwargs):
templates_dict = kwargs.get('templates_dict') or {}
if templates_dict:
start_date = templates_dict.get('start_date') or 'start_date 없음'
end_date = templates_dict.get('end_date') or 'end_date 없음'
print(f"Start Date: {start_date}")
print(f"End Date: {end_date}")
get_datetime_macro()
설명: 이 예시는 매크로 변수를 사용해 2월의 시작일과 종료일을 설정하여 3월 15일에 실행되는 배치에서 2월 전체 데이터를 한 번에 처리하도록 합니다.
Airflow의 매크로 변수는 주기적 데이터 처리 시점 설정에 매우 유용합니다. BashOperator와 PythonOperator에서 모두 활용 가능하며, 템플릿 변수와 매크로 모듈을 조합해 복잡한 날짜 연산을 간편하게 구현할 수 있습니다. 작업의 기간 설정과 연관된 연산이 필요한 경우 macros.datetime, macros.dateutil 등을 활용해 효율적인 데이터 파이프라인을 구축할 수 있습니다.
'MLOps > Airflow' 카테고리의 다른 글
[Airflow] BashOperator와 XCom을 활용하는 방법 (0) | 2024.11.06 |
---|---|
[Airflow] XCom 사용하기 (0) | 2024.11.03 |
[Airflow] Jinja 템플릿 활용 (0) | 2024.11.03 |
[Airflow] op_args와 op_kwargs 사용하기 (2) | 2024.11.03 |
[Airflow] @task 데코레이터 사용하기 (0) | 2024.10.31 |