MLOps/Airflow 23

[Airflow] SimpleHttpOperator에 대해 알아보기

Airflow Operator와 Provider OperatorAirflow 오퍼레이터는 DAG의 작업을 정의하는 빌딩 블록입니다. 오퍼레이터는 다음과 같은 범주로 나뉩니다:액션 오퍼레이터: 특정 작업을 수행 (예: BashOperator, PythonOperator).전송 오퍼레이터: 시스템 간 데이터 이동 (예: S3ToRedshiftOperator).센서 오퍼레이터: 특정 조건을 기다림 (예: S3KeySensor).https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html Operators — Airflow Documentation airflow.apache.orgProvider프로바이더는 Airflow의 기..

MLOps/Airflow 2024.11.10

[Airflow] Trigger Rule, Task Group, Edge Label, DAG 간 의존성 관리

Trigger Rule: 상위 태스크의 상태에 따라 하위 태스크의 실행을 제어합니다.Task Group: 태스크를 논리적으로 조직하여 가독성을 높이고 유지 보수를 쉽게 합니다.Edge Label: 태스크 간 의존성을 주석 처리하여 DAG 시각화를 더 명확히 합니다.DAG 간 의존성 관리: TriggerDagRunOperator와 ExternalTaskSensor를 사용하여 여러 DAG 간의 의존성을 관리합니다.Airflow에서의 트리거 규칙 이해하기Trigger Rule이란?Airflow에서 Trigger Rule은 상위 태스크의 상태에 따라 하위 태스크가 실행될지 여부를 결정하는 규칙입니다. 기본적으로는 모든 상위 태스크가 성공해야 하위 태스크가 실행됩니다(all_success). 하지만 상위 태스크 ..

MLOps/Airflow 2024.11.10

[Airflow] BranchPythonOperator로 Task 분기 처리하기

데이터 파이프라인을 구성할 때, 상황에 따라 특정 Task만 실행해야 하는 경우가 자주 발생합니다. Airflow에서는 BranchPythonOperator를 사용하여 Task의 분기 처리를 할 수 있고 @task.branch 데코레이터와 BaseBranchOperator를 상속하여 직접 커스터마이징하는 방법도 존재합니다.이 글에서는 세가지 방법을 사용하여 Task를 분기하는 방법을 코드 예제와 함께 설명하겠습니다.BranchPythonOperator로 Task 분기 처리하기BranchPythonOperator는 특정 조건에 따라 실행할 Task의 ID를 리턴하여 분기 처리를 수행합니다. 이때, 함수의 리턴값이 분기처리의 핵심입니다. BranchPythonOperator에서 리턴된 값이 후속 Task의 ..

MLOps/Airflow 2024.11.08

[Airflow] 전역 변수 Variable 이용하기

Airflow의 XCom은 특정 DAG 내부의 Task 간 데이터 공유에 적합하지만, 전역적으로 DAG 간에 데이터를 공유하기 위해서는 Variable을 사용할 수 있습니다. Variable은 모든 DAG에서 접근 가능한 전역 변수로, 필요에 따라 설정된 값을 DAG에서 불러와 사용하게 합니다.Variable 설정하기1. Airflow 서비스의 Admin 탭에 들어간 뒤, Variables를 선택합니다. 2. + 버튼을 클릭합니다. 3. Variable로 지정할 Key와 Value를 적어주고 Save를 클릭합니다. Description은 optional입니다. 4. 저장된 Variable을 확인합니다.Variable 변수 사용Variable 설정 및 사용Variable은 메타DB에 저장되며, Jinja ..

MLOps/Airflow 2024.11.07

[Airflow] 서로 다른 Operator 간 XCom 사용 (Python, Bash, Email)

Airflow에서는 각 Operator가 작업의 중간 결과나 데이터를 공유할 때 XCom(Cross-Communication)을 사용합니다. XCom을 활용하면 서로 다른 Operator 간에도 데이터를 전달할 수 있어 데이터 처리의 유연성을 높일 수 있습니다. 여기서는 Python Operator에서 생성한 데이터를 Bash Operator에서 활용하고, 반대로 Bash Operator에서 생성한 데이터를 Python Operator로 전달하는 방법과 Python Operator에서 생성한 데이터를 Email Operator에 전달하여 이메일에 값이 잘 전달되었는지 확인하는 과정을 가지겠습니다. Python Operator → Bash Operator로 XCom 전달아래 코드는 Python Operat..

MLOps/Airflow 2024.11.07

[Airflow] BashOperator와 XCom을 활용하는 방법

