
현대의 데이터 주도 기업에서 데이터 파이프라인(ETL)의 안정성은 비즈니스 의사결정의 핵심입니다. 하지만 Upstream 소스의 갑작스러운 변경이나 예기치 않은 데이터 품질 저하는 다운스트림의 분석 모델과 대시보드를 무너뜨리는 주범입니다. 본 포스팅에서는 단순히 데이터를 옮기는 것을 넘어, **Python** 기반 ETL 과정에서 데이터의 신뢰성을 보장하는 **Great Expectations (GE)** 라이브러리의 독창적인 활용 방법과 실무적인 해결책을 깊이 있게 다룹니다.
1. 데이터 무결성을 위한 사전 방어 전략: 스키마 검증의 중요성
많은 데이터 엔지니어들이 ETL 스크립트를 작성할 때 로직 자체에 집중하지만, 정작 유입되는 데이터의 '형태(Schema)'가 올바른지 검증하는 것에는 소홀한 경우가 많습니다. Schema Drift(소스 데이터 구조의 미묘한 변화)는 조용히 발생하며, 그 결과는 비즈니스 로직 오류라는 참사로 이어집니다. **Great Expectations**는 단순한 assertion을 넘어 데이터에 대한 '기대(Expectation)'를 선언하고 이를 자동화된 문서 및 레포트로 관리할 수 있는 유일무이한 툴입니다.
Pandas vs Great Expectations 검증 방식 비교
| 비교 항목 | 전통적인 Pandas Assertion | Great Expectations 활용 |
|---|---|---|
| 검증 선언 방식 | 하드코딩된 if/assert 문 | 선언적 API (Expectations) |
| 문서화 | 개발자의 주석에 의존 | Data Docs 자동 생성 (HTML) |
| 결과 레포팅 | 에러 로그 출력에 그침 | Validation Result JSON 및 시각화 |
| 확장성 | 복잡한 규칙 구현 시 가독성 저하 | 커스텀 Expectation 및 프로파일링 지원 |
| ETL 통합 | try-except 문으로 수동 제어 | Checkpoint를 통한 유연한 파이프라인 통합 |
2. 실무 개발자를 위한 Great Expectations 활용 예제 (7가지)
개발자가 현업 ETL 파이프라인(Pandas, Spark 등)에 즉시 복사하여 붙여넣을 수 있는, Great Expectations의 핵심 기능을 활용한 실무 해결 예제들입니다.
Example 1: 필수 컬럼 존재 여부 및 순서 검증
소스 데이터에서 비즈니스 로직에 필수적인 컬럼들이 빠짐없이 존재하는지, 그리고 정의된 순서대로 유입되는지 검증하여 Schema Drift를 방지합니다.
import pandas as pd
import great_expectations as ge
# Sample Data loading (ETL의 Extract 단계)
df_raw = pd.DataFrame({
'user_id': [1, 2, 3],
'event_type': ['click', 'view', 'purchase'],
'ts': ['2026-04-19 10:00:00', '2026-04-19 10:01:00', '2026-04-19 10:02:00']
})
# Great Expectations DataFrame으로 변환
ge_df = ge.from_pandas(df_raw)
# 1. 컬럼 목록 및 순서 검증 (정확히 일치해야 함)
required_columns = ['user_id', 'event_type', 'ts']
validation_result = ge_df.expect_table_columns_to_match_ordered_list(column_list=required_columns)
if not validation_result["success"]:
print(f"Schema Validation Failed: {validation_result['exception_info']}")
# 파이프라인 중단 로직 추가 가능
else:
print("Column Schema Validated Successfully.")
Example 2: 컬럼별 데이터 타입(Dtype) 무결성 보장
데이터 전처리(Transform) 단계 전, 각 컬럼이 비즈니스 로직 연산에 적합한 데이터 타입(Integer, String 등)을 가지고 있는지 확인합니다.
# user_id는 Integer, event_type은 String(Object)이어야 함을 검증
validation_results = []
validation_results.append(ge_df.expect_column_values_to_be_in_type_list('user_id', ['int64', 'int']))
validation_results.append(ge_df.expect_column_values_to_be_in_type_list('event_type', ['object', 'str']))
# 결과 통합 확인
for result in validation_results:
if not result["success"]:
print(f"Type Validation Failed for column {result['expectation_config']['kwargs']['column']}")
Example 3: 필수값(Non-null) 및 고유값(Unique) 제약 조건 검증
PK 역할을 하는 컬럼에 NULL이 유입되거나 중복 데이터가 발생하는 경우를 탐지하여 데이터 품질 저하를 막습니다.
# 1. user_id는 절대 NULL일 수 없음
ge_df.expect_column_values_to_be_not_null(column='user_id')
# 2. (비즈니스 규칙에 따라) ts별 user_id는 유니크해야 함 (간접적인 PK 검증)
ge_df.expect_column_unique_value_count_to_be_between(column='user_id', min_value=3, max_value=3)
# 또는 더 직관적인 방법
ge_df.expect_column_values_to_be_unique(column='user_id')
Example 4: 허용된 범주의 데이터 유입 검증 (Set Membership)
이벤트 타입이나 상태 코드처럼 미리 정의된 카테고리값(Categorical Data) 외에 오타나 잘못된 데이터가 소스에 포함되어 유입되는 것을 방지합니다.
# 허용된 이벤트 타입 정의
allowed_events = ['click', 'view', 'purchase', 'login']
# event_type 컬럼의 값이 허용된 세트에 포함되는지 검증
validation_result = ge_df.expect_column_values_to_be_in_set(
column='event_type',
value_set=allowed_events
)
if not validation_result["success"]:
unexpected_count = validation_result['result']['unexpected_count']
unexpected_list = validation_result['result']['unexpected_list']
print(f"Invalid event types found ({unexpected_count} rows): {unexpected_list}")
Example 5: 수치 데이터의 비즈니스 논리 범위 검증 (Range Check)
가격, 수량, 나이 등 수치형 데이터가 상식적이거나 비즈니스 규칙에 정의된 범위(예: 음수 가격 금지) 내에 존재하는지 확인합니다.
# (가정) 데이터에 'price' 컬럼이 추가되었다고 가정
df_raw['price'] = [1000, 2500, -500] # -500은 잘못된 데이터
ge_df_price = ge.from_pandas(df_raw)
# 가격은 0보다 크거나 같아야 함
validation_result = ge_df_price.expect_column_values_to_be_between(
column='price',
min_value=0,
strict_min=False
)
if not validation_result["success"]:
# 잘못된 데이터가 발생한 비율 확인
print(f"Error percentage: {validation_result['result']['unexpected_percent']}%")
Example 6: 정규표현식을 활용한 문자열 포맷 검증 (Pattern Matching)
이메일, 전화번호, 날짜 형태의 문자열이 약속된 정규표현식 포맷을 따르고 있는지 ETL 진입 단계에서 강력하게 검증합니다.
# ts 컬럼이 YYYY-MM-DD HH:MM:SS 포맷인지 검증하는 정규표현식
date_regex = r"^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}$"
validation_result = ge_df.expect_column_values_to_match_regex(
column='ts',
regex=date_regex
)
if not validation_result["success"]:
print("Timestamp format is invalid.")
Example 7: 프로덕션 ETL 파이프라인 통합 (Checkpoint 활용)
실제 프로덕션 환경에서는 코드 내에 직접 Expectation을 선언하기보다, 저장된 Suite를 불러와 Checkpoint를 통해 실행하고 결과에 따라 알림이나 파이프라인 제어를 수행하는 해결책을 사용합니다.
import great_expectations as ge
from great_expectations.core.batch import BatchRequest
from great_expectations.checkpoint import SimpleCheckpoint
# 1. Data Context 로드 (GE 구성 파일들이 있는 경로)
context = ge.get_context()
# 2. 검증할 데이터 세팅 (ETL Extract 데이터)
# (참고: 실제 환경에서는 DataConnector 설정을 통해 S3, DB 등 소스에서 직접 BatchRequest 생성)
batch_request = BatchRequest(
datasource_name="my_filesystem_datasource",
data_connector_name="default_inferred_data_connector_name",
data_asset_name="users_data.csv",
)
# 3. 미리 정의된 Expectation Suite 이름
suite_name = "users.schema.suite"
# 4. Checkpoint 설정 및 실행
checkpoint_config = {
"name": "my_etl_checkpoint",
"config_version": 1.0,
"class_name": "SimpleCheckpoint",
"run_name_template": "%Y%m%d-%H%M%S-etl-run",
"validations": [
{
"batch_request": batch_request,
"expectation_suite_name": suite_name,
}
],
}
context.add_or_update_checkpoint(**checkpoint_config)
results = context.run_checkpoint(checkpoint_name="my_etl_checkpoint")
# 5. 검증 결과에 따른 ETL 흐름 제어
if not results["success"]:
print("Critical: Data Validation Failed. Aborting ETL Pipeline.")
# Slack 알림 전송 또는 Airflow DAG Fail 처리 로직
raise ValueError("Data quality is too low for processing.")
else:
print("Data Validation Passed. Proceeding to Transform stage.")
# ETL Transform 단계로 진행
3. ETL 파이프라인에서 Schema Drift 해결 전략
소스 데이터의 스키마 변경은 ETL 장애의 가장 큰 원인입니다. Great Expectations를 활용하여 이를 현명하게 해결하는 3가지 방법은 다음과 같습니다.
- **Fail-Fast 전략 (강력한 차단):** Example 1, 2와 같이 컬럼 이름, 순서, 타입을 파이프라인 진입로에서 엄격하게 검증합니다. 스키마가 일치하지 않으면 **즉시 파이프라인을 중단(Fail-fast)**시키고 Upstream 담당자에게 알림을 보냅니다. 이는 잘못된 데이터가 Downstream으로 흘러가 더 큰 오류를 일으키는 것을 막는 가장 효과적인 방법입니다.
- **경고 및 모니터링 (유연한 대응):** 모든 스키마 변경이 장애로 이어지는 것은 아닙니다. 새로운 컬럼이 추가된 경우처럼 로직에 영향이 없다면, `mostly` 인자(G.E API 지원)를 활용하여 검증 성공 임계치를 조절하거나, Fail 대신 경고(Data Docs 시각화)만 남기고 파이프라인을 계속 진행시킬 수 있습니다. 이를 통해 불필요한 파이프라인 중단을 줄입니다.
- **자동화된 프로파일링 및 Suite 업데이트:** Great Expectations의 `OnboardingDataAssistant`나 프로파일러를 활용하여, 소스 데이터의 변경된 스키마를 주기적으로 자동 분석합니다. 변경된 스키마에 맞춰 Expectation Suite를 자동으로 업데이트하거나, 변경 제안 레포트를 엔지니어에게 제공하여 ETL 코드를 빠르게 수정할 수 있도록 해결책을 제시합니다.
4. 결론: Great Expectations 데이터 스키마 검증 도구 활용
ETL 과정에서 데이터 스키마를 검증하는 것은 이제 '선택'이 아닌 '필수'입니다. **Great Expectations**는 Python 데이터 엔지니어에게 데이터 품질을 선언적으로 정의하고, 자동화된 문서화(Data Docs)를 통해 협업을 촉진하며, Checkpoint를 통해 프로덕션 파이프라인에 유연하게 통합할 수 있는 강력한 해결책을 제공합니다. 단순히 코드를 작성하는 것을 넘어 데이터의 '기대'를 관리하는 것, 그것이 완벽한 ETL 파이프라인을 구축하는 차이점입니다.
참고 문헌 및 출처
- Great Expectations Official Documentation
- Python Data Science Handbook by Jake VanderPlas
- Data Engineering Teams by Jesse Anderson
- Apache Airflow Documentation (ETL Integration patterns)
'Artificial Intelligence > 60. Python' 카테고리의 다른 글
| [PYTHON] 전처리 파이프라인에서 GPU 가속(RAPIDS) 도입 시 가성비 분석 및 3가지 효율 해결 방법 (0) | 2026.04.23 |
|---|---|
| [PYTHON] 고차원 데이터 시각화를 위한 t-SNE vs UMAP 2가지 알고리즘 성능 및 해석 차이 해결 방법 (0) | 2026.04.23 |
| [PYTHON] 고차원 카테고리 데이터 해결을 위한 Target Encoding 오버피팅 방지 7가지 방법 (0) | 2026.04.23 |
| [PYTHON] 시계열 결측치 0으로 해결하는 MICE 알고리즘의 한계와 3가지 대안 방법 (0) | 2026.04.23 |
| [PYTHON] 효율적인 데이터 라벨링을 위한 Active Learning 샘플링 전략 7가지 해결 방법 (0) | 2026.04.23 |