BranchPythonOperator란?
- 상황에 따라 뒤에 실행되어야할 태스크를 동적으로 결정해주는 오퍼레이터
- 미리 정해준 Operator들 중에 선택하는 형태로 돌아감
from airflow.operoators.python import BranchPythonOperator
def skip_or_count_trigger():
if Variable.get("mode", "dev") == "dev"
return []
else:
return ["trigger_b"]
branching = BranchPythonOperator(
task_id='branching',
python_callable=skip_or_count_trigger,
)
Latest Only Operator
- Time-sensitive한 태스크들이 과거 데이터의 backfill시 실행되는 것을 막기 위함
- 현재 시간이 지금 태스크가 처리하는 logical_date보다 미래이고 다음
- logical_date보다는 과거인 경우에만 뒤로 실행을 이어가고 아니면 여기서 중단
t1
>> backfill이면 실행중단
t3
or t4
from airflow improt DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.latest_only import LatestOnlyOperator
from datetime import datetime
from datetime import timedata
with DAG (
dag_id = 'Learn_LatestOnlyOperator',
schedule=timedleta(hours=48),
start_date=datetime(2025,1,14),
catchup=True
) as dag:
t1 = EmptyOperator(task_id = 'taks1')
t2 = EmptyOperator(task_id = 'latest_only')
t3 = EmptyOperator(task_id = 'task3')
t4 = EmptyOperator(task_id = 'task4)
t1 >> t2 >> t3 >> t4
![[SCR-20250316-sjny.png]]
Trigger Rules
- Upstream 태스크의 성공 실패 상황에 따라 뒷단 태스크의 실행여부를 결정하고 싶다면??
- 보통 앞단이 하나라도 실패하면 뒷 단의 태스크는 실행불가
- Operator에 trigger_rule이란 파라미터로 결정 가능
- trigger_rule은 태스크에 주어지는 파라미터로 다음과 같은 값이 가능
- all_success (기본값), all_failed, all_done, one_failed, one_success, none_failed, none_failed_min_one_success
Trigger Rule 사용 예
from airflow.utils.trigger_rule_import TriggerRule
with DAG ("trigger_rules", default_args = default_args, schedule=timedleta(1)) as dag:
t1 = BashOperator(task_id = "print_date", bash_command="date")
t2 = BashOperator(task_id = "sleep 5 ", bash_command="sleep 5")
t3 = BashOperator(task_id = "exit", bash_command="exit 1") // -> 실패
t4 = BashOperator(
task_id = 'final_task',
bash_command='echo DONE!',
trigger_rule=TriggerRule.ALL_DONE
)
[t1,t2,t3] >> t4
![[SCR-20250323-sjqj.png]]