MLOps/Airflow

[Airflow] DAG 실행과 스케줄링

monkeykim 2024. 11. 13. 00:10

1. DAG 파일 로드 및 파싱

Airflow에서 모든 워크플로는 DAG(Directed Acyclic Graph) 형태로 정의됩니다. 이 DAG은 Python 코드로 작성되며, 작업의 순서와 의존성을 설정하는 방향성 비순환 그래프입니다. Airflow의 스케줄러(Scheduler)는 설정된 디렉토리를 주기적으로 확인하여 새로 생성되거나 수정된 DAG 파일이 있는지 감지합니다.(이로 인해 코드를 작성할 때 메모리 효율적으로 작성을 해야 함)

스케줄러는 DAG 파일을 로드하고 파싱하여 DAG 객체를 생성하며, 이를 메타데이터 데이터베이스에 저장합니다. 이때 DAG에 정의된 모든 작업(Task)과 그 의존 관계도 함께 등록됩니다. 덕분에 Airflow는 DAG의 실행 시점을 정확히 파악하고 DAG이 완료될 때까지 순서대로 작업을 실행할 수 있게 됩니다.


2. DAG의 스케줄 확인

Airflow에서 DAG의 실행 주기는 schedule_interval 속성으로 정의됩니다. 예를 들어, 매일, 매시간, 또는 특정 요일마다 실행되도록 설정할 수 있습니다. 스케줄러는 메타데이터 데이터베이스에 저장된 DAG의 스케줄 정보를 주기적으로 확인하여 실행 시간이 도래한 DAG을 감지하고, 실행 준비를 합니다.

스케줄러는 DAG의 실행 조건에 맞춰 DAG 인스턴스를 생성합니다. 이 DAG 인스턴스에는 DAG이 실행될 Task와 그 의존성이 포함되어 있습니다. 이로써 Airflow는 DAG 실행 시점과 Task 간의 순서를 명확히 관리할 수 있습니다.


3. DAG 인스턴스 생성과 Task 상태 관리

스케줄러가 실행할 시간이 된 DAG을 감지하면, DAG 인스턴스를 생성하고 각 Task의 상태를 '대기 중(None)'으로 설정합니다. Task는 여러 개의 연산자로 구성되며, 각 연산자는 Bash, Python, Email 등 다양한 작업을 수행할 수 있습니다.

DAG 내에서 Task는 서로 의존성을 가질 수 있습니다. 예를 들어 Task B가 Task A에 의존하는 경우, 스케줄러는 Task A가 성공적으로 완료되기 전까지는 Task B를 실행하지 않습니다. 스케줄러는 DAG의 의존성을 확인하여, 실행 준비가 된 Task의 상태를 'Queued'로 변경하여 실행 대기 상태로 둡니다.


4. Task 큐잉 및 Executor와의 상호작용

Airflow 스케줄러는 'Queued' 상태의 Task들을 작업 큐에 추가합니다. 이때 Airflow의 Executor가 큐의 Task를 실행하는 역할을 맡습니다. Executor는 여러 종류가 있으며, 다음과 같이 나뉩니다:

  • LocalExecutor: 한 서버에서 여러 프로세스를 통해 병렬 작업을 실행하는 방식입니다.
  • CeleryExecutor: 여러 워커를 통해 작업을 분산 처리할 수 있어 대규모 워크플로에서 유용합니다.

Executor는 큐에서 Task를 가져와 실제로 실행하고, 성공하면 상태를 'Success'로, 실패하면 'Failed'로 메타데이터 데이터베이스에 기록합니다.


5. Task 상태 업데이트와 다음 Task 실행

스케줄러는 지속적으로 메타데이터 데이터베이스를 확인하여 각 Task의 상태가 업데이트되었는지 감지합니다. 만약 선행 Task가 성공적으로 완료된 경우, 스케줄러는 해당 Task의 의존성을 확인하고 다음 Task를 'Queued' 상태로 전환하여 Executor가 처리할 수 있도록 준비합니다.

이 과정은 DAG의 모든 Task가 완료될 때까지 반복되며, 각 Task의 완료 여부에 따라 의존 관계를 기반으로 순차적으로 실행됩니다.


6. 스케줄러의 지속적인 상태 확인과 재실행

스케줄러는 DAG의 상태와 스케줄을 주기적으로 확인하며, 다음 실행 시점이 도래한 DAG을 큐에 추가합니다. 특정 Task가 실패한 경우에는 재시도 설정에 따라 스케줄러가 자동으로 다시 실행하거나, 웹 UI를 통해 사용자가 직접 재실행을 요청할 수 있습니다.

스케줄러는 모든 DAG의 상태를 추적하고, 주기적으로 DAG의 실행 조건을 확인하여, 매일 또는 지정된 주기마다 DAG을 실행할 수 있도록 관리합니다.


7. DAG 종료와 로그 관리

모든 Task가 완료되면 DAG 실행이 종료되며, 실행 기록은 메타데이터 데이터베이스에 저장됩니다. 각 Task는 로그를 생성하며, 이 로그는 Airflow 웹 UI를 통해 확인할 수 있습니다. 덕분에 사용자는 언제든지 각 Task의 실행 내역과 오류 원인을 손쉽게 파악할 수 있습니다.


Airflow의 작동 원리 요약

Airflow는 DAG 파일을 로드하고 파싱하여 워크플로를 관리하고, 스케줄러와 Executor의 협력으로 모든 Task가 효율적으로 실행되도록 제어합니다. Airflow의 내부 작동 원리는 다음과 같은 흐름으로 요약할 수 있습니다:

  1. DAG 파일을 로드하고 파싱하여 메타데이터 데이터베이스에 저장합니다.
  2. 스케줄러가 DAG의 스케줄을 확인하고 실행 시간이 된 DAG을 인스턴스로 생성합니다.
  3. Task 의존성을 관리하며 'Queued' 상태로 Task를 작업 큐에 추가합니다.
  4. Executor가 큐의 Task를 실행하고 상태를 업데이트합니다.
  5. 스케줄러는 모든 Task가 완료될 때까지 상태를 주기적으로 확인하며 다음 Task를 실행합니다.
  6. DAG이 완료되면 메타데이터 데이터베이스에 기록되며, 로그는 웹 UI에서 확인할 수 있습니다.