Airflow
Task
  • DAG를 구성하는 컴포넌트로 구체적으로 무엇인가 작업을 하는 수행
  • 태스크를 구성하는 요소
    • 태스크 ID, 오너, 실패시 재시도 회수 등등
    • 구체적인 수행 작업 (뒤에 설명할 오퍼레이터에 의해 결정된다.)
  • ETL을 하는 DAG라면 세 개의 태스크로 구성 가능
  • Airflow의 오퍼레이터로 만들어진다.
    • Airflow에서 이미 다양한 종류의 오퍼레이터를 제공한다.
    • 커스텀해서 직접 개발도 가능하다.

![[SCR-20250216-tdcx.png]]

t1 >> t2 
t1 >> t3
t2 >> t4
t3 >> t4
t5 >> t4

만약 그림이 없다면?? 코드만 보고 작성이 안된다.

t1 >> [t2,t3] >> t4 << t5  

위 그림과 똑같다. 같은 레벨에 있는 태스크는 리스트에 넣어서 표현할 수 있다.

t1 >> [t2,t3] >> t4 
t5 >> t4

![[SCR-20250216-teik.png]]

t1 >> [t2,t3] >> t4
t5 >> t4
[t4,t7] >> t6 >> t8 
 
from airflow import DAG
 
import pendulum
 
import datetime
from airflow.operators.empty import EmptyOperator
 
with DAG(
dag_id="dags_conn_test",
schedule=None,
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
 
 
	t1 = EmptyOperator(
		task_id="t1"
	)
 
	t2 = EmptyOperator(
		task_id="t2"
	) 
 
	t3 = EmptyOperator(
		task_id="t3"
	)
	
	t4 = EmptyOperator(
		task_id="t4"
	)  
	
	t5 = EmptyOperator(
		task_id="t5"
	)
	
	t6 = EmptyOperator(
		task_id="t6"
	)
	
	t7 = EmptyOperator(
		task_id="t7"
	)
	
	t8 = EmptyOperator(
		task_id="t8"
	)
	
	t1 >> [t2, t3] >> t4
	t5 >> t4
	[t4, t7] >> t6 >> t8

DAG 에서 태스크들이 어떻게 실행이 될까??

  • Airflow 스케줄러는 DAG단위가 아닌 태스크 단위로 스케줄을 함
    • 스케줄해야하는 태스크들의 수가 Airflow의 용량(Worker의 CPU의 합) 보다 큰 경우 대기해야 함
  • 태스크를 많이 만들면 전체 DAG이 실행되는데 오래 걸리고 스케줄러에 부하가 감
  • 태스크를 너무 적게 만들면 모듈화가 안되고 실패시 재실행하는데 시간이 오래 걸림
  • 태스크를 논리적인 단위로 적절하게 분리해야 여러모로 편리
    • 디버깅과 재실행이 쉬워짐
    • 태스크당 다른 개발자 할당, 태스크당 다른 리소스 할당 가능

Task Decorator 사용 3개의 태스크간 데이터 전달

  • 태스크간 주고 받는 데이터는 Airflow 메타 데이터 DB에 기록된다.
  • 정확히는 Xcom이란 방식을 사용한다.
    • DAG_RUN_ID, TASK_ID, Key가 기본키
      • DAG_RUN_ID : DAG RUN 마다 주어지는 ID
      • Key : "return_value" 등등 다양한 키가 존재
    • 이 경우 큰 데이터를 주고 받을 수 없음

에어플로우안에 Xcom 이라는 테이블이 있고 태스크간에 데이터를 읽을 때 Xcom이라는 테이블에서 값을 읽어서 리턴 해주고 왔다갔다 한다~

with DAG (
	data = extract(csv_url)
	lines = transform (data)
	load (cur, lines, target_table)
)

Airflow taks 명령어의 제약점

  • airflow tasks 명령은 실행 결과를 메타 데이터 DB에 쓰지 않음

    • 그래서 앞서처럼 Xcom으로 태스크간 데이터를 주고 받는 경우에는 동작하지 않음
    • 이 명령은 테스트용으로 제공된다.
  • airflow dags 명령은 실행 결과를 메타 데이터 DB에 쓴다.

    • airflow dags test DAG_ID
  • Xcom 사용시 airflow tasks 명령으로 개별 태스크 테스트 불가하다.

  • 웹 UI에서 개별 실행은 전혀 문제없음 (메타데이터 기록!)