Projects/ClimaML

[ClimaML] 커스텀 센서로 안정성 높은 데이터 파이프라인 구축

monkeykim 2024. 11. 25. 00:16

프로젝트 개요

지난 시간 공공데이터 API를 활용하여 10년간의 기상 데이터를 PostgreSQL 데이터베이스에 적재하는 작업에 이어 이번 시간에는 매일 새로운 데이터를 수집하는 자동화된 데이터 파이프라인을 구축하는 프로젝트를 진행했습니다.

이번 포스팅에서는 다음과 같은 내용을 다룹니다:

  1. 매일 기상 데이터를 적재하는 자동화 DAG 작성
  2. 안정성을 높이기 위한 커스텀 센서 작성
  3. 진행 중 겪었던 문제와 해결 과정

1. 매일 데이터 적재를 위한 자동화 DAG 작성

요구사항

  • 매일 전날(D - 1) 데이터를 API에서 가져와 DB에 적재.
  • Airflow DAG를 작성하여 스케줄링.

구현된 DAG

DAG는 매일 13시에 실행되며, 전날 데이터를 API에서 가져와 DB에 적재하는 작업을 수행합니다. 이렇게 스케줄링을 한 이유는 기상 API 명세를 확인했을 때, 데이터의 갱신 주기가 전일 11시 이후이기 때문이었습니다. 시간 계산은 context의 logical_date를 통해 동적으로 날짜를 계산했습니다. 

from airflow import DAG
from datetime import timedelta
from operators.climaml_fetch_historical_weather_data_operator import ClimamlFetchHistoricalWeatherDataOperator
import pendulum

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='climaml_daily_weather_data_dag',
    default_args=default_args,
    description='매일 전날 공공데이터의 기상 데이터를 요청한 후 데이터를 PostgreSQL에 적재합니다.',
    schedule="0 13 * * *",
    start_date=pendulum.datetime(2024, 11, 25, tz='Asia/Seoul'),
    catchup=False,
) as dag:

 

2. 안정성을 높이기 위한 커스텀 센서 작성

요구사항

  • API에서 요청 가능한 데이터가 없을 경우, 데이터 적재 작업을 중단해야 했습니다.
  • API의 응답 상태를 확인하는 센서를 작성.

커스텀 센서

센서는 전날 데이터를 미리 요청하여 유효성을 확인합니다.

from airflow.sensors.base import BaseSensorOperator
import requests

class ClimamlDataSensor(BaseSensorOperator):
    template_fields = ('check_date', 'api_key')

    def __init__(self, check_date, url, api_key, station_ids, **kwargs):
        super().__init__(**kwargs)
        self.check_date = check_date
        self.url = url
        self.api_key = api_key
        self.station_ids = station_ids

    def poke(self, context):
        params = {
            'serviceKey': self.api_key,
            'pageNo': '1',
            'numOfRows': '1',
            'dataType': 'JSON',
            'dataCd': 'ASOS',
            'dateCd': 'DAY',
            'startDt': self.check_date,
            'endDt': self.check_date,
            'stnIds': self.station_ids[0]
        }
        try:
            response = requests.get(self.url, params=params)
            data = response.json()
            result_code = data.get('response', {}).get('header', {}).get('resultCode')
            return result_code == "00"
        except Exception as e:
            self.log.error(f"API 요청 실패: {e}")
            return False

기상 데이터 API의 명세에 작성돼있는 에러 코드입니다. '00' 이 아닌 상황에 대해서는 return False를 주도록 Custom Sensor를 작성하였습니다.


매일 작업을 진행하는 DAG 전체 코드

from airflow import DAG
from datetime import datetime, timedelta
from operators.climaml_fetch_historical_weather_data_operator import ClimamlFetchHistoricalWeatherDataOperator
from sensors.climaml_data_sensor import ClimamlDataSensor
import pendulum

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='climaml_daily_weather_data_dag',
    default_args=default_args,
    description='매일 전날 공공데이터의 기상 데이터를 요청한 후 조회가 가능하면 데이터를 가지고와 postgreSQL에 적재합니다.',
    schedule="0 13 * * *",
    start_date=pendulum.datetime(2024, 11, 25, tz='Asia/Seoul'),
    catchup=False
) as dag:
    # 관측소 번호와 이름
    station_ids = [
        '108',  # 서울
        '105',  # 강릉
        '114',  # 원주
        '112',  # 인천
        '119',  # 수원
        '127',  # 충주
        '131',  # 청주
        '137',  # 상주
        '283',  # 경주
        '146',  # 전주
        '247',  # 남원
        '184',  # 제주
        '189',  # 서귀포
    ]

    # 전일 데이터 요청이 가능한지 여부 확인
    check_date = "{{ (execution_date - macros.timedelta(days=1)).strftime('%Y%m%d') }}"
    climaml_check_api_avaliable = ClimamlDataSensor(
        task_id='climaml_check_api_avaliable',
        check_date=check_date,
        url='http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList',
        api_key="{{ var.value.data_go_kr }}",
        station_ids=station_ids,
        poke_interval=300,  # 5분마다 확인
        timeout=3600,  # 최대 1시간 대기
    )

    climaml_fetch_daily_data = ClimamlFetchHistoricalWeatherDataOperator(
        task_id='climaml_fetch_daily_data',
        conn_id='conn-db-postgres-custom',
        station_ids=station_ids,
    )

    climaml_check_api_avaliable >> climaml_fetch_daily_data

DAG graph

 


4. 문제와 해결 과정

  1. 템플릿 필드 사용 관련 문제
    • 템플릿 필드를 사용할 때 Airflow 템플릿 엔진과 날짜 형식의 충돌 문제가 발생.
    • 해결: 템플릿 필드 대신 context의 logical_date를 활용.
  2. 센서를 통한 데이터 유효성 검사
    • 데이터가 없는 경우에도 적재를 시도하는 문제를 방지하기 위해 센서를 추가.

결론

이번 프로젝트를 통해 안정적이고 효율적인 데이터 파이프라인을 구축했습니다. 특히 Airflow의 커스텀 오퍼레이터와 센서를 활용하여 유연성과 안정성을 높일 수 있었습니다. 앞으로는 데이터를 활용한 예측 모델 개발에 집중할 예정입니다.

다음 블로그에서는 데이터 시각화 및 예측 모델을 소개하겠습니다.