MLOps/Airflow

[Airflow] PostgreSQL 연동 - Docker Compose를 활용한 설정과 Custom Hook 사용

monkeykim 2024. 11. 16. 22:18

이번 포스팅에서는 Docker Compose로 PostgreSQL 컨테이너를 설정하고, Airflow의 Hook을 활용해 데이터베이스 작업을 간소화하는 방법을 살펴보겠습니다.


PostgreSQL 컨테이너 설정

Docker Compose를 활용해 PostgreSQL 컨테이너를 설정합니다. 아래는 docker-compose.yaml 파일에서 PostgreSQL 설정의 주요 부분입니다.

services:
  postgres_custom:
    image: postgres:13
    environment:
      POSTGRES_USER: kim
      POSTGRES_PASSWORD: kim
      POSTGRES_DB: kim
      TZ: Asia/Seoul
    volumes:
      - postgres-custom-db-volume:/var/lib/postgresql/data
    ports:
      - 5432:5432
    networks:
      network_custom:
        ipv4_address: 172.28.0.3

networks:
  network_custom:
    driver: bridge
    ipam:
      driver: default
      config:
        - subnet: 172.28.0.0/16
          gateway: 172.28.0.1

주요 설정

  1. 고정 IP 할당
    • networks를 통해 고정 IP를 부여합니다. 예: 172.28.0.3.
    • 동일 네트워크에 속한 컨테이너 간 통신이 가능하게 합니다.
  2. 환경 변수
    • 데이터베이스 접속 정보 (POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_DB)를 설정합니다.
  3. 데이터 유지
    • volumes를 통해 컨테이너 재시작 후에도 데이터가 유지되도록 설정합니다.

컨테이너 시작

docker-compose up

이제 PostgreSQL이 설정된 고정 IP에서 실행됩니다.


Airflow와 PostgreSQL 연동

Airflow에서 PostgreSQL 작업을 수행하려면 PythonOperator와 psycopg2를 이용해 간단히 구현할 수 있습니다.

PostgreSQL로 데이터 Insert

아래는 Airflow에서 PostgreSQL에 데이터를 삽입하는 DAG입니다.

from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum

