Airflow
Dynamic Dags

Dependency in DAGS

  • 주기적 실행 : schedule로 지정
  • 다른 DAG에 의해 트리거
    • Explicit Trigger : DAG A가 분명하게 Dag B를 트리거 (TriggerDagRunOperator)
    • Reactive Trigger : DAB B가 DAG A가 끝나기를 대기 (External Task Sensor)
    • 알아두면 좋은 상황에 따라 다른 태스크 실행 방식들
      • 조건에 따라 다른 태스크로 분기(BranchPython Operator)
      • 과거 데이터 Backfill시에는 불필요한 태스크 처리 (LatestOnly Operator)
        • 어떤 경우에는 앞단이 실패해도 동작해야 하는 경우가 있을 수 있음

Jinja Template 소개

  • Jinja 템플릿은 Python에서 널리 사용되는 템플릿 엔진
    • Django 템플릿 엔진에서 영감을 받아 개발
    • Jinja를 사용하면 프레젠테이션 로직과 애플리케이션 로직을 분리하여 동적으로 HTML 생성
    • Flask에서 사용됨
  • 변수는 이중 중괄호로 {{}} 감싸서 사용 <h1> 안녕하세요. {{name}}님! <h1>
  • 제어문은 퍼센트 기호 {{% %}}로 표시

예 1) logical_date을 코드 내에서 쉽게 사용 :{{ds}}

  • 가능한 모든 시스템 변수는 여기를 참조

BashOperator를 사용하여 템플릿 작업 정의

task1 = BashOperator (
	task_id = 'task1',
	bash_command='echo "{{ds}}"',
	dag = dag
)

동적 매개변수가 있는 다른 템플릿 작업 정의

  • 예 2 ) 파라미터 등으로 넘어온 변수를 쉽게 사용
task2 = Bashoperator(
	task_id = 'task2',
	bash_command='echo "안녕하세요",{{params.name}}!"',
	params = {'name':'John'}, #사용자 정의 가능한 매개변수
	dag = dag
)

Airflow에서 사용 가능한 Jinja 변수 살펴보기

- {{ds}}
- {{ds_nodash}}
- {{ts}}
- {{dag}}
- {{task}}
- {{dag_run}}
- {{var.value}}
- {{var.json}}
- {{conn}}

Explicit trigger

  • TriggerDagRunOperator
  • DAG A가 명시적으로 DAG B를 트리거
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
 
trigger_B = TriggerDagRunOperator(
	task_id = "trigger_B",
	trigger_dag_id = '트리거하려는 DAG 이름',
	conf = {'path':'opt/ml/conf'},
	logical_date = "{{ds}}",
	reset_dag_run = True,
	wait_for_completion=True
)

Reactive trigger

  • ExternalTaskSensor
  • DAG B가 DAG A의 태스크가 끝나기를 대기
    • 이 경우 DAG A는 이 사실을 모른다.

Sensor란 무엇인가??

  • Sensor는 특정 조건이 충족될 때까지 대기하는 Operator

  • Sensor는 외부 리소스의 가용성이나 특정 조건의 완료와 같은 상황 동기화에 유용

  • Airflow는 몇 가지 내장 Sensor를 제공

    • FileSensor: 지정된 위치에 파일이 생길 때까지 대기
    • HttpSensor: HTTP 요청을 수행하고 지정된 응답이 대기
    • SqlSensor: SQL 데이터베이스에서 특정 조건을 충족할 때까지 대기
    • TimeSensor: 특정 시간에 도달할 때까지 워크플로우를 일시 중지
    • ExternalTaskSensor: 다른 Airflow DAG의 특정 작업 완료를 대기
  • 기본적으로는 주기적으로 조건이 충족되었는지 체크하는 것

    • mode 파라미터가 이를 결정: reschedule 혹은 poke
    • Poke: worker를 하나 붙잡고 poke간에 sleep을 하는 방법
    • RESCHEDULE: worker를 릴리스하고 다시 잡아서 poke를 하는 방법

ExternalTaskSensor (1)

  • DAG B의 ExternalTaskSensor 태스크가 DAG A의 특정 태스크가 끝났는지 체크한다.
    • 먼저 두 DAG가 동일한 schedule_interval을 사용해야 한다.
    • 이 경우 두 태스크들의 Logical Date이 동일해야 한다. 아니면 매칭이 안된다.
from airflow.sensors.exteran_task_import ExteranlTaskSensor
 
waitinf_for_end_of_dag_a = ExteranlTaskSensor(
	task_id='waiting_for_end_of_dag_a',
	external_dag_id='DAG 이름',
	external_task_id='end',
	timeout=5*60,
	mode='reschedule'
)