Direct Acycle Graph
- Airflow에서 ETL을 부르는 명칭
DAG는 Task로 구성된다.
- 예를 3개의 태스크로 구성된다면 Extract, Tranform, Load로 구성
- 태스크 간에는 실행 순서가 주어져야 한다.
DAG 지정 가능 파라미터들
- 이름 dag_id, 설명
- 실행 시간과 주기
- 태그
루프만 없으면 된다.
Create DAG Instance
from airflow import DAG
test_dag = DAG(
dag_id = "test_dag"
schedule = "0 9 * * *"
description = "business owner : Hojoon"
tags = ['Marketing', 'ETL']
start_date = datetime(2025, 3, 1, hour=0, minute=0)
default_args = default_args
)
crontab syntax
(* * * * *)
- minute
- hour
- day of the month
- month of the year
- day of the week
Parameter
-
max_active_runs
-
동시에 실행 가능한 DAG의 수
- 증분 업데이트 DAG의 경우 과거 데이터의 빠른 backfill을 위해 다수의 DAG를 동시에 실행해야할 수도 있음. 몇 개의 DAG의 동시 실행을 허용할 지를 이 파라미터로 결정, 개별 DAG 실행을 DAG RUN이라고 부른다.
- 이는 Airflow Worker 수와 DAG가 어떻게 구현 방식에 따라 가능 여부가 결정된다. 기본 값은 16 (CPU가 10가 밖에 없으면 16개가 못돈다.)
-
max_active_tasks
-
동시에 실행 가능한 태스크의 수
- DAG가 하나 실행될 때 한번에 동시 실행 가능한 태스크 수
- 기본값은 32
-
catchup
- 기본값은 True인데, False를 추천한다.
default_args : 모든 태스크에 필요한 기본 정보
defaut_args = {
'owner':'keeyon',
'email':['keeyonghan@hotmail.com'],
'retries':1,
'retry_delay':'timedelta(minutes=3)',
}
Task 구현
- 만일 파이썬 함수를 하나의 태스크로 사용하고 싶다면
-
- PythonOperator
- @task decorator
-
from airflow.operators.python import PythonOperator
print_hello = PythonOperator(
dag = dag,
task_id='print_hello',
python_callable=print_hello
)
@task
def print_hello():
print('hello')
return 'hello!'
def print_hello(**ctx):
print('hello!')
return
t1, t2, t3라는 세 개의 태스크가 있다면??
3개를 일렬로?
t1 >> t2 >> t3
t1의 실행이 끝나면 t2와 t2를 병렬로 실행
t1 >> t2
t1 >> t3
t1과 t2를 먼저 병렬로 실행하고 끝나면 다음에 t3를 실행
t1 >> t3
t2 >> t3
DAG 실행하기
airflow dags list
airflow tasks list {dag_id}
airflow tasks test {dag_id} print_hello 2025-01-09
airflow dags test {dag_id} 2025-01-10