Apache Airflow

Introduction

Apache Airflow is one of the most used workflow management tools for data pipelines - both AWS and GCP have a managed Airflow solution in addition to other SaaS offerings (notably Astronomer).

It allows developers to programmatically define and schedule data workflows and monitor them using Python. It is based on directed acyclic graphs (DAGs) concept, where all the different steps (tasks) of the data processing (wait for a file, transform it, ingest it, join with other datasets, process it, etc.) are represented as graph’s nodes.

Each node can be either an “operator”, that is a task doing some actual job (i.e. transform data, load it, etc.), or “sensors”, a task waiting for some event to happen (i.e. a file arrival, a Rest api call, etc.).

In this article we will discuss sensors and tasks controlling external systems and, in particular, the internals of some of the (relatively) new most interesting features, Reschedule sensors, SmartSensors and Deferrable Operators.

Sensors are synchronous by default

Sensors are a special type of Operators designed to wait for an event to occur and then succeed so that their downstream tasks can run.

Sensors are a fundamental building block to create pipelines in Airflow; however, historically, as they share the Operator’s main execution method, they were (and by default still are) synchronous. 

By default, they busy-wait for an event to occur consuming a worker’s slot.

Too many “sensors” busy waiting may, if not well dimensioned, use all the worker’s slots and bring to starvation and deadlocks (if TaskExternalSensor were used for example). Even when enough slots are available, workers may be hogged by tons of sleeping processes.

Working around it

The first countermeasure is to confine sensors in separate pools. This only partially limits the problems.

A more efficient workaround exploits the airflow’s ability to retry failed tasks. Basically, the idea is to make unmet sensor fail if sensing conditions are unmet, and set the sensor’s number of retries and retry delay to account for it, in particular number_of_retries * retry_delay should be equal to the sensor’s timeout. This frees the worker’s slot, making it possible to run other tasks.

This solution works like a charm, it doesn’t require any Airflow code change.

Main drawbacks are:

  • bugs and errors in the sensors may be masked by timeouts, which however may be mitigated by properly written unit tests.
  • Some overhead is added to the scheduler, as such polling intervals may not be too frequent - and a separate process is spawned.

Reschedule mode

Sensor’s reschedule mode is quite similar to the previous workaround.

In practice, sensors have a new “mode” attribute which may have two values, “poke”, the default one, providing the old synchronous behaviour, and “reschedule”.

When mode is set to reschedule:

  • BaseSensorOperator’s “execute” method raises an AirflowRescheduleException when the sensing condition is unmet, containing the reschedule_date
  • This exception is caught by the TaskInstance run method, which persists it in the TaskReschedule table along with id of the task associated with it and updates the task state to “UP_FOR_RESCHEDULE
  • When the TaskInstance run method is called, if it is in “UP_FOR_RESCHEDULE” state, the task is run if the reschedule_date allows it

This approach improves over the above mentioned workaround as it allows to distinguish between actual errors and unmet sensor condition, otherwise shares the same limitations, and lightweight checks are quite resource intensive.

Smart sensors

In parallel to the “reschedule” mode, a “different” approach was proposed in AIP-17, called Smart Sensor, merged in release 2.0.0 and already deprecated and planned to be removed in the next Airflow 2.4.0 release (they’re not in the main branch anymore).

All smart sensor poke-contexts are serialised in the DB and picked up by a separate process, running in special built-in smart sensor DAGs.

I won’t add any additional details on them, as they’ve been replaced by Deferrable Operators.

Smart Sensor were a sensible solution; however, despite considerable changes in airflow code, they have two main pitfails:

  • No High Availability support
  • Sensor’s suspension is a subset of a more generic problem, suspension of tasks - this solution can’t be easily extended to it.

For referece, please refer to AIP-17 here and here.

Deferrable Operators

Deferrable Operators, introduced in AIP-40, are instead a more generic solution: they’re a superset of Smart Sensors, supporting broader Task suspension, and built from the design to be highly-available. Therefore, no surprise they’ve replaced SmartSensors.

Albeit quite elegant, this solution is slightly more complex. To fully understand it, let’s start from a  use case to grasp the solution details.

A typical airflow use-case is to orchestrate jobs running on external systems (for example, a Spark Job runs on Yarn/EMR/…). More and more frequently, those systems offer an asynchronous API returning a job id and a way to poll its status.

