본문 바로가기
Artificial Intelligence/60. Python

[PYTHON] 완벽한 ETL 파이프라인 구축을 위한 Great Expectations 데이터 스키마 검증 방법 3가지 및 해결책

by Papa Martino V 2026. 4. 23.
728x90

Great Expectations
Great Expectations

 

 

현대의 데이터 주도 기업에서 데이터 파이프라인(ETL)의 안정성은 비즈니스 의사결정의 핵심입니다. 하지만 Upstream 소스의 갑작스러운 변경이나 예기치 않은 데이터 품질 저하는 다운스트림의 분석 모델과 대시보드를 무너뜨리는 주범입니다. 본 포스팅에서는 단순히 데이터를 옮기는 것을 넘어, **Python** 기반 ETL 과정에서 데이터의 신뢰성을 보장하는 **Great Expectations (GE)** 라이브러리의 독창적인 활용 방법과 실무적인 해결책을 깊이 있게 다룹니다.


1. 데이터 무결성을 위한 사전 방어 전략: 스키마 검증의 중요성

많은 데이터 엔지니어들이 ETL 스크립트를 작성할 때 로직 자체에 집중하지만, 정작 유입되는 데이터의 '형태(Schema)'가 올바른지 검증하는 것에는 소홀한 경우가 많습니다. Schema Drift(소스 데이터 구조의 미묘한 변화)는 조용히 발생하며, 그 결과는 비즈니스 로직 오류라는 참사로 이어집니다. **Great Expectations**는 단순한 assertion을 넘어 데이터에 대한 '기대(Expectation)'를 선언하고 이를 자동화된 문서 및 레포트로 관리할 수 있는 유일무이한 툴입니다.

Pandas vs Great Expectations 검증 방식 비교

비교 항목 전통적인 Pandas Assertion Great Expectations 활용
검증 선언 방식 하드코딩된 if/assert 문 선언적 API (Expectations)
문서화 개발자의 주석에 의존 Data Docs 자동 생성 (HTML)
결과 레포팅 에러 로그 출력에 그침 Validation Result JSON 및 시각화
확장성 복잡한 규칙 구현 시 가독성 저하 커스텀 Expectation 및 프로파일링 지원
ETL 통합 try-except 문으로 수동 제어 Checkpoint를 통한 유연한 파이프라인 통합

2. 실무 개발자를 위한 Great Expectations 활용 예제 (7가지)

개발자가 현업 ETL 파이프라인(Pandas, Spark 등)에 즉시 복사하여 붙여넣을 수 있는, Great Expectations의 핵심 기능을 활용한 실무 해결 예제들입니다.

Example 1: 필수 컬럼 존재 여부 및 순서 검증

소스 데이터에서 비즈니스 로직에 필수적인 컬럼들이 빠짐없이 존재하는지, 그리고 정의된 순서대로 유입되는지 검증하여 Schema Drift를 방지합니다.


import pandas as pd
import great_expectations as ge

# Sample Data loading (ETL의 Extract 단계)
df_raw = pd.DataFrame({
    'user_id': [1, 2, 3],
    'event_type': ['click', 'view', 'purchase'],
    'ts': ['2026-04-19 10:00:00', '2026-04-19 10:01:00', '2026-04-19 10:02:00']
})

# Great Expectations DataFrame으로 변환
ge_df = ge.from_pandas(df_raw)

# 1. 컬럼 목록 및 순서 검증 (정확히 일치해야 함)
required_columns = ['user_id', 'event_type', 'ts']
validation_result = ge_df.expect_table_columns_to_match_ordered_list(column_list=required_columns)

if not validation_result["success"]:
    print(f"Schema Validation Failed: {validation_result['exception_info']}")
    # 파이프라인 중단 로직 추가 가능
else:
    print("Column Schema Validated Successfully.")

Example 2: 컬럼별 데이터 타입(Dtype) 무결성 보장

데이터 전처리(Transform) 단계 전, 각 컬럼이 비즈니스 로직 연산에 적합한 데이터 타입(Integer, String 등)을 가지고 있는지 확인합니다.


