MLOps/Airflow

[Airflow] 서로 다른 Operator 간 XCom 사용 (Python, Bash, Email)

monkeykim 2024. 11. 7. 01:11

Airflow에서는 각 Operator가 작업의 중간 결과나 데이터를 공유할 때 XCom(Cross-Communication)을 사용합니다. XCom을 활용하면 서로 다른 Operator 간에도 데이터를 전달할 수 있어 데이터 처리의 유연성을 높일 수 있습니다. 여기서는 Python Operator에서 생성한 데이터를 Bash Operator에서 활용하고, 반대로 Bash Operator에서 생성한 데이터를 Python Operator로 전달하는 방법과 Python Operator에서 생성한 데이터를 Email Operator에 전달하여 이메일에 값이 잘 전달되었는지 확인하는 과정을 가지겠습니다.

 

Python Operator → Bash Operator로 XCom 전달
아래 코드는 Python Operator에서 반환한 result_dict 데이터를 XCom을 통해 Bash Operator에 전달하는 예시입니다. Bash Operator는 이를 환경 변수로 받아 사용합니다.

@task(task_id='python_push')
def python_push_xcom():
    result_dict = {'status': 'Good', 'data': [1, 2, 3], 'options_cnt': 100}
    return result_dict  # xcom으로 result_dict의 값이 push가 됨

bash_pull = BashOperator(
    task_id='bash_pull',
    env={
        'STATUS': '{{ ti.xcom_pull(task_ids="python_push")["status"] }}',
        'DATA': '{{ ti.xcom_pull(task_ids="python_push")["data"] }}',
        'OPTIONS_CNT': '{{ ti.xcom_pull(task_ids="python_push")["options_cnt"]}}'
    },
    bash_command='echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
)

python_push_xcom() >> bash_pull

Bash Operator → Python Operator로 XCom 전달
반대로, Bash Operator에서 XCom을 통해 데이터를 Python Operator로 전달할 수도 있습니다. Bash Operator에서 ti.xcom_push를 사용해 데이터를 전달하고, Python Operator에서 ti.xcom_pull을 통해 이를 가져옵니다.

bash_push = BashOperator(
    task_id='bash_push',
    bash_command='echo PUSH START '
                 '{{ti.xcom_push(key="bash_pushed", value=200)}} && '
                 'echo PUSH COMPLETE'
)

@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
    ti = kwargs['ti']
    status_value = ti.xcom_pull(key='bash_pushed')
    return_value = ti.xcom_pull(task_ids='bash_push')
    print('status_value:' + str(status_value))
    print('return_value:' + return_value)

bash_push >> python_pull_xcom()

Python Operator에서 XCom 생성 후 Email Operator에서 사용

Python Operator에서 작업의 성공 여부를 반환하고, Email Operator에서 이를 메일 본문에 활용할 수 있습니다.

@task(task_id='someting_task')
def some_logic():
    from random import choice
    return choice(['Success', 'Fail'])  # return 한 값이 xcom으로 전달됨

send_email = EmailOperator(
    task_id='send_email',
    to='kwj102501@naver.com',
    subject='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} some logic 처리결과',
    html_content='{{ data_interval_end.in_timezone("Asia/Seoul") | ds }} 처리 결과는 <br> \
                 {{ ti.xcom_pull(task_ids="someting_task")}} 했습니다 <br>'
)

some_logic() >> send_email

 

Email Operator에서는 to, subject, html_content 등 주요 파라미터에 대해 Jinja 템플릿 문법을 사용할 수 있습니다. 필요한 데이터를 XCom에서 템플릿 문법을 통해 직접 불러와 이메일에 적용할 수 있어 간결하게 관리할 수 있습니다.


다양한 오퍼레이터를 사용하면서 xcom data를 어떻게 사용하는지 궁금하다면, 그 오퍼레이터가 가지고 있는 파라미터 중에서 template 문법을 쓸 수 있는 파라미터인지 공식문서에서 확인을 하고 파라미터에 대한 내용을 작성하면 된다.

Email operator의 예