Airflow
Jinja Template
  • 문서에서 특정 양식으로 작성된 값을 런타임실 실제 값으로 치환해주는 처리 엔진
  • 템플릿 엔진은 여러 솔루션이 존재하며 그 중 진자 템플릿은 파이썬 언어에서 사용하는 엔진
from jinja2 import Template
template = Template 'my name is {{name}})'
new_template = template.render(name='hjkim')
print(new_template)

1. Jinja 템플릿

  • Jinja 템플릿, 어디서 쓰이나?

    • 파이썬 기반 웹 프레임워크 Flask, Django 에서 주로 사용 (주로 HTML 템플릿 저장 후 화면에 보여질 때 실제 값으로 변환해서 출력)
  • SQL 작성시에도 활용 가능

select *
from tables
where base_dt ={{}};

2. Airflow에서

  • 오퍼레이터 파라미터 입력시 중괄호 {{}} 2개를 이용하면,

    Airflow에서 기본적으로 제공하는 변수들을 치환된 값으로 입력할 수 있음 (날짜, DAG ID)

{{data_interval_start}}

에어플로우 공식문서 (opens in a new tab)

모든 오퍼레이터 , 모든 파라미터에 Template 변수 적용이 가능한 것이 아니다. !!

3. Operators

공식문서를 보다보면 마지막에 templated 라고 적혀있으면 템플릿 엔진은 적용할 수 있다는 것이다 .

  • bash_command (str)
  • env
from airflow import DAG
import pendulum
import datetime
from airflow.operators.bash import BashOperator

with DAG(
dag_id="dags_bash_with_template",
schedule="10 0 \* \* \*",
start_date=pendulum.datetime(2025, 2, 21, tz="Asia/Seoul"),
catchup=False
) as dag:

bash_t1 = BashOperator(
task_id='bash_t1',
bash_command='echo "data_interval_end: {{ data_interval_end }} "'
)

bash_t2 = BashOperator(
task_id='bash_t2',
env={
'START_DATE': '{{ data_interval_start | ds }}',
'END_DATE': '{{ data_interval_end | ds }}'
},
bash_command='echo $START_DATE && echo $END_DATE'
)

bash_t1 >> bash_t2

Python Operator with Templates

from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from airflow.decorators import task
 
with DAG(
	dag_id="dags_python_template",
	schedule="30 9 * * *",
	start_date=pendulum.datetime(2025, 2, 12, tz="Asia/Seoul"),
	catchup=False
) as dag:
 
def python_function(start_date, end_date, **kwargs):
	print(start_date)
	print(end_date)
 
python_t1 = PythonOperator(
	task_id="python_t1",
	python_callable=python_function,
	op_kwargs={'start_date': '{{data_interval_start | ds}}', 'end_date': '{{data_interval_end | ds}}'}
)
 
@task(task_id='python_t2')
def python_function2(\*\*kwargs):
print(kwargs)
print('ds:' + kwargs['ds'])
print('ts:' + kwargs['ts'])
print('date_interval_start:' + str(kwargs['data_interval_start']))
print('date_interval_end:' + str(kwargs['data_interval_end']))
print('task_instance:' + str(kwargs['ti']))

python_t1 >> python_function2()