Airflow
Country Capital Dag Improve

V3

  • 두 개의 태스크로 나누기

    • 효율성 증대
    • extract => transform_load
  • 태스크간 데이터를 파일로 넘기되 파일 경로를 고정

    • country_capital_YYYYMMDD_HHMMSS.csv

확장성있는 태스크 간의 정보 전달 방법은??

  • 태스크를 개별적으로 실행하려면 태스크간 데이터 의존성이 약해야 한다.

뭔가 데이터를 메모리로 전달하고 테이블로 읽게 하는 것은 의존성이 강한 것이다.

  • 파일의 형태(로컬 파일이나 클라우드 상의 파일)로 주고 받되 그 위치를 고정된 파일 경로 사용
  • extract : CSV 파일을 읽어 정해진 파일 경로에 그 내용을 저장
  • transform_load : 정해진 파일 경로에서 해당 파일을 읽어서 테이블로 적재
  • 태스크간 데이터 이동 방법 파일로 이동하되 파일 경로를 고정
    • 중요한 포인트는 파일 경로가 고정되어야 두 태스크간의 파일을 통한 데이터 전달이 가능
      • 하나의 DAG는 경우에 따라 여러 RUN가 동시에 실행 가능
    • 따라서 country_capital.csv 처럼 고정된 이름을 쓸 수 없음
    • 대신 country_capital_YYYY_MMDD_HHMMSS가 태스크간 공유가 되어야함
      • 단 동일한 YYYYMMDD_HHMMSS가 태스크간 공유되어야 한다.
      • 이를 위해 logical_date라는 Airflow 컨텍스트 변수를 사용할 예정

FLOW

각 DAG 실행에는 타임스탬프가 하나 부여된다.

  • logical
  1. extrct -> 해당 DAG run에서 사용될 이름으로 파일 저장
  2. transform_load 해당 DAG run에서 사용될 이름으로 파일 읽어 snowflake 테이블로 적재

일관된 파일 명명 : country_capital_YYYYMMDD_HHMMSS.csv

from.airflow.operators.python import get_current_context

def get_file_path(context):
	tmp_dir = "/tmp" 
	date = context['logical_date']
	timestamp = date.strftime("%Y%m%d_%H%M%S")
	file_path = os.path.join(tmp_dir, f"country_capital_{timestamp}.csv")
 
	return file_path
 
file_path = get_file_path(get_current_context())
 

주요 포인트:

  • get_current_context 함수:
    • 현재 실행 중인 태스크 인스턴스의 실행 컨텍스트(예: 실행 날짜, DAG 정보, 태스크 인스턴스 등)를 담은 딕셔너리를 반환합니다.
  • 사용 목적:
    • PythonOperator 같은 오퍼레이터 내에서 실행되는 파이썬 함수가 자신의 실행 환경(예: 날짜, 실행 ID 등)을 참조할 수 있도록 도와줍니다. 예를 들어, 동적으로 파일명을 생성하거나, 이전 태스크에서 생성된 데이터를 활용할 때 유용합니다.

V4

이전 V3에서의 문제점

  • 다수의 레코드를 하나씩 INSERT INTO로 적재하는 것이 비효율적
  • 모든 데이터 웨어하우스와 데이터 레이크는 벌크 적재를 지원
    • snowflake에서는 COPY INTO 명이 그에 해당
      • 다른 이야기지만 워낙 많은 레코드들을 적재해야 하기에 기본키를 체크를 안함
    • 이는 stage라는 중간 매개체를 통해 레코드 단위가 아닌 파일 단위로 이뤄짐
  • 반대로 Snowflake에 있는 데이터를 stsge로 다운로드 받을 수도 있음 : Unload

