본문 바로가기
Artificial Intelligence/60. Python

[PYTHON] 스트리밍 데이터 처리 시 Kafka와 Python 모델의 3가지 결합 방법 및 지연 시간 해결

by Papa Martino V 2026. 4. 21.
728x90

Kafka-Python
Kafka-Python

 

현대의 데이터 아키텍처는 정적인 배치(Batch) 처리에서 실시간 스트리밍(Streaming) 처리로 급격히 전환되고 있습니다. 특히 금융 사기 탐지, 실시간 추천 엔진, IoT 센서 모니터링 분야에서 Apache Kafka는 데이터의 혈맥 역할을 합니다. 하지만 많은 개발자가 고성능 분산 메시지 큐인 Kafka와 유연하지만 상대적으로 느린 Python 머신러닝 모델을 결합할 때 처리량(Throughput) 저하와 지연 시간(Latency) 문제에 직면합니다. 본 포스팅에서는 Python 개발자를 위해 Kafka와 예측 모델을 결합하는 최적의 아키텍처 패턴 3가지를 분석하고, 실무에서 즉시 사용 가능한 7가지 고성능 구현 예제를 제공합니다.


1. Kafka와 Python 모델 결합 시 발생하는 주요 문제와 해석 차이

단순히 kafka-python 라이브러리로 데이터를 가져와 모델의 predict() 함수를 호출하는 방식은 소규모 테스트에서는 작동하지만, 운영 환경에서는 반드시 병목 현상을 일으킵니다. 이를 해결하기 위해 '처리 방식'의 구조적 차이를 이해해야 합니다.

지연 시간의 주범: 데이터 직렬화와 GIL

  • Serialization Overhead: JSON이나 Pickle 포맷은 직렬화/역직렬화 과정에서 CPU 비용이 큽니다.
  • GIL(Global Interpreter Lock): Python의 멀티스레딩 한계로 인해 CPU 집약적인 모델 연산이 동시성을 확보하기 어렵습니다.
  • Backpressure: Kafka의 Produce 속도가 Python Consumer의 처리 속도보다 빠를 때 발생하는 데이터 누적 문제입니다.

2. Kafka-Python 스트리밍 결합 방식 성능 비교

시스템의 요구 사항(실시간성 vs 처리량)에 따라 적합한 결합 방식을 선택할 수 있도록 비교 표를 구성했습니다.

비교 항목 In-Process 결합 (Consumer 내 삽입) Sidecar/Microservice (API 호출) Stream Processing (Faust/PySpark)
구현 난이도 매우 낮음 보통 높음
지연 시간(Latency) 최저 (네트워크 홉 없음) 높음 (HTTP/gRPC 오버헤드) 낮음 (분산 병렬 처리)
확장성 제한적 (Consumer 그룹 의존) 매우 우수 (독립적 스케일링) 우수 (클러스터 단위 확장)
추천 시나리오 가벼운 모델, 단일 노드 분석 대형 모델 (LLM, GPU 활용) 복잡한 상태 유지 분석(Stateful)
장애 전파 모델 장애가 Consumer 정지로 이어짐 격리되어 있어 상대적으로 안전 프레임워크가 장애 복구 관리

3. 실무 가속을 위한 Python & Kafka 구현 예제 (7 Examples)

가장 널리 쓰이는 confluent-kafkaFaust 라이브러리를 기준으로, 실무 개발자가 바로 적용할 수 있는 7가지 해결 방법을 소개합니다.

Example 1: 고성능 Consumer와 간단한 모델 예측 (Basic)

가장 기초적인 패턴으로, Consumer 루프 내에서 모델을 호출합니다.

from confluent_kafka import Consumer
import joblib

# 모델 로드
model = joblib.load('my_model.pkl')

conf = {'bootstrap.servers': 'localhost:9092', 'group.id': 'ml-group', 'auto.offset.reset': 'earliest'}
consumer = Consumer(conf)
consumer.subscribe(['input-topic'])

while True:
    msg = consumer.poll(1.0)
    if msg is None: continue
    
    # 데이터 변환 및 예측
    data = float(msg.value().decode('utf-8'))
    prediction = model.predict([[data]])
    print(f"Result: {prediction}")

Example 2: Multiprocessing을 활용한 병렬 예측 처리

GIL 문제를 해결하기 위해 여러 프로세스에 모델 추론 작업을 분산합니다.

