File Sensor란?
Airflow의 FileSensor는 파일 시스템에서 특정 파일이나 디렉토리가 존재하는지 감지하기 위해 사용됩니다. 워크플로우에서 다른 작업이 파일의 존재 여부에 의존할 때 매우 유용합니다. FileSensor는 주기적으로 파일을 확인하며, 파일이 존재하면 성공으로 처리되고, 존재하지 않으면 설정된 간격 동안 재시도합니다.
FileSensor의 주요 매개변수
- fs_conn_id (File Connection ID)
- Airflow에서 설정된 파일 연결 ID를 참조합니다.
- 기본적으로 FileSystemHook을 통해 연결을 관리합니다.
- filepath (파일 경로)
- 파일의 상대 경로를 지정합니다.
- fs_conn_id로 설정된 기본 경로를 기준으로 상대적인 위치를 입력합니다.
- recursive (재귀 탐색)
- True로 설정하면 디렉토리를 재귀적으로 탐색하여 파일을 찾습니다.
- poke_interval
- 센서가 파일의 존재를 확인하는 간격(초 단위)입니다.
- timeout
- 파일이 감지될 때까지 기다리는 최대 시간(초 단위)입니다.
- 이 시간이 지나면 센서는 실패로 간주됩니다.
- mode
- 센서의 동작 모드를 설정합니다.
- poke: 센서가 활성화 상태에서 지속적으로 파일을 확인합니다.
- reschedule: 확인 후 휴면 상태로 전환되며, 자원을 절약합니다.
- 센서의 동작 모드를 설정합니다.
FileSystemHook(FSHook)
FileSystemHook은 파일 시스템과 연결을 관리하는 Airflow의 Hook입니다. fs_conn_id를 참조하여 파일 시스템 경로에 접근할 수 있도록 도와줍니다. 이 Hook은 FileSensor와 함께 사용되며, 파일 시스템에서 작업을 간단히 수행할 수 있는 메소드를 제공합니다.
glob 모듈이란?
Python의 glob 모듈은 파일 시스템 경로를 기반으로 특정 패턴에 매칭되는 파일이나 디렉토리를 찾아 리스트로 반환합니다. Airflow에서 파일 경로를 다룰 때 유용하게 활용할 수 있습니다.
from glob import glob
# 디렉토리 내 모든 파일 찾기
print(glob('/home/kim/*'))
# 특정 파일 찾기
print(glob('/home/kim/docker-compose.yaml'))
# 하위 디렉토리까지 모두 탐색
print(glob('/home/kim/**', recursive=True))
FileSensor 사용 예제
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
import pendulum
# DAG 정의
with DAG(
dag_id='dag_file_sensor',
schedule="30 6 * * *", # 매일 오전 6:30에 실행
start_date=pendulum.datetime(2024, 11, 15, tz="Asia/Seoul"),
catchup=False
) as dag:
# FileSensor 정의
rainfall_sensor = FileSensor(
task_id='rainfall_sensor',
fs_conn_id='conn_file_airflow_files', # 파일 연결 ID
filepath='ListRainfallService/ListRainfallService.csv', # 파일 경로
recursive=False, # 재귀 탐색 비활성화
poke_interval=60, # 파일 확인 간격: 60초
timeout=60*60*24, # 최대 대기 시간: 24시간
mode='reschedule' # 자원 절약 모드
)
코드 설명
- fs_conn_id 설정Airflow web ui의 connection 탭에서 Connection Type을 File (path)로 변경한 뒤, Connection Id와 Path를 입력합니다. Path에는 파일 경로를 입력하면 됩니다.
- 파일 경로 설정
FileSensor는 fs_conn_id로 지정된 연결에서 상대 경로로 파일을 찾습니다. 예를 들어, filepath='ListRainfallService/ListRainfallService.csv'는 연결된 기본 디렉토리에서 ListRainfallService 폴더 아래의 파일을 의미합니다. - 파일 존재 확인
설정된 poke_interval 간격으로 파일의 존재 여부를 확인합니다. 재귀 탐색이 필요한 경우 recursive=True로 설정합니다. - 타임아웃 처리
파일이 설정된 timeout 시간 내에 존재하지 않으면 센서가 실패로 종료됩니다. - 모드 선택
reschedule 모드는 파일 확인 후 비활성 상태로 전환되어, poke 모드보다 효율적으로 자원을 사용합니다.
'MLOps > Airflow' 카테고리의 다른 글
[Airflow] Dataset: DAG 간 의존성 관리 (0) | 2024.12.03 |
---|---|
[Airflow] ExternalTaskSensor: DAG 간 의존성 설정과 활용 (0) | 2024.11.24 |
[Airflow] Sensor에 대해 알아보자 (0) | 2024.11.19 |
[Airflow] Provider 패키지 설치 (2) | 2024.11.17 |
[Airflow] PostgreSQL 연동 - Docker Compose를 활용한 설정과 Custom Hook 사용 (0) | 2024.11.16 |