Create your first Airflow DAG (2024)

Let's start creating a Hello World workflow, which does nothing other than sending "Hello World!" to the log.

A DAG file, which is basically just a Python script, is a configuration file specifying the DAG’s structure as code.

The following are the steps by step to write an Airflow DAG or workflow:

  1. Creating a python file
  2. Importing the modules
  3. Default Arguments for the DAG
  4. Instantiate a DAG
  5. Creating a callable function
  6. Creating Tasks
  7. Setting up Dependencies
  8. Verifying the final Dag code
  9. Test the Pipeline
  10. Running the DAG

Step 1: Creating a python file

  • Create the${AIRFLOW_HOME}/dagsdirectory if it is not present. Under${AIRFLOW_HOME}/dagsdirectory we can put the script in it (including python script, shell, sql, etc., to facilitate scheduling).

mkdir -p ${AIRFLOW_HOME}/dags 

  • Create a new python filehello_world_dag.pyinside the${AIRFLOW_HOME}/dags/directory.

HelloWorld Application Structure:

${AIRFLOW_HOME}├── airflow.cfg├── airflow.db├── dags <- Your DAGs directory│ └── hello_world_dag.py <- Your DAG definition file└── unittests.cfg 

Add the following steps content to thehello_world_dag.pyfile.

vi ${AIRFLOW_HOME}/dags/hello_world_dag.py

Step 2: Importing the modules

  • Import the"timedelta, datetime"module fromdatetimepackage to help us schedule the dags.
  • Import the"DAG"module fromairflowpackage to instantiate the DAG object.
  • Import theDummyOperatorfrom the"airflow.operators.dummy_operator"module.
  • Import thePythonOperatorfrom the"airflow.operators.python_operator"module.

#datetimefrom datetime import timedelta, datetime# The DAG objectfrom airflow import DAG# Operatorsfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.python_operator import PythonOperator 

Step 3: Default Arguments for the DAG

  • Default arguments are passed to a DAG as default_args dictionary.
  • This makes it easy to apply a common parameter to many operators without having to type it many times.

# initializing the default argumentsdefault_args = {'owner': 'Ranga','start_date': datetime(2022, 3, 4),'retries': 3,'retry_delay': timedelta(minutes=5)} 

Step 4: Instantiate a DAG

  • Next we will instantiate a DAG object by passing the"dag_id"string which is the unique identifier of the dag.
  • It is recommended to keep the python file name and"dag_id"same, so we will assign the"dag_id"as"hello_world_dag".

# Instantiate a DAG objecthello_world_dag = DAG('hello_world_dag',default_args=default_args,description='Hello World DAG',schedule_interval='* * * * *', catchup=False,tags=['example, helloworld']) 

Example usage:

- Daily schedule:- `schedule_interval='@daily'`- `schedule_interval='0 0 * * *'` 

  • By default, Airflow will run tasks for all past intervals up to the current time. This behavior can be disabled by setting thecatchupparameter of a DAG to false, in which case Airflow will only start executing tasks from the current interval.

Step 5: Creating a callable function

  • We also need to create a function that will be called by thePythonOperatoras shown below:

# python callable functiondef print_hello():return 'Hello World!' 

Step 6: Creating Tasks

  • An objectinstantiatedfrom anoperatoris called atask. There are various types of operators available but we will first focus onDummyOperatorandPythonOperator.
  • APythonOperatoris used tocall a python functioninside your DAG. We will create a PythonOperator object that calls a function which will return 'Hello World!' upon it's call.
  • Like a DAG object has"dag_id", a PythonOperator object has a"task_id"which acts as it's identifier.
  • It also has"python_callable"parameter which takes the name of the function to be called as it's input.

# Creating first taskstart_task = DummyOperator(task_id='start_task', dag=hello_world_dag)# Creating second taskhello_world_task = PythonOperator(task_id='hello_world_task', python_callable=print_hello, dag=hello_world_dag)# Creating third taskend_task = DummyOperator(task_id='end_task', dag=hello_world_dag) 

Step 7: Setting up Dependencies

  • Set the dependencies or the order in which the tasks should be executed.
  • We can set the dependencies of the task by writing thetask namesalong with>>or<<to indicate thedownstreamorupstreamflow respectively.
  • Here are a few ways you can define dependencies between them:

