이번 포스팅에서는 Docker Compose로 PostgreSQL 컨테이너를 설정하고, Airflow의 Hook을 활용해 데이터베이스 작업을 간소화하는 방법을 살펴보겠습니다.
PostgreSQL 컨테이너 설정
Docker Compose를 활용해 PostgreSQL 컨테이너를 설정합니다. 아래는 docker-compose.yaml 파일에서 PostgreSQL 설정의 주요 부분입니다.
services:
postgres_custom:
image: postgres:13
environment:
POSTGRES_USER: kim
POSTGRES_PASSWORD: kim
POSTGRES_DB: kim
TZ: Asia/Seoul
volumes:
- postgres-custom-db-volume:/var/lib/postgresql/data
ports:
- 5432:5432
networks:
network_custom:
ipv4_address: 172.28.0.3
networks:
network_custom:
driver: bridge
ipam:
driver: default
config:
- subnet: 172.28.0.0/16
gateway: 172.28.0.1
주요 설정
- 고정 IP 할당
- networks를 통해 고정 IP를 부여합니다. 예: 172.28.0.3.
- 동일 네트워크에 속한 컨테이너 간 통신이 가능하게 합니다.
- 환경 변수
- 데이터베이스 접속 정보 (POSTGRES_USER, POSTGRES_PASSWORD, POSTGRES_DB)를 설정합니다.
- 데이터 유지
- volumes를 통해 컨테이너 재시작 후에도 데이터가 유지되도록 설정합니다.
컨테이너 시작
docker-compose up
이제 PostgreSQL이 설정된 고정 IP에서 실행됩니다.
Airflow와 PostgreSQL 연동
Airflow에서 PostgreSQL 작업을 수행하려면 PythonOperator와 psycopg2를 이용해 간단히 구현할 수 있습니다.
PostgreSQL로 데이터 Insert
아래는 Airflow에서 PostgreSQL에 데이터를 삽입하는 DAG입니다.
from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum
with DAG(
dag_id="dag_python_with_postgres",
schedule="30 6 * * *",
start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
catchup=False,
) as dag:
def insert_postgres(ip, port, dbname, user, password, **kwargs):
import psycopg2
from contextlib import closing
with closing(psycopg2.connect(host=ip, port=port, dbname=dbname, user=user, password=password)) as conn:
with closing(conn.cursor()) as cursor:
sql = "INSERT INTO my_table (col1, col2) VALUES (%s, %s);"
cursor.execute(sql, ("value1", "value2"))
conn.commit()
insert_task = PythonOperator(
task_id="insert_postgres",
python_callable=insert_postgres,
op_args=["172.28.0.3", "5432", "kim", "kim", "kim"],
)
insert_task
코드 설명
- psycopg2와 연결
- psycopg2.connect: PostgreSQL 데이터베이스와 연결하여 세션을 생성합니다.
- closing: 연결과 커서를 자동으로 닫아주는 contextlib 유틸리티입니다.
- 명시적으로 conn.close()와 cursor.close()를 호출하지 않아도 됩니다.
- cursor
- 세션 내에서 쿼리를 실행하고 결과를 가져오는 객체입니다.
- cursor.execute(sql, params): SQL 쿼리를 실행합니다.
- 예: cursor.execute("SELECT * FROM table_name WHERE id = %s", (1,))
- commit
- 데이터베이스에 변경 사항을 적용합니다. INSERT, UPDATE와 같은 DML 작업 후 반드시 호출해야 합니다.
위 코드의 문제점
- 접속 정보의 노출
- host, user, password와 같은 민감한 정보가 코드에 포함됩니다.
- 유지보수 어려움
- 접속 정보 변경 시 모든 DAG을 수정해야 합니다.
Airflow Hook을 활용한 개선
Airflow는 Hook을 활용해 외부 시스템과의 연결을 간편하게 처리할 수 있습니다. PostgreSQL을 연동하기 위해 PostgresHook을 사용할 수 있습니다.
PostgresHook을 활용한 데이터 삽입
from airflow import DAG
from airflow.operators.python import PythonOperator
import pendulum
from airflow.providers.postgres.hooks.postgres import PostgresHook
with DAG(
dag_id="dag_python_with_postgres_hook",
schedule="30 6 * * *",
start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
catchup=False,
) as dag:
def insert_with_hook(postgres_conn_id, **kwargs):
hook = PostgresHook(postgres_conn_id=postgres_conn_id)
with hook.get_conn() as conn:
with conn.cursor() as cursor:
sql = "INSERT INTO my_table (col1, col2) VALUES (%s, %s);"
cursor.execute(sql, ("value1", "value2"))
conn.commit()
insert_task = PythonOperator(
task_id="insert_with_hook",
python_callable=insert_with_hook,
op_kwargs={"postgres_conn_id": "conn-db-postgres-custom"},
)
insert_task
장점
- 접속 정보 관리
- 접속 정보는 Airflow UI Connection에서 관리하며, DAG 코드에는 노출되지 않습니다.
- 유지보수 용이
- 접속 정보 변경 시 Airflow UI에서 Connection만 수정하면 됩니다.
Custom Hook을 활용한 Bulk Load
Airflow Hook을 확장하여 커스텀 로직을 추가할 수 있습니다. 아래는 CSV 데이터를 PostgreSQL에 적재하기 위한 Custom Hook의 예제입니다.
먼저 Custom Hook을 작성해줍니다.
from airflow.hooks.base import BaseHook
import psycopg2
import pandas as pd
class CustomPostgresHook(BaseHook):
def __init__(self, postgres_conn_id, **kwargs):
self.postgres_conn_id = postgres_conn_id
def get_conn(self):
airflow_conn = BaseHook.get_connection(self.postgres_conn_id) # connection 정보 반환
self.host = airflow_conn.host
self.user = airflow_conn.login
self.password = airflow_conn.password
self.dbname = airflow_conn.schema
self.port = airflow_conn.port
self.postgres_conn = psycopg2.connect(host=self.host, user=self.user, password=self.password, dbname=self.dbname, port=self.port)
return self.postgres_conn # postgres connection session 정보 반환
def bulk_load(self, table_name, file_name, delimiter: str, is_header:bool, is_replace: bool):
from sqlalchemy import create_engine
self.log.info('적재 대상 파일:' + file_name)
self.log.info('테이블:' + table_name)
self.get_conn()
header = 0 if is_header else None # is_header = true면 0, false면 None
if_exists = 'replace' if is_replace else 'append' # is_replace = true면 replace, false면 append
try:
file_df = pd.read_csv(file_name, header=header, delimiter=delimiter, encoding='utf-8')
except UnicodeDecodeError:
self.log.info("UTF-8 인코딩 실패. EUC-KR로 재시도합니다.")
file_df = pd.read_csv(file_name, header=header, delimiter=delimiter, encoding='euc-kr')
for col in file_df.columns:
try:
# string인 경우에만 처리
file_df[col] = file_df[col].str.replace('\r\n', '')
self.log.info(f'{table_name}.{col}: 개행문자 제거')
except:
# string 문자열이 아닐 경우 continue
continue
self.log.info('적재 건수:' + str(len(file_df)))
uri = f'postgresql://{self.user}:{self.password}@{self.host}/{self.dbname}'
engine = create_engine(uri)
file_df.to_sql(name=table_name,
con=engine,
schema='public',
if_exists=if_exists,
index=False
)
- BaseHook을 상속받고 Custom Hook을 작성해야 합니다.
- BaseHook.get_connection(self.postgres_conn_id): Airflow ui에 작성한 Connection 정보를 리턴합니다.
- @classmethod: 인스턴스화를 하지 않고도 해당 클래스의 메소드로 접근이 가능합니다. 여기서 get_connection는 @classmethod로 감싸져있습니다.
- get_conn(): postgres connection session 정보를 반환합니다.
Custom hook을 수행할 DAG을 작성합니다.
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from hooks.custom_postgres_hook import CustomPostgresHook
with DAG(
dag_id="dag_python_with_custom_hook_bulk_load",
schedule="30 6 * * *",
start_date=pendulum.datetime(2024, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
def insrt_postgres(postgres_conn_id, table_name, file_name, **kwargs):
custom_progress_hook = CustomPostgresHook(postgres_conn_id=postgres_conn_id)
custom_progress_hook.bulk_load(table_name=table_name, file_name=file_name, delimiter=',', is_header=True, is_replace=False)
insrt_postgres = PythonOperator(
task_id='insrt_postgres',
python_callable=insrt_postgres,
op_kwargs={
'postgres_conn_id': 'conn-db-postgres-custom',
'table_name': 'TbListRainfallService_bulk1',
'file_name': '/opt/airflow/files/ListRainfallService/ListRainfallService.csv'
}
)
insrt_postgres
결과 이미지
'MLOps > Airflow' 카테고리의 다른 글
[Airflow] Sensor에 대해 알아보자 (0) | 2024.11.19 |
---|---|
[Airflow] Provider 패키지 설치 (2) | 2024.11.17 |
[Airflow] Docker Compose 파일 해석 (1) | 2024.11.14 |
[Airflow] DAG 실행과 스케줄링 (2) | 2024.11.13 |
[Airflow] Custom Operator 만들기 (0) | 2024.11.11 |