본문 바로가기
Artificial Intelligence/60. Python

[PYTHON] 데이터 파이프라인의 Lineage 추적을 위한 OpenLineage 적용 방법 및 3가지 해결 차이

by Papa Martino V 2026. 4. 21.
728x90

Data Lineage
Data Lineage

 

현대의 데이터 아키텍처는 수많은 소스 시스템, 복잡한 ETL 프로세스, 그리고 다양한 BI 도구들이 얽혀 있는 거대한 그물망과 같습니다. 데이터 엔지니어에게 가장 고통스러운 질문은 "이 대시보드의 숫자가 왜 틀렸나요?"가 아니라, "이 데이터가 도대체 어디서부터 흘러들어왔나요?"라는 질문일 것입니다. 데이터의 기원과 변경 이력을 추적하는 데이터 리니지(Data Lineage)는 이제 가시성을 넘어 가용성과 신뢰성의 핵심 지표가 되었습니다. 본 포스팅에서는 파이썬(Python) 환경에서 OpenLineage 표준을 적용하여 분산된 데이터 흐름을 투명하게 관리하는 구체적인 방법과, 기존의 수동 추적 방식 대비 3가지 결정적인 기술 해결 차이를 상세히 다룹니다. 실무 개발자가 즉시 통합 가능한 7가지 이상의 엔지니어링 예제를 통해 강력한 데이터 거버넌스를 구축해 보십시오.


1. 데이터 리니지의 필연성과 OpenLineage의 독창적 가치

데이터 리니지는 데이터의 생애 주기를 기록하는 지도입니다. 소스 테이블에서 타겟 테이블로 이동하는 단순한 경로뿐만 아니라, 어떤 쿼리가 실행되었는지, 어떤 파이썬 함수가 데이터를 변형했는지, 실행 시간은 얼마나 걸렸는지에 대한 메타데이터를 포함합니다.

OpenLineage는 이러한 메타데이터 수집을 위한 오픈 소스 표준 규격입니다. 특정 벤더에 종속되지 않고 Airflow, Spark, dbt 등 다양한 도구에서 발생하는 이벤트를 하나의 통합된 형태로 수집할 수 있다는 것이 가장 큰 장점입니다.


2. 수동 메타데이터 관리 vs OpenLineage 자동 추적 차이 비교

데이터 파이프라인 운영 시 발생하는 문제들에 대해 OpenLineage가 제공하는 해결 방식의 차이를 표로 정리하였습니다.

비교 항목 수동/하드코딩 리니지 관리 OpenLineage 기반 자동화 해결되는 핵심 차이
메타데이터 수집 문서화 또는 주석에 의존 런타임 시 API를 통한 자동 수집 실시간 가시성 확보
변경 사항 반영 코드 수정 시 매번 수동 업데이트 코드 변경 시 리니지 자동 갱신 휴먼 에러 원천 차단
도구 간 통합 각 도구별 파편화된 로그 발생 표준 포맷(JSON)으로 통합 관리 엔드투엔드(E2E) 추적 가능
데이터 영향도 분석 테이블 의존성 파악에 며칠 소요 그래프 분석으로 몇 초 내 파악 장애 복구 시간(MTTR) 단축
버전 관리 과거 데이터 흐름 확인 불가 실행 단위(Job/Run)별 이력 저장 감사(Audit) 및 규제 대응 해결

3. 실무 적용을 위한 OpenLineage & Python 통합 예제 (7 Examples)

이 섹션에서는 파이썬 환경에서 OpenLineage 이벤트를 생성하고 전송하는 7가지 실전 패턴을 소개합니다. 주로 openlineage-python 클라이언트를 활용합니다.

Example 1: 파이썬 스크립트에서 커스텀 Job 이벤트 전송

독립적인 파이썬 처리 로직에서 리니지 데이터를 수동으로 생성하여 Marquez 등의 백엔드로 전송하는 기초 예제입니다.


from openlineage.client import OpenLineageClient, Job, Run, RunEvent, RunState
from openlineage.client.facet import SqlJobFacet
import uuid
import datetime

# 1. 클라이언트 설정 (Marquez 등 메타데이터 서버 주소)
client = OpenLineageClient.from_environment()

# 2. 작업 정의
job = Job(namespace="sales_data_pipeline", name="daily_summary_job")
run = Run(runId=str(uuid.uuid4()))

# 3. 시작 이벤트 전송
client.emit(
    RunEvent(
        eventType=RunState.START,
        eventTime=datetime.datetime.now().isoformat(),
        run=run,
        job=job,
        producer="my_python_producer_v1"
    )
)

# [데이터 처리 로직 수행]

# 4. 완료 이벤트 전송
client.emit(
    RunEvent(
        eventType=RunState.COMPLETE,
        eventTime=datetime.datetime.now().isoformat(),
        run=run,
        job=job,
        producer="my_python_producer_v1"
    )
)

Example 2: Input/Output 데이터셋(Dataset) 상세 정의