Stage

  • Snowflake에서 파일을 임시 저장하여 데이터를 로드/언로드할 때 사용하는 스토리지
    • 외부 데이터 소스와 snowflake 테이블 간의 중간 다리 역할
  • 주요 명령어 :
    • PUT : 로컬 시스템에서 파일을 스테이지로 업로드
    • LIST : 스테이지에 저장된 파일 목록 확인
    • REMOVE : 스테이지에서 파일 삭제
    • GET : 스테이지에서 파일을 로컬 시스템으로 다운로드
  • 장점 :
    • 데이터 로드 및 언로드 프로세스를 최적화
    • 파일 압축 지원으로 저장공간과 전송 속도 최적화

Stage 종류

  • External Stage
    • 클라우드 스토리지를 Stage로 사용하는 경우로 큰 데이터 처리가 필요한 경우 사용
    • AWS S3, Google Cloud Storage, Azure Blob Storage
  • Internal Stage (Snowflake가 관리하는 클라우드 스토리지)
    • User Stage : 사용자 개인별 Stage
    • Table Stage : 테이블별로 자동 생성되는 Stage
    • Named Stage : 사용자가 명시적으로 생성한 커스텀 Stage

External Stage

  • 클라우드 스토리지 연결
    • AWS S3
    • Google Cloud Storage
    • Microsoft Azure Blob Storage
  • 특징
    • CREATE STAGE 명령으로 명시적으로 생성
    • 클라우드 스토리지 액세스 권한 필요
    • 대용량 데이터 처리에 적합
    • 클라우드내 다른 시스템과의 통합 용이
    • 비용 효율적인 장기 저장 가능

Internal Stage

  • User Stage

    • 각 사용자별로 할당된 개인 저장 공간으로 사용자 계정 생성시 자동으로 생성된다.
    • @~로 시작하는 경로 사용
    • 임시 데이터 로드에 적합
  • Table Stage

    • 특정 테이블과 연결된 스테이지로 테이블 생성시 자동으로 생성된다.
    • @%로 시작하는 경로 사용
    • 단일 테이블 데이터 로딩에 주로 사용
  • Named Stage (@)

    • CREATE STAGE로 명시적 생성
    • 여러 사용자 / 프로세스가 공유 가능
    • 영구적인 특성
    • 반복전인 데이터 로드에 적합

COPY INTO

  • 목적 :

    • 스테이지 파일을 Snowflake 테이블로 로드
    • 데이터 타입 검증 및 변환 수행
    • 다양한 파일 형식 지원
  • 주요 기능

    • 압축 파일 자동 처리
    • 병렬 로딩으로 성능 향상
    • 오류 처리 및 검증 옵션 제공
    • 중복 로드 방지를 위한 이력 관리
  • 뒤에서 Internal Table Stage와 함께 사용하는 예를 볼 예정

실무 적용 팁

  • 스테이지 선택
    • 소규모 임시 데이터 : User Stage
    • 단일 테이블 로드 : Table Stage
    • 정기 배치 작업 : Named Stage
    • 대용량 / 외부 연동 : External Stage
  • 데이터 로딩
    • 명시적 파일 포맷 정의로 일관성 유지
    • 실제 로드 이전에 VALIDATION_MODE 로 검증
    • 로드이력 모니러팅
    • 대용량 파일 분할 로딩 고려

V4 Copy into

copy_query =f ""COPY INTO{target_table}
	from {target_stage} --
	FILE_FORMAT = (
		TYPE = 'CSV'
		FIELD_OPTIONALLY_ENCLOSED_BY = ""
		SKIP_HEADER = 1
	)
""
 
cur.execute(copy_query)

문제점

  • 테이블 스테이지에 계속해서 파일이 쌓임
    • cur.execute -> 중복 로드 문제 발생 가능성
  • 스토리지 비용 죽제
    • Remove 해줘야 한다.

중복 로드를 막으려면??

COPY INTO force 파라미터의 값에 따라 다름 -> 기본값은 false

  • 전체 스테이지가 아닌 단일 파일만 COPY INTO에서 사용

  • 사용한 파일은 스테이지에서 제거

  • max_active_runs를 1로 세팅

    • 전체 업데이트의 경우 이를 1로 하지 않을 이유가 없음
    • 만일 파일 이름을 고정한다면 COPY_INTO의 force 파라미터를 true로 설정

