Projects/ClimaML

[Airflow] 기상 데이터 적재 자동화: 2014년부터 2024년까지

monkeykim 2024. 11. 18. 00:04

들어가며

Airflow를 활용하여 2014년부터 2024년까지의 기상 데이터를 공공 API를 통해 요청하고, PostgreSQL 데이터베이스에 저장하는 과정을 설명합니다.

  1. 디렉토리 구조 및 common 폴더: 프로젝트 디렉토리 구조와 공통으로 사용될 파일 정의
  2. Airflow DAG 설계: 데이터 적재 작업을 정의.
  3. Custom Operator 작성: API 요청과 데이터 적재 로직 구현.
  4. PostgreSQL 데이터베이스와 스키마 자동 생성.

프로젝트 디렉토리 구조

아래는 현재까지의 프로젝트 디렉토리 구조입니다:

 

├── dags/
│   ├── climaml_historical_weather_data_dag.py  # DAG 정의 파일
├── plugins/
│   ├── operators/
│   │   ├── climaml_fetch_historical_weather_data_operator.py  # Custom Operator
│   ├── common/
│   │   ├── climaml_column_mapping.py  # 컬럼 매핑 정의
│   │   ├── climaml_data_utils.py  # API 요청 및 데이터 처리 유틸리티
├────────

 


common/climaml_column_mapping.py

이 파일은 API 응답 데이터의 컬럼을 PostgreSQL 테이블 컬럼 이름에 매핑합니다.

# 공공 데이터 포털에서 사용하는 원본데이터 키를 
# postgresql에 저장할 컬럼 이름으로 변환하기 위한 매핑 변수
# 요청하는 API 키가 변경이 되면 이 부분에서만 변경을 해주면 된다.
# API가 업데이트되어 avgTa라는 키가 averageTemperature로 바뀐다면 SELECTED_COLUMNS에서만 수정하면 됨.

SELECTED_COLUMNS = {
    'stnId': 'stn_id',          # 관측소 ID
    'tm': 'tm',                 # 일시
    'avgTa': 'avg_ta',          # 평균 기온 (°C)
    'minTa': 'min_ta',          # 최저 기온 (°C)
    'maxTa': 'max_ta',          # 최고 기온 (°C)
    'sumRn': 'sum_rn',          # 일 강수량 (mm)
    'avgWs': 'avg_ws',          # 평균 풍속 (m/s)
    'avgRhm': 'avg_rhm',        # 평균 상대 습도 (%)
    'avgTd': 'avg_td',          # 평균 이슬점 온도 (°C)
    'avgPs': 'avg_ps',          # 평균 현지 기압 (hPa)
    'ssDur': 'ss_dur'           # 가조 시간 (hr)
}

common/climaml_data_utils.py

이 파일은 API 요청과 데이터를 DataFrame으로 변환하는 유틸리티 함수입니다.

import requests
import pandas as pd
from common.climaml_column_mapping import SELECTED_COLUMNS


def fetch_weather_data(params_base, station_ids, url):
    all_data = []
    for station_id in station_ids:
        params = params_base.copy()
        params['stnIds'] = station_id
        response = requests.get(url, params=params)

        if response.status_code == 200:  # 요청 성공
            print(f"[INFO] Successfully fetched data for station {station_id}.")
            try:
                data = response.json()  # JSON 변환 시도
            except ValueError as e:
                print(f"[ERROR] Failed to parse JSON response for station {station_id}. Error: {e}")
                print(f"[DEBUG] Response content: {response.text[:500]}")  # 응답 내용 일부 출력
                continue  # 다음 관측소로 넘어감
            
            # 데이터 추출 및 필터링
            items = data.get('response', {}).get('body', {}).get('items', {}).get('item', [])
            if not items:
                print(f"[INFO] No data found for station {station_id}.")
                continue

            filtered_data = [
                {new_key: item.get(old_key, None) for old_key, new_key in SELECTED_COLUMNS.items()}
                for item in items
            ]
            all_data.extend(filtered_data)
        else:  # 요청 실패
            print(f"[ERROR] Failed to fetch data for station {station_id}. Status code: {response.status_code}")
            print(f"[DEBUG] Response text: {response.text[:500]}")
            continue

    return pd.DataFrame(all_data)

1. Airflow DAG 설계

  • 2014년 11월 17일부터 2024년 11월 16일까지의 데이터 요청.
  • 공공 데이터 포털 API를 통해 기상 데이터 수집.
  • PostgreSQL에 데이터를 적재하며 스키마를 자동 생성.

코드 예제: Airflow DAG 정의

from airflow import DAG
from operators.climaml_fetch_historical_weather_data_operator import ClimamlFetchHistoricalWeatherDataOperator
import pendulum

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='climaml_historical_weather_data_dag',
    default_args=default_args,
    description='10년치 기상 데이터를 수집하고 PostgreSQL에 적재합니다.',
    schedule_interval=None,
    start_date=pendulum.datetime(2024, 11, 17, tz="Asia/Seoul"),
    catchup=False,
) as dag:

	# 사용할 관측소
    station_ids = [
        '108',  # 서울
        '105',  # 강릉
        '114',  # 원주
        '112',  # 인천
        '119',  # 수원
        '127',  # 충주
        '131',  # 청주
        '137',  # 상주
        '283',  # 경주
        '146',  # 전주
        '247',  # 남원
        '184',  # 제주
        '189',  # 서귀포
    ]

    climaml_fetch_historical_data = ClimamlFetchHistoricalWeatherDataOperator(
        task_id='climaml_fetch_historical_data',
        conn_id='conn-db-postgres-custom',  # connection id
        start_date='2014-11-17',
        end_date='2024-11-16',
        station_ids=station_ids,
    )
    climaml_fetch_historical_data

