MLOps/Airflow

[Airflow] File Sensor에 대하여

monkeykim 2024. 11. 21. 01:32
File Sensor란?

Airflow의 FileSensor는 파일 시스템에서 특정 파일이나 디렉토리가 존재하는지 감지하기 위해 사용됩니다. 워크플로우에서 다른 작업이 파일의 존재 여부에 의존할 때 매우 유용합니다. FileSensor는 주기적으로 파일을 확인하며, 파일이 존재하면 성공으로 처리되고, 존재하지 않으면 설정된 간격 동안 재시도합니다.


FileSensor의 주요 매개변수

  1. fs_conn_id (File Connection ID)
    • Airflow에서 설정된 파일 연결 ID를 참조합니다.
    • 기본적으로 FileSystemHook을 통해 연결을 관리합니다.
  2. filepath (파일 경로)
    • 파일의 상대 경로를 지정합니다.
    • fs_conn_id로 설정된 기본 경로를 기준으로 상대적인 위치를 입력합니다.
  3. recursive (재귀 탐색)
    • True로 설정하면 디렉토리를 재귀적으로 탐색하여 파일을 찾습니다.
  4. poke_interval
    • 센서가 파일의 존재를 확인하는 간격(초 단위)입니다.
  5. timeout
    • 파일이 감지될 때까지 기다리는 최대 시간(초 단위)입니다.
    • 이 시간이 지나면 센서는 실패로 간주됩니다.
  6. mode
    • 센서의 동작 모드를 설정합니다.
      • poke: 센서가 활성화 상태에서 지속적으로 파일을 확인합니다.
      • reschedule: 확인 후 휴면 상태로 전환되며, 자원을 절약합니다.

FileSystemHook(FSHook)

FileSystemHook은 파일 시스템과 연결을 관리하는 Airflow의 Hook입니다. fs_conn_id를 참조하여 파일 시스템 경로에 접근할 수 있도록 도와줍니다. 이 Hook은 FileSensor와 함께 사용되며, 파일 시스템에서 작업을 간단히 수행할 수 있는 메소드를 제공합니다.

FSHook 공식 문서 확인

 

airflow.hooks.filesystem — Airflow Documentation

 

airflow.apache.org


glob 모듈이란?

Python의 glob 모듈은 파일 시스템 경로를 기반으로 특정 패턴에 매칭되는 파일이나 디렉토리를 찾아 리스트로 반환합니다. Airflow에서 파일 경로를 다룰 때 유용하게 활용할 수 있습니다.

from glob import glob

# 디렉토리 내 모든 파일 찾기
print(glob('/home/kim/*'))

# 특정 파일 찾기
print(glob('/home/kim/docker-compose.yaml'))

# 하위 디렉토리까지 모두 탐색
print(glob('/home/kim/**', recursive=True))

FileSensor 사용 예제

from airflow import DAG
from airflow.sensors.filesystem import FileSensor
import pendulum

# DAG 정의
with DAG(
    dag_id='dag_file_sensor',
    schedule="30 6 * * *",  # 매일 오전 6:30에 실행
    start_date=pendulum.datetime(2024, 11, 15, tz="Asia/Seoul"),
    catchup=False
) as dag:
    # FileSensor 정의
    rainfall_sensor = FileSensor(
        task_id='rainfall_sensor',
        fs_conn_id='conn_file_airflow_files',  # 파일 연결 ID
        filepath='ListRainfallService/ListRainfallService.csv',  # 파일 경로
        recursive=False,  # 재귀 탐색 비활성화
        poke_interval=60,  # 파일 확인 간격: 60초
        timeout=60*60*24,  # 최대 대기 시간: 24시간
        mode='reschedule'  # 자원 절약 모드
    )

코드 설명

  1. fs_conn_id 설정Airflow web ui의 connection 탭에서 Connection Type을 File (path)로 변경한 뒤, Connection Id와 Path를 입력합니다. Path에는 파일 경로를 입력하면 됩니다.
  2. 파일 경로 설정
    FileSensor는 fs_conn_id로 지정된 연결에서 상대 경로로 파일을 찾습니다. 예를 들어, filepath='ListRainfallService/ListRainfallService.csv'는 연결된 기본 디렉토리에서 ListRainfallService 폴더 아래의 파일을 의미합니다.
  3. 파일 존재 확인
    설정된 poke_interval 간격으로 파일의 존재 여부를 확인합니다. 재귀 탐색이 필요한 경우 recursive=True로 설정합니다.
  4. 타임아웃 처리
    파일이 설정된 timeout 시간 내에 존재하지 않으면 센서가 실패로 종료됩니다.
  5. 모드 선택
    reschedule 모드는 파일 확인 후 비활성 상태로 전환되어, poke 모드보다 효율적으로 자원을 사용합니다.

Connection 등록

 

file sensor 결과 로그