V5

  • 이 데이터 셋 자체는 증분 업데이트가 불필요하지만 가상의 상황을 가정해보자
  • 앞서 배운 Snowflake의 MERGE를 사용해서 다시 구현
    • 제대로 하려면 데이터 소스단에서 바뀐 레코드만 받을 수 있는 방법이 있어야 한다.
    • 읽는 부분을 변경된 레코드만 읽게 수정하면 증분 업데이트 구현끝
  • MERGE를 사용하는 경우 Transaction을 쓸 필요가 없음
    • MERGE는 이 자체가 하나의 Statement이기 때문에 중간에 실패하면 RollBack한다.

구현 사항

  • 스테이지와 COPY INTO를 사용해서 테이블을 적재하는 코드를 함수로 만듬
    • populate_table_via_stage
  • 변경 레코드들이 있는 스테이징 테이블과 최종 타겟 테이블 두 개를 놓고 MERGE 수행
    • 두 테이블은 동일한 스키마를 가져야 한다.
    • CREATE TEMP TABLE 스테이징 테이블 LIKE 타겟 테이블

의도 상황

  • 증분 테스트 흉내 내기
    • 아래 CSV 파일로 실행했을 때 3개의 레코드의 내용이 바뀌고 1개의 레코드가 추가되는지 확인
countrycapital
AbkhaziaSukhumi2
AfghanistanKabul2
ZimbabweHarafr2
MarsPlanet

위 테이블처럼 수도에 2를 붙여서 업데이트 시킨 로우 3개와 Mars라는 원래 존재 하지 않던 나라의 로우를 추가해볼 것 이다.

V6

Helper 함수들을 별도의 모듈로 빼보자

  • util.py를 만들고 거기에 V5에서 아래 함수를 옮겨보자
    • get_file_path
    • return_snowflake_conn
    • populate_table_via_stage
  • country_capital_to_snowflake_v6.py
    • import util
    • 위 함수 호출하는 부분 앞에 .util 추가
    • 임시 파일을 저장하는 폴더 위치를 data_dir이란 Airflow Variable로 유지

V7

Internal Stage 기반 COPY INTO의 문제는??

  • 데이터가 외부 클라우드 스토리지에 이미 있는 ㄴ경우
    • 굳이 이를 다시 Snowflake Stage로 업로드하는 것은 비효율적
    • 이때 사용할 수 있는 것이 External Stage
  • AWS S3를 External Stage로 사용해 보는 예를 설명
    • 복잡한 부분은 AWS S3 접근 권한을 설정하는 부분으로 AWS 지식이 필요
    • S3 :
      • AWS의 클라우드 스토리지
      • Top 레베 폴더부터 만들고 시작해야 하는데 이를 버킷이라고 부른다.
    • IAM (Identity and Access Management)
      • AWS 리소스에 대한 접근을 안전하게 통제할 수 있는 사용자와 권한 관리 서비스

S3 사용 External Stage 기반 COPY INTO 절차

  • airflow-bootcamp-test-bucket 버킷을 예로 들어볼 예정
  • AWS IAM으로 snowflake_s3라는 사용자를 만들고 위 버킷에만 읽기쓰기 권한 지정
    • 이때 Custom Policy 사용 필요
    • 보안 관점에서 훨씬 더 안전한 방식 : 리소스 별로 전용 사용자 계정을 생성 (혹은 Iam ROLE을 생성 후 사용)
  • External Stage (my_s3_stage)를 생성하여 위 버킷을 지정
  • 이 버켓이 적재 레코드가 들어있는 파일들을 업로드
  • COPY INTO의 소스로 위 스테이지를 지정

Snowflake에서 S3 버킷을 접근하기 위한 IAM 사용자 생성