Airflow에서는 DAG 내의 여러 Task 간에 데이터를 주고받을 수 있도록 XCom(Cross Communication) 기능을 제공합니다. XCom은 작은 크기의 데이터를 공유하기 위한 용도로 설계되었으며, Task 간에 값을 전달하거나 처리 결과를 공유하는 데 유용합니다. 주로 1GB 미만의 데이터에 적합하며, 대용량 데이터는 AWS S3, HDFS 등의 외부 솔루션을 통해 관리하는 것이 좋습니다.
XCom이란?
XCom은 DAG 내에서 Task 간에 데이터를 주고받기 위한 기술로, Task의 중간 결과를 다른 Task의 입력으로 사용할 수 있게 해줍니다. XCom에 저장된 데이터는 Airflow의 메타DB의 xcom 테이블에 저장되며, 이를 통해 여러 Task 간에 값을 전달할 수 있습니다.
XCom의 주요 메서드: xcom_push와 xcom_pull
- xcom_push: 현재 Task의 실행 중 필요한 값을 XCom에 저장합니다.
- xcom_pull: 다른 Task가 xcom_push로 저장한 값을 가져옵니다.
예시: xcom_push와 xcom_pull 활용하기
다음 예제는 두 개의 Task에서 xcom_push를 사용해 데이터를 저장하고, 세 번째 Task에서 xcom_pull을 통해 해당 데이터를 가져오는 방법을 보여줍니다.
from airflow import DAG
from airflow.decorators import task
import pendulum
with DAG(
dag_id='dag_python_with_xcom_eg1',
schedule='30 6 * * *',
start_date=pendulum.datetime(2024, 3, 1, tz='Asia/Seoul'),
catchup=False
) as dag:
@task(task_id='python_xcom_push_task1')
def xcom_push1(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key='result1', value='value1')
ti.xcom_push(key='result2', value=[1, 2, 3])
@task(task_id='python_xcom_push_task2')
def xcom_push2(**kwargs):
ti = kwargs['ti']
ti.xcom_push(key='result1', value='value2')
ti.xcom_push(key='result2', value=[1, 2, 3, 4])
@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(key='result1')
value2 = ti.xcom_pull(key='result2', task_ids='python_xcom_push_task1')
print("Pulled result1:", value1)
print("Pulled result2 from task1:", value2)
xcom_push1() >> xcom_push2() >> xcom_pull()
설명: 이 예시에서 xcom_push1과 xcom_push2 Task는 각각 동일한 키(result1, result2)로 데이터를 XCom에 저장합니다. xcom_pull Task에서는 result1 값을 기본적으로 가장 마지막에 push된 xcom_push2의 데이터를 가져오며, task_ids를 지정해 특정 Task(python_xcom_push_task1)에서 result2 값을 가져올 수 있습니다.
Python 함수의 return 값으로 XCom 사용하기
Python 함수의 return 값도 XCom으로 자동 저장됩니다. 이 방식은 Task decorator와 함께 사용되며, Task의 출력이 다른 Task의 입력으로 자연스럽게 전달될 수 있습니다.
예시: return 값을 통한 XCom 사용
아래는 Python Task의 return 값이 자동으로 XCom에 저장되고, 이를 다른 Task에서 가져오는 예시입니다.
from airflow import DAG
from airflow.decorators import task
import pendulum
with DAG(
dag_id='dag_python_with_xcom_eg2',
schedule='30 6 * * *',
start_date=pendulum.datetime(2024, 3, 1, tz='Asia/Seoul'),
catchup=False
) as dag:
@task(task_id='python_xcom_push_by_return')
def xcom_push_result(**kwargs):
return 'Success'
@task(task_id='python_xcom_pull_1')
def xcom_pull_1(**kwargs):
ti = kwargs['ti']
value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
print("Value fetched via xcom_pull:", value1)
@task(task_id='python_xcom_pull_2')
def xcom_pull_2(status, **kwargs):
print("Value received as function argument:", status)
python_xcom_push_by_return = xcom_push_result()
xcom_pull_2(python_xcom_push_by_return)
python_xcom_push_by_return >> xcom_pull_1()
설명:
- xcom_push_result Task는 return을 통해 값을 전달하므로 자동으로 XCom에 push됩니다.
- xcom_pull_1 Task는 ti.xcom_pull 메서드를 사용하여 xcom_push_result Task의 return 값을 가져옵니다.
- xcom_pull_2 Task는 직접 인자로 xcom_push_result의 return 값을 받아옵니다.
XCom의 push와 pull 정리
- XCom push 방법
- ti.xcom_push 메서드로 값을 명시적으로 push하기
- Python 함수의 return 값을 통해 자동으로 push하기
- XCom pull 방법
- ti.xcom_pull 메서드를 사용해 값 가져오기
- return 값을 다른 Task의 입력으로 사용해 전달받기
중요 사항
Task decorator를 사용하면 함수의 입력과 출력 관계만으로도 Task 간의 흐름을 정의할 수 있습니다. Airflow는 이를 기반으로 Task 간의 종속 관계를 자동으로 설정하므로, 데이터 흐름을 쉽게 관리할 수 있습니다.
'MLOps > Airflow' 카테고리의 다른 글
[Airflow] 서로 다른 Operator 간 XCom 사용 (Python, Bash, Email) (0) | 2024.11.07 |
---|---|
[Airflow] BashOperator와 XCom을 활용하는 방법 (0) | 2024.11.06 |
[Airflow] Macro 변수 사용 (0) | 2024.11.03 |
[Airflow] Jinja 템플릿 활용 (0) | 2024.11.03 |
[Airflow] op_args와 op_kwargs 사용하기 (2) | 2024.11.03 |