Let's start creating a Hello World workflow, which does nothing other than sending "Hello World!" to the log.
A DAG file, which is basically just a Python script, is a configuration file specifying the DAG’s structure as code.
The following are the steps by step to write an Airflow DAG or workflow:
Step 1: Creating a python file
mkdir -p ${AIRFLOW_HOME}/dags
HelloWorld Application Structure:
${AIRFLOW_HOME}├── airflow.cfg├── airflow.db├── dags <- Your DAGs directory│ └── hello_world_dag.py <- Your DAG definition file└── unittests.cfg
Add the following steps content to thehello_world_dag.pyfile.
vi ${AIRFLOW_HOME}/dags/hello_world_dag.py
Step 2: Importing the modules
#datetimefrom datetime import timedelta, datetime# The DAG objectfrom airflow import DAG# Operatorsfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.python_operator import PythonOperator
Step 3: Default Arguments for the DAG
# initializing the default argumentsdefault_args = {'owner': 'Ranga','start_date': datetime(2022, 3, 4),'retries': 3,'retry_delay': timedelta(minutes=5)}
Step 4: Instantiate a DAG
# Instantiate a DAG objecthello_world_dag = DAG('hello_world_dag',default_args=default_args,description='Hello World DAG',schedule_interval='* * * * *', catchup=False,tags=['example, helloworld'])
Example usage:
- Daily schedule:- `schedule_interval='@daily'`- `schedule_interval='0 0 * * *'`
Step 5: Creating a callable function
# python callable functiondef print_hello():return 'Hello World!'
Step 6: Creating Tasks
# Creating first taskstart_task = DummyOperator(task_id='start_task', dag=hello_world_dag)# Creating second taskhello_world_task = PythonOperator(task_id='hello_world_task', python_callable=print_hello, dag=hello_world_dag)# Creating third taskend_task = DummyOperator(task_id='end_task', dag=hello_world_dag)
Step 7: Setting up Dependencies
# Set the order of execution of tasks. start_task >> hello_world_task >> end_task
Step 8: Verifying the final DAG code
After compiling all the elements of the DAG, ourfinal codeshould look like this:
cat ${AIRFLOW_HOME}/dags/hello_world_dag.py
#datetimefrom datetime import timedelta, datetime# The DAG objectfrom airflow import DAG# Operatorsfrom airflow.operators.dummy_operator import DummyOperatorfrom airflow.operators.python_operator import PythonOperator# initializing the default argumentsdefault_args = {'owner': 'Ranga','start_date': datetime(2022, 3, 4),'retries': 3,'retry_delay': timedelta(minutes=5)}# Instantiate a DAG objecthello_world_dag = DAG('hello_world_dag',default_args=default_args,description='Hello World DAG',schedule_interval='* * * * *', catchup=False,tags=['example, helloworld'])# python callable functiondef print_hello():return 'Hello World!'# Creating first taskstart_task = DummyOperator(task_id='start_task', dag=hello_world_dag)# Creating second taskhello_world_task = PythonOperator(task_id='hello_world_task', python_callable=print_hello, dag=hello_world_dag)# Creating third taskend_task = DummyOperator(task_id='end_task', dag=hello_world_dag)# Set the order of execution of tasks. start_task >> hello_world_task >> end_task
This file creates a simple DAG with just two operators, theDummyOperator, which does nothing, and aPythonOperatorwhich calls theprint_hellofunction when its task is executed.
Step 9: Test the Pipeline
Check the DAG file contains valid Python code by executing the file with Python:
python3 ${AIRFLOW_HOME}/dags/hello_world_dag.py
Step 10: Running the DAG
source ~/install/airflow-tutorial/airflow_venv/bin/activateexport AIRFLOW_HOME=~/install/airflow-tutorial/airflow
airflow webserver \--pid ${AIRFLOW_HOME}/logs/airflow-webserver.pid \--stdout ${AIRFLOW_HOME}/logs/airflow-webserver.out \--stderr ${AIRFLOW_HOME}/logs/airflow-webserver.out \-l ${AIRFLOW_HOME}/logs/airflow-webserver.log \-p 8080-Dairflow scheduler \--pid ${AIRFLOW_HOME}/logs/airflow-scheduler.pid \--stdout ${AIRFLOW_HOME}/logs/airflow-scheduler.out \--stderr ${AIRFLOW_HOME}/logs/airflow-scheduler.out \-l ${AIRFLOW_HOME}/logs/airflow-scheduler.log \-D
Happy Learning.