Airflow Operator와 Provider
Operator
Airflow 오퍼레이터는 DAG의 작업을 정의하는 빌딩 블록입니다. 오퍼레이터는 다음과 같은 범주로 나뉩니다:
- 액션 오퍼레이터: 특정 작업을 수행 (예: BashOperator, PythonOperator).
- 전송 오퍼레이터: 시스템 간 데이터 이동 (예: S3ToRedshiftOperator).
- 센서 오퍼레이터: 특정 조건을 기다림 (예: S3KeySensor).
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html
Provider
프로바이더는 Airflow의 기능을 확장하여 외부 시스템과 상호작용할 수 있게 하는 패키지입니다. 예를 들어:
- AWS 프로바이더: AWS 서비스와의 상호작용
- Google Cloud 프로바이더: Google Cloud 서비스와의 상호작용
- HTTP 프로바이더: HTTP 요청 전송
- 데이터베이스 프로바이더: PostgreSQL, MySQL과 같은 데이터베이스와 상호작용
SimpleHttpOperator를 활용하여 서울시 공공 데이터 API 호출하기
SimpleHttpOperator란?
SimpleHttpOperator는 Airflow에서 HTTP 요청을 보내고 응답을 받을 수 있는 오퍼레이터입니다. REST API 호출을 통해 데이터를 처리할 때 유용하며, 요청 결과는 텍스트 형식으로 반환됩니다. API를 사용해 데이터를 수집하고자 할 때 간편하게 사용할 수 있습니다.
SimpleHttpOperator의 주요 기능
- http_conn_id: Airflow에 등록된 HTTP 연결 ID를 지정합니다.
- endpoint: 요청을 보낼 API의 엔드포인트를 설정합니다.
- method: GET, POST와 같은 HTTP 메소드를 지정합니다.
- headers: 요청 헤더를 설정하여 Content-Type, 인코딩 방식 등을 지정할 수 있습니다.
Airflow에서 API 커넥션 등록하기
- Airflow UI에 접속합니다.
- 상단 메뉴에서 어드민(Admin) > Connections로 이동합니다.
- Conn Id에 openapi.seoul.go.kr 등 연결 식별자를 입력하고, Conn Type으로 HTTP를 선택한 후 설정을 완료합니다.
API 키를 Variable로 관리하기
API 키를 Airflow Variable에 저장하면 보안성을 유지하면서도 코드에서 간편하게 참조할 수 있습니다.
- Airflow UI에 접속합니다.
- 어드민(Admin) > Variables로 이동하여 API 키를 등록합니다.
- API 키가 필요한 모든 DAG에서 Variable을 참조하면 코드에 키를 직접 노출하지 않아도 됩니다.
SimpleHttpOperator를 이용한 API 호출 예제 코드
SimpleHttpOperator를 사용하여 서울시 공공 데이터를 요청하고, xcom을 통해 응답을 후속 Task에 전달합니다.
import pendulum
from airflow import DAG
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.decorators import task
with DAG(
dag_id="dag_simple_http_operator",
schedule="30 6 * * *",
start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
''' 서울시 강우량 정보 '''
rain_fall_info = SimpleHttpOperator(
task_id='rain_fall_info',
http_conn_id='openapi.seoul.go.kr',
endpoint='{{var.value.apikey_openapi_seoul_go_kr}}/json/ListRainfallService/1/5/',
method='GET',
headers={'Content-Type': 'application/json',
'charset': 'utf-8',
'Accept': '*/*'
}
)
@task(task_id='process_rainfall_data')
def process_rainfall_data(**kwargs):
ti = kwargs['ti']
result = ti.xcom_pull(task_ids='rain_fall_info') # xcom에서 응답 데이터를 가져옴
import json
from pprint import pprint
pprint(json.loads(result))
rain_fall_info >> process_rainfall_data()
- http_conn_id: Airflow에 등록한 커넥션 ID를 사용하여 HTTP 연결을 설정합니다.
- endpoint: Variable로 등록된 API 키를 포함한 엔드포인트를 지정합니다.
- xcom_pull: SimpleHttpOperator는 응답 결과를 xcom에 저장하며, 후속 태스크에서는 xcom_pull 메서드를 사용해 응답 값을 가져올 수 있습니다.
XCom을 통한 응답 데이터 조회
SimpleHttpOperator의 응답 결과는 자동으로 xcom에 저장됩니다. 이 데이터를 후속 태스크에서 사용하려면 xcom_pull을 통해 가져옵니다.
마무리
Airflow의 SimpleHttpOperator를 사용하면 간단하게 HTTP 요청을 통해 데이터를 가져올 수 있으며, xcom을 통해 다른 태스크에 데이터를 전달할 수 있습니다. 특히 워크플로우 구축 시 API 키를 Variable에 저장하고, Airflow 커넥션을 통해 보안성을 유지하면서 효율적으로 데이터 파이프라인을 관리할 수 있습니다.
'MLOps > Airflow' 카테고리의 다른 글
[Airflow] DAG 실행과 스케줄링 (2) | 2024.11.13 |
---|---|
[Airflow] Custom Operator 만들기 (0) | 2024.11.11 |
[Airflow] Trigger Rule, Task Group, Edge Label, DAG 간 의존성 관리 (0) | 2024.11.10 |
[Airflow] BranchPythonOperator로 Task 분기 처리하기 (0) | 2024.11.08 |
[Airflow] 전역 변수 Variable 이용하기 (0) | 2024.11.07 |