본문 바로가기
Artificial Intelligence/60. Python

[PYTHON] 분산 환경 Ray 데이터 셔플링 성능 최적화 해결 방법 3가지와 7개 실무 예제

by Papa Martino V 2026. 4. 28.
728x90

Data Shuffling
Data Shuffling

 

빅데이터 시대에 대규모 데이터셋을 처리하는 것은 단순한 로직의 문제가 아니라 인프라 자원의 효율적 활용의 문제입니다. 특히 파이썬 생태계에서 분산 컴퓨팅의 표준으로 자리 잡은 Ray를 활용할 때, 가장 큰 병목 현상이 발생하는 지점은 다름 아닌 '데이터 셔플링(Data Shuffling)' 단계입니다. 본 포스팅에서는 Ray의 Ray Data 라이브러리를 중심으로 분산 환경에서 데이터 셔플링 성능을 비약적으로 향상시킬 수 있는 구체적인 최적화 전략과 실무에 즉시 적용 가능한 코드를 심도 있게 다룹니다.


1. 왜 분산 환경에서 셔플링이 병목이 되는가?

셔플링은 repartition, group_by, sort와 같은 연산을 수행할 때 데이터 노드 간에 데이터를 재배치하는 과정을 의미합니다. 이 과정에서 발생하는 주요 오버헤드는 다음과 같습니다.

  • Network I/O: 노드 간 데이터 이동 시 발생하는 네트워크 대역폭 소모
  • Serialization/Deserialization: 객체를 직렬화하여 전송하고 다시 복원하는 CPU 부하
  • Object Store Memory: Ray의 공유 메모리인 Plasma Object Store의 압박

2. 셔플링 방식 비교: Simple vs. Push-based

Ray는 기본적으로 두 가지 셔플링 메커니즘을 제공합니다. 데이터의 크기와 클러스터의 구조에 따라 적절한 선택이 필요합니다.

비교 항목 Simple Shuffling (Pull-based) Push-based Shuffling
작동 원리 리듀서가 매퍼의 데이터를 가져옴 매퍼가 데이터를 능동적으로 전송/병합
메모리 사용량 중간 파일 생성으로 인한 스파이크 발생 가능 스트리밍 방식으로 메모리 효율적 관리
적합한 데이터 규모 중소규모 데이터셋 (100GB 미만) 대규모 테라바이트(TB)급 데이터셋
확장성 노드 수가 많아질수록 성능 저하 수천 개의 노드에서도 안정적 성능 유지

3. Ray 데이터 셔플링 최적화 3대 핵심 전략

전략 1: 전송 최소화를 위한 Local 집계 활용

모든 데이터를 셔플링하기 전에 각 노드에서 가능한 수준의 사전 집계(Map-side combine)를 수행하여 네트워크 전송량을 최소화합니다.

전략 2: 객체 저장소(Object Store) 튜닝

Ray의 공유 메모리 크기를 적절히 설정하고, Spilling(디스크 쓰기)이 발생하지 않도록 메모리 사용 임계값을 조정해야 합니다.

전략 3: 파티션 크기(Block Size) 최적화

너무 작은 파티션은 스케줄링 오버헤드를 유발하고, 너무 큰 파티션은 Out-of-Memory(OOM) 오류를 일으킵니다. 일반적으로 100MB ~ 1GB 사이의 블록 크기를 권장합니다.


4. 실무 적용 가능한 개발자용 예제 (7가지)

아래 예제들은 Ray 2.0 이상 버전의 ray.data API를 기준으로 작성되었습니다.

Example 1: 최적의 파티션 개수 자동 설정

import ray
import ray.data

# Ray 초기화
ray.init(ignore_reinit_error=True)

# 데이터 로드 시 병렬 처리 성능을 위해 num_blocks를 CPU 코어 수의 2~4배로 설정
ds = ray.data.read_parquet("s3://my-bucket/large_data/")
num_cpus = int(ray.cluster_resources().get("CPU", 1))
optimized_ds = ds.repartition(num_blocks=num_cpus * 4)

