본문 바로가기
Artificial Intelligence/60. Python

[PYTHON] AI 파이프라인 성능 최적화 : multiprocessing, asyncio, threading 배치 방법과 3가지 핵심 차이

by Papa Martino V 2026. 4. 23.
728x90

multiprocessing, asyncio, threading
multiprocessing, asyncio, threading

 

 

2026년 현재, 대규모 언어 모델(LLM)과 실시간 고해상도 이미지 추론이 보편화되면서 AI 파이프라인의 효율성은 서비스의 생존과 직결되고 있습니다. 파이썬 환경에서 동시성을 처리하는 세 가지 도구인 multiprocessing, asyncio, threading은 각각의 고유한 동작 메커니즘을 가집니다. 이를 잘못 배치할 경우, 병렬 처리를 시도했음에도 불구하고 오히려 컨텍스트 스위칭 오버헤드로 인해 성능이 저하되는 현상을 겪게 됩니다. 본 가이드에서는 복잡한 AI 파이프라인의 각 단계(데이터 수집, 전처리, 모델 추론, API 서빙)에 어떤 기술을 배치해야 가장 최적의 처리량(Throughput)을 확보할 수 있는지 상세히 분석합니다.


1. 파이썬 동시성 도구별 메커니즘 및 AI 워크로드 적합도 비교

각 기술이 자원을 사용하는 방식과 AI 작업군에 따른 유리한 지점을 표로 정리하였습니다.

비교 항목 Multiprocessing Threading Asyncio
병렬성 수준 진정한 병렬 처리 (True Parallelism) 동시성 (Concurrency, GIL 제한) 단일 스레드 비동기 (Cooperative)
주요 자원 멀티 코어 CPU 공유 메모리 (Shared Memory) 이벤트 루프 (Event Loop)
메모리 오버헤드 매우 높음 (프로세스 복제) 낮음 매우 낮음 (경량 코루틴)
데이터 전송 방식 IPC (Pickle/Serialization 필요) 직접 접근 가능 (Race Condition 주의) 객체 공유 (비동기 안전성 필요)
AI 파이프라인 배치 무거운 전처리, CPU 연산 DB 쿼리, 파일 I/O LLM API 호출, 외부 서버 통신

2. AI 파이프라인 단계별 최적 배치 전략

① 데이터 수집 및 외부 API 연동: Asyncio

AI 모델이 외부 API(예: OpenAI, Anthropic)를 호출하거나 수천 개의 웹 페이지를 크롤링해야 하는 단계에서는 asyncio가 압도적으로 유리합니다. 수천 개의 네트워크 대기 상태(Wait)를 단일 스레드에서 효율적으로 관리할 수 있기 때문입니다.

② 데이터 전처리 및 증강: Multiprocessing

이미지 리사이징, 텍스트 토큰화, 복잡한 수치 계산은 CPU 바운드 작업입니다. 파이썬의 GIL(Global Interpreter Lock) 때문에 threading은 이 단계에서 무용지물입니다. 멀티 코어를 모두 활용할 수 있는 multiprocessing을 배치해야 합니다.

③ 모델 로딩 및 공유 자원 관리: Threading

여러 모델 가중치를 메모리에 하나만 올려두고 여러 스레드가 동시에 참조해야 할 때(I/O 바운드가 섞인 경우)는 threading이 효율적입니다. 프로세스 간 복제가 일어나지 않아 메모리 낭비를 줄일 수 있습니다.


3. 실무 엔지니어를 위한 단계별 구현 예제 (7가지)

현업에서 바로 복사하여 사용할 수 있는 AI 파이프라인 가속 코드 샘플입니다.

#1. Asyncio를 활용한 대규모 LLM API 비동기 호출 해결

import asyncio
import aiohttp

async def fetch_llm_response(session, prompt):
    async with session.post("https://api.ai-provider.com/v1/chat", json={"prompt": prompt}) as resp:
        return await resp.json()

async def process_bulk_prompts(prompts):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_llm_response(session, p) for p in prompts]
        return await asyncio.gather(*tasks)

# 결과: 수백 개의 프롬프트를 동시에 전송하여 대기 시간 최소화

