Airflow
Yfinace_Project
Functional Scope
  • Yahoo Finance에서 주식 및 금융 데이터를 손쉽게 다운로드함
  • 함수인자
    • tickers : AAPL
    • start : YYYY-MM-DD
    • end : YYYY-MM-DD
    • interval : 기본은 하루
  • 함수 리턴값
    • pandas.DataFrame 형식으로 리턴
    • 포함된 날짜에 주식시장이 휴무인 경우 해당 날짜의 데이터는 없음
  • 기억할 점
    • tickers로 다수의 주식 심볼 이름을 지정 가능. 다수 지정이 가능하지만 우리는 하나만 지정할 예정
    • startend를 지정하지 않으면 가용한 모든 데이터를 읽어온다.

V1 DAG (Full Refresh)

  • 가용한 엔비디아 주식 가격 정보를 읽어올 예정
    • data, open, close, high, low, volume, symbol
  • 매번 다시 다 읽어다가 업데이트 하는 전체 업데이트로 구현
  • Internal Table Stage와 COPY INTO 사용

V2 (Incremental)

  • 전체 업데이트의 문제점
    • 읽어오는데 시간이 오래 걸리는 큰 데이터라면 현실적이지 않음
  • DAG 실행시간을 모니터링하다가 어느 시점에는 증분 업데이트로 변경 필요
  • DAG의 tags를 사용해서 fullrefresh인지 incremental인지 구분 필요

증분 업데이트 첫 번째 버전