2. Custom Operator 작성

Airflow에서 작업을 보다 효율적으로 수행하기 위해 Custom Operator를 작성했습니다. 이 Operator는 다음의 역할을 수행합니다:

  • API 요청: 지정된 날짜와 관측소 번호를 기반으로 데이터를 요청.
  • PostgreSQL 데이터베이스에 적재: 요청한 데이터를 처리하고 DB에 저장.

코드 예제: Custom Operator

from airflow.models import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from common.climaml_data_utils import fetch_weather_data
import pendulum
import pandas as pd
import numpy as np

class ClimamlFetchHistoricalWeatherDataOperator(BaseOperator):
    def __init__(self, conn_id, start_date, end_date, station_ids, **kwargs):
        super().__init__(**kwargs)
        self.conn_id = conn_id
        self.start_date = pendulum.parse(start_date)  # 시작 날짜
        self.end_date = pendulum.parse(end_date)  # 종료 날짜
        self.station_ids = station_ids

    def execute(self, context):
        postgres_hook = PostgresHook(postgres_conn_id=self.conn_id)
        engine = postgres_hook.get_sqlalchemy_engine()

        current_start = self.start_date
        current_end = self.end_date

        while current_start <= current_end:
        	# end_of_period를 90일씩 증가시키며, end_date보다 커지게 되면, min 메소드로 end_date를 전달
            end_of_period = min(current_start.add(days=90), current_end)

            params_base = {
                'serviceKey': Variable.get("data_go_kr"),  # Variable에 api key 등록
                'pageNo': '1',
                'numOfRows': '999',  # 1000개는 요청할 수 없다고 함
                'dataType': 'JSON',
                'dataCd': 'ASOS',
                'dateCd': 'DAY',
                'startDt': current_start.format('YYYYMMDD'),
                'endDt': end_of_period.format('YYYYMMDD'),
            }

            df = fetch_weather_data(params_base=params_base, station_ids=self.station_ids, url="http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList")
            df.drop_duplicates(inplace=True)  # 중복된 data frame 제거
            df.replace("", np.nan, inplace=True)  # 빈값은 nan으로 변경
            df.to_sql('clima_ml_weather_data', engine, if_exists='append', index=False)  # db에 저장

            current_start = end_of_period.add(days=1)

 

코드 풀이

  • PostgreSQL 연결
    • PostgresHook: PostgreSQL과 연결합니다.
    • SQLAlchemy 엔진 생성: postgres_hook.get_sqlalchemy_engine()을 사용하여 SQLAlchemy의 Engine 객체를 반환합니다. 이 객체는 DB와 상호작용할 때 사용됩니다.
  • 데이터 저장
    • to_sql: Transform된 Dataframe을 PostgreSQL DB에 저장합니다.
      • clima_ml_weather_data: 데이터가 저장될 Table 이름입니다.
      • engine: 데이터베이스 연결 객체입니다.
      • if_exists = 'append': 기존에 테이블이 존재하면 데이터를 추가합니다.
      • index=False: 데이터프레임의 인덱스는 저장하지 않습니다.
  • 다음 날짜 범위로 이동
    • current_start를 end_of_period의 다음 날로 이동하여 루프를 다시 시작할 때 새로운 날짜 범위( + 90일)에 대해 데이터를 처리합니다.

3. PostgreSQL Hook 및 스키마 자동 생성

데이터베이스를 직접 관리하지 않아도, Pandas의 to_sql 메서드와 Airflow의 PostgreSQL Hook을 활용해 스키마를 자동 생성했습니다.

engine = postgres_hook.get_sqlalchemy_engine()
df.to_sql('clima_ml_weather_data', engine, if_exists='append', index=False)

결과

  • DAG 실행: 2014년부터 2024년까지 데이터를 성공적으로 적재 (47,488 row).
  • 자동화된 작업: DAG과 Custom Operator를 통해 재사용 가능한 작업 흐름 완성.
  • 확장성 확보: 향후 새로운 관측소 추가나 기간 확장도 쉽게 처리 가능.

Airflow web ui (success task)
각 관측소 별 10년치 data를 DB에 적재 완료


앞으로 할 일

  1. 매일 기상데이터를 업데이트 하는 Custom Operater 작성
    • 목적: 매일 기상 데이터를 API에서 가져와 clima_ml_weather_data DB에 추가
    • 구현 방향:
      • Airflow 스케줄러로 하루에 한 번 실행
      • 이전 날의 데이터를 요청하도록 날짜를 자동으로 계산
      • 데이터 정제 및 컬럼 매핑 (동일한 작업)
      • PostgreSQL Hook으로 DB에 적재 (동일한 작업)
  2. AI/ML 모델 학습 및 관리
    • LSTM 모델을 사용하여 ETL한 기상 데이터를 학습
    • MLflow를 이용하여 모델을 관리하고 성능을 추적
    • 특정 기간 후 새로운 데이터를 바탕으로 재학습