Without Deferrable Operators, a common way to implement it is through a custom operator triggering the job in the execute method, getting the job id, and polling for it until it finishes, in a busy-wait loop. One may be tempted to use two separate operators, one for the “trigger” and one for the “poll” calls, anyway this would invalidate the airflow retry mechanism.

Deferrable Operators solve this problem and add to the tasks the ability to suspend themselves. If the polling condition is unmet, task execution may be suspended and resumed after a configurable delay.
Suspension of tasks is achieved by raising a TaskDeferred exception in a deferrable operator. A handy “defer” method is added to the BaseOperator to do it. This exception contains the following information:

  • The function to resume, along with the needed arguments.
  • A Trigger object, containing the details on when to trigger the next run.

The function arguments are a simple way to keep the task state, for example the job_id of the triggered spark job to poll.

Most useful trigger objects are generally time-based, and most commons are already provided by airflow: DateTimeTrigger, triggering at a specific time, and TimeDeltaTrigger, triggering after a delay, so it is generally not necessary to implement them.

Triggers and Triggerer implementation leverages Python’s async library introduced with Python 3.5 (Airflow 2.0.0 requires Python version 3.6 or higher). A trigger extends a BaseTrigger and provides an async-compatible “run” method, which yields control when idle. 

Time based trigger are implemented in a while loop using await asyncio.sleep rather than thread.sleep.

This allows them to coexist with thousands of other Triggers within one process.

Note that, to limit the number of triggers, there is a one-to-many relationship between Triggers and TaskInstances, in particular the same trigger may be shared by multiple tasks.

Let’s see how everything is orchestrated.

When a TaskDeferred exceptions is caught in the run method of TaskInstance, these steps are followed:

  • TaskInstance state is updated to DEFERRED.
  • The method and the arguments to resume the execution of the task are serialised in the TaskInstance (and not in the Trigger), in the next_method and next_kwargs columns table. Task instance is linked to the trigger through a trigger_id attributed to TaskInstance.
  • The Trigger is persisted in the DB, in a separate table, Trigger.

A separate airflow component, the Triggerer,  forming a new continuously-running-process part of an Airflow installation, is in charge of executing the triggers.

This process contains an async event loop which drains all the triggers serialised in the DB and creates all the not-yet-created triggers, running the coroutines concurrently. Thousands of triggers may run at once efficiently.

A trigger does some lightweight check. For example, the DateTimeTrigger verifies that the triggering date is passed; if so, it yields a “TriggerEvent”. 

All events are handled by the Triggerer, and for each TriggerEvent all the corresponding TaskInstance to schedule are picked up, their state is updated from DEFERRED to SCHEDULED.
The TaskInstance run method has been updated to check if the task should resume (it checks if “next_method” is set); if so, it resumes it, otherwise it proceeds as usual.

The availability of the system is increased by allowing multiple Triggerer to run in parallel - this is implemented adding to each Trigger the id of the triggerer in charge of it - and adding a heartbeat to each triggerer, serialised in the DB. Each trigger will pick up only assigned triggers. 

Author: Antonio Barbuzzi, Head of Data Engineering @ Bitrock

Read More
mainframes

Today, mainframes are still widely used in data-centric industries such as Banking, Finance and Insurances. 92 of the world’s top 100 banks rely on these legacy technologies, and it is believed that they account for 90% of all global credit card transactions (source: Skillsoft).

This is suboptimal, since relying on mainframes generates high operational costs, calculated in MIPS (million instructions per second). A large institution can spend more than $16 million per year, estimating the cost for a 15.200 MIPS mainframe (source: Amazon Web Services).

In addition, mainframes come with technical complexities, like the reliance on the 60-year old COBOL programming language. For organizations, this means not only reduced data accessibility and infrastructure scalability, but also the problem of finding skilled COBOL programmers at a reasonable cost - more info here

Moreover, as consumers are now used to sophisticated on-demand digital services  - we could call it the “Netflix effect”, by which everything must be available immediately and everywhere. Thus banking services, such as trading, home banking, and financial reports need to keep the pace and offer reliability and high performances. In order to do that, large volumes of data must be quickly accessed and processed from web and mobile applications: mainframes may not be the answer. 