# Set the order of execution of tasks. start_task >> hello_world_task >> end_task 

Step 8: Verifying the final DAG code

After compiling all the elements of the DAG, ourfinal codeshould look like this:

cat ${AIRFLOW_HOME}/dags/hello_world_dag.py

#datetimefrom datetime import timedelta, datetime# The DAG objectfrom airflow import DAG# Operatorsfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.python_operator import PythonOperator# initializing the default argumentsdefault_args = {'owner': 'Ranga','start_date': datetime(2022, 3, 4),'retries': 3,'retry_delay': timedelta(minutes=5)}# Instantiate a DAG objecthello_world_dag = DAG('hello_world_dag',default_args=default_args,description='Hello World DAG',schedule_interval='* * * * *', catchup=False,tags=['example, helloworld'])# python callable functiondef print_hello():return 'Hello World!'# Creating first taskstart_task = DummyOperator(task_id='start_task', dag=hello_world_dag)# Creating second taskhello_world_task = PythonOperator(task_id='hello_world_task', python_callable=print_hello, dag=hello_world_dag)# Creating third taskend_task = DummyOperator(task_id='end_task', dag=hello_world_dag)# Set the order of execution of tasks. start_task >> hello_world_task >> end_task 

This file creates a simple DAG with just two operators, theDummyOperator, which does nothing, and aPythonOperatorwhich calls theprint_hellofunction when its task is executed.

Step 9: Test the Pipeline

Check the DAG file contains valid Python code by executing the file with Python:

python3 ${AIRFLOW_HOME}/dags/hello_world_dag.py 

Step 10: Running the DAG

  • Activate the virtual environment.

source ~/install/airflow-tutorial/airflow_venv/bin/activateexport AIRFLOW_HOME=~/install/airflow-tutorial/airflow 

  • Start the airflow webserver and scheduler.

airflow webserver \--pid ${AIRFLOW_HOME}/logs/airflow-webserver.pid \--stdout ${AIRFLOW_HOME}/logs/airflow-webserver.out \--stderr ${AIRFLOW_HOME}/logs/airflow-webserver.out \-l ${AIRFLOW_HOME}/logs/airflow-webserver.log \-p 8080-Dairflow scheduler \--pid ${AIRFLOW_HOME}/logs/airflow-scheduler.pid \--stdout ${AIRFLOW_HOME}/logs/airflow-scheduler.out \--stderr ${AIRFLOW_HOME}/logs/airflow-scheduler.out \-l ${AIRFLOW_HOME}/logs/airflow-scheduler.log \-D 

Create your first Airflow DAG (1)

  • The DAG should run successfully. In order to check thegraph viewortree view, you can hover over links and selectGraphorTreeoptions.

Create your first Airflow DAG (2)

  • You can also view thetask's execution informationusinglogs. To do so, simply,click on the task, and you should see the following dialog box:

Create your first Airflow DAG (3)

  • Next, click on theLogbutton, and you will be redirected to the task's log.

Create your first Airflow DAG (4)

Happy Learning.

Create your first Airflow DAG (2024)

FAQs

How do you create the first DAG in Airflow? ›

To create a DAG in Airflow, you'll typically follow these steps:
  1. Import necessary modules: You'll need to import airflow modules like `DAG`, `operators`, and `tasks`.
  2. Define default arguments: Set default arguments that will be shared among all the tasks in your DAG, such as start date, owner, and retries.
Sep 29, 2023

How do I create a new DAG in Airflow UI? ›

Creating an Airflow DAG using the Pipeline UI
  1. Go to Jobs > Create Job. Under Job details, select Airflow. ...
  2. Specify a name for the job.
  3. Under DAG File select the Editor option.
  4. Click Create. You are redirected to the job Editor tab.
  5. Build your Airflow pipeline. ...
  6. When you are done with building your pipeline, click Save.

How do I add a DAG? ›

To add or update a DAG, move the Python . py file for the DAG to the /dags folder in the environment's bucket. In Google Cloud console, go to the Environments page. In the list of environments, find a row with the name of your environment and in the DAGs folder column click the DAGs link.

How do I publish a DAG in Airflow? ›