데이터가 어디서 오고 어디로 가는지 구체적인 URI를 명시하여 리니지 그래프를 연결합니다.


from openlineage.client.run import Dataset

inputs = [
    Dataset(namespace="s3://raw-zone", name="user_logs/2026/04/19")
]
outputs = [
    Dataset(namespace="postgres://warehouse_db", name="public.user_activity_summary")
]

# RunEvent 생성 시 inputs, outputs 리스트를 전달하여 데이터 흐름 기록

Example 3: Airflow와 OpenLineage 라이브러리 자동 통합

Airflow 환경에서 OPENLINEAGE_URL 설정만으로 모든 Operator의 리니지를 자동 추적하는 해결 방법입니다.


# Docker-compose 또는 환경 변수에 설정
# 별도의 파이썬 코드 수정 없이 Airflow가 런타임에 이벤트를 가로챔
export AIRFLOW__OPENLINEAGE__TRANSPORT='{"type": "http", "url": "http://marquez:5000"}'
export AIRFLOW__OPENLINEAGE__NAMESPACE='company_wide_airflow'

Example 4: SQL 파싱을 통한 리니지 추출 (SqlJobFacet)

실행되는 SQL 구문을 OpenLineage에 포함시켜 데이터 변환 로직을 투명하게 공개합니다.


sql_facet = SqlJobFacet(query="INSERT INTO target SELECT * FROM source WHERE id > 100")
job = Job(namespace="db_sync", name="move_records", facets={"sql": sql_facet})
# 이후 RunEvent 발송 시 해당 job 객체 사용

Example 5: 데이터 품질(Data Quality) 메트릭 포함하기

데이터 건수, Null 값 비율 등 품질 지표를 Facet으로 추가하여 리니지와 함께 모니터링합니다.


from openlineage.client.facet import DataQualityMetricsInputDatasetFacet

dq_facet = DataQualityMetricsInputDatasetFacet(
    rowCount=1500,
    columnMetrics={'age': {'nullCount': 0, 'distinctCount': 45}}
)
# Dataset 정의 시 facets 파라미터에 추가

Example 6: Pandas 처리 로직의 리니지 수동 래핑

Pandas를 이용한 전처리 과정 전후를 OpenLineage 이벤트로 감싸서 추적성을 확보합니다.


import pandas as pd

def tracked_transform(input_path, output_path):
    # START 이벤트 발송 로직 생략
    df = pd.read_csv(input_path)
    processed_df = df.groupby('category').sum()
    processed_df.to_parquet(output_path)
    # COMPLETE 이벤트와 함께 input_path/output_path를 Dataset으로 전송

Example 7: 에러 발생 시 FAIL 이벤트 전송 및 원인 기록

파이프라인 중단 시 Error Facet을 통해 어떤 예외가 발생했는지 리니지 백엔드에 기록합니다.


from openlineage.client.facet import ErrorMessageRunFacet

try:
    raise ValueError("필수 컬럼 'user_id'가 누락되었습니다.")
except Exception as e:
    error_facet = ErrorMessageRunFacet(
        message=str(e),
        programmingLanguage="python",
        stackTrace="...stack..."
    )
    # RunState.FAIL 이벤트 전송 시 facets에 포함

4. OpenLineage 기반 거버넌스 해결 시 주의사항

  1. 네트워크 오버헤드: 모든 미세한 스텝마다 이벤트를 발송하면 파이프라인 성능에 영향을 줄 수 있습니다. 주요 체크포인트 단위로 이벤트를 묶어서 발송하십시오.
  2. 네임스페이스 관리: 조직 내에서 일관된 네임스페이스 규칙(예: 서비스명_환경명)을 정하지 않으면 리니지 그래프가 파편화되어 연결되지 않습니다.
  3. 보안 데이터 노출: SQL Facet 전송 시 민감한 개인정보나 비밀번호가 포함되지 않도록 마스킹 처리가 선행되어야 합니다.

5. 결론: 리니지는 데이터 신뢰의 시작입니다

데이터 엔지니어링의 정점은 화려한 알고리즘이 아니라 "예측 가능하고 추적 가능한 관리 체계"에 있습니다. OpenLineage를 파이썬 파이프라인에 적용하는 것은 단순히 이력을 남기는 행위를 넘어, 데이터 팀 전체의 생산성을 높이고 데이터 민주화를 실현하는 해결 방법입니다. 지금 바로 여러분의 Airflow나 커스텀 ETL 스크립트에 이 표준을 도입하여 데이터의 '족보'를 완성해 보시기 바랍니다.


참고 문헌 및 출처

  • OpenLineage Official Documentation (openlineage.io)
  • Marquez: Metadata service for OpenLineage (GitHub/marquez-project)
  • "Data Management at Scale" by Piethein Strengholt
  • Astronomer Guide: Lineage with OpenLineage and Airflow
  • Python Client for OpenLineage API (PyPI: openlineage-python)
728x90