Use Airflow templates | Astronomer Documentation (2024)

Templating allows you to pass dynamic information into task instances at runtime. For example, you can run the following command to print the day of the week every time you run a task:

BashOperator(
task_id="print_day_of_week",
bash_command="echo Today is {{ execution_date.format('dddd') }}",
)

In this example, the value in the double curly braces {{ }} is the templated code that is evaluated at runtime. If you execute this code on a Wednesday, the BashOperator prints Today is Wednesday. Templates have numerous applications. For example, you can use templating to create a new directory named after a task's execution date for storing daily data (/data/path/20210824). Alternatively, you can select a specific partition (/data/path/yyyy=2021/mm=08/dd=24) so that only the relevant data for a given execution date is scanned.

Airflow leverages Jinja, a Python templating framework, as its templating engine. In this guide, you'll learn the following:

  • How to apply Jinja templates in your code.
  • Which variables and functions are available when templating.
  • Which operator fields can be templated and which cannot.
  • How to validate templates.
  • How to apply custom variables and functions when templating.
  • How to render templates to strings and native Python code.

Assumed knowledge

To get the most out of this guide, you should have an understanding of:

  • Airflow operators. See Operators 101.
  • Jinja templating. See Jinja basics.

Runtime variables in Airflow

Templating in Airflow works the same as Jinja templating in Python. You enclose the code you want evaluated between double curly braces, and the expression is evaluated at runtime.

This table lists some of the most commonly used Airflow variables that you can use in templates:

Variable nameDescription
execution_dateStarting datetime of DAG run interval
dsexecution_date formatted as "2022-12-27"
ds_nodashexecution_date formatted as "20221227"
next_dsnext execution_date (= end of current interval) datetime

For a complete list of the available variables, see the Airflow Templates reference.

Templateable fields and scripts

Templates cannot be applied to all arguments of an operator. Two attributes in the BaseOperator define where you can use templated values:

  • template_fields: Defines which operator arguments can use templated values.
  • template_ext: Defines which file extensions can use templated values.

The following example shows a simplified version of the BashOperator:

class BashOperator(BaseOperator):
template_fields = ('bash_command', 'env') # defines which fields are templateable
template_ext = ('.sh', '.bash') # defines which file extensions are templateable

def __init__(
self,
*,
bash_command,
env: None,
output_encoding: 'utf-8',
**kwargs,
):
super().__init__(**kwargs)
self.bash_command = bash_command # templateable (can also give path to .sh or .bash script)
self.env = env # templateable
self.output_encoding = output_encoding # not templateable

The template_fields attribute holds a list of attributes that can use templated values. You can also find this list in the Airflow documentation or in the Airflow UI as shown in the following image:

Use Airflow templates | Astronomer Documentation (1)

template_ext contains a list of file extensions that can be read and templated at runtime. For example, instead of providing a Bash command to bash_command, you could provide a .sh script that contains a templated value:

run_this = BashOperator(
task_id="run_this",
bash_command="script.sh", # .sh extension can be read and templated
)

The BashOperator takes the contents of the following script, templates it, and executes it:

# script.sh
echo "Today is {{ execution_date.format('dddd') }}"

Templating from files speeds development because an integrated development environment (IDE) can apply language-specific syntax highlighting on the script. This wouldn't be possible if your script is defined as a big string of Airflow code.

By default, Airflow searches for the location of your scripts relative to the directory the DAG file is defined in. So, if your DAG is stored in /path/to/dag.py and your script is stored in /path/to/scripts/script.sh, you would update the value of bash_command in the previous example to scripts/script.sh.

Alternatively, you can set a base path for templates at the DAG-level with the template_searchpath argument. For example, the following DAG would look for script.sh at /tmp/script.sh:

with DAG(..., template_searchpath="/tmp") as dag:
run_this = BashOperator(task_id="run_this", bash_command="script.sh")

Validate templates

The output of templates can be checked in both the Airflow UI and Airflow CLI. One advantage of the Airflow CLI is that you don't need to run any tasks before seeing the result.