Mainframe Offloading to the rescue

Mainframe Offloading can solve the conundrum. It entails replicating the mainframe data to a parallel database, possibly open source, that can be accessed in a more agile way saving expensive MIPS. As a sort of “Digital Twin” to the mainframe, the replicated data store can be used for data analysis, applications, cloud services and more. 

This form of database replication provides significant advantages both in flexibility and cost reduction. Whenever an application or a service needs to read customers’ data, it can access the parallel database without having to pay for expensive mainframe MIPS. Moreover, the mere offloading paves the way for a progressive migration to the cloud, e.g. entailing bidirectional replication of information between the open source cloud database and the datacenter.

Offloading data from the mainframe requires middleware tools for migration and integration. Apache Kafka can be leveraged as a reliable solution for event streaming and data storage, thanks to its distributed and replicated log capabilities. It can integrate different data sources into a scalable architecture with loosely coupled components. 

Alongside the event streaming platform, CDC (Change Data Capture) tools are also to be considered to push data modifications from the mainframe into the streaming platform. CDC is a software process that automatically identifies and tracks updates in a database. It allows to overcome the limitations of batch data processing in favour of a near-real time transfer. While IBM and Oracle offer proprietary CDC tools, such as InfoSphere Data Replication and Oracle Golden Gate,  3rd party and open-source solutions are also available, like Qlik Data Integration (formerly known as Attunity) and Debezium

From Offloading to Replacement

As a heuristic process for perfectibility, Mainframe Offloading can also be seen as a starting point to mainframe replacement proper, with both applications and mission-critical core banking systems running in the cloud. This would mean that the expensive monolithic architecture gives way to modernization and future-proof, cloud native solutions.

Yet, replacing a mainframe is not an easy nor a quick task. In his blog article “Mainframe Integration, Offloading and Replacement with Apache Kafka”, Kai Waehner hypothesizes a gradual 5-year plan. First, Kafka is used for decoupling between the mainframe and the already-existing applications. Then, new cloud-based applications and microservices are built and integrated in the infrastructure. Finally, some or even all mainframe applications and mission-critical functionalities are replaced with modern technology.

It must be said that it is often not possible to switch off mainframes altogether. For larger institutions, such as major banks, the costs and inconveniences of a full migration may be just too high. Realistically speaking, the most effective scenario would be a hybrid infrastructure in which certain core banking functionalities remain tied to the mainframe, and others are migrated to a multi-cloud infrastructure.

How Bitrock can help

Given the complexity of the operation, it is fundamental to work with a specialized partner with thorough expertise with offloading and legacy migration. In Bitrock we have worked along with major organizations to help them modernize the infrastructure, save costs and support their cloud native transition. By way of example, we have carried out a mainframe offloading project for an important consumer credit company, transferring data from a legacy DB2 to a newer Elastic database. Thanks to the Confluent platform and a CDC system, data are now intercepted and pushed in real time from the core system to the front-end database, enabling advanced use cases

If you want to know more about this success story or how we can help you with your journey from legacy to cloud, please do not hesitate to contact us!

Read More

These last couple of years have taught an important lesson to all Data & Analytics specialists: agility is the key. Being able to pivot between different design patterns and approaches is increasingly important to thrive through supply chain volatility, accelerated digitalization, and disruption of business operations.

 To turn these challenges into opportunities, and stay ahead of competition, companies must revise antiquate models based on centralized, static data. The centrifugal shift towards distributed architectures and multi-cloud infrastructures, emerged a few years ago, has today found its cultural equivalent in new, decentralized approaches to Data & Analytics. At the same time, the possibility to analyze data in motion in a dynamic manner allows to integrate actionable insights into decision making and business operations.

 Let’s take a look at some of the most interesting Data & Analytics trends that have emerged or consolidated recently, and how these can create value for organizations in the next future.

Small & Wide Data

We have come to realize that Big Data is not always the answer. Accumulating information can lead to data sourcing and quality issues, plus requiring the implementation of deep learning analytical techniques whose cost and complexity may outweigh the results. We have also seen how quickly data can become irrelevant – companies run the risk of hoarding stale, useless information that cannot provide significant value.

