Projects

Python으로 비동기와 멀티스레딩을 활용한 작업 관리 API 구축

monkeykim 2024. 11. 23. 01:06

이번 포스팅에서는 FastAPI를 기반으로, 이미지 스타일 학습 요청을 처리하는 API를 설계하면서, 비동기 프로그래밍과 멀티 스레딩을 어떻게 활용할 수 있는지에 대해 다뤄보겠습니다.


프로젝트 개요

이 프로젝트는 이미지 스타일 학습 작업 관리 API를 구현한 사례입니다. 유저는 이미지 스타일 학습 요청을 API로 보내고, 작업이 완료되면 학습된 스타일 모델을 받을 수 있습니다. 주요 기능은 다음과 같습니다:

  • 사용자가 학습 요청을 보내면 작업이 학습 대기열에 추가됩니다.
  • 백그라운드에서 대기열을 지속적으로 확인하고, 작업을 하나씩 처리합니다.
  • 학습은 별도의 스레드에서 실행되어 메인 이벤트 루프의 차단을 방지합니다.
  • 사용자는 학습 상태를 조회하거나 작업을 취소할 수 있습니다.

사용 기술

1. FastAPI

  • Python 기반의 웹 프레임워크로, 간결하고 직관적인 RESTful API를 설계할 수 있습니다.
  • 비동기 프로그래밍 지원을 통해 API 서버 구축이 가능합니다.

2. asyncio

  • Python의 비동기 프로그래밍 라이브러리로, 이벤트 루프를 통해 병렬 처리를 지원합니다.
  • 해당 프로젝트에서는 asyncio.create_task()를 사용하여 백그라운드에서 작업을 처리합니다.

3. threading

  • Python 표준 라이브러리로, 멀티스레딩을 지원합니다.
  • 장시간 실행되는 작업(예: 모델 학습)을 별도의 스레드에서 처리하여 메인 스레드의 차단을 방지합니다.

1. FastAPI 서버 초기화 및 백그라운드 작업 관리 설정

FastAPI 서버가 실행될 때, 학습 작업을 처리하는 백그라운드 작업이 시작됩니다. 이 작업은 대기열(train_job_list)을 지속적으로 확인하고, 대기 중인 작업이 있으면 처리합니다.

 

서버 초기화

from fastapi import FastAPI
import asyncio

app = FastAPI()

# 전역 변수
train_job_list = []  # 작업 대기열
is_training = False  # 현재 학습 진행 상태

@app.on_event("startup")
async def startup_event():
    # 서버가 시작될 때 백그라운드 작업 실행
    asyncio.create_task(startTrainJob())

 

백그라운드 작업 관리

async def startTrainJob():
    global is_training, train_job_list

    while True:
        if not is_training and len(train_job_list) > 0:
            print("Starting a new training job...")
            await generateStyle(train_job_list[0])
        else:
            print("No pending jobs or already training.")
            await asyncio.sleep(10)
  • asyncio.create_task()
    • startTrainJob을 백그라운드에서 실행하여 대기열을 지속적으로 확인합니다.
  • while True:
    • 대기열이 비어 있으면 일정 시간 대기(asyncio.sleep)
    • 대기열에 작업이 있으면 학습을 시작(generateStyle 호출)

2. 대기열에 작업 추가하기

사용자가 학습 요청을 보내면, 작업이 대기열에 추가됩니다.

from fastapi.responses import JSONResponse
from pydantic import BaseModel

class StyleGenerateBody(BaseModel):
    id: int
    styleName: str
    baseModel: str
    styleRefBase64: str

@app.post("/queueTrain")
def queueTrain(body: StyleGenerateBody):
    global train_job_list
    job = {
        "id": body.id,
        "style_name": body.styleName,
        "base_model": body.baseModel,
        "style_ref_base64": body.styleRefBase64,
        "status": "PENDING"
    }
    train_job_list.append(job)
    return JSONResponse({"message": "Job added to queue", "queue_length": len(train_job_list)})
  • 요청 데이터는 StyleGenerateBody 모델로 검증됩니다.
  • 새로운 job이 생성되어 train_job_list에 추가됩니다.
  • 추가된 job의 메시지, 대기열 길이를 반환합니다.

3. 학습 작업 실행 (멀티스레딩 활용)

대기열에서 꺼낸 작업은 별도의 스레드에서 실행됩니다.

import threading
import base64
import os

async def generateStyle(job):
    global is_training
    is_training = True

    # Base64 이미지 디코딩 및 저장
    image_data = base64.b64decode(job["style_ref_base64"])
    file_path = f"./styles/{job['id']}.png"
    os.makedirs(os.path.dirname(file_path), exist_ok=True)
    with open(file_path, "wb") as f:
        f.write(image_data)

    # 학습 실행
    def train_model():
        print(f"Training model for job {job['id']}...")
        # 학습 로직 추가 (예: 이미지 생성 모델 학습)
        print(f"Training completed for job {job['id']}")

    thread = threading.Thread(target=train_model)
    thread.start()
    thread.join()

    is_training = False
  • Base64 처리
    • 사용자가 보낸 이미지를 디코딩하여 저장합니다.
  • 멀티스레딩
    • threading.Thread로 학습 작업을 메인 스레드와 분리하여 실행합니다.
    • join()으로 학습이 완료될 때까지 기다립니다.

4. 작업 상태 확인

사용자는 Job id를 통해 현재 상태를 확인할 수 있습니다.

@app.post("/getTrainStatus")
def getTrainStatus(body: dict):
    job = next((j for j in train_job_list if j["id"] == body["id"]), None)
    if job:
        return {"status": job["status"]}
    return {"status": "NOT_FOUND"}

 

  • next()를 사용해 대기열에서 작업을 검색합니다.
  • Job이 존재하면 상태를 반환하고, 없으면 "NOT_FOUND"을 반환합니다.

5. 작업 취소 및 대기열 관리

사용자는 특정 작업을 취소할 수 있습니다. 대기열에서 작업을 제거하면 됩니다.

@app.post("/stopOrDeleteTrain")
def stopOrDeleteTrain(body: dict):
    global train_job_list
    train_job_list = [job for job in train_job_list if job["id"] != body["id"]]
    return {"message": "Job removed from queue"}

 

 

  • 대기열에서 제거할 작업만 제외하고 새 리스트를 생성합니다.
  • 이 방식은 안전하며, 대기열을 순회 중에 수정하는 문제를 방지합니다.

 

'Projects' 카테고리의 다른 글

Fastify와 Python 서버 간 작업 처리  (0) 2024.11.27