본문 바로가기
Artificial Intelligence/60. Python

[PYTHON] AI 데이터 파이프라인 최적화를 위한 3가지 병렬 처리 선택 방법과 성능 차이 해결책

by Papa Martino V 2026. 4. 13.
728x90

Multiprocessing vs Threading vs Asyncio
Multiprocessing vs Threading vs Asyncio

1. AI 워크로드의 병목, 어떻게 돌파할 것인가?

현대 AI 서비스의 핵심은 모델 자체의 성능만큼이나 데이터 파이프라인(Data Pipeline)의 처리 속도에 달려 있습니다. 수천 개의 이미지 리샘플링, 기가바이트 단위의 텍스트 토큰화, 실시간 API 호출 등 AI 워크플로우는 CPU 집약적인 연산과 I/O 바운드 작업이 복합적으로 얽혀 있습니다. Python 개발자들은 이때 세 가지 선택지 앞에 놓입니다: multiprocessing, threading, 그리고 asyncio. 하지만 잘못된 선택은 Python의 GIL(Global Interpreter Lock)에 의한 성능 저하를 초래하거나, 오히려 컨텍스트 스위칭 비용으로 인해 속도를 늦추기도 합니다. 본 포스팅에서는 AI 데이터 파이프라인의 각 단계에 맞는 최적의 병렬화 전략과 실무에서 직면하는 성능 병목을 해결하는 7가지 Example을 심도 있게 분석합니다.


2. Multiprocessing vs Threading vs Asyncio 핵심 차이 분석

각 기술은 해결하려는 문제의 성격이 완전히 다릅니다. AI 파이프라인 설계 전, 아래 표를 통해 기술적 메커니즘을 반드시 이해해야 합니다.

비교 항목 Multiprocessing Threading Asyncio
핵심 메커니즘 개별 프로세스 생성 (OS 레벨) 단일 프로세스 내 경량 스레드 단일 스레드 내 이벤트 루프
GIL 영향 완벽하게 우회 가능 제한적 (I/O 작업 시만 해제) 동일 (협력적 멀티태스킹)
메모리 소모 높음 (메모리 복제 발생) 보통 (메모리 공유) 매우 낮음 (비동기 제어)
최적의 워크로드 CPU Bound (행렬 연산, 전처리) I/O Bound (파일 읽기/쓰기) Network Bound (API 호출, 크롤링)
데이터 통신 IPC (Queue, Pipe) 필요 공유 변수 접근 (Lock 필수) 공유 상태 유지 용이

3. AI 파이프라인 가속화를 위한 실전 구현 Example 7선

실제 대규모 데이터 로딩부터 임베딩 서버 통신까지, AI 프로젝트에서 즉시 적용 가능한 Python 코드입니다.

Example 1: Multiprocessing을 이용한 고해상도 이미지 전처리 가속

GIL을 우회하여 멀티코어 CPU를 100% 활용하는 CPU 집약적 연산의 정석입니다.

import multiprocessing
from PIL import Image
import os

def resize_image(img_path):
    # 이미지 리사이징은 CPU 연산량이 큼
    with Image.open(img_path) as img:
        img = img.resize((512, 512))
        img.save(f"processed_{img_path}")

if __name__ == "__main__":
    paths = [f"img_{i}.jpg" for i in range(1000)]
    # 프로세스 풀을 사용하여 병렬 처리
    with multiprocessing.Pool(processes=multiprocessing.cpu_count()) as pool:
        pool.map(resize_image, paths)
        

Example 2: Threading을 이용한 로그 파일 실시간 수집 및 쓰기

파일 시스템 I/O 대기 시간 동안 다른 스레드가 동작하게 하여 효율을 높이는 방법입니다.

import threading
import queue

log_queue = queue.Queue()

def log_writer():
    while True:
        data = log_queue.get()
        if data is None: break
        with open("ai_logs.txt", "a") as f:
            f.write(data + "\n")
        log_queue.task_done()

# 로깅 전용 스레드 시작
writer_thread = threading.Thread(target=log_writer)
writer_thread.start()
        

Example 3: Asyncio를 활용한 대규모 임베딩 API 비동기 호출

수천 개의 텍스트를 외부 API(OpenAI 등)로 보낼 때 네트워크 대기 시간을 최소화하는 핵심 해결책입니다.

