airflow 20

[Airflow] SimpleHttpOperator에 대해 알아보기

Airflow Operator와 Provider OperatorAirflow 오퍼레이터는 DAG의 작업을 정의하는 빌딩 블록입니다. 오퍼레이터는 다음과 같은 범주로 나뉩니다:액션 오퍼레이터: 특정 작업을 수행 (예: BashOperator, PythonOperator).전송 오퍼레이터: 시스템 간 데이터 이동 (예: S3ToRedshiftOperator).센서 오퍼레이터: 특정 조건을 기다림 (예: S3KeySensor).https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html Operators — Airflow Documentation airflow.apache.orgProvider프로바이더는 Airflow의 기..

MLOps/Airflow 2024.11.10

[Airflow] Trigger Rule, Task Group, Edge Label, DAG 간 의존성 관리

Trigger Rule: 상위 태스크의 상태에 따라 하위 태스크의 실행을 제어합니다.Task Group: 태스크를 논리적으로 조직하여 가독성을 높이고 유지 보수를 쉽게 합니다.Edge Label: 태스크 간 의존성을 주석 처리하여 DAG 시각화를 더 명확히 합니다.DAG 간 의존성 관리: TriggerDagRunOperator와 ExternalTaskSensor를 사용하여 여러 DAG 간의 의존성을 관리합니다.Airflow에서의 트리거 규칙 이해하기Trigger Rule이란?Airflow에서 Trigger Rule은 상위 태스크의 상태에 따라 하위 태스크가 실행될지 여부를 결정하는 규칙입니다. 기본적으로는 모든 상위 태스크가 성공해야 하위 태스크가 실행됩니다(all_success). 하지만 상위 태스크 ..

MLOps/Airflow 2024.11.10

[Airflow] BranchPythonOperator로 Task 분기 처리하기

데이터 파이프라인을 구성할 때, 상황에 따라 특정 Task만 실행해야 하는 경우가 자주 발생합니다. Airflow에서는 BranchPythonOperator를 사용하여 Task의 분기 처리를 할 수 있고 @task.branch 데코레이터와 BaseBranchOperator를 상속하여 직접 커스터마이징하는 방법도 존재합니다.이 글에서는 세가지 방법을 사용하여 Task를 분기하는 방법을 코드 예제와 함께 설명하겠습니다.BranchPythonOperator로 Task 분기 처리하기BranchPythonOperator는 특정 조건에 따라 실행할 Task의 ID를 리턴하여 분기 처리를 수행합니다. 이때, 함수의 리턴값이 분기처리의 핵심입니다. BranchPythonOperator에서 리턴된 값이 후속 Task의 ..

MLOps/Airflow 2024.11.08

[Airflow] 전역 변수 Variable 이용하기

Airflow의 XCom은 특정 DAG 내부의 Task 간 데이터 공유에 적합하지만, 전역적으로 DAG 간에 데이터를 공유하기 위해서는 Variable을 사용할 수 있습니다. Variable은 모든 DAG에서 접근 가능한 전역 변수로, 필요에 따라 설정된 값을 DAG에서 불러와 사용하게 합니다.Variable 설정하기1. Airflow 서비스의 Admin 탭에 들어간 뒤, Variables를 선택합니다. 2. + 버튼을 클릭합니다. 3. Variable로 지정할 Key와 Value를 적어주고 Save를 클릭합니다. Description은 optional입니다. 4. 저장된 Variable을 확인합니다.Variable 변수 사용Variable 설정 및 사용Variable은 메타DB에 저장되며, Jinja ..

MLOps/Airflow 2024.11.07

[Airflow] BashOperator와 XCom을 활용하는 방법

Airflow에서 BashOperator는 유용하며, PythonOperator와 마찬가지로 XCom과 함께 태스크 간의 데이터를 주고 받아, 워크플로우 내에서 데이터를 효율적으로 활용할 수 있습니다. 이번 글에서는 BashOperator와 XCom의 상호작용 방식을 설명하고, BashOperator에서 데이터를 주고 받는 여러 방법을 말씀드리겠습니다. BashOperator에서 XCom 사용하기BashOperator는 bash_command나 env 파라미터에 템플릿 문법을 사용하여 데이터를 주고받을 수 있습니다. 이 과정에서 XCom을 활용해 다른 태스크로부터 데이터를 가져오거나, 데이터를 전달하는 작업이 가능합니다.기본 개념: XCom Push와 PullXCom은 Airflow 태스크 간에 데이터를..

MLOps/Airflow 2024.11.06

[Airflow] XCom 사용하기