Deploying New DAG Code in Apache Airflow
  1. Write Your DAG: First, you need to write your DAG in Python. ...
  2. Place Your DAG in the DAGs Folder: Once your DAG is written, you need to place it in the DAGs folder. ...
  3. Update Airflow: After placing your DAG in the DAGs folder, you need to update Airflow.

In what language is an Airflow DAG written in? ›

An Airflow DAG is defined in a Python file and is composed of the following components: DAG definition. Airflow operators. Operator relationships.

Where do you write DAG in Airflow? ›

Airflow will only load DAGs that appear in the top level of a DAG file. This means you cannot just declare a function with @dag - you must also call it at least once in your DAG file and assign it to a top-level object, as you can see in the example above.

Can I create DAG from Airflow UI? ›

You can add notes to task instances and DAG runs from the Grid view in the Airflow UI. This feature is useful if you need to share contextual information about a DAG or task run with your team, such as why a specific run failed. Go to the Grid View of the docs_example_dag DAG you created in Step 2.

What does DAG stand for? ›

A directed acyclic graph (DAG) is a conceptual representation of a series of activities. The order of the activities is depicted by a graph, which is visually presented as a set of circles, each representing an activity, some of which are connected by lines, representing the flow from one activity to another.

How do you trigger Airflow DAG using API? ›

How to Trigger Airflow DAG Using REST API
  1. Enable REST API in Airflow. In the config, enable authentication and set it to basic authentication. I have version 3.8 of the docker-compose document and version 2.6. ...
  2. Build your POST request. I'm using Postman in this demo for the API requests. Define headers as follows:
Nov 10, 2023

What is Airflow DAG used for? ›

In Airflow, a DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code.

How do I know if my Airflow DAG is running? ›

DAGs that have a currently running DAG run can be shown on the UI dashboard in the “Running” tab. Similarly, DAGs whose latest DAG run is marked as failed can be found on the “Failed” tab.

How do you trigger Airflow? ›

The Airflow scheduler is designed to run as a persistent service in an Airflow production environment. To kick it off, all you need to do is execute airflow scheduler . It will use the configuration specified in airflow. cfg .

When not to use Apache Airflow? ›

Apache Airflow's Python dependencies can sometimes conflict with those required by custom task code. By default, Airflow operates within a single Python environment, which can lead to issues when different tasks have conflicting dependencies.

Does Airflow require coding? ›

However, the philosophy of Airflow is to define workflows as code so coding will always be required.

How do I host an Airflow server? ›

Configuration: You can also configure the Airflow web server and scheduler using the airflow. cfg configuration file, which is located in the Airflow home directory. You can set various configuration options such as the web server port, the scheduler interval, and the number of workers to run the DAGs.

Which component of Airflow initiates the DAG run? ›

The scheduler, by default, will kick off a DAG Run for any data interval that has not been run since the last data interval (or has been cleared). This concept is called Catchup.

How do you reset the DAG state in Airflow? ›

Clear all tasks​

To rerun multiple DAGs, click Browse > DAG Runs, select the DAGs to rerun, and in the Actions list select Clear the state.

How do you trigger DAG from cloud function? ›

Deploy a Cloud Function that triggers the DAG
  1. Trigger Type. Cloud Storage.
  2. Event Type. Finalize / Create.
  3. Bucket. Select a bucket that must trigger this function.
  4. Retry on failure. We recommend to disable this option for the purposes of this example.

Top Articles
Latest Posts
Article information

Author: Pres. Lawanda Wiegand

Last Updated:

Views: 6294

Rating: 4 / 5 (51 voted)

Reviews: 90% of readers found this page helpful

Author information

Name: Pres. Lawanda Wiegand

Birthday: 1993-01-10

Address: Suite 391 6963 Ullrich Shore, Bellefort, WI 01350-7893

Phone: +6806610432415

Job: Dynamic Manufacturing Assistant

Hobby: amateur radio, Taekwondo, Wood carving, Parkour, Skateboarding, Running, Rafting

Introduction: My name is Pres. Lawanda Wiegand, I am a inquisitive, helpful, glamorous, cheerful, open, clever, innocent person who loves writing and wants to share my knowledge and understanding with you.