Airflow에서는 DAG 작성 시 Jinja 템플릿을 통해 특정 파라미터에 동적으로 값을 할당할 수 있습니다. Jinja는 주로 웹 프레임워크에서 HTML 템플릿 렌더링에 사용되는 엔진이지만, Airflow에서는 SQL, Bash 스크립트 등 다양한 작업에서 파라미터 값 설정을 유연하게 해주는 역할을 합니다. 이 글에서는 Jinja 템플릿이 무엇인지, Airflow에서 어떤 식으로 활용되는지 구체적으로 알아보겠습니다.
Jinja 템플릿이란?
Jinja 템플릿은 문서에서 특정 양식으로 작성된 값을 런타임에 실제 값으로 치환해주는 처리 엔진입니다. Python 기반의 여러 프레임워크에서 지원되며, 대표적으로 Flask나 Django에서 HTML 템플릿을 화면에 렌더링할 때 사용됩니다. Airflow에서는 SQL 작성, BashOperator의 명령 설정 등 다양한 작업에 활용할 수 있습니다.
Airflow에서 Jinja 템플릿 사용법
Airflow의 Operator에서 Jinja 템플릿을 사용하면 DAG 실행 시점에 특정 파라미터를 동적으로 설정할 수 있습니다. 템플릿 변수는 중괄호 {{ }}로 감싸서 사용하며, {% %}를 이용해 조건문이나 반복문도 쓸 수 있습니다.
예를 들어, 다음과 같은 기본 변수들이 제공됩니다:
- {{ ds }}: 작업 실행 날짜 (YYYY-MM-DD 형식)
- {{ ds_nodash }}: 작업 실행 날짜 (YYYYMMDD 형식)
- {{ ts }}: 작업 시작 시간
- {{ data_interval_start }}, {{ data_interval_end }}: 작업의 데이터 처리 시작/종료 시간
이 외에도 Airflow는 템플릿에서 사용할 수 있는 다양한 변수를 제공하여, 데이터 파이프라인을 유연하게 설계할 수 있습니다.
주의사항
모든 Operator와 파라미터에 템플릿 변수를 적용할 수 있는 것은 아닙니다. 템플릿 지원 여부는 해당 Operator의 template_fields 속성에서 확인할 수 있습니다.
공식 Docs: https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
Airflow 날짜 개념과 템플릿 변수 이해하기
Airflow에서는 배치가 실행된 시간과 데이터의 실제 기간이 다를 수 있습니다. 예를 들어, 25일 00시에 시작된 배치가 24일의 데이터를 처리하는 경우, data_interval_start는 24일 00:00, data_interval_end는 24일 23:59로 설정됩니다. 이는 데이터 수집 시점을 기준으로 날짜를 다르게 설정할 수 있게 해 주며, ETL 작업에서 유용하게 사용됩니다.
- 현재 작업 중인 배치의 종료 시점: data_interval_end
- 이전 배치의 시작 시점: data_interval_start
BashOperator에서 템플릿 사용 예시
BashOperator의 bash_command, env, cwd 파라미터는 템플릿 적용이 가능합니다. 다음 예시는 data_interval_start와 data_interval_end 변수를 사용해 주기적인 작업에서 시간 값을 활용하는 방법을 보여줍니다.
from airflow import DAG
from airflow.operators.bash import BashOperator
import pendulum
with DAG(
dag_id='dag_bash_with_template',
schedule='30 6 * * *',
start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
bash_t1 = BashOperator(
task_id='bash_t1',
bash_command='echo "data_interval_end: {{ data_interval_end }}"'
)
bash_t2 = BashOperator(
task_id='bash_t2',
env={
'START_DATE': '{{ data_interval_start | ds }}',
'END_DATE': '{{ data_interval_end | ds }}'
},
bash_command='echo $START_DATE && echo $END_DATE'
)
bash_t1 >> bash_t2
설명: data_interval_start와 data_interval_end를 사용하여 작업이 실행된 데이터의 시작 시간과 종료 시간을 출력합니다. op_args 및 op_kwargs와 함께 템플릿을 활용하면, 다양한 작업에서 동적 파라미터 설정이 가능합니다.
PythonOperator에서 템플릿 변수 사용 예시
PythonOperator는 **kwargs를 통해 템플릿 변수들이 자동으로 전달되어, 함수 내에서 쉽게 참조할 수 있습니다.
from airflow import DAG
import pendulum
from airflow.operators.python import PythonOperator
from airflow.decorators import task
with DAG(
dag_id='dag_python_template',
schedule='30 6 * * *',
start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
@task(task_id='python_t2')
def python_function2(**kwargs):
print("Execution Date:", kwargs['ds'])
print("Execution Time:", kwargs['ts'])
print("Data Interval Start:", kwargs['data_interval_start'])
print("Data Interval End:", kwargs['data_interval_end'])
python_function2()
**kwargs를 통해 템플릿 변수에 직접 접근하여 출력할 수 있습니다. Airflow가 제공하는 다양한 시간 변수들은 ds, ts, data_interval_start, data_interval_end 등으로 바로 활용할 수 있습니다.
'MLOps > Airflow' 카테고리의 다른 글
[Airflow] XCom 사용하기 (0) | 2024.11.03 |
---|---|
[Airflow] Macro 변수 사용 (0) | 2024.11.03 |
[Airflow] op_args와 op_kwargs 사용하기 (2) | 2024.11.03 |
[Airflow] @task 데코레이터 사용하기 (0) | 2024.10.31 |
[Airflow] Python Operator로 외부 모듈을 Import하는 방법 (0) | 2024.10.30 |