본문 바로가기
Artificial Intelligence/60. Python

[PYTHON] Race Condition 해결을 위한 2가지 핵심 동기화 기법 : Lock과 Semaphore 활용 방법

by Papa Martino V 2026. 3. 29.
728x90

Race Condition
Race Condition

 

파이썬 멀티스레딩 환경에서 데이터 무결성을 보장하고 경쟁 상태(Race Condition)를 방지하는 실무 전략


1. 경쟁 상태(Race Condition)의 본질과 동기화의 필요성

현대 소프트웨어 개발에서 성능 최적화를 위해 멀티스레딩(Multithreading)은 필수적인 요소입니다. 하지만 파이썬의 Global Interpreter Lock(GIL)이 존재함에도 불구하고, 공유 자원에 여러 스레드가 동시에 접근할 때 발생하는 Race Condition(경쟁 상태)은 프로그램의 논리적 오류와 데이터 손상을 초래하는 고질적인 문제입니다. 경쟁 상태란 두 개 이상의 스레드가 공유 변수나 리소스에 접근하여 동시에 수정하려고 할 때, 실행 순서에 따라 결과값이 달라지는 현상을 말합니다. 이를 해결하기 위해서는 특정 시점에 단 하나의 스레드만 자원을 사용하도록 제한하는 동기화(Synchronization) 메커니즘이 필요합니다. 본 가이드에서는 가장 대표적인 해결책인 LockSemaphore의 차이점과 실무 적용 노하우를 상세히 다룹니다.

2. Lock과 Semaphore 핵심 비교 분석

동기화 객체를 선택하기 전, 각 기법의 특성을 정확히 이해하는 것이 중요합니다.

비교 항목 Lock (Mutex) Semaphore (세마포어)
핵심 개념 상호 배제 (Mutual Exclusion) 제한된 개수의 자원 공유
접근 허용 수 오직 1개의 스레드 N개의 스레드 (설정 가능)
소유권 Lock을 획득한 스레드만 해제 가능 획득한 스레드가 아니어도 해제 가능
주요 용도 공유 변수 수정, 파일 쓰기 보호 DB 커넥션 풀 제한, 서버 부하 조절
복잡도 매우 낮음 중간 (Deadlock 위험 주의)

3. 실무 적용을 위한 7가지 Sample Examples

개발자가 현업에서 즉시 활용할 수 있는 파이썬 스레딩 동기화 예제입니다.

Example 1: Lock을 이용한 공유 카운터 업데이트 (가장 기본적인 해결 방법)

공유 변수를 안전하게 증가시키는 방법입니다. with 문을 사용하여 안전하게 해제합니다.


import threading

class SecureCounter:
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()

    def increment(self):
        with self.lock:
            # Critical Section 시작
            current = self.value
            self.value = current + 1
            # Critical Section 종료

counter = SecureCounter()
threads = [threading.Thread(target=counter.increment) for _ in range(100)]
for t in threads: t.start()
for t in threads: t.join()
print(f"Final Value: {counter.value}")
        

Example 2: Semaphore를 이용한 동시 HTTP 요청 제한

특정 API에 과부하를 주지 않기 위해 동시 실행 스레드 수를 3개로 제한하는 방법입니다.


import threading
import time

# 동시에 3개만 허용
pool_limit = threading.Semaphore(3)

def fetch_resource(id):
    with pool_limit:
        print(f"Resource {id} fetching...")
        time.sleep(1)
        print(f"Resource {id} done.")

for i in range(10):
    threading.Thread(target=fetch_resource, args=(i,)).start()
        

Example 3: RLock(Recursive Lock)을 이용한 재귀 함수 동기화

동일한 스레드가 이미 획득한 Lock을 다시 요청할 때 발생하는 Deadlock을 방지합니다.


import threading

class ReentrantTask:
    def __init__(self):
        self.lock = threading.RLock()

    def outer_task(self):
        with self.lock:
            print("Outer task started")
            self.inner_task()

    def inner_task(self):
        with self.lock:
            print("Inner task executed under same lock")

task = ReentrantTask()
threading.Thread(target=task.outer_task).start()
        

Example 4: BoundedSemaphore를 이용한 자원 반환 감시

Semaphore의 release()가 acquire() 횟수보다 많아지는 논리적 오류를 방지합니다.


import threading

# 초기값 1인 한정 세마포어
b_sem = threading.BoundedSemaphore(1)

def safe_release():
    b_sem.acquire()
    b_sem.release()
    try:
        b_sem.release()  # 예외 발생: ValueError
    except ValueError:
        print("Semaphore release count exceeded!")

safe_release()
        

Example 5: Lock을 활용한 스레드 세이프(Thread-safe) 싱글톤 패턴

멀티스레드 환경에서 인스턴스가 단 하나만 생성되도록 보장합니다.


import threading

class Singleton:
    _instance = None
    _lock = threading.Lock()

    def __new__(cls):
        if not cls._instance:
            with cls._lock:
                if not cls._instance:
                    cls._instance = super(Singleton, cls).__new__(cls)
        return cls._instance

s1 = Singleton()
s2 = Singleton()
print(f"Is same instance? {s1 is s2}")
        

Example 6: 조건부 Lock (Try-Lock) 활용법

Lock이 풀릴 때까지 무한정 기다리지 않고, 즉시 다른 작업을 수행하는 방법입니다.


import threading
import time

resource_lock = threading.Lock()

def non_blocking_task():
    got_lock = resource_lock.acquire(blocking=False)
    if got_lock:
        try:
            print("Lock acquired, processing data...")
            time.sleep(2)
        finally:
            resource_lock.release()
    else:
        print("Lock busy, doing other work instead.")

threading.Thread(target=non_blocking_task).start()
threading.Thread(target=non_blocking_task).start()
        

Example 7: Semaphore 기반의 생산자-소비자 패턴 (Buffer Control)

버퍼의 크기를 제한하여 시스템 메모리 폭주를 방지하는 실무 구조입니다.


import threading
import queue
import time

buffer = queue.Queue()
# 버퍼 최대 크기를 5로 제한하는 세마포어
empty_slots = threading.Semaphore(5)

def producer():
    for i in range(10):
        empty_slots.acquire()
        buffer.put(f"Data-{i}")
        print(f"Produced Data-{i}")

def consumer():
    while True:
        data = buffer.get()
        print(f"Consumed {data}")
        empty_slots.release()
        time.sleep(0.5)

threading.Thread(target=producer).start()
threading.Thread(target=consumer, daemon=True).start()
        

4. 결론 및 주의사항

동기화는 강력한 도구이지만, 과도한 Lock 사용은 Deadlock(교착 상태)이나 스레드 경합으로 인한 성능 저하를 유발합니다. 항상 with 구문을 사용하여 예외 발생 시에도 Lock이 해제되도록 보장하고, 가능한 Critical Section의 범위를 최소화하는 것이 운영 환경의 핵심입니다.

내용 출처

  • Python 공식 문서 (threading module): https://docs.python.org/3/library/threading.html
  • Operating System Concepts (Abraham Silberschatz 저) - Synchronization 파트
  • Python Software Foundation - GIL(Global Interpreter Lock) Guide
728x90