# user_id는 Integer, event_type은 String(Object)이어야 함을 검증
validation_results = []
validation_results.append(ge_df.expect_column_values_to_be_in_type_list('user_id', ['int64', 'int']))
validation_results.append(ge_df.expect_column_values_to_be_in_type_list('event_type', ['object', 'str']))

# 결과 통합 확인
for result in validation_results:
    if not result["success"]:
        print(f"Type Validation Failed for column {result['expectation_config']['kwargs']['column']}")

Example 3: 필수값(Non-null) 및 고유값(Unique) 제약 조건 검증

PK 역할을 하는 컬럼에 NULL이 유입되거나 중복 데이터가 발생하는 경우를 탐지하여 데이터 품질 저하를 막습니다.


# 1. user_id는 절대 NULL일 수 없음
ge_df.expect_column_values_to_be_not_null(column='user_id')

# 2. (비즈니스 규칙에 따라) ts별 user_id는 유니크해야 함 (간접적인 PK 검증)
ge_df.expect_column_unique_value_count_to_be_between(column='user_id', min_value=3, max_value=3)
# 또는 더 직관적인 방법
ge_df.expect_column_values_to_be_unique(column='user_id')

Example 4: 허용된 범주의 데이터 유입 검증 (Set Membership)

이벤트 타입이나 상태 코드처럼 미리 정의된 카테고리값(Categorical Data) 외에 오타나 잘못된 데이터가 소스에 포함되어 유입되는 것을 방지합니다.


# 허용된 이벤트 타입 정의
allowed_events = ['click', 'view', 'purchase', 'login']

# event_type 컬럼의 값이 허용된 세트에 포함되는지 검증
validation_result = ge_df.expect_column_values_to_be_in_set(
    column='event_type',
    value_set=allowed_events
)

if not validation_result["success"]:
    unexpected_count = validation_result['result']['unexpected_count']
    unexpected_list = validation_result['result']['unexpected_list']
    print(f"Invalid event types found ({unexpected_count} rows): {unexpected_list}")

Example 5: 수치 데이터의 비즈니스 논리 범위 검증 (Range Check)

가격, 수량, 나이 등 수치형 데이터가 상식적이거나 비즈니스 규칙에 정의된 범위(예: 음수 가격 금지) 내에 존재하는지 확인합니다.


# (가정) 데이터에 'price' 컬럼이 추가되었다고 가정
df_raw['price'] = [1000, 2500, -500] # -500은 잘못된 데이터
ge_df_price = ge.from_pandas(df_raw)

# 가격은 0보다 크거나 같아야 함
validation_result = ge_df_price.expect_column_values_to_be_between(
    column='price',
    min_value=0,
    strict_min=False
)

if not validation_result["success"]:
    # 잘못된 데이터가 발생한 비율 확인
    print(f"Error percentage: {validation_result['result']['unexpected_percent']}%")

Example 6: 정규표현식을 활용한 문자열 포맷 검증 (Pattern Matching)

이메일, 전화번호, 날짜 형태의 문자열이 약속된 정규표현식 포맷을 따르고 있는지 ETL 진입 단계에서 강력하게 검증합니다.


# ts 컬럼이 YYYY-MM-DD HH:MM:SS 포맷인지 검증하는 정규표현식
date_regex = r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$"

validation_result = ge_df.expect_column_values_to_match_regex(
    column='ts',
    regex=date_regex
)

if not validation_result["success"]:
    print("Timestamp format is invalid.")

Example 7: 프로덕션 ETL 파이프라인 통합 (Checkpoint 활용)

실제 프로덕션 환경에서는 코드 내에 직접 Expectation을 선언하기보다, 저장된 Suite를 불러와 Checkpoint를 통해 실행하고 결과에 따라 알림이나 파이프라인 제어를 수행하는 해결책을 사용합니다.


import great_expectations as ge
from great_expectations.core.batch import BatchRequest
from great_expectations.checkpoint import SimpleCheckpoint

# 1. Data Context 로드 (GE 구성 파일들이 있는 경로)
context = ge.get_context()

