V3
-
두 개의 태스크로 나누기
- 효율성 증대
- extract => transform_load
-
태스크간 데이터를 파일로 넘기되 파일 경로를 고정
- country_capital_YYYYMMDD_HHMMSS.csv
확장성있는 태스크 간의 정보 전달 방법은??
- 태스크를 개별적으로 실행하려면 태스크간 데이터 의존성이 약해야 한다.
뭔가 데이터를 메모리로 전달하고 테이블로 읽게 하는 것은 의존성이 강한 것이다.
- 파일의 형태(로컬 파일이나 클라우드 상의 파일)로 주고 받되 그 위치를 고정된 파일 경로 사용
- extract : CSV 파일을 읽어 정해진 파일 경로에 그 내용을 저장
- transform_load : 정해진 파일 경로에서 해당 파일을 읽어서 테이블로 적재
- 태스크간 데이터 이동 방법 파일로 이동하되 파일 경로를 고정
- 중요한 포인트는 파일 경로가 고정되어야 두 태스크간의 파일을 통한 데이터 전달이 가능
- 하나의 DAG는 경우에 따라 여러 RUN가 동시에 실행 가능
- 따라서 country_capital.csv 처럼 고정된 이름을 쓸 수 없음
- 대신 country_capital_YYYY_MMDD_HHMMSS가 태스크간 공유가 되어야함
- 단 동일한 YYYYMMDD_HHMMSS가 태스크간 공유되어야 한다.
- 이를 위해 logical_date라는 Airflow 컨텍스트 변수를 사용할 예정
- 중요한 포인트는 파일 경로가 고정되어야 두 태스크간의 파일을 통한 데이터 전달이 가능
FLOW
각 DAG 실행에는 타임스탬프가 하나 부여된다.
- logical
- extrct -> 해당 DAG run에서 사용될 이름으로 파일 저장
- transform_load 해당 DAG run에서 사용될 이름으로 파일 읽어 snowflake 테이블로 적재
일관된 파일 명명 : country_capital_YYYYMMDD_HHMMSS.csv
from.airflow.operators.python import get_current_context
def get_file_path(context):
tmp_dir = "/tmp"
date = context['logical_date']
timestamp = date.strftime("%Y%m%d_%H%M%S")
file_path = os.path.join(tmp_dir, f"country_capital_{timestamp}.csv")
return file_path
file_path = get_file_path(get_current_context())
주요 포인트:
- get_current_context 함수:
- 현재 실행 중인 태스크 인스턴스의 실행 컨텍스트(예: 실행 날짜, DAG 정보, 태스크 인스턴스 등)를 담은 딕셔너리를 반환합니다.
- 사용 목적:
- PythonOperator 같은 오퍼레이터 내에서 실행되는 파이썬 함수가 자신의 실행 환경(예: 날짜, 실행 ID 등)을 참조할 수 있도록 도와줍니다. 예를 들어, 동적으로 파일명을 생성하거나, 이전 태스크에서 생성된 데이터를 활용할 때 유용합니다.
V4
이전 V3에서의 문제점
- 다수의 레코드를 하나씩 INSERT INTO로 적재하는 것이 비효율적
- 모든 데이터 웨어하우스와 데이터 레이크는 벌크 적재를 지원
- snowflake에서는 COPY INTO 명이 그에 해당
- 다른 이야기지만 워낙 많은 레코드들을 적재해야 하기에 기본키를 체크를 안함
- 이는 stage라는 중간 매개체를 통해 레코드 단위가 아닌 파일 단위로 이뤄짐
- snowflake에서는 COPY INTO 명이 그에 해당
- 반대로 Snowflake에 있는 데이터를 stsge로 다운로드 받을 수도 있음 : Unload
Stage
- Snowflake에서 파일을 임시 저장하여 데이터를 로드/언로드할 때 사용하는 스토리지
- 외부 데이터 소스와 snowflake 테이블 간의 중간 다리 역할
- 주요 명령어 :
- PUT : 로컬 시스템에서 파일을 스테이지로 업로드
- LIST : 스테이지에 저장된 파일 목록 확인
- REMOVE : 스테이지에서 파일 삭제
- GET : 스테이지에서 파일을 로컬 시스템으로 다운로드
- 장점 :
- 데이터 로드 및 언로드 프로세스를 최적화
- 파일 압축 지원으로 저장공간과 전송 속도 최적화
Stage 종류
- External Stage
- 클라우드 스토리지를 Stage로 사용하는 경우로 큰 데이터 처리가 필요한 경우 사용
- AWS S3, Google Cloud Storage, Azure Blob Storage
- Internal Stage (Snowflake가 관리하는 클라우드 스토리지)
- User Stage : 사용자 개인별 Stage
- Table Stage : 테이블별로 자동 생성되는 Stage
- Named Stage : 사용자가 명시적으로 생성한 커스텀 Stage
External Stage
- 클라우드 스토리지 연결
- AWS S3
- Google Cloud Storage
- Microsoft Azure Blob Storage
- 특징
- CREATE STAGE 명령으로 명시적으로 생성
- 클라우드 스토리지 액세스 권한 필요
- 대용량 데이터 처리에 적합
- 클라우드내 다른 시스템과의 통합 용이
- 비용 효율적인 장기 저장 가능
Internal Stage
-
User Stage
- 각 사용자별로 할당된 개인 저장 공간으로 사용자 계정 생성시 자동으로 생성된다.
- @~로 시작하는 경로 사용
- 임시 데이터 로드에 적합
-
Table Stage
- 특정 테이블과 연결된 스테이지로 테이블 생성시 자동으로 생성된다.
- @%로 시작하는 경로 사용
- 단일 테이블 데이터 로딩에 주로 사용
-
Named Stage (@)
- CREATE STAGE로 명시적 생성
- 여러 사용자 / 프로세스가 공유 가능
- 영구적인 특성
- 반복전인 데이터 로드에 적합
COPY INTO
-
목적 :
- 스테이지 파일을 Snowflake 테이블로 로드
- 데이터 타입 검증 및 변환 수행
- 다양한 파일 형식 지원
-
주요 기능
- 압축 파일 자동 처리
- 병렬 로딩으로 성능 향상
- 오류 처리 및 검증 옵션 제공
- 중복 로드 방지를 위한 이력 관리
-
뒤에서 Internal Table Stage와 함께 사용하는 예를 볼 예정
실무 적용 팁
- 스테이지 선택
- 소규모 임시 데이터 : User Stage
- 단일 테이블 로드 : Table Stage
- 정기 배치 작업 : Named Stage
- 대용량 / 외부 연동 : External Stage
- 데이터 로딩
- 명시적 파일 포맷 정의로 일관성 유지
- 실제 로드 이전에 VALIDATION_MODE 로 검증
- 로드이력 모니러팅
- 대용량 파일 분할 로딩 고려
V4 Copy into
copy_query =f ""COPY INTO{target_table}
from {target_stage} --
FILE_FORMAT = (
TYPE = 'CSV'
FIELD_OPTIONALLY_ENCLOSED_BY = ""
SKIP_HEADER = 1
)
""
cur.execute(copy_query)
문제점
- 테이블 스테이지에 계속해서 파일이 쌓임
cur.execute
-> 중복 로드 문제 발생 가능성
- 스토리지 비용 죽제
- Remove 해줘야 한다.
중복 로드를 막으려면??
COPY INTO force 파라미터의 값에 따라 다름
-> 기본값은 false
-
전체 스테이지가 아닌 단일 파일만 COPY INTO에서 사용
-
사용한 파일은 스테이지에서 제거
-
max_active_runs를 1로 세팅
- 전체 업데이트의 경우 이를 1로 하지 않을 이유가 없음
- 만일 파일 이름을 고정한다면 COPY_INTO의 force 파라미터를 true로 설정
V5
- 이 데이터 셋 자체는 증분 업데이트가 불필요하지만 가상의 상황을 가정해보자
- 앞서 배운 Snowflake의 MERGE를 사용해서 다시 구현
- 제대로 하려면 데이터 소스단에서 바뀐 레코드만 받을 수 있는 방법이 있어야 한다.
- 읽는 부분을 변경된 레코드만 읽게 수정하면 증분 업데이트 구현끝
- MERGE를 사용하는 경우 Transaction을 쓸 필요가 없음
- MERGE는 이 자체가 하나의 Statement이기 때문에 중간에 실패하면 RollBack한다.
구현 사항
- 스테이지와 COPY INTO를 사용해서 테이블을 적재하는 코드를 함수로 만듬
populate_table_via_stage
- 변경 레코드들이 있는 스테이징 테이블과 최종 타겟 테이블 두 개를 놓고 MERGE 수행
- 두 테이블은 동일한 스키마를 가져야 한다.
CREATE TEMP TABLE
스테이징 테이블 LIKE 타겟 테이블
의도 상황
- 증분 테스트 흉내 내기
- 아래 CSV 파일로 실행했을 때 3개의 레코드의 내용이 바뀌고 1개의 레코드가 추가되는지 확인
country | capital |
---|---|
Abkhazia | Sukhumi2 |
Afghanistan | Kabul2 |
Zimbabwe | Harafr2 |
Mars | Planet |
위 테이블처럼 수도에 2를 붙여서 업데이트 시킨 로우 3개와 Mars라는 원래 존재 하지 않던 나라의 로우를 추가해볼 것 이다.
V6
Helper 함수들을 별도의 모듈로 빼보자
- util.py를 만들고 거기에 V5에서 아래 함수를 옮겨보자
- get_file_path
- return_snowflake_conn
- populate_table_via_stage
- country_capital_to_snowflake_v6.py
- import util
- 위 함수 호출하는 부분 앞에 .util 추가
- 임시 파일을 저장하는 폴더 위치를 data_dir이란 Airflow Variable로 유지
V7
Internal Stage 기반 COPY INTO의 문제는??
- 데이터가 외부 클라우드 스토리지에 이미 있는 ㄴ경우
- 굳이 이를 다시 Snowflake Stage로 업로드하는 것은 비효율적
- 이때 사용할 수 있는 것이 External Stage
- AWS S3를 External Stage로 사용해 보는 예를 설명
- 복잡한 부분은 AWS S3 접근 권한을 설정하는 부분으로 AWS 지식이 필요
- S3 :
- AWS의 클라우드 스토리지
- Top 레베 폴더부터 만들고 시작해야 하는데 이를 버킷이라고 부른다.
- IAM (Identity and Access Management)
- AWS 리소스에 대한 접근을 안전하게 통제할 수 있는 사용자와 권한 관리 서비스
S3 사용 External Stage 기반 COPY INTO 절차
- airflow-bootcamp-test-bucket 버킷을 예로 들어볼 예정
- AWS IAM으로 snowflake_s3라는 사용자를 만들고 위 버킷에만 읽기쓰기 권한 지정
- 이때 Custom Policy 사용 필요
- 보안 관점에서 훨씬 더 안전한 방식 : 리소스 별로 전용 사용자 계정을 생성 (혹은 Iam ROLE을 생성 후 사용)
- External Stage (my_s3_stage)를 생성하여 위 버킷을 지정
- 이 버켓이 적재 레코드가 들어있는 파일들을 업로드
- COPY INTO의 소스로 위 스테이지를 지정