Projects/ClimaML

[Postgresql] Open API 데이터 ETL 플로우

monkeykim 2024. 11. 4. 02:25

Open API 데이터를 추출하고, 원하는 데이터 형식으로 변환하고, 그것을 데이터 베이스에 적재하는 ETL 플로우에 대해 알아보겠습니다. 저장할 DB는 PostgreSQL을 사용하였습니다. 

PostgreSQL을 선택한 이유?

MLOps 프로젝트의 기상 데이터 적재와 관리 용도로 사용할 데이터베이스(DB)로 데이터 양, 접근 방식, 쿼리 성능 등을 고려하여 선택하였습니다.

10년치 기상 데이터라면 수십 GB 이하로 예상이 되기 때문에 오픈소스인 PostgreSQL이나 MySQL으로도 충분히 감당할 수 있다고 생각하였습니다. 또한 구조화된 스키마를 가지고 있어, 각 지역 및 날짜별 기상 데이터를 정리할 수 있을 것이라고 판단했습니다. 그리고 PostgreSQL이 최근 각광받고 있다는 점때문에 PostgreSQL을 선택하였습니다.


ETL

1. API로 Data fetch하기

import requests

url = 'http://apis.data.go.kr/1360000/AsosDalyInfoService/getWthrDataList'
params = {'serviceKey': 'serviceKey',
          'pageNo': '1',  # 페이지 번호
          'numOfRows': '10',  # 한 페이지 결과 수
          'dataType': 'JSON',  # 요청 자료 형식(XML/JSON)
          'dataCd': 'ASOS',  # 자료 분류 코드
          'dateCd': 'DAY',  # 날짜 분류 코드
          'startDt': '20140101',  # 조회 기간 시작일(YYYYMMDD)
          'endDt': '20140601',  # 조회 기간 종료일(YYYYMMDD) (전일(D-1)까지 제공)
          'stnIds': '108'  # 종관기상관측 지점 번호
          }
response = requests.get(url, params=params)
data = response.json()

 

2. Pandas를 활용하여 fetch한 데이터 변환하기

data = response.json()

items = data['response']['body']['items']['item']

# 필요한 기상 변수 선택 및 컬럼명 매핑 (db 컬럼의 이름과 매핑)
selected_columns = {
    'stnId': 'stn_id',          # 지점 번호
    'tm': 'tm',                 # 일시
    'avgTa': 'avg_ta',          # 평균 기온(°C)
    'minTa': 'min_ta',          # 최저 기온(°C)
    'maxTa': 'max_ta',          # 최고 기온(°C)
    'sumRn': 'sum_rn',          # 일 강수량(mm)
    'avgWs': 'avg_ws',          # 평균 풍속(m/s)
    'avgRhm': 'avg_rhm',        # 평균 상대 습도(%)
    'avgTd': 'avg_td',          # 평균 이슬점 온도(°C)
    'avgPs': 'avg_ps',          # 평균 현지 기압(hPa)
    'ssDur': 'ss_dur'           # 가조 시간(hr)
}

# 각 item에서 필요한 컬럼만 추출하고 이름을 매핑하여 DataFrame 생성
filtered_data = [{new_key: item.get(old_key, None) for old_key, new_key in selected_columns.items()} for item in items]
df = pd.DataFrame(filtered_data)

# 빈 문자열을 NaN으로 변환
df.replace("", np.nan, inplace=True)
print(df)

JSON 데이터에서 필요한 기상 변수만 골라 DataFrame으로 정리합니다.

df 출력 결과

3. Postgresql에 적재하기

engine = create_engine('postgresql://username:password@localhost:5432/database_name')
df.to_sql('weather_data', engine, if_exists='append', index=False)
print("데이터가 PostgreSQL에 성공적으로 적재되었습니다.")

PostgreSQL에 데이터를 적재하려면 SQLAlchemy를 통해 연결을 설정하고, Pandas의 to_sql 기능을 사용해 DataFrame을 데이터베이스에 적재할 수 있습니다.

  • if_exists='append': 데이터가 이미 존재하면 기존 테이블에 추가합니다.
  • index=False: DataFrame의 인덱스를 테이블에 저장하지 않습니다.

pgAdmin을 이용하여 테이블 생성하기