print(f"최적화된 파티션 수: {optimized_ds.num_blocks()}")

Example 2: Push-based Shuffle 활성화

# 대규모 데이터 정렬 시 Push-based 셔플링 강제 활성화
# 환경 변수 또는 실행 옵션을 통해 설정 가능
ctx = ray.data.DataContext.get_current()
ctx.use_push_based_shuffle = True

ds = ray.data.read_csv("large_dataset.csv")
# 글로벌 정렬 수행 시 Push-based 메커니즘이 동작함
sorted_ds = ds.sort("user_id")

Example 3: 복합 키 기반 고속 그룹화 (Map-side Reduction)

# 데이터 전처리 중 그룹화를 수행할 때 집계 함수를 미리 정의하여 셔플링 부하 감소
def aggregate_locally(batch):
    # 각 블록 내에서 1차 집계 수행
    return batch.groupby("category").sum()

ds = ray.data.read_parquet("sales_data/")
# 셔플링 전 map_batches를 활용한 사전 처리
optimized_agg = ds.map_batches(aggregate_locally).groupby("category").sum()

Example 4: 공유 메모리 OOM 방지를 위한 Resource 제한

# 특정 태스크가 과도한 메모리를 사용하지 않도록 리소스 제약 조건 부여
@ray.remote(memory=2 * 1024 * 1024 * 1024) # 2GB 제한
def shuffle_safe_task(block):
    # 무거운 전처리 로직
    return block

# 데이터셋 파이프라인에 적용
ds = ray.data.from_items([i for i in range(100)])
ds = ds.map_batches(shuffle_safe_task, batch_size=50)

Example 5: 데이터 로드 시 필터 푸시다운(Filter Pushdown)

# 셔플링 대상 자체를 줄이기 위해 소스 단계에서 데이터 필터링
ds = ray.data.read_parquet(
    "s3://logs/",
    filter=(ray.data.preprocessor.Placeholder("status") == "200")
)
# 필요한 열만 선택하여 직렬화 비용 감소
ds = ds.select_columns(["user_id", "timestamp", "action"])

Example 6: 분산 셔플링 중 모니터링 및 로깅

# Ray Dashboard 외에 코드상에서 실행 통계 확인
ds = ray.data.range(1000000).repartition(100)
ds.fully_executed() # 모든 셔플 연산 완료 대기

# 실행 통계 출력 (어느 단계에서 병목이 생기는지 확인 가능)
print(ds.stats())

Example 7: 사용자 정의 파티셔닝(Custom Partitioning)

# 특정 비즈니스 로직에 따라 데이터를 물리적으로 배치하여 차후 셔플링 방지
def custom_partitioner(row):
    return hash(row["region"]) % 10

ds = ray.data.read_parquet("global_data/")
# 특정 키값 기반으로 데이터를 재배치하여 로컬리티 향상
sharded_ds = ds.repartition(num_blocks=10, shuffle=True)

5. 결론 및 고도화 팁

Ray를 이용한 분산 데이터 전처리는 단순히 코드를 돌리는 것이 아니라, 컴퓨팅 자원과 데이터 이동의 밸런스를 맞추는 예술에 가깝습니다. 셔플링 성능을 높이기 위해서는 무조건적인 자원 투입보다는 데이터 소스 단계에서의 필터링, 적절한 파티션 크기 설정, 그리고 대규모 작업 시 Push-based 셔플링 활용이 필수적입니다.

 

전문가 팁: Ray 2.x 시리즈로 올라오면서 Streaming Executor가 기본값이 되었습니다. 이는 메모리 소비를 획기적으로 줄여주지만, 매우 큰 객체를 셔플링할 때는 여전히 object_store_memory 설정을 체크해야 합니다.


내용 출처 및 참고 문헌

  • Ray Official Documentation: Ray Data - Optimizing Shuffles (2024)
  • Anyscale Engineering Blog: Push-based Shuffle in Ray for Large Scale Data Processing
  • Python Software Foundation: Multiprocessing and Distributed Systems in Python
728x90