Un viaggio attraverso Synchronous Sensors, Reschedule Mode, Smart Sensors e Deferrable Operators in Airflow

Apache Airflow

Introduzione

Apache Airflow è uno degli strumenti di gestione workflow più utilizzati per le pipeline di dati – sia AWS che GCP hanno una soluzione Airflow gestita, oltre ad altre offerte SaaS (in particolare Astronomer).

Consente agli sviluppatori di definire e organizzare in modo programmatico i flussi di lavoro dei dati e di monitorarli utilizzando Python. Si basa sul concetto di grafi aciclici diretti (DAG), in cui tutte le diverse fasi (task) dell’elaborazione dei dati (attesa di un file, trasformazione, ingestione, unione con altri dataset, elaborazione, ecc.) sono rappresentati da nodi del grafo.

Ogni nodo può essere un “operatore”, cioè un’attività che svolge un lavoro effettivo (ad esempio, trasformare i dati, caricarli, ecc.), oppure un “sensore”, un’attività che attende che si verifichi un evento (ad esempio, l’arrivo di un file, una chiamata all’API Rest, ecc.)

In questo articolo parleremo dei sensori e dei task che controllano sistemi esterni e, in particolare, degli aspetti interni di alcune delle (relativamente) nuove funzionalità più interessanti, i Reschedule sensor, gli SmartSensor e i Deferrable Operator.

I Sensori sono sincroni di default

I sensori sono un tipo speciale di operatori progettati per aspettare che si verifichi un evento e poi avere successo in modo che i loro compiti a valle possano essere eseguiti.

I sensori sono un elemento fondamentale per creare pipeline in Airflow; tuttavia, storicamente, poiché derivano da Operator e condividono il metodo “main”, erano (e sono tuttora) sincroni.

Per impostazione predefinita, attendono che si verifichi un evento che consumi lo slot di un worker.

Troppi “sensori” in attesa possono, se non ben dimensionati, utilizzare tutti gli slot di un worker e portare a starvation e deadlock (se si usa TaskExternalSensor, per esempio). Anche quando sono disponibili abbastanza slot, i worker possono essere monopolizzati da tonnellate di processi in attesa.

Workingaround

Il primo workaround consiste nel confinare i sensori in pool separati. Questo limita solo parzialmente i problemi.

Una soluzione più efficiente sfrutta la capacità di airflow di rieseguire i task falliti. Fondamentalmente, l’idea è di far fallire un sensore se le condizioni di rilevamento non sono soddisfatte e di impostare il numero di retries e il retry delay per tenerne conto, in particolare number_of_retries * retry_delay dovrebbe essere uguale al timeout del sensore. In questo modo si libera lo slot del worker, rendendo possibile l’esecuzione di altri task.

Questa soluzione funziona a meraviglia e non richiede alcuna modifica del codice di Airflow.

I principali svantaggi sono

  • bug ed errori nei sensori possono essere mascherati dai timeout, che tuttavia possono essere mitigati da unit test scritti correttamente.
  • Si aggiunge un po’ di overhead allo scheduler, dato che gli intervalli di polling non possono essere troppo frequenti, e viene fatto lo spawn di un processo separato per ogni check.

Reschedule mode

Il reschedule mode del sensore è abbastanza simile al precedente workaround.

In pratica, i sensori hanno un nuovo attributo “mode” che può avere due valori: “poke”, quello predefinito, che fornisce il vecchio comportamento sincrono, e “reschedule”.

Quando la modalità è impostata su reschedule:

  • Il metodo “execute” di BaseSensorOperator solleva un’eccezione AirflowRescheduleException quando la condizione di rilevamento non è soddisfatta, contenendo la reschedule_date.
  • Questa eccezione viene catturata dal metodo run di TaskInstance, che la memorizza nella tabella TaskReschedule insieme all’id del task associato e aggiorna lo stato del task a “UP_FOR_RESCHEDULE“.
  • Quando viene richiamato il metodo run della TaskInstance, se questa si trova nello stato “UP_FOR_RESCHEDULE”, l’attività viene eseguita se la reschedule date lo consente.

Questo approccio migliora rispetto al workaround di cui sopra, in quanto consente di distinguere tra errori effettivi e condizioni del sensore non soddisfatte, ma presenta le stesse limitazioni e i controlli semplici sono piuttosto impegnativi dal punto di vista delle risorse.

Sensori Smart

Parallelamente alla modalità “reschedule”, in AIP-17 è stato proposto un approccio “diverso”, chiamato Smart Sensor, confluito nella release 2.0.0 e già deprecato e destinato a essere rimosso nella prossima release di Airflow 2.4.0 (non è più presente nel ramo principale).

Tutti i poke-context dei sensori intelligenti sono serializzati nel DB e prelevati da un processo separato, in esecuzione in speciali DAG built-in per i sensori smart.

Non aggiungerò ulteriori dettagli su di essi, poiché sono stati sostituiti dai Deferrable Operators.

I sensori smart rappresentano una soluzione ragionevole; tuttavia, nonostante le notevoli modifiche apportate al codice del flusso d’aria, presentano due difetti principali:

  • Nessun supporto per l’high-availability
  • La sospensione dei sensori è un sottoinsieme di un problema più generico, la sospensione dei task, per cui questa soluzione non può essere facilmente estesa.