pgAdmin은 PostgreSQL을 GUI로 관리할 수 있는 도구입니다. 저와 같이 PostgreSQL을 처음 접하신다면 pgAdmin을 사용하여 데이터베이스와 테이블을 생성하고 쿼리를 쉽게 실행할 수 있습니다.

 

Ubuntu에서 pgAdmin 설치 방법

1. pgAdmin 리포지토리 추가
우분투에 pgAdmin을 설치하려면 pgAdmin의 공식 리포지토리를 추가해야 합니다.

curl https://www.pgadmin.org/static/packages_pgadmin_org.pub | sudo apt-key add -
sudo sh -c 'echo "deb https://ftp.postgresql.org/pub/pgadmin/pgadmin4/apt/$(lsb_release -cs) pgadmin4 main" > /etc/apt/sources.list.d/pgadmin4.list && apt update'

 

2. pgAdmin 설치

pgAdmin은 데스크톱 모드와 웹 모드로 설치할 수 있습니다. 보통 웹 모드로 사용하기 때문에 pgadmin4-web 패키지를 설치하는 것을 추천합니다.

sudo apt install pgadmin4-web

 

 

3. pgAdmin 설정

설치 후 웹 인터페이스를 설정합니다. 설치 후 아래 명령어를 통해 초기 설정을 진행합니다.

sudo /usr/pgadmin4/bin/setup-web.sh

설정 중 이메일 주소와 비밀번호를 묻는 화면이 나타나는데, 이를 입력하여 pgAdmin에 로그인할 때 사용할 자격 증명을 설정합니다.

 

4. pgAdmin 실행

설정이 완료되면 pgAdmin은 웹 모드로 실행됩니다. 이제 브라우저에서 pgAdmin에 접속할 수 있습니다.

http://127.0.0.1/pgadmin4

 

4-1 PostgreSQL 사용자 비밀번호 초기화 또는 설정

PostgreSQL에 접근해 postgres 사용자 계정의 비밀번호를 설정하는 방법입니다.

 

4-2 PostgreSQL 계정으로 접속
우분투에서 postgres 사용자는 PostgreSQL의 기본 관리자 계정입니다. 다음 명령어를 사용해 postgres 계정으로 로그인합니다.

sudo -i -u postgres

 

4-3 PostgreSQL 셸(psql)에 접속

psql

 

4-4 비밀번호 변경 명령 실행
postgres 계정의 비밀번호를 새로 설정합니다. 비밀번호는 원하는 값으로 설정하세요.

ALTER USER postgres PASSWORD '새로운_비밀번호';

 

설정이 완료되면 다음과 같은 메시지가 나타납니다. 'ALTER ROLE'

 

4-5 PostgreSQL 셸 종료 & postgres 사용자 로그아웃

\q
exit

 

5. PostgreSQL 서버 연결

pgAdmin에서 PostgreSQL 서버를 추가하고 데이터베이스와 테이블을 관리할 수 있습니다.

  • 왼쪽 사이드바에서 Servers를 오른쪽 클릭하고, CreateServer를 선택합니다.
  • Connection 탭에서 Host namelocalhost로, 사용자 이름비밀번호는 4-1에서 설정한 값으로 입력합니다.

6. 연결 완료 후 데이터베이스를 생성

  • 왼쪽 패널에서 ServersPostgreSQL 서버Databases를 마우스 오른쪽 클릭하고 CreateDatabase를 선택합니다.
  • Database name에 데이터베이스 이름을 입력합니다.
  • Save를 클릭하여 데이터베이스를 생성합니다.

7. 테이블 생성

  • 생성한 데이터베이스를 클릭하고, Schemas  Tables를 마우스 오른쪽 클릭한 후 Create  Table을 선택합니다.
  • Name 필드에 테이블 이름을 입력합니다.
  • Columns 탭으로 이동하여 각 컬럼을 추가합니다.
    • id 컬럼: Data type을 serial로 설정하고 Primary Key로 지정합니다.
    • 컬럼을 Data type에 맞게 하나씩 추가합니다.
  • 설정이 완료되면 Save를 클릭하여 테이블을 생성합니다.

 

DB에 저장이 되었다면, Query Tool로 쉽게 쿼리를 실행할 수 있습니다.