- DAG를 구성하는 컴포넌트로 구체적으로 무엇인가 작업을 하는 수행
- 태스크를 구성하는 요소
- 태스크 ID, 오너, 실패시 재시도 회수 등등
- 구체적인 수행 작업 (뒤에 설명할 오퍼레이터에 의해 결정된다.)
- ETL을 하는 DAG라면 세 개의 태스크로 구성 가능
- Airflow의 오퍼레이터로 만들어진다.
- Airflow에서 이미 다양한 종류의 오퍼레이터를 제공한다.
- 커스텀해서 직접 개발도 가능하다.
![[SCR-20250216-tdcx.png]]
t1 >> t2
t1 >> t3
t2 >> t4
t3 >> t4
t5 >> t4
만약 그림이 없다면?? 코드만 보고 작성이 안된다.
t1 >> [t2,t3] >> t4 << t5
위 그림과 똑같다. 같은 레벨에 있는 태스크는 리스트에 넣어서 표현할 수 있다.
t1 >> [t2,t3] >> t4
t5 >> t4
![[SCR-20250216-teik.png]]
t1 >> [t2,t3] >> t4
t5 >> t4
[t4,t7] >> t6 >> t8
from airflow import DAG
import pendulum
import datetime
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="dags_conn_test",
schedule=None,
start_date=pendulum.datetime(2023, 3, 1, tz="Asia/Seoul"),
catchup=False
) as dag:
t1 = EmptyOperator(
task_id="t1"
)
t2 = EmptyOperator(
task_id="t2"
)
t3 = EmptyOperator(
task_id="t3"
)
t4 = EmptyOperator(
task_id="t4"
)
t5 = EmptyOperator(
task_id="t5"
)
t6 = EmptyOperator(
task_id="t6"
)
t7 = EmptyOperator(
task_id="t7"
)
t8 = EmptyOperator(
task_id="t8"
)
t1 >> [t2, t3] >> t4
t5 >> t4
[t4, t7] >> t6 >> t8
DAG 에서 태스크들이 어떻게 실행이 될까??
- Airflow 스케줄러는 DAG단위가 아닌 태스크 단위로 스케줄을 함
- 스케줄해야하는 태스크들의 수가 Airflow의 용량(Worker의 CPU의 합) 보다 큰 경우 대기해야 함
- 태스크를 많이 만들면 전체 DAG이 실행되는데 오래 걸리고 스케줄러에 부하가 감
- 태스크를 너무 적게 만들면 모듈화가 안되고 실패시 재실행하는데 시간이 오래 걸림
- 태스크를 논리적인 단위로 적절하게 분리해야 여러모로 편리
- 디버깅과 재실행이 쉬워짐
- 태스크당 다른 개발자 할당, 태스크당 다른 리소스 할당 가능
Task Decorator 사용 3개의 태스크간 데이터 전달
- 태스크간 주고 받는 데이터는 Airflow 메타 데이터 DB에 기록된다.
- 정확히는 Xcom이란 방식을 사용한다.
- DAG_RUN_ID, TASK_ID, Key가 기본키
- DAG_RUN_ID : DAG RUN 마다 주어지는 ID
- Key : "return_value" 등등 다양한 키가 존재
- 이 경우 큰 데이터를 주고 받을 수 없음
- DAG_RUN_ID, TASK_ID, Key가 기본키
에어플로우안에 Xcom 이라는 테이블이 있고 태스크간에 데이터를 읽을 때 Xcom이라는 테이블에서 값을 읽어서 리턴 해주고 왔다갔다 한다~
with DAG (
data = extract(csv_url)
lines = transform (data)
load (cur, lines, target_table)
)