The Airflow CLI command airflow tasks render renders all templateable attributes of a given task. Given a dag_id, task_id, and random execution_date, the command output is similar to the following example:

$ airflow tasks render example_dag run_this 2021-01-01

# ----------------------------------------------------------
# property: bash_command
# ----------------------------------------------------------
echo "Today is Friday"

# ----------------------------------------------------------
# property: env
# ----------------------------------------------------------
None

For this command to work, Airflow needs access to a metadata database. To set up a local SQLite database, run the following commands:

cd <your-project-directory>
export AIRFLOW_HOME=$(pwd)
airflow db init # generates airflow.db, airflow.cfg, and webserver_config.py in your project dir

# airflow tasks render [dag_id] [task_id] [execution_date]

If you use the Astro CLI, a postgres metadata database is automatically configured for you after running astro dev start in your project directory. From here, you can run astro dev run tasks render <parameters> to test your templated values.

For most templates, this is sufficient. However, if an external system such as a variable in your production Airflow metadata database is reached by the templating logic, you must have connectivity to it.

To view the result of templated attributes after running a task in the Airflow UI, click a task and then click Rendered as shown in the following image:

Use Airflow templates | Astronomer Documentation (2)

The Rendered Template view and the output of the templated attributes are shown in the following image:

Use Airflow templates | Astronomer Documentation (3)

Using custom functions and variables in templates

As discussed previously, there are several variables available during templating. A Jinja environment and Airflow runtime are different. You can view a Jinja environment as a very stripped-down Python environment. That, among other things, means modules cannot be imported. For example, this command won't work in a Jinja template:

from datetime import datetime

BashOperator(
task_id="print_now",
bash_command="echo It is currently {{ datetime.now() }}", # raises jinja2.exceptions.UndefinedError: 'datetime' is undefined
)

However, it is possible to inject functions into your Jinja environment. In Airflow, several standard Python modules are injected by default for templating, under the name macros. For example, the previous code example can be updated to use macros.datetime:

BashOperator(
task_id="print_now",
bash_command="echo It is currently {{ macros.datetime.now() }}", # It is currently 2021-08-30 13:51:55.820299
)

Besides pre-injected functions, you can also use self-defined variables and functions in your templates. Airflow provides a convenient way to inject these into the Jinja environment. In the following example, a function is added to the DAG to print the number of days since May 1st, 2015:

def days_to_now(starting_date):
return (datetime.now() - starting_date).days

To use this inside a Jinja template, you can pass a dict to user_defined_macros in the DAG. For example:

def days_to_now(starting_date):
return (datetime.now() - starting_date).days


with DAG(
dag_id="demo_template",
start_date=datetime(2021, 1, 1),
schedule=None,
user_defined_macros={
"starting_date": datetime(2015, 5, 1), # Macro can be a variable
"days_to_now": days_to_now, # Macro can also be a function
},
) as dag:
print_days = BashOperator(
task_id="print_days",
bash_command="echo Days since {{ starting_date }} is {{ days_to_now(starting_date) }}", # Call user defined macros
)
# Days since 2015-05-01 00:00:00 is 2313

It's also possible to inject functions as Jinja filters using user_defined_filters. You can use filters as pipe-operations. The following example completes the same work as the previous example, only this time filters are used:

with DAG(
dag_id="bash_script_template",
start_date=datetime(2021, 1, 1),
schedule=None,
user_defined_filters={"days_to_now": days_to_now}, # Set user_defined_filters to use function as pipe-operation
user_defined_macros={"starting_date": datetime(2015, 5, 1)},
) as dag:
print_days = BashOperator(
task_id="print_days",
bash_command="echo Days since {{ starting_date }} is {{ starting_date | days_to_now }}", # Pipe value to function
)
# Days since 2015-05-01 00:00:00 is 2313

Functions injected with user_defined_filters and user_defined_macros are both usable in the Jinja environment. While they achieve the same result, Astronomer recommends using filters when you need to import multiple custom functions because the filter formatting improves the readability of your code. You can see this when comparing the two techniques side-to-side:

"{{ name | striptags | title }}" # chained filters are read naturally from left to right
"{{ title(striptags(name)) }}" # multiple functions are more difficult to interpret because reading right to left

Render native Python code

By default, Jinja templates always render to Python strings. Sometimes it's desirable to render templates to native Python code. When the code you're calling doesn't work with strings, it can cause issues. For example:

def sum_numbers(*args):
total = 0
for val in args:
total += val
return total

sum_numbers(1, 2, 3) # returns 6
sum_numbers("1", "2", "3") # TypeError: unsupported operand type(s) for +=: 'int' and 'str'

Consider a scenario where you're passing a list of values to this function by triggering a DAG with a config that holds some numbers:

with DAG(dag_id="failing_template", start_date=datetime.datetime(2021, 1, 1), schedule=None) as dag:
sumnumbers = PythonOperator(
task_id="sumnumbers",
python_callable=sum_numbers,
op_args="{{ dag_run.conf['numbers'] }}",
)

You would trigger the DAG with the following JSON to the DAG run configuration:

{"numbers": [1,2,3]}

The rendered value is a string. Since the sum_numbers function unpacks the given string, it ends up trying to add up every character in the string:

('[', '1', ',', ' ', '2', ',', ' ', '3', ']')

This is not going to work, so you must tell Jinja to return a native Python list instead of a string. Jinja supports this with Environments. The default Jinja environment outputs strings, but you can configure a NativeEnvironment to render templates as native Python code.

Support for Jinja's NativeEnvironment was added in Airflow 2.1.0 with the render_template_as_native_obj argument on the DAG class. This argument takes a boolean value which determines whether to render templates with Jinja's default Environment or NativeEnvironment. For example:

def sum_numbers(*args):
total = 0
for val in args:
total += val
return total


with DAG(
dag_id="native_templating",
start_date=datetime.datetime(2021, 1, 1),
schedule=None,
render_template_as_native_obj=True, # Render templates using Jinja NativeEnvironment
) as dag:
sumnumbers = PythonOperator(
task_id="sumnumbers",
python_callable=sum_numbers,
op_args="{{ dag_run.conf['numbers'] }}",
)

Passing the same JSON configuration {"numbers": [1,2,3]} now renders a list of integers which the sum_numbers function processes correctly:

[2021-08-26 11:53:12,872] {python.py:151} INFO - Done. Returned value was: 6

The Jinja environment must be configured on the DAG-level. This means that all tasks in a DAG render either using the default Jinja environment or using the NativeEnvironment.

Use Airflow templates | Astronomer Documentation (2024)

FAQs

How do you use Airflow templating? ›

Templating in Airflow works the same as Jinja templating in Python. You enclose the code you want evaluated between double curly braces, and the expression is evaluated at runtime. For a complete list of the available variables, see the Airflow Templates reference.

How do you write simple DAG in Airflow? ›

The following are the steps by step to write an Airflow DAG or workflow:
  1. Creating a python file.
  2. Importing the modules.
  3. Default Arguments for the DAG.
  4. Instantiate a DAG.
  5. Creating a callable function.
  6. Creating Tasks.
  7. Setting up Dependencies.
  8. Verifying the final Dag code.
Mar 18, 2022

Do I need to know Python for Airflow? ›

Airflow is Python-based but you can execute a program irrespective of the language. For instance, the first stage of your workflow has to execute a C++ based program to perform image analysis and then a Python-based program to transfer that information to S3.

How does Airflow look for DAGs? ›

Airflow loads DAGs from Python source files, which it looks for inside its configured DAG_FOLDER . It will take each file, execute it, and then load any DAG objects from that file. This means you can define multiple DAGs per Python file, or even spread one very complex DAG across multiple Python files using imports.

What is {{ DS }} in Airflow? ›

Airflow defines some Jinja filters that can be used to format values. For example, using {{ execution_date | ds }} will output the execution_date in the YYYY-MM-DD format.