Per i riferimenti, si veda AIP-17 qui e qui.

Deferrable Operators

I Derferrable Operators, introdotti in AIP-40, sono invece una soluzione più generica: sono un superset di Smart Sensors, supportano una più ampia sospensione dei task e sono stati progettati per offrire high-availablity. Non sorprende quindi che abbiano sostituito gli SmartSensor.

Anche se molto elegante, questa soluzione è leggermente più complessa. Per comprenderla appieno, partiamo da un caso d’uso per cogliere i dettagli della soluzione.

Un tipico caso d’uso di Airflow è l’orchestrazione di task su sistemi esterni (ad esempio, un job Spark viene eseguito su Yarn/EMR/…). Sempre più spesso, questi framework offrono un’API asincrona che restituisce l’id del task e un modo per verificarne lo stato.

In assenza di operatori differibili, un modo comune di implementarlo è attraverso un operatore custom che lancia il job nel metodo execute, ottiene l’id del job stesso e lo interroga finché non termina, in un ciclo busy-wait. Si potrebbe essere tentati di usare due operatori separati, uno per le chiamate “trigger” e uno per le chiamate “poll”, ma questo invaliderebbe il meccanismo di retry di Airflow.

Gli operatori differibili risolvono questo problema e aggiungono ai task la capacità di autosospendersi. Se la condizione di polling non è soddisfatta, l’esecuzione del task può essere sospesa e ripresa dopo un delay configurabile.

La sospensione dei task si ottiene sollevando un’eccezione TaskDeferred in un operatore Deferrable. A tale scopo, all’Operatore Base è stato aggiunto un pratico metodo “defer”. Questa eccezione contiene le seguenti informazioni:

  • La funzione da riprendere, assieme agli argomenti necessari.
  • Un oggetto Trigger, contenente i dettagli su quando attivare la prossima esecuzione.

Gli argomenti della funzione sono un modo semplice per mantenere lo stato del task, ad esempio il job_id del job Spark innescato da interrogare.

Gli oggetti trigger più utili sono generalmente basati sul tempo e la maggior parte di essi sono già forniti da Airflow: DateTimeTrigger, che si attiva in un momento specifico, e TimeDeltaTrigger, che si attiva dopo un ritardo, quindi in genere non è necessario implementarli.

L’implementazione dei Trigger e dei Triggerer sfrutta la libreria asincrona di Python introdotta con Python 3.5 (Airflow 2.0.0 richiede Python versione 3.6 o superiore). Un trigger estende un BaseTrigger e fornisce un metodo “run” compatibile con async, che restituisce il controllo quando è inattivo.

I trigger basati sul tempo sono implementati in un ciclo while utilizzando await asyncio.sleep piuttosto che thread.sleep.

Questo permette loro di coesistere con migliaia di altri trigger all’interno di un processo.

Si noti che, per limitare il numero di trigger, esiste una relazione uno-a-molti tra Trigger e TaskInstances, in particolare lo stesso trigger può essere condiviso da più task.

Vediamo come viene orchestrato il tutto.

Quando un’eccezione TaskDeferred viene catturata nel metodo run di TaskInstance, vengono seguiti i seguenti passi:

  • Lo stato di TaskInstance viene aggiornato a DEFERRED.
  • Il metodo e gli argomenti per riprendere l’esecuzione del task sono serializzati nella TaskInstance (e non nel Trigger), nella tabella delle colonne next_method e next_kwargs. L’istanza del task è collegata al trigger attraverso un trigger_id attribuito alla TaskInstance.
  • Il Trigger è conservato nel DB, in una tabella separata, Trigger.

Un componente separato di Airflow, il Triggerer, che costituisce un nuovo processo in esecuzione continua di un’installazione di Airflow, è responsabile dell’esecuzione dei trigger.

Questo processo contiene un ciclo di eventi asincroni che svuota tutti i trigger serializzati nel DB e crea tutti i trigger non ancora creati, eseguendo le coroutine in modo simultaneo. Migliaia di trigger possono essere eseguiti contemporaneamente in modo efficiente.

Un trigger esegue un controllo semplice e leggero. Ad esempio, DateTimeTrigger verifica che la data di attivazione sia passata; in caso affermativo, produce un “TriggerEvent“.

Tutti gli eventi vengono gestiti dal Triggerer e per ogni TriggerEvent vengono prelevate tutte le TaskInstance corrispondenti da programmare, il cui stato viene aggiornato da DEFERRED a SCHEDULED.

Il metodo di esecuzione della TaskInstance è stato aggiornato per verificare se l’attività deve essere ripresa (controlla se “next_method” è impostato); in caso affermativo, la riprende, altrimenti procede come al solito.

La disponibilità del sistema è aumentata consentendo a più Trigger di funzionare in parallelo – questo viene implementato aggiungendo a ogni Trigger l’id del Trigger che se ne occupa – e aggiungendo a ogni Trigger un heartbeat, serializzato nel DB. Ogni trigger raccoglierà solo i trigger assegnati.


Autore: Antonio Barbuzzi, Head of Data Engineering @ Bitrock

Vuoi saperne di più sui nostri servizi? Compila il modulo e fissa un incontro con il nostro team!