Write Airflow DAGs  |  Cloud Composer  |  Google Cloud (2024)

Cloud Composer1|Cloud Composer2

This guide shows you how to write an Apache Airflow directed acyclic graph(DAG) that runs in a Cloud Composer environment.

Because Apache Airflow does not provide strong DAG and task isolation,we recommend that you use separate production and test environmentsto prevent DAG interference. For more information,see Testing DAGs.

Structuring an Airflow DAG

An Airflow DAG is defined in a Python file and is composed of the followingcomponents:

  • DAG definition
  • Airflow operators
  • Operator relationships

The following code snippets show examples of each component out of context.

A DAG definition

The following example demonstrates a DAG definition:

Airflow 2

import datetimefrom airflow import modelsdefault_dag_args = { # The start_date describes when a DAG is valid / can be run. Set this to a # fixed point in time rather than dynamically, since it is evaluated every # time a DAG is parsed. See: # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date "start_date": datetime.datetime(2018, 1, 1),}# Define a DAG (directed acyclic graph) of tasks.# Any task you create within the context manager is automatically added to the# DAG object.with models.DAG( "composer_sample_simple_greeting", schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args,) as dag:

Airflow 1

import datetimefrom airflow import modelsdefault_dag_args = { # The start_date describes when a DAG is valid / can be run. Set this to a # fixed point in time rather than dynamically, since it is evaluated every # time a DAG is parsed. See: # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date "start_date": datetime.datetime(2018, 1, 1),}# Define a DAG (directed acyclic graph) of tasks.# Any task you create within the context manager is automatically added to the# DAG object.with models.DAG( "composer_sample_simple_greeting", schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args,) as dag:

Operators and tasks

Operators describe the work to be done. A tasktask is a specific instance of an operator.

Airflow 2

from airflow.operators import bash_operatorfrom airflow.operators import python_operator def greeting(): import logging logging.info("Hello World!") # An instance of an operator is called a task. In this case, the # hello_python task calls the "greeting" Python function. hello_python = python_operator.PythonOperator( task_id="hello", python_callable=greeting ) # Likewise, the goodbye_bash task calls a Bash script. goodbye_bash = bash_operator.BashOperator( task_id="bye", bash_command="echo Goodbye." )

Airflow 1

from airflow.operators import bash_operatorfrom airflow.operators import python_operator def greeting(): import logging logging.info("Hello World!") # An instance of an operator is called a task. In this case, the # hello_python task calls the "greeting" Python function. hello_python = python_operator.PythonOperator( task_id="hello", python_callable=greeting ) # Likewise, the goodbye_bash task calls a Bash script. goodbye_bash = bash_operator.BashOperator( task_id="bye", bash_command="echo Goodbye." )

Task relationships

Task relationships describe the order in which the work must be completed.

Airflow 2

# Define the order in which the tasks complete by using the >> and <<# operators. In this example, hello_python executes before goodbye_bash.hello_python >> goodbye_bash

Airflow 1

# Define the order in which the tasks complete by using the >> and <<# operators. In this example, hello_python executes before goodbye_bash.hello_python >> goodbye_bash

Full DAG workflow example in Python

The following workflow is a complete working DAG template that is composed of two tasks: a hello_python task and a goodbye_bash task:

Airflow 2

import datetimefrom airflow import modelsfrom airflow.operators import bash_operatorfrom airflow.operators import python_operatordefault_dag_args = { # The start_date describes when a DAG is valid / can be run. Set this to a # fixed point in time rather than dynamically, since it is evaluated every # time a DAG is parsed. See: # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date "start_date": datetime.datetime(2018, 1, 1),}# Define a DAG (directed acyclic graph) of tasks.# Any task you create within the context manager is automatically added to the# DAG object.with models.DAG( "composer_sample_simple_greeting", schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args,) as dag: def greeting(): import logging logging.info("Hello World!") # An instance of an operator is called a task. In this case, the # hello_python task calls the "greeting" Python function. hello_python = python_operator.PythonOperator( task_id="hello", python_callable=greeting ) # Likewise, the goodbye_bash task calls a Bash script. goodbye_bash = bash_operator.BashOperator( task_id="bye", bash_command="echo Goodbye." ) # Define the order in which the tasks complete by using the >> and << # operators. In this example, hello_python executes before goodbye_bash. hello_python >> goodbye_bash

Airflow 1

import datetimefrom airflow import modelsfrom airflow.operators import bash_operatorfrom airflow.operators import python_operatordefault_dag_args = { # The start_date describes when a DAG is valid / can be run. Set this to a # fixed point in time rather than dynamically, since it is evaluated every # time a DAG is parsed. See: # https://airflow.apache.org/faq.html#what-s-the-deal-with-start-date "start_date": datetime.datetime(2018, 1, 1),}# Define a DAG (directed acyclic graph) of tasks.# Any task you create within the context manager is automatically added to the# DAG object.with models.DAG( "composer_sample_simple_greeting", schedule_interval=datetime.timedelta(days=1), default_args=default_dag_args,) as dag: def greeting(): import logging logging.info("Hello World!") # An instance of an operator is called a task. In this case, the # hello_python task calls the "greeting" Python function. hello_python = python_operator.PythonOperator( task_id="hello", python_callable=greeting ) # Likewise, the goodbye_bash task calls a Bash script. goodbye_bash = bash_operator.BashOperator( task_id="bye", bash_command="echo Goodbye." ) # Define the order in which the tasks complete by using the >> and << # operators. In this example, hello_python executes before goodbye_bash. hello_python >> goodbye_bash

For more information about defining Airflow DAGs, see theAirflow tutorial andAirflow concepts.

Airflow operators

The following examples show a few popular Airflow operators. For anauthoritative reference of Airflow operators, see theOperators and Hooks Reference and Providers index.

BashOperator

Use theBashOperator to run command-line programs.

Airflow 2

from airflow.operators import bash # Create BigQuery output dataset. make_bq_dataset = bash.BashOperator( task_id="make_bq_dataset", # Executing 'bq' command requires Google Cloud SDK which comes # preinstalled in Cloud Composer. bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}", )

Airflow 1

from airflow.operators import bash_operator # Create BigQuery output dataset. make_bq_dataset = bash_operator.BashOperator( task_id="make_bq_dataset", # Executing 'bq' command requires Google Cloud SDK which comes # preinstalled in Cloud Composer. bash_command=f"bq ls {bq_dataset_name} || bq mk {bq_dataset_name}", )

Cloud Composer runs the provided commands in a Bash script on anAirflow worker. The worker is a Debian-based Docker container and includesseveral packages.

PythonOperator

Use thePythonOperator to run arbitrary Python code.

Cloud Composer runs the Python code in acontainer that includes packages for theCloud Composer image version used in yourenvironment.

To install additional Python packages, seeInstalling Python Dependencies.

Google Cloud Operators

To run tasks that use Google Cloud products, use theGoogle Cloud Airflow operators. For example,BigQuery operators query and process data in BigQuery.

There are many more Airflow operators for Google Cloud and individual services provided by Google Cloud. SeeGoogle Cloud Operators for the full list.

Airflow 2

from airflow.providers.google.cloud.operators import bigqueryfrom airflow.providers.google.cloud.transfers import bigquery_to_gcs bq_recent_questions_query = bigquery.BigQueryInsertJobOperator( task_id="bq_recent_questions_query", configuration={ "query": { "query": RECENT_QUESTIONS_QUERY, "useLegacySql": False, "destinationTable": { "projectId": project_id, "datasetId": bq_dataset_name, "tableId": bq_recent_questions_table_id, }, } }, location=location, )

Airflow 1

from airflow.contrib.operators import bigquery_operator # Query recent StackOverflow questions. bq_recent_questions_query = bigquery_operator.BigQueryOperator( task_id="bq_recent_questions_query", sql=""" SELECT owner_display_name, title, view_count FROM `bigquery-public-data.stackoverflow.posts_questions` WHERE creation_date < CAST('{max_date}' AS TIMESTAMP) AND creation_date >= CAST('{min_date}' AS TIMESTAMP) ORDER BY view_count DESC LIMIT 100 """.format( max_date=max_query_date, min_date=min_query_date ), use_legacy_sql=False, destination_dataset_table=bq_recent_questions_table_id, )

EmailOperator

Use theEmailOperator to send email from a DAG. To send email from a Cloud Composerenvironment, you mustconfigure your environment to use SendGrid.

Airflow 2

from airflow.operators import email # Send email confirmation (you will need to set up the email operator # See https://cloud.google.com/composer/docs/how-to/managing/creating#notification # for more info on configuring the email operator in Cloud Composer) email_summary = email.EmailOperator( task_id="email_summary", to="{{var.value.email}}", subject="Sample BigQuery notify data ready", html_content=""" Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date} 12AM. The most popular question was '{question_title}' with {view_count} views. Top 100 questions asked are now available at: {export_location}. """.format( min_date=min_query_date, max_date=max_query_date, question_title=( "{{ ti.xcom_pull(task_ids='bq_read_most_popular', " "key='return_value')[0][0] }}" ), view_count=( "{{ ti.xcom_pull(task_ids='bq_read_most_popular', " "key='return_value')[0][1] }}" ), export_location=output_file, ), )

Airflow 1

from airflow.operators import email_operator # Send email confirmation email_summary = email_operator.EmailOperator( task_id="email_summary", to="{{var.value.email}}", subject="Sample BigQuery notify data ready", html_content=""" Analyzed Stack Overflow posts data from {min_date} 12AM to {max_date} 12AM. The most popular question was '{question_title}' with {view_count} views. Top 100 questions asked are now available at: {export_location}. """.format( min_date=min_query_date, max_date=max_query_date, question_title=( "{{ ti.xcom_pull(task_ids='bq_read_most_popular', " "key='return_value')[0][0] }}" ), view_count=( "{{ ti.xcom_pull(task_ids='bq_read_most_popular', " "key='return_value')[0][1] }}" ), export_location=output_file, ), )

Notifications on operator failure

Set email_on_failure to True to send an email notification when an operatorin the DAG fails. To send email notifications from a Cloud Composerenvironment, you mustconfigure your environment to use SendGrid.

Airflow 2

from airflow import modelsdefault_dag_args = { "start_date": yesterday, # Email whenever an Operator in the DAG fails. "email": "{{var.value.email}}", "email_on_failure": True, "email_on_retry": False, "retries": 1, "retry_delay": datetime.timedelta(minutes=5), "project_id": project_id,}with models.DAG( "composer_sample_bq_notify", schedule_interval=datetime.timedelta(weeks=4), default_args=default_dag_args,) as dag:

Airflow 1

from airflow import modelsdefault_dag_args = { "start_date": yesterday, # Email whenever an Operator in the DAG fails. "email": "{{var.value.email}}", "email_on_failure": True, "email_on_retry": False, "retries": 1, "retry_delay": datetime.timedelta(minutes=5), "project_id": "{{var.value.gcp_project}}",}with models.DAG( "composer_sample_bq_notify", schedule_interval=datetime.timedelta(weeks=4), default_args=default_dag_args,) as dag:

DAG workflow guidelines

  1. Place any custom Python libraries in a DAG's ZIP archive in a nesteddirectory. Do not place libraries at the top level of the DAGs directory.

    When Airflow scans the dags/ folder, Airflow only checks for DAGs inPython modules that are in the top-level of the DAGs folder and in the toplevel of a ZIP archive also located in the top-level dags/ folder. IfAirflow encounters a Python module in a ZIP archive that does not containboth airflow and DAG substrings, Airflow stops processing the ZIParchive. Airflow returns only the DAGs found up to that point.

  2. Use Airflow 2 instead of Airflow 1.

    The Airflow community does not publish new minor or patch releasesfor Airflow 1 anymore.

  3. For fault tolerance, do not define multiple DAG objects in the same Pythonmodule.

  4. Do not use SubDAGs. Instead,group tasks inside DAGs.

  5. Place files that are required at DAG parse time into dags/ folder, notin the data/ folder.

  6. Implement unit tests for your DAGs

  7. Test developed or modified DAGs as recommended ininstructions for testing DAGs.

  8. Verify that developed DAGs do not increaseDAG parse times too much.

  9. Airflow tasks can fail for multiple reasons. To avoid failures ofwhole DAG runs, we recommend to enable task retries.Setting maximum retries to 0 means that no retries are performed.

    We recommend to override thedefault_task_retries option with a value for the task retires other than 0. In addition, youcan set theretries parameter at the tasklevel.

  10. If you want to use GPU in your Airflow tasks then create a separateGKE cluster based on nodes using machines with GPUs. UseGKEStartPodOperator to run your tasks.

  11. Avoid running CPU- and memory-heavy tasks in the cluster's node pool whereother Airflow components (schedulers, workers, web servers) are running.Instead, use KubernetesPodOperator orGKEStartPodOperator instead.

  12. When deploying DAGs into an environment, upload only the files thatare absolutely necessary for interpreting and executing DAGsinto the /dags folder.

  13. Limit the number of DAG files in /dags folder.

    Airflow is continuously parsing DAGs in /dags folder. The parsing is aprocess that loops through the DAGs folder and the number of files thatneed to be loaded (with their dependencies) makes impacts the performanceof DAG parsing and task scheduling. It is much more efficient to use 100files with 100 DAGs each than 10000 files with 1 DAG each and so suchoptimization is recommended. This optimization is a balance betweenparsing time and efficiency of DAG authoring and management.

    You can also consider, for example, to deploy 10000 DAG files you couldcreate 100 zip files each containing 100 DAG files.

    In addition to hints above, if you have more than 10000 DAG files thengenerating DAGs in a programamtic way might be a good option. For example,you can implement a single Python DAG file that generates some number ofDAG objects (for example, 20, 100 DAG objects).

Avoid using deprecated Airflow operators

Operators listed in the following table are deprecated. Avoid using them inyour DAGs. Instead, use provided up-to-date alternatives.

Deprecated OperatorOperator to Use
BigQueryExecuteQueryOperatorBigQueryInsertJobOperator
BigQueryPatchDatasetOperatorBigQueryUpdateTableOperator
DataflowCreateJavaJobOperatorBeamRunJavaPipelineOperator
DataflowCreatePythonJobOperatorBeamRunPythonPipelineOperator
DataprocScaleClusterOperatorDataprocUpdateClusterOperator
DataprocSubmitPigJobOperatorDataprocSubmitJobOperator
DataprocSubmitSparkSqlJobOperatorDataprocSubmitJobOperator
DataprocSubmitSparkJobOperatorDataprocSubmitJobOperator
DataprocSubmitHadoopJobOperatorDataprocSubmitJobOperator
DataprocSubmitPySparkJobOperatorDataprocSubmitJobOperator
MLEngineManageModelOperatorMLEngineCreateModelOperator, MLEngineGetModelOperator
MLEngineManageVersionOperatorMLEngineCreateVersion, MLEngineSetDefaultVersion, MLEngineListVersions, MLEngineDeleteVersion
GCSObjectsWtihPrefixExistenceSensorGCSObjectsWithPrefixExistenceSensor

FAQs for writing DAGs

How do I minimize code repetition if I want to run the same or similar tasks in multiple DAGs?

We suggest defining libraries and wrappers tominimize the code repetition.

How do I reuse code between DAG files?

Put your utility functions in alocal Python libraryand import the functions. You can reference the functions in any DAG locatedin the dags/ folder in your environment's bucket.

How do I minimize the risk of different definitions arising?

For example, you have two teams that want to aggregate raw data into revenuemetrics. The teams write two slightly different tasks that accomplish the samething. Define librariesto work with the revenue data so that the DAG implementers must clarifythe definition of revenue that's being aggregated.

How do I set dependencies between DAGs?

This depends on how you want to define the dependency.

If you have two DAGs (DAG A and DAG B) and you want DAG B to trigger after DAGA, you can put aTriggerDagRunOperator at the end ofDag A.

If DAG B depends only on an artifact that DAG A generates, such as aPub/Sub message, then a sensor might work better.

If DAG B is integrated closely with DAG A, you might be able to merge the twoDAGs into one DAG.

How do I pass unique run IDs to a DAG and its tasks?

For example, you want to pass Dataproc cluster names and file paths.

You can generate a random unique ID by returning str(uuid.uuid4()) ina PythonOperator. This places the ID intoXComs so that you can refer to the ID in other operators via templated fields.

Before generating a uuid, consider whether a DagRun-specific ID would bemore valuable. You can also reference these IDs in Jinja substitutions byusing macros.

How do I separate tasks in a DAG?

Each task should be an idempotent unit of work. Consequently, you should avoidencapsulating a multi-step workflow within a single task, such as a complexprogram running in a PythonOperator.

Should I define multiple tasks in a single DAG to aggregate data from multiple sources?

For example, you have multiple tables with raw data and want to create dailyaggregates for each table. The tasks are not dependent on each other. Shouldyou create one task and DAG for each table or create one general DAG?

If you are okay with each task sharing the same DAG-level properties, such asschedule_interval, then it makes sense to define multiple tasks in a singleDAG. Otherwise, to minimize code repetition, multiple DAGs can be generatedfrom a single Python module by placing them into the module's globals().

How do I limit the number of concurrent tasks running in a DAG?

For example, you want to avoid exceeding API usage limits/quotas or avoidrunning too many simultaneous processes.

You can defineAirflow pools in the Airflow web UI and associate tasks with existing pools in your DAGs.

FAQs for using operators

Should I use the DockerOperator?

We do not recommend usingthe DockerOperator,unless it's used to launch containers on a remote Docker installation (notwithin an environment's cluster). In a Cloud Composer environmentthe operator does not have access to Docker daemons.

Instead, use KubernetesPodOperator orGKEStartPodOperator. These operators launch Kubernetes pods intoKubernetes or GKE clusters respectively. Note that we don'trecommend launching pods into an environment's cluster, because this can leadto resource competition.

Should I use the SubDagOperator?

We do not recommend using the SubDagOperator.

Use alternatives as suggested in Grouping Tasks instructions.

Should I run Python code only in PythonOperators to fully separate Python operators?

Depending on your goal, you have a few options.

If your only concern is maintaining separate Python dependencies, youcan use the PythonVirtualenvOperator.

Consider using the KubernetesPodOperator. This operator allows youto define Kubernetes pods and run the pods in other clusters.

How do I add custom binary or non-PyPI packages?

You caninstall packages hosted in private package repositories.

You can alsouse the KubernetesPodOperator to run a Kubernetes pod with your own image built with custom packages.

How do I uniformly pass arguments to a DAG and its tasks?

You can use Airflow's built-in support forJinja templating to pass arguments that can be used in templated fields.

When does template substitution happen?

Template substitution occurs on Airflow workers just before the pre_executefunction of an operator is called. In practice, this means that templates arenot substituted until just before a task runs.

How do I know which operator arguments support template substitution?

Operator arguments that support Jinja2 template substitution are explicitlymarked as such.

Look for the template_fields field in the Operator definition,which contains a list of argument names that undergo template substitution.

For example, seethe BashOperator,which supports templating for the bash_command and env arguments.

What's next

Write Airflow DAGs  |  Cloud Composer  |  Google Cloud (2024)
Top Articles
Latest Posts
Article information

Author: Kelle Weber

Last Updated:

Views: 6189

Rating: 4.2 / 5 (53 voted)

Reviews: 92% of readers found this page helpful

Author information

Name: Kelle Weber

Birthday: 2000-08-05

Address: 6796 Juan Square, Markfort, MN 58988

Phone: +8215934114615

Job: Hospitality Director

Hobby: tabletop games, Foreign language learning, Leather crafting, Horseback riding, Swimming, Knapping, Handball

Introduction: My name is Kelle Weber, I am a magnificent, enchanting, fair, joyous, light, determined, joyous person who loves writing and wants to share my knowledge and understanding with you.