Why does Airflow use DAG? ›

In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.

How do I write a Python script in Airflow? ›

  1. Step 1: Make the Imports. The first step is to import the necessary classes. ...
  2. Step 2: Create the Airflow Python DAG object. The second step is to create the Airflow Python DAG object after the imports have been completed. ...
  3. Step 3: Add the Tasks.
Feb 14, 2022

In what language is an Airflow DAG written in? ›

An Airflow DAG is defined in a Python file and is composed of the following components: A DAG definition, operators, and operator relationships.

How do you make a DAG in Airflow step by step? ›

Steps To Create an Airflow DAG
  1. Importing the right modules for your DAG.
  2. Create default arguments for the DAG.
  3. Creating a DAG Object.
  4. Creating tasks.
  5. Setting up dependencies for the DAG.
Jan 19, 2023

Do data engineers use Airflow? ›

Apache Airflow is an open-source workflow authoring, scheduling, and monitoring application. It's one of the most reliable systems for orchestrating processes or pipelines that Data Engineers employ.

Do data scientists use Airflow? ›

Airbnb created Airflow in 2014 to help manage its data processing needs and has since become a far-reaching tool for data scientists across the industry. Airflow allows you to define workflows as directed acyclic graphs (DAGs) of tasks and provides a rich set of operators to perform those tasks.

When should you not use Airflow? ›

Use cases for which Airflow is a bad option
  1. if you need to share data between tasks.
  2. if you need versioning of your data pipelines → Airflow doesn't support that.
  3. if you would like to parallelize your Python code with Dask — Prefect supports Dask Distributed out of the box.
Aug 26, 2020

How many DAGs can Airflow handle? ›

DAGs are defined in standard Python files that are placed in Airflow's DAG_FOLDER . Airflow will execute the code in each file to dynamically build the DAG objects. You can have as many DAGs as you want, each describing an arbitrary number of tasks.

How many DAGs can Airflow run at once? ›

concurrency :** The maximum number of task instances allowed to run concurrently across all active DAG runs for a given DAG. This allows you to allow one DAG to run 32 tasks at once, and another DAG can be set to run 16 tasks at once.

How do you trigger Airflow DAG automatically? ›

In the Airflow web interface, on the DAGs page, in the Links column for your DAG, click the Trigger Dag button. (Optional) Specify the DAG run configuration. Click Trigger.

What is use of templating in Python? ›

Templating in Python. Templating, and in particular web templating is a way to represent data in different forms. These forms often (but not always) intended to be readable, even attractive, to a human audience. Frequently, templating solutions involve a document (the template) and data.

How do I trigger an Airflow DAG email? ›

How to use the EmailOperator in the airflow DAG
  1. System requirements :
  2. Step 1: Connecting to Gmail and logging in.
  3. Step 2: Enable IMAP for the SMTP.
  4. Step 3: Update SMTP details in Airflow.
  5. Step 4: Importing modules.
  6. Step 5: Default Arguments.
  7. Step 6: Instantiate a DAG.
  8. Step 7: Set the Tasks.
Dec 19, 2022

How do I activate DAG in Airflow? ›

In the Airflow web interface, on the DAGs page, in the Links column for your DAG, click the Trigger Dag button. (Optional) Specify the DAG run configuration. Click Trigger.

Top Articles
Latest Posts
Article information

Author: Laurine Ryan

Last Updated:

Views: 6722

Rating: 4.7 / 5 (77 voted)

Reviews: 92% of readers found this page helpful

Author information

Name: Laurine Ryan

Birthday: 1994-12-23

Address: Suite 751 871 Lissette Throughway, West Kittie, NH 41603

Phone: +2366831109631

Job: Sales Producer

Hobby: Creative writing, Motor sports, Do it yourself, Skateboarding, Coffee roasting, Calligraphy, Stand-up comedy

Introduction: My name is Laurine Ryan, I am a adorable, fair, graceful, spotless, gorgeous, homely, cooperative person who loves writing and wants to share my knowledge and understanding with you.