Apache Airflow


Apache Airflow is one of the most used workflow management tools for data pipelines - both AWS and GCP have a managed Airflow solution in addition to other SaaS offerings (notably Astronomer).

It allows developers to programmatically define and schedule data workflows and monitor them using Python. It is based on directed acyclic graphs (DAGs) concept, where all the different steps (tasks) of the data processing (wait for a file, transform it, ingest it, join with other datasets, process it, etc.) are represented as graph’s nodes.

Each node can be either an “operator”, that is a task doing some actual job (i.e. transform data, load it, etc.), or “sensors”, a task waiting for some event to happen (i.e. a file arrival, a Rest api call, etc.).

In this article we will discuss sensors and tasks controlling external systems and, in particular, the internals of some of the (relatively) new most interesting features, Reschedule sensors, SmartSensors and Deferrable Operators.

Sensors are synchronous by default

Sensors are a special type of Operators designed to wait for an event to occur and then succeed so that their downstream tasks can run.

Sensors are a fundamental building block to create pipelines in Airflow; however, historically, as they share the Operator’s main execution method, they were (and by default still are) synchronous. 

By default, they busy-wait for an event to occur consuming a worker’s slot.

Too many “sensors” busy waiting may, if not well dimensioned, use all the worker’s slots and bring to starvation and deadlocks (if TaskExternalSensor were used for example). Even when enough slots are available, workers may be hogged by tons of sleeping processes.

Working around it

The first countermeasure is to confine sensors in separate pools. This only partially limits the problems.

A more efficient workaround exploits the airflow’s ability to retry failed tasks. Basically, the idea is to make unmet sensor fail if sensing conditions are unmet, and set the sensor’s number of retries and retry delay to account for it, in particular number_of_retries * retry_delay should be equal to the sensor’s timeout. This frees the worker’s slot, making it possible to run other tasks.

This solution works like a charm, it doesn’t require any Airflow code change.

Main drawbacks are:

  • bugs and errors in the sensors may be masked by timeouts, which however may be mitigated by properly written unit tests.
  • Some overhead is added to the scheduler, as such polling intervals may not be too frequent - and a separate process is spawned.

Reschedule mode

Sensor’s reschedule mode is quite similar to the previous workaround.

In practice, sensors have a new “mode” attribute which may have two values, “poke”, the default one, providing the old synchronous behaviour, and “reschedule”.

When mode is set to reschedule:

  • BaseSensorOperator’s “execute” method raises an AirflowRescheduleException when the sensing condition is unmet, containing the reschedule_date
  • This exception is caught by the TaskInstance run method, which persists it in the TaskReschedule table along with id of the task associated with it and updates the task state to “UP_FOR_RESCHEDULE
  • When the TaskInstance run method is called, if it is in “UP_FOR_RESCHEDULE” state, the task is run if the reschedule_date allows it

This approach improves over the above mentioned workaround as it allows to distinguish between actual errors and unmet sensor condition, otherwise shares the same limitations, and lightweight checks are quite resource intensive.

Smart sensors

In parallel to the “reschedule” mode, a “different” approach was proposed in AIP-17, called Smart Sensor, merged in release 2.0.0 and already deprecated and planned to be removed in the next Airflow 2.4.0 release (they’re not in the main branch anymore).

All smart sensor poke-contexts are serialised in the DB and picked up by a separate process, running in special built-in smart sensor DAGs.

I won’t add any additional details on them, as they’ve been replaced by Deferrable Operators.

Smart Sensor were a sensible solution; however, despite considerable changes in airflow code, they have two main pitfails:

  • No High Availability support
  • Sensor’s suspension is a subset of a more generic problem, suspension of tasks - this solution can’t be easily extended to it.

For referece, please refer to AIP-17 here and here.

Deferrable Operators

Deferrable Operators, introduced in AIP-40, are instead a more generic solution: they’re a superset of Smart Sensors, supporting broader Task suspension, and built from the design to be highly-available. Therefore, no surprise they’ve replaced SmartSensors.

Albeit quite elegant, this solution is slightly more complex. To fully understand it, let’s start from a  use case to grasp the solution details.

A typical airflow use-case is to orchestrate jobs running on external systems (for example, a Spark Job runs on Yarn/EMR/…). More and more frequently, those systems offer an asynchronous API returning a job id and a way to poll its status.

Without Deferrable Operators, a common way to implement it is through a custom operator triggering the job in the execute method, getting the job id, and polling for it until it finishes, in a busy-wait loop. One may be tempted to use two separate operators, one for the “trigger” and one for the “poll” calls, anyway this would invalidate the airflow retry mechanism.

Deferrable Operators solve this problem and add to the tasks the ability to suspend themselves. If the polling condition is unmet, task execution may be suspended and resumed after a configurable delay.
Suspension of tasks is achieved by raising a TaskDeferred exception in a deferrable operator. A handy “defer” method is added to the BaseOperator to do it. This exception contains the following information:

  • The function to resume, along with the needed arguments.
  • A Trigger object, containing the details on when to trigger the next run.

The function arguments are a simple way to keep the task state, for example the job_id of the triggered spark job to poll.

Most useful trigger objects are generally time-based, and most commons are already provided by airflow: DateTimeTrigger, triggering at a specific time, and TimeDeltaTrigger, triggering after a delay, so it is generally not necessary to implement them.

Triggers and Triggerer implementation leverages Python’s async library introduced with Python 3.5 (Airflow 2.0.0 requires Python version 3.6 or higher). A trigger extends a BaseTrigger and provides an async-compatible “run” method, which yields control when idle. 

Time based trigger are implemented in a while loop using await asyncio.sleep rather than thread.sleep.

This allows them to coexist with thousands of other Triggers within one process.

Note that, to limit the number of triggers, there is a one-to-many relationship between Triggers and TaskInstances, in particular the same trigger may be shared by multiple tasks.

Let’s see how everything is orchestrated.

When a TaskDeferred exceptions is caught in the run method of TaskInstance, these steps are followed:

  • TaskInstance state is updated to DEFERRED.
  • The method and the arguments to resume the execution of the task are serialised in the TaskInstance (and not in the Trigger), in the next_method and next_kwargs columns table. Task instance is linked to the trigger through a trigger_id attributed to TaskInstance.
  • The Trigger is persisted in the DB, in a separate table, Trigger.

A separate airflow component, the Triggerer,  forming a new continuously-running-process part of an Airflow installation, is in charge of executing the triggers.

This process contains an async event loop which drains all the triggers serialised in the DB and creates all the not-yet-created triggers, running the coroutines concurrently. Thousands of triggers may run at once efficiently.

A trigger does some lightweight check. For example, the DateTimeTrigger verifies that the triggering date is passed; if so, it yields a “TriggerEvent”. 

All events are handled by the Triggerer, and for each TriggerEvent all the corresponding TaskInstance to schedule are picked up, their state is updated from DEFERRED to SCHEDULED.
The TaskInstance run method has been updated to check if the task should resume (it checks if “next_method” is set); if so, it resumes it, otherwise it proceeds as usual.

The availability of the system is increased by allowing multiple Triggerer to run in parallel - this is implemented adding to each Trigger the id of the triggerer in charge of it - and adding a heartbeat to each triggerer, serialised in the DB. Each trigger will pick up only assigned triggers. 

Author: Antonio Barbuzzi, Head of Data Engineering @ Bitrock