- 한번 과거 데이터가 적재되었으면 증분 업데이트로 변환
- 매일 그 전날 주식 데이터를 읽어오는 형태로 변경
- 구체적인 로직
  - 먼저 오늘 날짜 계산하고 그 기준으로 어제 날짜 계산
  - yf.download 함수를 호출할 때 위의 두 날짜를 사용
    - yf.download(symbol, start=yesterday, end=today)
  - 혹시라도 여러번 실행되는 경우를 대비해서 어제 날짜 레코드를 삭제
    - cur.execture(f"DELETE FROM stock_price WHERE date = '{yesterday}')
  - 다음으로 앞서 읽어온 하루 날짜 데이터를 stock_price로 푸쉬

이 버전의 DAG가 가지고 있는 문제는 뭐가 있을까??

만약 이 DAG가 일주일정도 돌았다고 가정해보자. 근데 나중에 화요일날 DAG가 실패한 상태거나, 잘못된 데이터를 획득했을 경우에 다시 돌려야 하는데 현재 DAG는 runtime 기준 이전 날짜만 돌리도록 구현되어 있기 때문에 backfill하는데 있어서 어려움이 있다.

  • 즉 백필에 아주 취약한 것이 두 번째 DAG의 문제이다.
  • logical_date를 사용하면 특정 날짜, 특정 시간을 타겟해서 가능하다.

어떻게 구현할 것인가??

  • 읽어올 데이터의 날짜를 계산하지 않고 Airflow에게 알려달라고 예정
    • 기본적으로 Airflow는 모든 DAG가 증분 업데이트를 한다고 가정함
  • Airflow는 DAG에 지정된 Schedule정보를 바탕으로 DAG의 실행주기를 알 수 있다.
    • Daily라는 오늘 실행될 떄 어제 데이터를 처리
    • Hourly라면 오늘 실행될 때 전 시간 데이터를 처리
    • Weekly라면 오늘 실행될 때 지난 한 주 데이터를 처리
  • DAG RUN마다 처리 날짜와 시간을 메타 DB에 logical_date이란 이름으로 저장
    • dag_run이란 테이블이 메타 데이터 DB에 존재
      • 사실 logical_date 는 execution_date란 이름으로 저장된다.
    • 즉 증분 업데이트 구현시 처리 날짜를 직접 계산하지 않고 logical_date를 읽어 문제 핵졀
    • Web UI에서 쉽게 재실행하여 backfill
      • 아니면 터미널에서 airflow dags backfill 명령 실행

DAG Run 기록 -> 메타데이터 DB (dag_run)

  • Dag Run 별로 기록
    • 실행 날짜 / 시간 정보 기록("logical_date", execution_date)
    • 실행 결과도 기록됨 ("state") : sucess, failed
  • 증분 업데이트 시 각 Dag Run은 독립적으로 실행 가능해야 함
    • 자원이 있는 한 최대 max_active_runs 수만큼의 DAG가 실행 가능
  • 전체 업데이트를 할 때는
    • max_active_runs를 1로 설정
    • catchup을 false로 설정

데모 실행

  • snowflake stock_price에서 2025-02-10 레코드 삭제 후 backfill
    • airflow dags test 명령으로 코드 변경 없이 backfill하고 확인
  • Airflow 메타데이터 DB인 Postgres에서 dag_run 테이블 확인 후 일부 레코드 삭제
  • airflow dags backfill 명령으로 위에서 삭제한 dag_run들 재실행
    • airflow dags backfill --start date 2025-02-02 --end-date 2025-02-09 --reset-dagruns YfinanceToSnowflake_inc_v2
  • Airflow Web UI에서 확인

Airflow Web UI를 이용한 Backfill 실행

편리한 경우 : 재실행해야 하는 dag_run이 몇 개 없을 경우

  • dag run이 많은 경우 airflow dags backfill --start-date 2025-02-01 --end-date 2025-02-09 --reset-dagruns YfinanceToSnowflake_inc_v2

  • 실행의 상태가 성공인 경우 range 사이에 끼어있어도 재실행되지 않는다. 그래서 dagruns reset 옵션을 넣어야 한다.

Backfill과 관련된 Airflow 변수들

  • start_date :
    • DAG 파라미터로 DAG가 처음 실행되는 날짜가 DAG가 처음 읽어와야 하는 데이터 날짜/시간. 실제 첫 실행날짜는 start_date + DAG의 실행주기
  • logical_date :
    • DAG가 읽어와야하는 데이터의 날짜와 시간. 증분 업데이트를 하는 경우에만 의미 있음.
  • catchup :
    • DAG 파라미터로 DAG가 처음 활성화된 시점이 start_date보다 먼 미래라면 그 사이에 실행이 안된 것들을 어떻게 할 것인지 결정해주는 파라미터. True가 디폴트 값이고 이 경우 실행 안된것들을 모두 따라잡으려함. False가 되면 실행안된 것들을 무시한다. 잘 모르겠으면 False로 실행

CatchUp

만일 DAG의 스케줄이 "30 1 * * *"이고 start_date 2025-01-01

1. 이 DAG는 언제 처음 실행이 될까??

2025-01-02 01:30:00

✅ 1번 질문 설명: "이 DAG는 언제 처음 실행이 될까?"

Airflow에서 DAG의 첫 실행일(실제 DAG Run이 생성되어 작업이 실행되는 시점)은 다음과 같은 규칙으로 정해집니다.

중요 규칙:

  • Airflow는 interval 기반 스케줄링 방식을 사용합니다.
  • 스케줄 표현식은 특정 시점이 아닌, **"특정 시간 간격(interval)이 끝나는 시점"**을 의미합니다.
  • 즉, Airflow에서 설정한 DAG의 start_date2025-01-01이고 cron 표현식이 30 1 * * *이면,
    • 첫 번째 실행 대상 간격(interval)은:
      2025-01-01 01:30:00 (exclusive) ~ 2025-01-02 01:30:00 (inclusive)
    • 이 간격(interval)이 끝난 직후인 2025-01-02 01:30:00에 DAG가 최초로 실행됩니다.

즉, 첫 실행일은 항상 start_date + schedule_interval이 됩니다.

Interval의 시작(논리적 시작)Interval의 끝(논리적 끝=logical_date)실제 DAG 실행시점
2025-01-01 01:30:002025-01-02 01:30:002025-01-02 01:30:00

즉, DAG는 설정한 날짜의 다음 스케줄링 시점에 처음 실행되는 겁니다.

2. 만일 이 DAG를 2025-01-15을 처음부터 활성화했다면 실행되지 못했던 날짜 RUN은 어떻게 될까??

  1. 만일 이 날짜들에 대해 DAG가 실행되기를 원하지 않는다면 무엇을 해야하는가??

catchup 변수에 따라 달려있는 것이다.

3. 2025-01-24일 이 DAG가 실행될 때 다음 변수의 값은 무엇으로 채워질까??

context['logical_date'] = 2025-01-23 01:30:00 context['ds'] = 2025-01-23

✅ 3번 질문 설명: "2025-01-24일 DAG가 실행될 때 context 변수의 값?"

Airflow에서는 context['logical_date']context['ds']의 값이 어떤 방식으로 설정되는지 명확히 알아야 합니다.

⭐️ logical_date란?

  • **DAG가 다루고 있는 데이터의 "논리적 기준 날짜"**입니다.
  • 실제 실행시간(execution_date)보다 항상 이전 간격(interval)의 끝나는 시점입니다.

이 DAG의 스케줄을 다시 생각해보면:

실행 대상 간격interval 끝(=logical_date)실제 DAG 실행 시점
2025-01-22 01:30 ~ 2025-01-23 01:302025-01-23 01:30:00 (logical_date)2025-01-23 01:30:00
2025-01-23 01:30 ~ 2025-01-24 01:302025-01-24 01:30:00 (logical_date)2025-01-24 01:30:00

즉,

  • 2025-01-24 01:30에 실제 DAG가 실행될 때 logical_date는 항상 이전 interval의 종료 시각, 즉 2025-01-24 01:30:00이 됩니다. 문제에서는 23일 값을 제시했는데, 이게 맞는 이유는 만약 실행 시각을 표현할 때 Airflow가 주는 컨텍스트의 개념을 조금 더 구체적으로 이해해야 합니다.

다만 실제로 Airflow 2.x부터는 명칭이 바뀌면서 보다 명확해졌습니다:

  • Airflow 2.x 이상 버전에서는:
    • logical_date는 interval의 종료 시점을 의미합니다.
    • dslogical_date에서 날짜만 추출한 값입니다.
  • DAG는 항상 "interval 끝나는 시점(logical_date)"에 실행되고, 작업은 보통 이전 날(interval)의 데이터를 처리합니다.

그래서 문제에서 주어진 상황(2025-01-24 DAG가 실행될 때)의 정확한 의미를 다시 해석하면:

**"2025-01-24 01:30:00에 DAG가 실행될 때"**는 실제로 DAG가 실행되는 이 시점의 Airflow context는 다음과 같이 됩니다:

  • context['logical_date'] = 2025-01-24 01:30:00 (현재 실행 interval이 끝나는 시점)
  • context['ds'] = "2025-01-24"

만약 문제의 원본이 "2025-01-24 01:30:00에 실행된 DAG의 논리적 날짜가 이전 날로 표기되었다"는 예시로 작성된 것이라면, 그것은 Airflow 1.x의 execution_date 기반으로 설명된 경우일 가능성이 있습니다.

그러나 Airflow 2.x 이후로는 정확히 이와 같이 해석됩니다:

  • DAG는 항상 interval이 끝날 때 실행됨.
  • interval이 끝나는 시점 = logical_date