Airflow에서 BashOperator는 유용하며, PythonOperator와 마찬가지로 XCom과 함께 태스크 간의 데이터를 주고 받아, 워크플로우 내에서 데이터를 효율적으로 활용할 수 있습니다. 이번 글에서는 BashOperator와 XCom의 상호작용 방식을 설명하고, BashOperator에서 데이터를 주고 받는 여러 방법을 말씀드리겠습니다. BashOperator에서 XCom 사용하기BashOperator는 bash_command나 env 파라미터에 템플릿 문법을 사용하여 데이터를 주고받을 수 있습니다. 이 과정에서 XCom을 활용해 다른 태스크로부터 데이터를 가져오거나, 데이터를 전달하는 작업이 가능합니다.기본 개념: XCom Push와 PullXCom은 Airflow 태스크 간에 데이터를..

MLOps/Airflow 2024.11.06

[Airflow] XCom 사용하기

Airflow에서는 DAG 내의 여러 Task 간에 데이터를 주고받을 수 있도록 XCom(Cross Communication) 기능을 제공합니다. XCom은 작은 크기의 데이터를 공유하기 위한 용도로 설계되었으며, Task 간에 값을 전달하거나 처리 결과를 공유하는 데 유용합니다. 주로 1GB 미만의 데이터에 적합하며, 대용량 데이터는 AWS S3, HDFS 등의 외부 솔루션을 통해 관리하는 것이 좋습니다.XCom이란?XCom은 DAG 내에서 Task 간에 데이터를 주고받기 위한 기술로, Task의 중간 결과를 다른 Task의 입력으로 사용할 수 있게 해줍니다. XCom에 저장된 데이터는 Airflow의 메타DB의 xcom 테이블에 저장되며, 이를 통해 여러 Task 간에 값을 전달할 수 있습니다.XCo..

MLOps/Airflow 2024.11.03

[Airflow] Macro 변수 사용

Airflow에서는 매크로 변수(macros)를 사용해 DAG 실행 시점에 필요한 날짜 연산을 동적으로 수행할 수 있습니다. 매크로 변수는 Airflow의 템플릿 엔진(Jinja)을 통해 작업에 필요한 시간 계산을 간편하게 해주는 유용한 도구입니다. 매크로 변수를 사용하면 복잡한 날짜 계산이 필요한 작업도 쉽게 설정할 수 있으며, 데이터 처리 주기에 맞춰 시작일과 종료일을 유연하게 설정할 수 있습니다.매크로 변수란?Airflow의 매크로 변수는 템플릿에서 다양한 날짜 연산을 지원하는 도구로, Python의 datetime 및 dateutil 라이브러리에 익숙하다면 더욱 효과적으로 사용할 수 있습니다. 주요 매크로 모듈은 다음과 같습니다:macros.datetime: 날짜와 시간 처리를 위한 모듈macro..

MLOps/Airflow 2024.11.03

[Airflow] Jinja 템플릿 활용

Airflow에서는 DAG 작성 시 Jinja 템플릿을 통해 특정 파라미터에 동적으로 값을 할당할 수 있습니다. Jinja는 주로 웹 프레임워크에서 HTML 템플릿 렌더링에 사용되는 엔진이지만, Airflow에서는 SQL, Bash 스크립트 등 다양한 작업에서 파라미터 값 설정을 유연하게 해주는 역할을 합니다. 이 글에서는 Jinja 템플릿이 무엇인지, Airflow에서 어떤 식으로 활용되는지 구체적으로 알아보겠습니다.Jinja 템플릿이란?Jinja 템플릿은 문서에서 특정 양식으로 작성된 값을 런타임에 실제 값으로 치환해주는 처리 엔진입니다. Python 기반의 여러 프레임워크에서 지원되며, 대표적으로 Flask나 Django에서 HTML 템플릿을 화면에 렌더링할 때 사용됩니다. Airflow에서는 SQ..

MLOps/Airflow 2024.11.03

[Airflow] op_args와 op_kwargs 사용하기

Airflow는 데이터 파이프라인 작업을 DAG(Directed Acyclic Graph)로 정의하여 작업 순서를 지정할 수 있습니다. 이때 PythonOperator를 통해 Python 함수를 task로 사용할 수 있는데, 이 함수에 인자를 전달할 때 op_args와 op_kwargs 파라미터를 사용합니다.op_args: 위치 인자 전달op_args는 위치 기반 인자(positional arguments)를 전달하기 위한 리스트 형태의 파라미터입니다. 이를 통해 함수에 순서대로 값을 전달할 수 있습니다. 예시def my_task_with_args(arg1, *args): print(f"arg1: {arg1}") print(f"Additional args: {args}") PythonOp..

MLOps/Airflow 2024.11.03