이번 포스팅에서는 FastAPI를 기반으로, 이미지 스타일 학습 요청을 처리하는 API를 설계하면서, 비동기 프로그래밍과 멀티 스레딩을 어떻게 활용할 수 있는지에 대해 다뤄보겠습니다.
프로젝트 개요
이 프로젝트는 이미지 스타일 학습 작업 관리 API를 구현한 사례입니다. 유저는 이미지 스타일 학습 요청을 API로 보내고, 작업이 완료되면 학습된 스타일 모델을 받을 수 있습니다. 주요 기능은 다음과 같습니다:
- 사용자가 학습 요청을 보내면 작업이 학습 대기열에 추가됩니다.
- 백그라운드에서 대기열을 지속적으로 확인하고, 작업을 하나씩 처리합니다.
- 학습은 별도의 스레드에서 실행되어 메인 이벤트 루프의 차단을 방지합니다.
- 사용자는 학습 상태를 조회하거나 작업을 취소할 수 있습니다.
사용 기술
1. FastAPI
- Python 기반의 웹 프레임워크로, 간결하고 직관적인 RESTful API를 설계할 수 있습니다.
- 비동기 프로그래밍 지원을 통해 API 서버 구축이 가능합니다.
2. asyncio
- Python의 비동기 프로그래밍 라이브러리로, 이벤트 루프를 통해 병렬 처리를 지원합니다.
- 해당 프로젝트에서는 asyncio.create_task()를 사용하여 백그라운드에서 작업을 처리합니다.
3. threading
- Python 표준 라이브러리로, 멀티스레딩을 지원합니다.
- 장시간 실행되는 작업(예: 모델 학습)을 별도의 스레드에서 처리하여 메인 스레드의 차단을 방지합니다.
1. FastAPI 서버 초기화 및 백그라운드 작업 관리 설정
FastAPI 서버가 실행될 때, 학습 작업을 처리하는 백그라운드 작업이 시작됩니다. 이 작업은 대기열(train_job_list)을 지속적으로 확인하고, 대기 중인 작업이 있으면 처리합니다.
서버 초기화
from fastapi import FastAPI
import asyncio
app = FastAPI()
# 전역 변수
train_job_list = [] # 작업 대기열
is_training = False # 현재 학습 진행 상태
@app.on_event("startup")
async def startup_event():
# 서버가 시작될 때 백그라운드 작업 실행
asyncio.create_task(startTrainJob())
백그라운드 작업 관리
async def startTrainJob():
global is_training, train_job_list
while True:
if not is_training and len(train_job_list) > 0:
print("Starting a new training job...")
await generateStyle(train_job_list[0])
else:
print("No pending jobs or already training.")
await asyncio.sleep(10)
- asyncio.create_task()
- startTrainJob을 백그라운드에서 실행하여 대기열을 지속적으로 확인합니다.
- while True:
- 대기열이 비어 있으면 일정 시간 대기(asyncio.sleep)
- 대기열에 작업이 있으면 학습을 시작(generateStyle 호출)
2. 대기열에 작업 추가하기
사용자가 학습 요청을 보내면, 작업이 대기열에 추가됩니다.
from fastapi.responses import JSONResponse
from pydantic import BaseModel
class StyleGenerateBody(BaseModel):
id: int
styleName: str
baseModel: str
styleRefBase64: str
@app.post("/queueTrain")
def queueTrain(body: StyleGenerateBody):
global train_job_list
job = {
"id": body.id,
"style_name": body.styleName,
"base_model": body.baseModel,
"style_ref_base64": body.styleRefBase64,
"status": "PENDING"
}
train_job_list.append(job)
return JSONResponse({"message": "Job added to queue", "queue_length": len(train_job_list)})
- 요청 데이터는 StyleGenerateBody 모델로 검증됩니다.
- 새로운 job이 생성되어 train_job_list에 추가됩니다.
- 추가된 job의 메시지, 대기열 길이를 반환합니다.
3. 학습 작업 실행 (멀티스레딩 활용)
대기열에서 꺼낸 작업은 별도의 스레드에서 실행됩니다.
import threading
import base64
import os
async def generateStyle(job):
global is_training
is_training = True
# Base64 이미지 디코딩 및 저장
image_data = base64.b64decode(job["style_ref_base64"])
file_path = f"./styles/{job['id']}.png"
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as f:
f.write(image_data)
# 학습 실행
def train_model():
print(f"Training model for job {job['id']}...")
# 학습 로직 추가 (예: 이미지 생성 모델 학습)
print(f"Training completed for job {job['id']}")
thread = threading.Thread(target=train_model)
thread.start()
thread.join()
is_training = False
- Base64 처리
- 사용자가 보낸 이미지를 디코딩하여 저장합니다.
- 멀티스레딩
- threading.Thread로 학습 작업을 메인 스레드와 분리하여 실행합니다.
- join()으로 학습이 완료될 때까지 기다립니다.
4. 작업 상태 확인
사용자는 Job id를 통해 현재 상태를 확인할 수 있습니다.
@app.post("/getTrainStatus")
def getTrainStatus(body: dict):
job = next((j for j in train_job_list if j["id"] == body["id"]), None)
if job:
return {"status": job["status"]}
return {"status": "NOT_FOUND"}
- next()를 사용해 대기열에서 작업을 검색합니다.
- Job이 존재하면 상태를 반환하고, 없으면 "NOT_FOUND"을 반환합니다.
5. 작업 취소 및 대기열 관리
사용자는 특정 작업을 취소할 수 있습니다. 대기열에서 작업을 제거하면 됩니다.
@app.post("/stopOrDeleteTrain")
def stopOrDeleteTrain(body: dict):
global train_job_list
train_job_list = [job for job in train_job_list if job["id"] != body["id"]]
return {"message": "Job removed from queue"}
- 대기열에서 제거할 작업만 제외하고 새 리스트를 생성합니다.
- 이 방식은 안전하며, 대기열을 순회 중에 수정하는 문제를 방지합니다.
'Projects' 카테고리의 다른 글
Fastify와 Python 서버 간 작업 처리 (0) | 2024.11.27 |
---|