Airflow
Dag

Direct Acycle Graph

  • Airflow에서 ETL을 부르는 명칭

DAG는 Task로 구성된다.

  • 예를 3개의 태스크로 구성된다면 Extract, Tranform, Load로 구성
  • 태스크 간에는 실행 순서가 주어져야 한다.

DAG 지정 가능 파라미터들

  • 이름 dag_id, 설명
  • 실행 시간과 주기
  • 태그

루프만 없으면 된다.

Create DAG Instance

from airflow import DAG

test_dag = DAG(
	dag_id = "test_dag"
	schedule = "0 9 * * *"
	description = "business owner : Hojoon"
	tags = ['Marketing', 'ETL']
	start_date = datetime(2025, 3, 1, hour=0, minute=0)
	default_args = default_args
)

crontab syntax

(* * * * *)

  • minute
  • hour
  • day of the month
  • month of the year
  • day of the week

Parameter

  • max_active_runs

  • 동시에 실행 가능한 DAG의 수

    • 증분 업데이트 DAG의 경우 과거 데이터의 빠른 backfill을 위해 다수의 DAG를 동시에 실행해야할 수도 있음. 몇 개의 DAG의 동시 실행을 허용할 지를 이 파라미터로 결정, 개별 DAG 실행을 DAG RUN이라고 부른다.
    • 이는 Airflow Worker 수와 DAG가 어떻게 구현 방식에 따라 가능 여부가 결정된다. 기본 값은 16 (CPU가 10가 밖에 없으면 16개가 못돈다.)
  • max_active_tasks

  • 동시에 실행 가능한 태스크의 수

    • DAG가 하나 실행될 때 한번에 동시 실행 가능한 태스크 수
    • 기본값은 32
  • catchup

    • 기본값은 True인데, False를 추천한다.

default_args : 모든 태스크에 필요한 기본 정보

defaut_args = {
	'owner':'keeyon',
	'email':['keeyonghan@hotmail.com'],
	'retries':1,
	'retry_delay':'timedelta(minutes=3)',
}

Task 구현

  • 만일 파이썬 함수를 하나의 태스크로 사용하고 싶다면
      1. PythonOperator
    • @task decorator
from airflow.operators.python import PythonOperator
 
print_hello = PythonOperator(
	dag = dag,
	task_id='print_hello',
	python_callable=print_hello
)
 
@task
def print_hello():
	print('hello')
	return 'hello!'
 
def print_hello(**ctx):
	print('hello!')
	return

t1, t2, t3라는 세 개의 태스크가 있다면??

3개를 일렬로?

t1 >> t2 >> t3

t1의 실행이 끝나면 t2와 t2를 병렬로 실행

t1 >> t2 t1 >> t3

t1과 t2를 먼저 병렬로 실행하고 끝나면 다음에 t3를 실행

t1 >> t3 t2 >> t3

DAG 실행하기

airflow dags list
airflow tasks list {dag_id}
airflow tasks test {dag_id} print_hello 2025-01-09
airflow dags test {dag_id} 2025-01-10