from multiprocessing import Pool
import numpy as np

def predict_worker(batch_data):
    # 각 프로세스에서 실행될 모델 로직
    return model.predict(batch_data)

# Consumer에서 메시지를 모아 Batch로 Worker에게 전달
with Pool(processes=4) as pool:
    results = pool.map(predict_worker, data_chunks)

Example 3: Faust를 이용한 상태 유지(Stateful) 스트리밍 분석

Kafka Streams의 Python 판인 Faust를 활용하여 실시간 윈도우 집계를 수행합니다.

import faust

app = faust.App('ml-stream', broker='kafka://localhost:9092')
topic = app.topic('sensor-data', value_type=float)

@app.agent(topic)
async def process(stream):
    async for value in stream:
        # 비동기적으로 모델 예측 수행
        result = await my_async_model.predict(value)
        yield result

Example 4: Avro 직렬화를 통한 데이터 스키마 관리 및 속도 향상

JSON 대신 이진 포맷인 Avro를 사용하여 네트워크 오버헤드를 줄입니다.

from confluent_kafka.schema_registry.avro import AvroDeserializer

# Schema Registry 연결 및 역직렬화 설정
avro_deserializer = AvroDeserializer(schema_registry_client, schema_str)

# Consumer 설정에 적용하여 고속 데이터 처리 가능

Example 5: 비동기 API(FastAPI)와 Kafka 결합 (Sidecar Pattern)

모델을 별도의 서비스로 분리하고 Kafka Consumer가 이를 호출하는 방식입니다.

import httpx

async def call_model_api(data):
    async with httpx.AsyncClient() as client:
        response = await client.post("http://model-service/predict", json=data)
        return response.json()

# Kafka Consumer 내에서 await call_model_api(msg_data) 호출

Example 6: Kafka Producer의 Batching 설정을 통한 지연 시간 해결

예측 결과를 다시 Kafka로 보낼 때 성능을 극대화하는 설정값입니다.

conf = {
    'bootstrap.servers': 'localhost:9092',
    'linger.ms': 10,           # 10ms 동안 메시지를 모아 보냄
    'batch.size': 32768,       # 32KB 단위로 배치 전송
    'compression.type': 'snappy' # 압축을 통해 처리량 증대
}
producer = Producer(conf)

Example 7: GPU 가속 추론을 위한 TensorRT와 Kafka 연동

대규모 딥러닝 모델의 실시간 처리를 위해 GPU 큐를 관리하는 방식입니다.

# Consumer는 GPU 메모리 버퍼에 데이터를 적재만 하고
# 별도 스레드가 TensorRT 엔진을 통해 일괄 추론 수행
input_buffer.append(kafka_msg)
if len(input_buffer) >= 64:
    results = trt_engine.execute(input_buffer)

4. 고가용성 스트리밍 환경을 위한 3단계 문제 해결 전략

  1. Rebalance 방지: 모델 추론 시간이 max.poll.interval.ms보다 길어지면 Consumer가 그룹에서 제외됩니다. 이를 위해 예측 로직은 비동기로 처리하거나 설정을 넉넉히 조정하십시오.
  2. Dead Letter Queue (DLQ): 예측에 실패한(포맷 오류 등) 데이터는 별도의 에러 토픽으로 보내 전체 파이프라인이 멈추지 않도록 해결하십시오.
  3. Monitoring: Consumer Lag(데이터 밀림 정도)을 Prometheus 등으로 실시간 모니터링하여 인스턴스 확장 시점을 파악하십시오.

5. 결론: 어떤 방식을 선택할 것인가?

데이터의 양이 적고 모델이 가볍다면 Example 1(In-Process)로 충분합니다. 그러나 확장성이 중요한 대규모 서비스라면 Example 5(API 분리)Example 3(Faust)을 권장합니다. 특히 2026년 현재는 데이터의 정합성과 속도를 동시에 잡기 위해 Avro멀티프로세싱의 조합이 가장 안정적인 선택지로 꼽힙니다.


내용 출처 및 기술 참조

  • Apache Kafka Official Documentation (kafka.apache.org)
  • Confluent Guide: Putting Machine Learning Models into Production
  • Faust Streaming Library for Python (GitHub)
  • "Designing Data-Intensive Applications" by Martin Kleppmann
  • Python Global Interpreter Lock (GIL) - Real Python Reference
728x90