Airflow에서 BashOperator는 유용하며, PythonOperator와 마찬가지로 XCom과 함께 태스크 간의 데이터를 주고 받아, 워크플로우 내에서 데이터를 효율적으로 활용할 수 있습니다. 이번 글에서는 BashOperator와 XCom의 상호작용 방식을 설명하고, BashOperator에서 데이터를 주고 받는 여러 방법을 말씀드리겠습니다.
BashOperator에서 XCom 사용하기
BashOperator는 bash_command나 env 파라미터에 템플릿 문법을 사용하여 데이터를 주고받을 수 있습니다. 이 과정에서 XCom을 활용해 다른 태스크로부터 데이터를 가져오거나, 데이터를 전달하는 작업이 가능합니다.
기본 개념: XCom Push와 Pull
XCom은 Airflow 태스크 간에 데이터를 주고받는 데 사용됩니다. PythonOperator에서는 kwargs에서 ti 인스턴스를 꺼내어 xcom_push, xcom_pull을 사용할 수 있지만, BashOperator에서는 이를 템플릿 문법을 통해 쉽게 활용할 수 있습니다. 예를 들어 ti.xcom_push나 ti.xcom_pull과 같은 형태로 ti 인스턴스를 템플릿에서 바로 호출하여 값을 주고받을 수 있습니다.
BashOperator 예제
다음은 XCom을 활용한 간단한 예제 코드입니다. 두 개의 BashOperator를 사용하여 첫 번째 태스크에서는 메시지를 XCom에 저장하고, 두 번째 태스크에서는 해당 메시지를 가져와 사용하는 방법을 보여줍니다.
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id='dags_bash_with_xcom',
schedule='10 0 * * *',
start_date=pendulum.datetime(2024, 3, 1, tz='Asia/Seoul'),
catchup=False
) as dag:
# 첫 번째 태스크: XCom에 메시지를 저장
bash_push = BashOperator(
task_id='bash_push',
bash_command="echo START && "
"echo XCOM_PUSHED "
"{{ ti.xcom_push(key='bash_pushed', value='first_bash_message') }} && "
"echo COMPLETE"
)
# 두 번째 태스크: 첫 번째 태스크에서 XCom에 저장된 값을 불러옴
bash_pull = BashOperator(
task_id='bash_pull',
env={'PUSHED_VALUE': "{{ ti.xcom_pull(key='bash_pushed')}}",
'RETURN_VALUE': "{{ ti.xcom_pull(task_ids='bash_push')}}"},
bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE",
do_xcom_push=False
)
bash_push >> bash_pull
코드 설명
- bash_push 태스크: 첫번째 BashOperator인 bash_push는 XCom을 사용하여 "first_bash_message"라는 메시지를 key인 "bash_pushed"와 함께 전달합니다. ti.xcom_push를 통해 bash_pushed 키에 메시지를 저장합니다.
- return_value: XCom의 return_value는 태스크의 마지막 출력값을 자동으로 가져오는 기능을 합니다. 따라서 ti.xcom_pull에서 task_ids만 지정하고 key를 생략하면 return_value를 기본값으로 가져오게 됩니다.
- bash_pull 태스크: 두번째 BashOperator인 bash_pull은 첫번째 태스크인 bash_push에서 저장된 값을 가져옵니다.
- PUSHED_VALUE: bash_pushed 키를 사용하여 value를 불러온다.
- RETURN_VALUE: task_ids='bash_push'로 첫번째 태스크의 마지막 출력값을 가져온다.
- do_xcom_push=False: BashOperator는 기본적으로 마지막 출력값을 자동으로 XCom에 저장합니다. 이 값을 XCom에 저장하고 싶지 않을 경우, do_xcom_push=False를 설정하여 방지할 수 있습니다.
'MLOps > Airflow' 카테고리의 다른 글
[Airflow] 전역 변수 Variable 이용하기 (0) | 2024.11.07 |
---|---|
[Airflow] 서로 다른 Operator 간 XCom 사용 (Python, Bash, Email) (0) | 2024.11.07 |
[Airflow] XCom 사용하기 (0) | 2024.11.03 |
[Airflow] Macro 변수 사용 (0) | 2024.11.03 |
[Airflow] Jinja 템플릿 활용 (0) | 2024.11.03 |