Small & Wide Data have emerged as innovative approaches to enable the generation of valuable insights via less voluminous, more varied data. The former approach eschews data-hungry models in favor of tailored analytical techniques relying on limited amounts of data. The latter leverages the integration of heterogeneous sources, both structured and unstructured, instead of a larger single one.

 Small & Wide Data can enable the access to advanced analytics and AI for smaller players, which cannot rely on enough information for conventional Big Data techniques. But bigger companies can also benefit from these approaches. As Gartner suggests, 70% of organizations will shift their focus from big to Small and Wide data by 2025.

Data Mesh

The current shifts towards decentralization and microservices can be said to underpin the very notion of Data Mesh. First introduced by Zhamak Dehghani in her 2019 article “How to Move Beyond a Monolithic Data Lake to a Distributed Data Mesh”, it purports to overcome the limitations of gargantuan Data Lakes and their reliance on hyper-specialized teams and financially-untenable ETL pipelines.

 By contrast, Data Mesh can be seen as an organizational and architectural model that allows for a distributed, domain-driven data ownership. This ubiquitous data platform empowers cross-functional teams to operate independently, while offering greater flexibility and interaction between distributed datasets.

It is worth noting that the distributed Data Mesh architecture stems from a paradigm shift, rather than a Copernican revolution. It does not ditch altogether data lake advantages and principles – centralization is in fact retained for governance and open standards – but evolves them to increase business agility and reduce time-to-market.

Image by Zhamak Dehghani via martinfowler.com

Continuous Intelligence

Continuous Intelligence leverages Event Stream Processing and Real-time Analytics to integrate actionable insights into decision-making and business processes. This design pattern turns analytics into a prescriptive practice: ingest large volumes of data in motion – from sources like IoT, eCommerce transactions, traffic, weather – and leverage them to augment or even automate human decisions.

CI enables companies to analyze data on the fly, identify trends and root causes, and make real-time decisions that allow strategic differentiation in competitive, saturated markets. It is a transformative model that provides a plethora of opportunities – from detecting fraud in finance and improving customer experience in retail to implementing predictive maintenance in manufacturing and more. CI can be also employed to connect different branches and departments in a company, to share & leverage data in real time, optimize decision-making and thus increase productivity.

Thanks to its partnership with Radicalbit, Bitrock can integrate its D&A consulting services with Helicon, a cutting-edge Continuous Intelligence platform. This code-free SaaS solution enhances the management of data streaming pipelines with Machine Learning, dramatically accelerating the development of real time advanced analytics (descriptive, diagnostic, predictive and prescriptive). The platform offers efficient features such as a stream processing pipelines visual editor, with debugging capabilities, data exploration, and real-time ML monitoring, enabling the adoption of the Continuous Intelligence paradigm.

Analytics at the Edge

Speaking of IoT, a recent D&A trend concerns the decentralization of the very location in which the collection and analysis of data takes place. Edge Analytics means distributing information, analytics and technology closer to – or possibly within – the physical assets, i.e. the edge. In other words, it entails the possibility of avoiding in part or altogether the transfer to data centers and cloud environments, increasing the flexibility of the whole data infrastructure.  

It is a growing trend – Gartner foresees that, by 2023, more than 50% “primary responsibility of data and analytics leaders will comprise data created, managed and analyzed in edge environments”. The reasons are multiple: for instance, provisioning analytics to the edge can have a positive impact on the speed in which data is processed, with actionable insights being generated in real-time. Stability is another case in point: avoiding data transfer means less disruption from connectivity issues. Finally, we have to consider compliance – leaving data “where it is” reduces the headaches deriving from different national regulations and governance policies.

For these reasons, Analytics at the Edge can bring significant benefits to a wide array of applications. Automotive risk mitigation is, for instance, a business case in which analyzing data in real time is fundamental to avoid collisions or breakdowns. Healthcare, on the other hand, can simplify the management of personal, sensitive data if this is not moved to cloud services or data centers located under different jurisdictions.

Data Democracies

The notion of Data Democracy concerns the creation of an ethical and methodological framework that removes the technological barriers to informed data management. It revolves around the principle that people, regardless of their technical know-how, should be able to access and trust available information during their daily operations.