# 2. 검증할 데이터 세팅 (ETL Extract 데이터)
# (참고: 실제 환경에서는 DataConnector 설정을 통해 S3, DB 등 소스에서 직접 BatchRequest 생성)
batch_request = BatchRequest(
    datasource_name="my_filesystem_datasource",
    data_connector_name="default_inferred_data_connector_name",
    data_asset_name="users_data.csv",
)

# 3. 미리 정의된 Expectation Suite 이름
suite_name = "users.schema.suite"

# 4. Checkpoint 설정 및 실행
checkpoint_config = {
    "name": "my_etl_checkpoint",
    "config_version": 1.0,
    "class_name": "SimpleCheckpoint",
    "run_name_template": "%Y%m%d-%H%M%S-etl-run",
    "validations": [
        {
            "batch_request": batch_request,
            "expectation_suite_name": suite_name,
        }
    ],
}
context.add_or_update_checkpoint(**checkpoint_config)
results = context.run_checkpoint(checkpoint_name="my_etl_checkpoint")

# 5. 검증 결과에 따른 ETL 흐름 제어
if not results["success"]:
    print("Critical: Data Validation Failed. Aborting ETL Pipeline.")
    # Slack 알림 전송 또는 Airflow DAG Fail 처리 로직
    raise ValueError("Data quality is too low for processing.")
else:
    print("Data Validation Passed. Proceeding to Transform stage.")
    # ETL Transform 단계로 진행

3. ETL 파이프라인에서 Schema Drift 해결 전략

소스 데이터의 스키마 변경은 ETL 장애의 가장 큰 원인입니다. Great Expectations를 활용하여 이를 현명하게 해결하는 3가지 방법은 다음과 같습니다.

  1. **Fail-Fast 전략 (강력한 차단):** Example 1, 2와 같이 컬럼 이름, 순서, 타입을 파이프라인 진입로에서 엄격하게 검증합니다. 스키마가 일치하지 않으면 **즉시 파이프라인을 중단(Fail-fast)**시키고 Upstream 담당자에게 알림을 보냅니다. 이는 잘못된 데이터가 Downstream으로 흘러가 더 큰 오류를 일으키는 것을 막는 가장 효과적인 방법입니다.
  2. **경고 및 모니터링 (유연한 대응):** 모든 스키마 변경이 장애로 이어지는 것은 아닙니다. 새로운 컬럼이 추가된 경우처럼 로직에 영향이 없다면, `mostly` 인자(G.E API 지원)를 활용하여 검증 성공 임계치를 조절하거나, Fail 대신 경고(Data Docs 시각화)만 남기고 파이프라인을 계속 진행시킬 수 있습니다. 이를 통해 불필요한 파이프라인 중단을 줄입니다.
  3. **자동화된 프로파일링 및 Suite 업데이트:** Great Expectations의 `OnboardingDataAssistant`나 프로파일러를 활용하여, 소스 데이터의 변경된 스키마를 주기적으로 자동 분석합니다. 변경된 스키마에 맞춰 Expectation Suite를 자동으로 업데이트하거나, 변경 제안 레포트를 엔지니어에게 제공하여 ETL 코드를 빠르게 수정할 수 있도록 해결책을 제시합니다.

4. 결론: Great Expectations 데이터 스키마 검증 도구 활용

ETL 과정에서 데이터 스키마를 검증하는 것은 이제 '선택'이 아닌 '필수'입니다. **Great Expectations**는 Python 데이터 엔지니어에게 데이터 품질을 선언적으로 정의하고, 자동화된 문서화(Data Docs)를 통해 협업을 촉진하며, Checkpoint를 통해 프로덕션 파이프라인에 유연하게 통합할 수 있는 강력한 해결책을 제공합니다. 단순히 코드를 작성하는 것을 넘어 데이터의 '기대'를 관리하는 것, 그것이 완벽한 ETL 파이프라인을 구축하는 차이점입니다.


참고 문헌 및 출처

  • Great Expectations Official Documentation 
  • Python Data Science Handbook by Jake VanderPlas
  • Data Engineering Teams by Jesse Anderson
  • Apache Airflow Documentation (ETL Integration patterns)
728x90