#2. Multiprocessing 기반 대용량 이미지 전처리 방법

from multiprocessing import Pool
import cv2

def preprocess_image(img_path):
    img = cv2.imread(img_path)
    # 복잡한 필터링 및 리사이징 연산 (CPU Intensive)
    img = cv2.resize(img, (224, 224))
    return img

if __name__ == '__main__':
    with Pool(processes=8) as pool:
        processed_imgs = pool.map(preprocess_image, image_paths)

#3. Threading을 이용한 백그라운드 모델 가중치 업데이트

import threading
import time

def background_model_update(model):
    while True:
        # 1시간마다 새로운 가중치 체크 및 로드 (I/O Bound)
        time.sleep(3600)
        model.reload_weights()

update_thread = threading.Thread(target=background_model_update, args=(my_model,), daemon=True)
update_thread.start()

#4. Asyncio + Threading 혼합: 블로킹 함수 비동기화

비동기 프레임워크 내에서 CPU 연산이 포함된 라이브러리(예: Pandas)를 사용할 때 이벤트 루프가 멈추지 않게 합니다.

import asyncio
from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=5)

async def async_data_analysis(df):
    loop = asyncio.get_event_loop()
    # Pandas 연산을 별도 스레드에서 실행하여 비동기 루프 보존
    result = await loop.run_in_executor(executor, df.mean)
    return result

#5. Multiprocessing Shared Memory를 이용한 데이터 복사 해결

프로세스 간 대량의 텐서 데이터를 전달할 때 발생하는 직렬화 오버헤드를 방지합니다.

from multiprocessing import shared_memory
import numpy as np

# 공유 메모리에 텐서 할당
shm = shared_memory.SharedMemory(create=True, size=1024*1024)
shared_tensor = np.ndarray((1024, 1024), dtype=np.float32, buffer=shm.buf)
# 워커 프로세스에서 복사 없이 바로 접근 가능

#6. Gunicorn + Uvicorn을 이용한 멀티 프로세스 서빙 아키텍처

AI 모델 서빙 시 단일 프로세스의 GIL 한계를 극복하기 위해 워커 프로세스를 확장합니다.

# 2026년 권장 배포 환경 설정
# -w: CPU 코어 수 기반 프로세스 생성, -k: 비동기 워커 사용
gunicorn app:main -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000

#7. Ray를 이용한 분산 처리 파이프라인 통합

가장 현대적인 방법으로, multiprocessing의 복잡한 통신 로직을 추상화하여 분산 환경에서 AI 작업을 실행합니다.

import ray

ray.init()

@ray.remote
def distributed_inference(batch):
    # 각 워커 노드에서 독립된 인터프리터로 실행
    return model.predict(batch)

futures = [distributed_inference.remote(b) for b in batches]
results = ray.get(futures)

4. 결론: AI 엔지니어를 위한 최종 의사결정 트리

  1. 작업이 단순히 '기다리는 것'(API, DB, Network)인가? -> asyncio를 최우선 고려하세요.
  2. 작업이 '숫자를 계산하거나 이미지를 변환'하는 것인가? -> multiprocessing으로 코어를 점유하세요.
  3. 메모리에 무거운 모델을 하나만 올리고 가벼운 I/O를 병렬로 처리해야 하는가? -> threading을 사용하세요.
  4. 2026년의 최신 트렌드: RayDask 같은 상위 프레임워크를 통해 위 세 가지를 하이브리드로 자동 관리하는 구조가 가장 권장됩니다.

성능 최적화는 이론보다 측정에서 시작됩니다. cProfile이나 Py-Spy 같은 도구로 병목 구간이 GIL 경쟁(Lock Contention)인지, I/O Wait인지 먼저 파악하시기 바랍니다.


내용 출처 및 참고문헌:

  • Python Developer Documentation: "Concurrency and Parallelism Guide."
  • Fowler, M. (2024). "Patterns of Distributed Systems in Python AI Pipelines."
  • FastAPI & Uvicorn Performance Benchmarks 2026.
  • Ray Project: "Whitepaper on Distributed Computing for AI."
728x90