The democratization of data impacts any kind of business organization, and bears upon both personnel and technology. Lowering the barrier to data means first of all offering upskilling programs aimed at data literacy development, whatever the function or seniority within the company. It also means rethinking data silos in favor of more flexible and transparent architectural models, such as Data Meshes (see above). Finally, it entails implementing efficient analytics and Business Intelligence tools on a company-wide level. One example is Sense, by our partner Qlik, that enables advanced, ML-powered analytics while helping develop data literacy.

As a real cultural shift, a Data Democracy can offer significant benefits to a company’s internal operations. It empowers non-technical employees to make fast, informed decisions without the support of IT or data experts – think of how this can help Product, Marketing, Sales team generate more value and save resources. Moreover, developing a corporate data culture may have a positive impact on an organization’s relationship with its stakeholder and the public at large. Data ethics informs data governance policies that promote privacy, cybersecurity, and a righteous management of customer data.


These are only some of the opportunities offered by the latest trends in Data & Analytics. If you want to know how Bitrock can help your company evolve its data strategy, and stay ahead of competition with cutting-edge solutions, send us a message – we’ll be happy to book a call!

Author: Daniele Croci, Digital Marketing Specialist @ Bitrock

Read More
PNRR Bitrock

Non c’è alternativa”, recitava un vecchio slogan politico che portò alla creazione del governo più duraturo del Novecento. Oggi, il medesimo mantra si può applicare alle molteplici necessità di innovazione e digitalizzazione del tessuto produttivo italiano, che si (ri)affaccia sul mercato globale al termine, si spera, della crisi pandemica già affardellato da decennali cali di produttività e competitività. Per chi vuole prosperare nuovo scenario, il cambiamento tecnologico rappresenta un principio cogente.

In questo senso, il Piano Nazionale di Ripresa e Resilienza (PNRR) costituisce una opportunità rilevante. Elaborato in risposta alla grave crisi economica e sociale innescata dal Covid19, prevede l’allocazione di 191,5 miliardi di euro in una serie di interventi atti a rilanciare la fragile economia italiana e stimolare l’occupazione. Gli ambiti spaziano dallo sviluppo della mobilità sostenibile, alla transizione ecologica e all’inclusione di gruppi sociali ulteriormente marginalizzati dalla precarietà lavorativa.

Transizione digitale 4.0 per il sistema Italia

La prima missione del PNRR mette al centro “Digitalizzazione, Innovazione, Competitività, Cultura e Turismo”, valorizzando i concetti chiave che fungono da leitmotiv per l’intero Recovery Plan. Prevede lo stanziamento di 40,32 miliardi di euro per un programma di transizione digitale che interessa sia il settore pubblico sia quello privato. 

L’obiettivo è quello di sostenere lo sviluppo e la capacità competitiva di un sistema paese che, al momento, si posizione al 25mo posto (su 28) nel Digital Economy and Society Index (DESI). Come ricorda il PNRR (pag. 83), tale arretratezza fa il paio con il calo di produttività che ha caratterizzato l’economia italiana nell’ultimo ventennio, a fronte di una tendenza positiva nel resto del continente europeo. Questa contrazione è sovente legata alla ridotta innovazione digitale delle piccole e medie imprese, che rappresentano il 92% delle aziende e impiegano l’82% dei lavoratori in Italia (Il Sole 24 Ore).

La missione si articola in tre componenti:

  1. Digitalizzazione, Innovazione e Sicurezza nella PA (9,75 Mrd)
  2. Digitalizzazione, Innovazione e Competitività del Sistema Produttivo (23,89 Mrd)
  3. Turismo e Cultura (6,68 Mrd)

Vediamo nel dettaglio il secondo punto, cui è dedicato uno dei maggiori investimenti del PNRR.

Digitalizzazione, Innovazione e Competitività del Sistema Produttivo: come funziona

Il programma per il settore privato si prefigge, nelle parole del documento, di rafforzare “la politica di incentivazione fiscale già in corso (studiata per colmare il gap di “digital intensity” del nostro sistema produttivo verso il resto d’Europa – minori investimenti valutabili in due punti di Pil – specie nella manifattura e nelle PMI), che ha avuto effetti positivi sia sulla digitalizzazione delle imprese che sull’occupazione, soprattutto giovanile e nelle nuove professioni” (pag. 98).

Prevede una serie di investimenti e riforme che hanno l’obbiettivo di potenziare la digitalizzazione, innovazione tecnologica e internazionalizzazione del tessuto produttivo e imprenditoriale, con un occhio specifico alle PMI che maggiormente risentono del clima di volatilità contemporanea. 

