Airflow
Dynamic Tasks

BranchPythonOperator란?

  • 상황에 따라 뒤에 실행되어야할 태스크를 동적으로 결정해주는 오퍼레이터
    • 미리 정해준 Operator들 중에 선택하는 형태로 돌아감
from airflow.operoators.python import BranchPythonOperator
 
def skip_or_count_trigger():
	if Variable.get("mode", "dev") == "dev"
		return []
	else:
		return ["trigger_b"]
 
branching = BranchPythonOperator(
	task_id='branching',
	python_callable=skip_or_count_trigger,
)

Latest Only Operator

  • Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함
  • 현재 시간이 지금 태스크가 처리하는 logical_date보다 미래이고 다음
  • logical_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단

t1 >> backfill이면 실행중단 t3 or t4

from airflow improt DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from datetime import datetime
from datetime import timedata
 
 
with DAG (
	dag_id = 'Learn_LatestOnlyOperator',
	schedule=timedleta(hours=48),
	start_date=datetime(2025,1,14),
	catchup=True
) as dag:
	t1 = EmptyOperator(task_id = 'taks1')
	t2 = EmptyOperator(task_id = 'latest_only')
	t3 = EmptyOperator(task_id = 'task3')
	t4 = EmptyOperator(task_id = 'task4)
 
t1 >> t2 >> t3 >> t4

![[SCR-20250316-sjny.png]]

Trigger Rules

  • Upstream 태스크의 성공 실패 상황에 따라 뒷단 태스크의 실행여부를 결정하고 싶다면??
    • 보통 앞단이 하나라도 실패하면 뒷 단의 태스크는 실행불가
  • Operator에 trigger_rule이란 파라미터로 결정 가능
    • trigger_rule은 태스크에 주어지는 파라미터로 다음과 같은 값이 가능
    • all_success (기본값), all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success

Trigger Rule 사용 예

from airflow.utils.trigger_rule_import TriggerRule
 
with DAG ("trigger_rules", default_args = default_args, schedule=timedleta(1)) as dag: 
	t1 = BashOperator(task_id = "print_date", bash_command="date")
	t2 = BashOperator(task_id = "sleep 5 ", bash_command="sleep 5")
	t3 = BashOperator(task_id = "exit", bash_command="exit 1") // -> 실패
	t4 = BashOperator(
		task_id = 'final_task',
		bash_command='echo DONE!',
		trigger_rule=TriggerRule.ALL_DONE
	)
[t1,t2,t3] >> t4

![[SCR-20250323-sjqj.png]]