import asyncio
import aiohttp

async def fetch_embedding(session, text):
    async with session.post("https://api.openai.com/v1/embeddings", json={"input": text}) as resp:
        return await resp.json()

async def main():
    texts = ["AI is great", "Python is fast", ...]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_embedding(session, t) for t in texts]
        results = await asyncio.gather(*tasks)

asyncio.run(main())
        

Example 4: Multiprocessing + Threading 혼합 파이프라인 설계

CPU로 이미지를 처리하면서 동시에 스레드로 결과를 DB에 저장하는 복합 워크플로우입니다.

from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

# CPU 전처리는 프로세스로, DB 저장은 스레드로 분리
def process_full_pipeline(item):
    # 1. CPU 연산 (Process)
    processed_data = heavy_math(item)
    # 2. I/O 작업 (Thread)
    with ThreadPoolExecutor(max_workers=2) as executor:
        executor.submit(save_to_db, processed_data)
        

Example 5: Asyncio Queue를 이용한 실시간 추론 요청 스로틀링

서버에 요청이 폭주할 때 에러를 방지하기 위해 비동기 큐로 처리량을 조절하는 기법입니다.

async def worker(q):
    while True:
        req = await q.get()
        # 추론 수행
        await asyncio.sleep(0.1) 
        q.task_done()

# 동시 처리량을 10개로 제한
queue = asyncio.Queue(maxsize=10)
        

Example 6: Multiprocessing Shared Memory를 통한 데이터 복사 오버헤드 해결

프로세스 간 대용량 NumPy 배열을 전달할 때 발생하는 복사 비용을 줄이는 최적화 방법입니다.

from multiprocessing import shared_memory
import numpy as np

# 공유 메모리에 1GB 데이터 할당
shm = shared_memory.SharedMemory(create=True, size=10**9)
existing_shm = shared_memory.SharedMemory(name=shm.name)
# 프로세스 간 포인터만 공유하여 데이터 접근
        

Example 7: Asyncio Loop 내에서 Blocking 연산 실행(Run in Executor)

비동기 루프가 CPU 연산으로 인해 멈추는(Blocking) 현상을 방지하는 실무 팁입니다.

import asyncio
import time

def blocking_cpu_task():
    time.sleep(5) # CPU 집약적 작업 가정
    return "Done"

async def main():
    loop = asyncio.get_running_loop()
    # 비동기 루프를 방해하지 않고 프로세스 풀에서 실행
    result = await loop.run_in_executor(None, blocking_cpu_task)
    print(result)

asyncio.run(main())
        

4. AI 프로젝트 상황별 최적의 선택 가이드라인

기술의 우위가 아닌 상황의 적합성을 판단해야 합니다.

  • 상황 1: 대용량 이미지/비디오 데이터 증강(Augmentation) -> Multiprocessing. GIL을 우회하여 모든 CPU 코어를 소진해야 합니다.
  • 상황 2: 벡터 DB 저장 및 클라우드 스토리지 업로드 -> Threading. 네트워크 응답을 기다리는 동안 다른 파일을 준비하는 것이 효율적입니다.
  • 상황 3: 마이크로서비스 간 수만 건의 gRPC/HTTP 통신 -> Asyncio. 수천 개의 연결을 최소한의 리소스로 유지할 수 있습니다.

5. 결론: 하이브리드 병렬 처리의 가치

결국 완벽한 단 하나의 기술은 없습니다. 현대적인 AI 데이터 파이프라인은 데이터 로딩(Multiprocessing) - 통신(Asyncio) - 로깅(Threading)이 유기적으로 결합된 하이브리드 형태를 띱니다. Python의 내부 메커니즘인 GIL을 적이 아닌 동료로 삼아, 각 연산의 성격에 맞는 도구를 선택할 때 비로소 성능의 극한을 끌어낼 수 있습니다.

내용 출처

  • Python Official Documentation: "Concurrency in Python - multiprocessing, threading, asyncio".
  • Real Python: "Speed Up Your Python Program With Concurrency".
  • High Performance Python (Mikael Plitt & Ian Ozsvald, 2nd Edition).
  • PyCon Engineering Blog: "Understanding GIL and its impact on Data Science".
728x90