Dag 4

[ClimaML] 커스텀 센서로 안정성 높은 데이터 파이프라인 구축

프로젝트 개요지난 시간 공공데이터 API를 활용하여 10년간의 기상 데이터를 PostgreSQL 데이터베이스에 적재하는 작업에 이어 이번 시간에는 매일 새로운 데이터를 수집하는 자동화된 데이터 파이프라인을 구축하는 프로젝트를 진행했습니다.이번 포스팅에서는 다음과 같은 내용을 다룹니다:매일 기상 데이터를 적재하는 자동화 DAG 작성안정성을 높이기 위한 커스텀 센서 작성진행 중 겪었던 문제와 해결 과정1. 매일 데이터 적재를 위한 자동화 DAG 작성요구사항매일 전날(D - 1) 데이터를 API에서 가져와 DB에 적재.Airflow DAG를 작성하여 스케줄링.구현된 DAGDAG는 매일 13시에 실행되며, 전날 데이터를 API에서 가져와 DB에 적재하는 작업을 수행합니다. 이렇게 스케줄링을 한 이유는 기상 AP..

Projects/ClimaML 2024.11.25

[Airflow] ExternalTaskSensor: DAG 간 의존성 설정과 활용

DAG 간 의존성을 설정하는 방법Airflow에서는 DAG 간 의존성을 설정하는 데 두 가지 주요 방법을 제공합니다.1. TriggerDagRunOperator한 DAG의 특정 작업이 완료되었을 때 다른 DAG을 실행하도록 설정할 수 있습니다.DAG 간의 순차적 실행이 필요할 때 유용합니다.2. ExternalTaskSensor외부 DAG의 특정 작업(Task)이 완료된 후 현재 DAG의 작업(Task)을 실행하고 싶을 때 사용됩니다.작업 간의 동기화를 보장하며, 복잡한 DAG 네트워크에서 의존성을 쉽게 관리할 수 있습니다.https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/index.html ai..

MLOps/Airflow 2024.11.24

[Airflow] DAG 실행과 스케줄링

1. DAG 파일 로드 및 파싱Airflow에서 모든 워크플로는 DAG(Directed Acyclic Graph) 형태로 정의됩니다. 이 DAG은 Python 코드로 작성되며, 작업의 순서와 의존성을 설정하는 방향성 비순환 그래프입니다. Airflow의 스케줄러(Scheduler)는 설정된 디렉토리를 주기적으로 확인하여 새로 생성되거나 수정된 DAG 파일이 있는지 감지합니다.(이로 인해 코드를 작성할 때 메모리 효율적으로 작성을 해야 함)스케줄러는 DAG 파일을 로드하고 파싱하여 DAG 객체를 생성하며, 이를 메타데이터 데이터베이스에 저장합니다. 이때 DAG에 정의된 모든 작업(Task)과 그 의존 관계도 함께 등록됩니다. 덕분에 Airflow는 DAG의 실행 시점을 정확히 파악하고 DAG이 완료될 때까..

MLOps/Airflow 2024.11.13

[Airflow] BranchPythonOperator로 Task 분기 처리하기

데이터 파이프라인을 구성할 때, 상황에 따라 특정 Task만 실행해야 하는 경우가 자주 발생합니다. Airflow에서는 BranchPythonOperator를 사용하여 Task의 분기 처리를 할 수 있고 @task.branch 데코레이터와 BaseBranchOperator를 상속하여 직접 커스터마이징하는 방법도 존재합니다.이 글에서는 세가지 방법을 사용하여 Task를 분기하는 방법을 코드 예제와 함께 설명하겠습니다.BranchPythonOperator로 Task 분기 처리하기BranchPythonOperator는 특정 조건에 따라 실행할 Task의 ID를 리턴하여 분기 처리를 수행합니다. 이때, 함수의 리턴값이 분기처리의 핵심입니다. BranchPythonOperator에서 리턴된 값이 후속 Task의 ..

MLOps/Airflow 2024.11.08