Airflow에서는 각 Operator가 작업의 중간 결과나 데이터를 공유할 때 XCom(Cross-Communication)을 사용합니다. XCom을 활용하면 서로 다른 Operator 간에도 데이터를 전달할 수 있어 데이터 처리의 유연성을 높일 수 있습니다. 여기서는 Python Operator에서 생성한 데이터를 Bash Operator에서 활용하고, 반대로 Bash Operator에서 생성한 데이터를 Python Operator로 전달하는 방법과 Python Operator에서 생성한 데이터를 Email Operator에 전달하여 이메일에 값이 잘 전달되었는지 확인하는 과정을 가지겠습니다.
Python Operator → Bash Operator로 XCom 전달
아래 코드는 Python Operator에서 반환한 result_dict 데이터를 XCom을 통해 Bash Operator에 전달하는 예시입니다. Bash Operator는 이를 환경 변수로 받아 사용합니다.
@task(task_id='python_push')
def python_push_xcom():
result_dict = {'status': 'Good', 'data': [1, 2, 3], 'options_cnt': 100}
return result_dict # xcom으로 result_dict의 값이 push가 됨
bash_pull = BashOperator(
task_id='bash_pull',
env={
'STATUS': '{{ ti.xcom_pull(task_ids="python_push")["status"] }}',
'DATA': '{{ ti.xcom_pull(task_ids="python_push")["data"] }}',
'OPTIONS_CNT': '{{ ti.xcom_pull(task_ids="python_push")["options_cnt"]}}'
},
bash_command='echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
)
python_push_xcom() >> bash_pull
Bash Operator → Python Operator로 XCom 전달
반대로, Bash Operator에서 XCom을 통해 데이터를 Python Operator로 전달할 수도 있습니다. Bash Operator에서 ti.xcom_push를 사용해 데이터를 전달하고, Python Operator에서 ti.xcom_pull을 통해 이를 가져옵니다.
bash_push = BashOperator(
task_id='bash_push',
bash_command='echo PUSH START '
'{{ti.xcom_push(key="bash_pushed", value=200)}} && '
'echo PUSH COMPLETE'
)
@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
ti = kwargs['ti']
status_value = ti.xcom_pull(key='bash_pushed')
return_value = ti.xcom_pull(task_ids='bash_push')
print('status_value:' + str(status_value))
print('return_value:' + return_value)
bash_push >> python_pull_xcom()
Python Operator에서 XCom 생성 후 Email Operator에서 사용
Python Operator에서 작업의 성공 여부를 반환하고, Email Operator에서 이를 메일 본문에 활용할 수 있습니다.
@task(task_id='someting_task')
def some_logic():
from random import choice
return choice(['Success', 'Fail']) # return 한 값이 xcom으로 전달됨
send_email = EmailOperator(
task_id='send_email',
to='kwj102501@naver.com',
subject='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some logic 처리결과',
html_content='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 처리 결과는 <br> \
{{ ti.xcom_pull(task_ids="someting_task")}} 했습니다 <br>'
)
some_logic() >> send_email
Email Operator에서는 to, subject, html_content 등 주요 파라미터에 대해 Jinja 템플릿 문법을 사용할 수 있습니다. 필요한 데이터를 XCom에서 템플릿 문법을 통해 직접 불러와 이메일에 적용할 수 있어 간결하게 관리할 수 있습니다.
다양한 오퍼레이터를 사용하면서 xcom data를 어떻게 사용하는지 궁금하다면, 그 오퍼레이터가 가지고 있는 파라미터 중에서 template 문법을 쓸 수 있는 파라미터인지 공식문서에서 확인을 하고 파라미터에 대한 내용을 작성하면 된다.
'MLOps > Airflow' 카테고리의 다른 글
[Airflow] BranchPythonOperator로 Task 분기 처리하기 (0) | 2024.11.08 |
---|---|
[Airflow] 전역 변수 Variable 이용하기 (0) | 2024.11.07 |
[Airflow] BashOperator와 XCom을 활용하는 방법 (0) | 2024.11.06 |
[Airflow] XCom 사용하기 (0) | 2024.11.03 |
[Airflow] Macro 변수 사용 (0) | 2024.11.03 |