MLOps/Airflow

[Airflow] BashOperator와 XCom을 활용하는 방법

monkeykim 2024. 11. 6. 00:26

Airflow에서 BashOperator는 유용하며, PythonOperator와 마찬가지로 XCom과 함께 태스크 간의 데이터를 주고 받아, 워크플로우 내에서 데이터를 효율적으로 활용할 수 있습니다. 이번 글에서는 BashOperator와 XCom의 상호작용 방식을 설명하고, BashOperator에서 데이터를 주고 받는 여러 방법을 말씀드리겠습니다.


 

BashOperator에서 XCom 사용하기

BashOperator는 bash_command나 env 파라미터에 템플릿 문법을 사용하여 데이터를 주고받을 수 있습니다. 이 과정에서 XCom을 활용해 다른 태스크로부터 데이터를 가져오거나, 데이터를 전달하는 작업이 가능합니다.

기본 개념: XCom Push와 Pull

XCom은 Airflow 태스크 간에 데이터를 주고받는 데 사용됩니다. PythonOperator에서는 kwargs에서 ti 인스턴스를 꺼내어 xcom_push, xcom_pull을 사용할 수 있지만, BashOperator에서는 이를 템플릿 문법을 통해 쉽게 활용할 수 있습니다. 예를 들어 ti.xcom_push나 ti.xcom_pull과 같은 형태로 ti 인스턴스를 템플릿에서 바로 호출하여 값을 주고받을 수 있습니다.

BashOperator 예제

다음은 XCom을 활용한 간단한 예제 코드입니다. 두 개의 BashOperator를 사용하여 첫 번째 태스크에서는 메시지를 XCom에 저장하고, 두 번째 태스크에서는 해당 메시지를 가져와 사용하는 방법을 보여줍니다.

from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id='dags_bash_with_xcom',
    schedule='10 0 * * *',
    start_date=pendulum.datetime(2024, 3, 1, tz='Asia/Seoul'),
    catchup=False
) as dag:
    # 첫 번째 태스크: XCom에 메시지를 저장
    bash_push = BashOperator(
        task_id='bash_push',
        bash_command="echo START && "
                     "echo XCOM_PUSHED "
                     "{{ ti.xcom_push(key='bash_pushed', value='first_bash_message') }} && "
                     "echo COMPLETE"
    )

    # 두 번째 태스크: 첫 번째 태스크에서 XCom에 저장된 값을 불러옴
    bash_pull = BashOperator(
        task_id='bash_pull',
        env={'PUSHED_VALUE': "{{ ti.xcom_pull(key='bash_pushed')}}",
             'RETURN_VALUE': "{{ ti.xcom_pull(task_ids='bash_push')}}"},
        bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE",
        do_xcom_push=False
    )

    bash_push >> bash_pull

코드 설명

  1. bash_push 태스크: 첫번째 BashOperator인 bash_push는 XCom을 사용하여 "first_bash_message"라는 메시지를 key인 "bash_pushed"와 함께 전달합니다. ti.xcom_push를 통해 bash_pushed 키에 메시지를 저장합니다.
    • return_value: XCom의 return_value는 태스크의 마지막 출력값을 자동으로 가져오는 기능을 합니다. 따라서 ti.xcom_pull에서 task_ids만 지정하고 key를 생략하면 return_value를 기본값으로 가져오게 됩니다.
  2. bash_pull 태스크: 두번째 BashOperator인 bash_pull은 첫번째 태스크인 bash_push에서 저장된 값을 가져옵니다.
    • PUSHED_VALUE: bash_pushed 키를 사용하여 value를 불러온다.
    • RETURN_VALUE: task_ids='bash_push'로 첫번째 태스크의 마지막 출력값을 가져온다.
  3. do_xcom_push=False: BashOperator는 기본적으로 마지막 출력값을 자동으로 XCom에 저장합니다. 이 값을 XCom에 저장하고 싶지 않을 경우, do_xcom_push=False를 설정하여 방지할 수 있습니다.