All'interno del PNRR, il piano di investimento “Transizione 4.0” costituisce un’evoluzione del già noto programma Industria 4.0 del 2017, di cui viene allargato il novero delle aziende potenzialmente beneficiarie. Prevede tra le altre cose l’erogazione di un credito di imposta per società che decidono di investire in

  1. Beni capitali, materiali e immateriali
  2. Ricerca, sviluppo e innovazione
  3. Attività di formazione alla digitalizzazione e di sviluppo delle relative competenze

La prima voce riguarda l’investimento per strumenti “direttamente connessi alla trasformazione digitale dei processi produttivi” – i cosiddetti Beni 4.0 già indicati negli allegati A e B alla legge 232 del 2016 –, e “beni immateriali di natura diversa, ma strumentali all’attività dell’impresa (pag. 99)

Se il primo allegato dettaglia una serie di componenti hardware, tra cui macchinari, utensili e sistemi di monitoraggio, il secondo si concentra su soluzioni software ad alto tasso tecnologico che possono sostenere le aziende in un percorso di crescita scalabile e sostenibile.

Le applicazioni possibili

Integrati all’interno di una visione strategica, le soluzioni hardware e software menzionate nel PNRR possono trovare applicazione in una serie di ambiti, tra cui:

  • La transizione verso il paradigma Cloud Native, un approccio che sfrutta le tecnologie del Cloud Computing per progettare e implementare applicazioni sulla base dei principi di flessibilità, adattabilità, efficienza e resilienza. Grazie a strumenti metodologici e tecnologici come DevOps, container e microservizi, il Cloud Native permette di ridurre il time to market e sostenere l’evoluzione agile dell’intero ecosistema aziendale.
  • La valorizzazione del patrimonio informativo aziendale attraverso l’implementazione di sistemi di Data Analysis in tempo reale, IIoT (Industrial Internet of Things) e Data Streaming che, combinati con Machine Learning e Intelligenza Artificiale, possono essere sfruttati per la manutenzione predittiva, con un evidente ottimizzazione dei costi. Rientrano in questo ambito anche i Digital Twin, le copie virtuali di risorse o processi industriali che permettono di sperimentare in vitro nuove soluzioni e prevenire malfunzionamenti.
  • La cybersecurity, sempre più centrale in un contesto di crescente digitalizzazione di processi e servizi, e di crescente interdipendenza di attori nazionali e stranieri, pubblici e privati all’interno della catena del valore digitale.

Questi percorsi di maturazione digitale possono essere rilevanti sia per le grandi realtà, sia per le PMI che faticano maggiormente a tenere il passo con l’evoluzione tecnologica e la competizione internazionale. Lo sforzo è premiato: come riporta l’Osservatorio innovazione digitale PMI del Politecnico di Milano, le aziende medie e piccole digitalizzate riportano in media un incremento del 28% nell’utile netto, con il margine di profitto più alto del 18% (La Repubblica).

Perché quindi le aziende non digitalizzano? Il problema, spesso, è nella mancanza di personale qualificato. La carenza di staff qualificato affligge il 42% delle PMI italiane (La Repubblica), e la cifra sale al 70% se prendiamo in esame l’intero tessuto produttivo europeo (Commissione Europea). Un altro possibile fattore bloccante concerne la renitenza all’abbandono o evoluzione di sistemi legacy già consolidati all’interno dei processi aziendali.

Questi sono solo alcuni dei motivi per cui è fondamentale affiancarsi a un partner qualificato, che possa accompagnare l’azienda nella pianificazione degli investimenti tecnologici e digitali resi possibili dal PNRR (e non solo).

Bitrock ha competenze certificate ed esperienza internazionale per offrire soluzioni su misura che innovano l’ecosistema tecnologico e digitale, mantenendo gli investimenti legacy del cliente. Il know-how specializzato in ambito DevOps, Software Engineering, UX&Front-End e Data&Analytics è la chiave per affrontare il percorso di evoluzione digitale, con al centro i valori di semplificazione e automazione che generano valore duraturo.

Per conoscere nel dettaglio come possiamo supportare la tua azienda, contattaci subito!

Read More