들어가며
Airflow를 활용하여 2014년부터 2024년까지의 기상 데이터를 공공 API를 통해 요청하고, PostgreSQL 데이터베이스에 저장하는 과정을 설명합니다.
- 디렉토리 구조 및 common 폴더: 프로젝트 디렉토리 구조와 공통으로 사용될 파일 정의
- Airflow DAG 설계: 데이터 적재 작업을 정의.
- Custom Operator 작성: API 요청과 데이터 적재 로직 구현.
- PostgreSQL 데이터베이스와 스키마 자동 생성.
프로젝트 디렉토리 구조
아래는 현재까지의 프로젝트 디렉토리 구조입니다:
├── dags/
│ ├── climaml_historical_weather_data_dag.py # DAG 정의 파일
├── plugins/
│ ├── operators/
│ │ ├── climaml_fetch_historical_weather_data_operator.py # Custom Operator
│ ├── common/
│ │ ├── climaml_column_mapping.py # 컬럼 매핑 정의
│ │ ├── climaml_data_utils.py # API 요청 및 데이터 처리 유틸리티
├────────
common/climaml_column_mapping.py
이 파일은 API 응답 데이터의 컬럼을 PostgreSQL 테이블 컬럼 이름에 매핑합니다.
# 공공 데이터 포털에서 사용하는 원본데이터 키를
# postgresql에 저장할 컬럼 이름으로 변환하기 위한 매핑 변수
# 요청하는 API 키가 변경이 되면 이 부분에서만 변경을 해주면 된다.
# API가 업데이트되어 avgTa라는 키가 averageTemperature로 바뀐다면 SELECTED_COLUMNS에서만 수정하면 됨.
SELECTED_COLUMNS = {
'stnId': 'stn_id', # 관측소 ID
'tm': 'tm', # 일시
'avgTa': 'avg_ta', # 평균 기온 (°C)
'minTa': 'min_ta', # 최저 기온 (°C)
'maxTa': 'max_ta', # 최고 기온 (°C)
'sumRn': 'sum_rn', # 일 강수량 (mm)
'avgWs': 'avg_ws', # 평균 풍속 (m/s)
'avgRhm': 'avg_rhm', # 평균 상대 습도 (%)
'avgTd': 'avg_td', # 평균 이슬점 온도 (°C)
'avgPs': 'avg_ps', # 평균 현지 기압 (hPa)
'ssDur': 'ss_dur' # 가조 시간 (hr)
}
common/climaml_data_utils.py
이 파일은 API 요청과 데이터를 DataFrame으로 변환하는 유틸리티 함수입니다.
import requests
import pandas as pd
from common.climaml_column_mapping import SELECTED_COLUMNS
def fetch_weather_data(params_base, station_ids, url):
all_data = []
for station_id in station_ids:
params = params_base.copy()
params['stnIds'] = station_id
response = requests.get(url, params=params)
if response.status_code == 200: # 요청 성공
print(f"[INFO] Successfully fetched data for station {station_id}.")
try:
data = response.json() # JSON 변환 시도
except ValueError as e:
print(f"[ERROR] Failed to parse JSON response for station {station_id}. Error: {e}")
print(f"[DEBUG] Response content: {response.text[:500]}") # 응답 내용 일부 출력
continue # 다음 관측소로 넘어감
# 데이터 추출 및 필터링
items = data.get('response', {}).get('body', {}).get('items', {}).get('item', [])
if not items:
print(f"[INFO] No data found for station {station_id}.")
continue
filtered_data = [
{new_key: item.get(old_key, None) for old_key, new_key in SELECTED_COLUMNS.items()}
for item in items
]
all_data.extend(filtered_data)
else: # 요청 실패
print(f"[ERROR] Failed to fetch data for station {station_id}. Status code: {response.status_code}")
print(f"[DEBUG] Response text: {response.text[:500]}")
continue
return pd.DataFrame(all_data)
1. Airflow DAG 설계
- 2014년 11월 17일부터 2024년 11월 16일까지의 데이터 요청.
- 공공 데이터 포털 API를 통해 기상 데이터 수집.
- PostgreSQL에 데이터를 적재하며 스키마를 자동 생성.
코드 예제: Airflow DAG 정의
from airflow import DAG
from operators.climaml_fetch_historical_weather_data_operator import ClimamlFetchHistoricalWeatherDataOperator
import pendulum
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='climaml_historical_weather_data_dag',
default_args=default_args,
description='10년치 기상 데이터를 수집하고 PostgreSQL에 적재합니다.',
schedule_interval=None,
start_date=pendulum.datetime(2024, 11, 17, tz="Asia/Seoul"),
catchup=False,
) as dag:
# 사용할 관측소
station_ids = [
'108', # 서울
'105', # 강릉
'114', # 원주
'112', # 인천
'119', # 수원
'127', # 충주
'131', # 청주
'137', # 상주
'283', # 경주
'146', # 전주
'247', # 남원
'184', # 제주
'189', # 서귀포
]
climaml_fetch_historical_data = ClimamlFetchHistoricalWeatherDataOperator(
task_id='climaml_fetch_historical_data',
conn_id='conn-db-postgres-custom', # connection id
start_date='2014-11-17',
end_date='2024-11-16',
station_ids=station_ids,
)
climaml_fetch_historical_data
2. Custom Operator 작성
Airflow에서 작업을 보다 효율적으로 수행하기 위해 Custom Operator를 작성했습니다. 이 Operator는 다음의 역할을 수행합니다:
- API 요청: 지정된 날짜와 관측소 번호를 기반으로 데이터를 요청.
- PostgreSQL 데이터베이스에 적재: 요청한 데이터를 처리하고 DB에 저장.
코드 예제: Custom Operator
from airflow.models import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from common.climaml_data_utils import fetch_weather_data
import pendulum
import pandas as pd
import numpy as np
class ClimamlFetchHistoricalWeatherDataOperator(BaseOperator):
def __init__(self, conn_id, start_date, end_date, station_ids, **kwargs):
super().__init__(**kwargs)
self.conn_id = conn_id
self.start_date = pendulum.parse(start_date) # 시작 날짜
self.end_date = pendulum.parse(end_date) # 종료 날짜
self.station_ids = station_ids
def execute(self, context):
postgres_hook = PostgresHook(postgres_conn_id=self.conn_id)
engine = postgres_hook.get_sqlalchemy_engine()
current_start = self.start_date
current_end = self.end_date
while current_start <= current_end:
# end_of_period를 90일씩 증가시키며, end_date보다 커지게 되면, min 메소드로 end_date를 전달
end_of_period = min(current_start.add(days=90), current_end)
params_base = {
'serviceKey': Variable.get("data_go_kr"), # Variable에 api key 등록
'pageNo': '1',
'numOfRows': '999', # 1000개는 요청할 수 없다고 함
'dataType': 'JSON',
'dataCd': 'ASOS',
'dateCd': 'DAY',
'startDt': current_start.format('YYYYMMDD'),
'endDt': end_of_period.format('YYYYMMDD'),
}
df = fetch_weather_data(params_base=params_base, station_ids=self.station_ids, url="http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList")
df.drop_duplicates(inplace=True) # 중복된 data frame 제거
df.replace("", np.nan, inplace=True) # 빈값은 nan으로 변경
df.to_sql('clima_ml_weather_data', engine, if_exists='append', index=False) # db에 저장
current_start = end_of_period.add(days=1)
코드 풀이
- PostgreSQL 연결
- PostgresHook: PostgreSQL과 연결합니다.
- SQLAlchemy 엔진 생성: postgres_hook.get_sqlalchemy_engine()을 사용하여 SQLAlchemy의 Engine 객체를 반환합니다. 이 객체는 DB와 상호작용할 때 사용됩니다.
- 데이터 저장
- to_sql: Transform된 Dataframe을 PostgreSQL DB에 저장합니다.
- clima_ml_weather_data: 데이터가 저장될 Table 이름입니다.
- engine: 데이터베이스 연결 객체입니다.
- if_exists = 'append': 기존에 테이블이 존재하면 데이터를 추가합니다.
- index=False: 데이터프레임의 인덱스는 저장하지 않습니다.
- to_sql: Transform된 Dataframe을 PostgreSQL DB에 저장합니다.
- 다음 날짜 범위로 이동
- current_start를 end_of_period의 다음 날로 이동하여 루프를 다시 시작할 때 새로운 날짜 범위( + 90일)에 대해 데이터를 처리합니다.
3. PostgreSQL Hook 및 스키마 자동 생성
데이터베이스를 직접 관리하지 않아도, Pandas의 to_sql 메서드와 Airflow의 PostgreSQL Hook을 활용해 스키마를 자동 생성했습니다.
engine = postgres_hook.get_sqlalchemy_engine()
df.to_sql('clima_ml_weather_data', engine, if_exists='append', index=False)
결과
- DAG 실행: 2014년부터 2024년까지 데이터를 성공적으로 적재 (47,488 row).
- 자동화된 작업: DAG과 Custom Operator를 통해 재사용 가능한 작업 흐름 완성.
- 확장성 확보: 향후 새로운 관측소 추가나 기간 확장도 쉽게 처리 가능.
앞으로 할 일
- 매일 기상데이터를 업데이트 하는 Custom Operater 작성
- 목적: 매일 기상 데이터를 API에서 가져와 clima_ml_weather_data DB에 추가
- 구현 방향:
- Airflow 스케줄러로 하루에 한 번 실행
- 이전 날의 데이터를 요청하도록 날짜를 자동으로 계산
- 데이터 정제 및 컬럼 매핑 (동일한 작업)
- PostgreSQL Hook으로 DB에 적재 (동일한 작업)
- AI/ML 모델 학습 및 관리
- LSTM 모델을 사용하여 ETL한 기상 데이터를 학습
- MLflow를 이용하여 모델을 관리하고 성능을 추적
- 특정 기간 후 새로운 데이터를 바탕으로 재학습
'Projects > ClimaML' 카테고리의 다른 글
[ClimaML] 커스텀 센서로 안정성 높은 데이터 파이프라인 구축 (1) | 2024.11.25 |
---|---|
[Postgresql] Open API 데이터 ETL 플로우 (0) | 2024.11.04 |
[API] 공공 데이터 포털에서 기상 데이터 받아오기 (3) | 2024.11.04 |
[Postgresql] Linux(Ubuntu)에서 설치 (0) | 2024.11.03 |