with DAG(
    dag_id="dag_python_with_postgres",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:

    def insert_postgres(ip, port, dbname, user, password, **kwargs):
        import psycopg2
        from contextlib import closing

        with closing(psycopg2.connect(host=ip, port=port, dbname=dbname, user=user, password=password)) as conn:
            with closing(conn.cursor()) as cursor:
                sql = "INSERT INTO my_table (col1, col2) VALUES (%s, %s);"
                cursor.execute(sql, ("value1", "value2"))
                conn.commit()

    insert_task = PythonOperator(
        task_id="insert_postgres",
        python_callable=insert_postgres,
        op_args=["172.28.0.3", "5432", "kim", "kim", "kim"],
    )

    insert_task

코드 설명

  1. psycopg2와 연결
    • psycopg2.connect: PostgreSQL 데이터베이스와 연결하여 세션을 생성합니다.
    • closing: 연결과 커서를 자동으로 닫아주는 contextlib 유틸리티입니다.
      • 명시적으로 conn.close()와 cursor.close()를 호출하지 않아도 됩니다.
  2. cursor
    • 세션 내에서 쿼리를 실행하고 결과를 가져오는 객체입니다.
    • cursor.execute(sql, params): SQL 쿼리를 실행합니다.
    • 예: cursor.execute("SELECT * FROM table_name WHERE id = %s", (1,))
  3. commit
    • 데이터베이스에 변경 사항을 적용합니다. INSERT, UPDATE와 같은 DML 작업 후 반드시 호출해야 합니다.

위 코드의 문제점

  1. 접속 정보의 노출
    • host, user, password와 같은 민감한 정보가 코드에 포함됩니다.
  2. 유지보수 어려움
    • 접속 정보 변경 시 모든 DAG을 수정해야 합니다.

Airflow Hook을 활용한 개선

Airflow는 Hook을 활용해 외부 시스템과의 연결을 간편하게 처리할 수 있습니다. PostgreSQL을 연동하기 위해 PostgresHook을 사용할 수 있습니다.

PostgresHook을 활용한 데이터 삽입

from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum
from airflow.providers.postgres.hooks.postgres import PostgresHook

with DAG(
    dag_id="dag_python_with_postgres_hook",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
    catchup=False,
) as dag:

    def insert_with_hook(postgres_conn_id, **kwargs):
        hook = PostgresHook(postgres_conn_id=postgres_conn_id)
        with hook.get_conn() as conn:
            with conn.cursor() as cursor:
                sql = "INSERT INTO my_table (col1, col2) VALUES (%s, %s);"
                cursor.execute(sql, ("value1", "value2"))
                conn.commit()

    insert_task = PythonOperator(
        task_id="insert_with_hook",
        python_callable=insert_with_hook,
        op_kwargs={"postgres_conn_id": "conn-db-postgres-custom"},
    )

    insert_task

장점

  1. 접속 정보 관리
    • 접속 정보는 Airflow UI Connection에서 관리하며, DAG 코드에는 노출되지 않습니다.
  2. 유지보수 용이
    • 접속 정보 변경 시 Airflow UI에서 Connection만 수정하면 됩니다.

Custom Hook을 활용한 Bulk Load

Airflow Hook을 확장하여 커스텀 로직을 추가할 수 있습니다. 아래는 CSV 데이터를 PostgreSQL에 적재하기 위한 Custom Hook의 예제입니다.

 

먼저 Custom Hook을 작성해줍니다.

from airflow.hooks.base import BaseHook
import psycopg2
import pandas as pd


class CustomPostgresHook(BaseHook):
    def __init__(self, postgres_conn_id, **kwargs):
        self.postgres_conn_id = postgres_conn_id

    def get_conn(self):
        airflow_conn = BaseHook.get_connection(self.postgres_conn_id)  # connection 정보 반환
        self.host = airflow_conn.host
        self.user = airflow_conn.login
        self.password = airflow_conn.password
        self.dbname = airflow_conn.schema
        self.port = airflow_conn.port

        self.postgres_conn = psycopg2.connect(host=self.host, user=self.user, password=self.password, dbname=self.dbname, port=self.port)
        return self.postgres_conn  # postgres connection session 정보 반환
    
    def bulk_load(self, table_name, file_name, delimiter: str, is_header:bool, is_replace: bool):
        from sqlalchemy import create_engine

        self.log.info('적재 대상 파일:' + file_name)
        self.log.info('테이블:' + table_name)
        self.get_conn()
        header = 0 if is_header else None # is_header = true면 0, false면 None
        if_exists = 'replace' if is_replace else 'append' # is_replace = true면 replace, false면 append
        
        try:
            file_df = pd.read_csv(file_name, header=header, delimiter=delimiter, encoding='utf-8')
        except UnicodeDecodeError:
            self.log.info("UTF-8 인코딩 실패. EUC-KR로 재시도합니다.")
            file_df = pd.read_csv(file_name, header=header, delimiter=delimiter, encoding='euc-kr')

        for col in file_df.columns:
            try:
                # string인 경우에만 처리
                file_df[col] = file_df[col].str.replace('\r\n', '')
                self.log.info(f'{table_name}.{col}: 개행문자 제거')
            except:
                # string 문자열이 아닐 경우 continue
                continue

        self.log.info('적재 건수:' + str(len(file_df)))
        uri = f'postgresql://{self.user}:{self.password}@{self.host}/{self.dbname}'
        engine = create_engine(uri)
        file_df.to_sql(name=table_name,
                       con=engine,
                       schema='public',
                       if_exists=if_exists,
                       index=False
                       )
  • BaseHook을 상속받고 Custom Hook을 작성해야 합니다.
  • BaseHook.get_connection(self.postgres_conn_id): Airflow ui에 작성한 Connection 정보를 리턴합니다.
    • @classmethod: 인스턴스화를 하지 않고도 해당 클래스의 메소드로 접근이 가능합니다. 여기서 get_connection는 @classmethod로 감싸져있습니다.
  • get_conn(): postgres connection session 정보를 반환합니다.

Custom hook을 수행할 DAG을 작성합니다.

import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from hooks.custom_postgres_hook import CustomPostgresHook

with DAG(
    dag_id="dag_python_with_custom_hook_bulk_load",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
    catchup=False
) as dag:
    def insrt_postgres(postgres_conn_id, table_name, file_name, **kwargs):
        custom_progress_hook = CustomPostgresHook(postgres_conn_id=postgres_conn_id)
        custom_progress_hook.bulk_load(table_name=table_name, file_name=file_name, delimiter=',', is_header=True, is_replace=False)
    
    insrt_postgres = PythonOperator(
        task_id='insrt_postgres',
        python_callable=insrt_postgres,
        op_kwargs={
            'postgres_conn_id': 'conn-db-postgres-custom',
            'table_name': 'TbListRainfallService_bulk1',
            'file_name': '/opt/airflow/files/ListRainfallService/ListRainfallService.csv'
        }
    )

    insrt_postgres

 

결과 이미지

Task log
적재된 data