Airflow에서는 DAG 내의 여러 Task 간에 데이터를 주고받을 수 있도록 XCom(Cross Communication) 기능을 제공합니다. XCom은 작은 크기의 데이터를 공유하기 위한 용도로 설계되었으며, Task 간에 값을 전달하거나 처리 결과를 공유하는 데 유용합니다. 주로 1GB 미만의 데이터에 적합하며, 대용량 데이터는 AWS S3, HDFS 등의 외부 솔루션을 통해 관리하는 것이 좋습니다.XCom이란?XCom은 DAG 내에서 Task 간에 데이터를 주고받기 위한 기술로, Task의 중간 결과를 다른 Task의 입력으로 사용할 수 있게 해줍니다. XCom에 저장된 데이터는 Airflow의 메타DB의 xcom 테이블에 저장되며, 이를 통해 여러 Task 간에 값을 전달할 수 있습니다.XCo..

MLOps/Airflow 2024.11.03

[Airflow] Macro 변수 사용

Airflow에서는 매크로 변수(macros)를 사용해 DAG 실행 시점에 필요한 날짜 연산을 동적으로 수행할 수 있습니다. 매크로 변수는 Airflow의 템플릿 엔진(Jinja)을 통해 작업에 필요한 시간 계산을 간편하게 해주는 유용한 도구입니다. 매크로 변수를 사용하면 복잡한 날짜 계산이 필요한 작업도 쉽게 설정할 수 있으며, 데이터 처리 주기에 맞춰 시작일과 종료일을 유연하게 설정할 수 있습니다.매크로 변수란?Airflow의 매크로 변수는 템플릿에서 다양한 날짜 연산을 지원하는 도구로, Python의 datetime 및 dateutil 라이브러리에 익숙하다면 더욱 효과적으로 사용할 수 있습니다. 주요 매크로 모듈은 다음과 같습니다:macros.datetime: 날짜와 시간 처리를 위한 모듈macro..

MLOps/Airflow 2024.11.03

[Airflow] @task 데코레이터 사용하기

Airflow에서는 파이썬 함수를 간단히 데코레이터를 통해 task로 변환할 수 있는 @task 데코레이터를 제공합니다. 이를 통해 PythonOperator를 직접 정의하는 것보다 효율적이고 간결하게 DAG 내의 task를 작성할 수 있습니다.데코레이터(Decorator)란?데코레이터는 함수를 '감싸서' 기능을 추가하는 파이썬 기능입니다. 함수를 인자로 전달하거나 함수 내부에 함수를 정의하는 것이 가능하기 때문에, 함수를 쉽게 확장할 수 있습니다. @데코레이터명으로 사용하며, Airflow에서는 이를 활용해 task를 정의합니다. Airflow에서의 @task 데코레이터 활용하기Airflow의 @task 데코레이터를 사용하면, 파이썬 함수 정의만으로 손쉽게 task를 생성할 수 있습니다. 이는 Airf..

MLOps/Airflow 2024.10.31

[Airflow] Python Operator로 외부 모듈을 Import하는 방법

1. Python Operator와 모듈 경로 이해하기Airflow에서 PythonOperator를 사용해 작업을 정의할 때, 외부 함수가 필요하면 해당 모듈을 DAG 내에서 import할 수 있습니다. 이때 import 경로를 정확히 지정하는 것이 중요합니다.Python 경로 설정 기본 원칙실행 중인 Python 파일과 동일한 디렉토리에 있는 모듈은 자동으로 import됩니다.pip로 설치한 라이브러리들은 자동으로 sys.path에 포함됩니다.sys.path 변수에 경로를 추가해, Python이 모듈을 찾을 수 있도록 설정할 수 있습니다.Airflow의 기본 sys.path 구성Airflow는 dags와 plugins 폴더를 자동으로 sys.path에 포함합니다. 따라서 이 두 폴더에 외부 모듈을 배치..

MLOps/Airflow 2024.10.30

[Airflow]Python Operator

Airflow의 python operator는 파이썬 함수를 직접적으로 실행할 수 있도록 해주는 오퍼레이터이다.이를 통해 특정 로직을 처리하거나 데이터를 가공하는 파이썬 함수를 작성한 뒤, ETL 파이프라인을 통해 작업을 간편하게 실행할 수 있다.PythonOperator 예제1. Airflow DAG 설정from airflow import DAGfrom airflow.operators.python_operator import PythonOperatorfrom datetime import datetime# 간단한 Python 함수 정의def my_python_function(): print("Hello from Python Operator!")# Airflow DAG 정의with DAG( 'p..

MLOps/Airflow 2024.10.26