The Kafka Summit 2023, held recently, brought together a diverse group of professionals, enthusiasts, and experts in the field of data streaming and event-driven architectures. This year’s summit was an exceptional gathering, filled with insightful discussions, cutting-edge demonstrations, and valuable networking opportunities. Of course, the Bitrock’s Engineering team couldn’t miss to attend it and to share the key insights from the event.
During the keynote presentation, Jay Kreps, Confluent Co-founder & CEO, presented a rundown of enhancements coming to Kafka over the next year and beyond.
After the Zookeeper removal in favor of KRaft (KIP-866) available from Confluent Platform 7.4.0, another big surprise announced is the KIP-932 Queues for Kafka which allows many consumers to read from the same partition, enabling use cases like the classic pub sub-queues. This will be made possible thanks to the introduction of share groups and acknowledgment of single records in Kafka Consumer protocol.
Jay also unveiled Confluent’s Kora Engine, the Apache Kafka engine built for the cloud. Kora is the engine that powers Confluent Cloud as a cloud-native, 10x Kafka service, bringing GBps+ elastic scaling, guaranteed reliability, and predictable low latency to 30K+ clusters worldwide.
Another important announcement made at Kafka Summit 2023 in London, is the upcoming Apache Flink-powered stream processing offering in Confluent Cloud, expected in winter 2023. The recent acquisition of Immerok by Confluent has positioned this data streaming giant to offer both streaming storage (via Apache Kafka) and streaming computation (via Apache Flink) capabilities.
The current rebalancing protocol has different issues, one is definitely that most of the logic is client side (fat client), for example, session timeouts and intervals are defined client side, but its main pain point is that the current protocol will stop processing new messages (strop the world) when executing a rebalancing. Fault group members might cause issues to the whole consumer group. The new protocol is based on three design goals in mind: Server side, Consumer Protocol and Incremental. The new reconciliation protocol has 3 main phases:
The group coordinator server side will receive the partition assignment from the members and compute the new assignment for the partitions due to a new member.
The group coordinator communicates which partitions should be revoked and the consumer acknowledges.
The partition can be assigned to the new member of the consumer group.
During the evening party, our colleagues enjoyed a beer and international foods while attending the performance by Sam Aaron, Live Coding Musician and Creator of Sonic Pi with his futuristic music sets improvised through the manipulation of live code.
The Kafka Summit 2023 was an outstanding event that showcased the advancements and future directions of the Kafka ecosystem which continues to be a driving force in enabling real-time data streaming and event-driven architectures in an increasingly data-centric world.
Last month we had the chance to attend the amazing Kafka Summit 2022 event organized by Confluent, one of Bitrock’s key Technology Partners.
Over 1500 people attended the event, which took place at the O2 in east London over two days of workshops, presentations, and networking.
Lots of news was given regarding Kafka, the Confluent Platform, Confluent Cloud, and the ecosystem altogether. An incredible opportunity to meet so many enthusiasts of this technology and discuss what is currently happening and what is on the radar for the upcoming future.
Modern Data Flow: Data Pipelines Done Right
The opening keynote of the event was hosted by Jay Kreps (CEO @ Confluent). The main topic (no pun intended :D) of the talk revolved around modern data flow and the growing need to process and move data in near real time.
From healthcare to grocery delivery, a lot of applications and services we use everyday are based on streaming data: in this scenario, Kafka stands as one of the main and most compelling technologies. The growing interest in Kafka is confirmed by the numerous organizations that are currently using it (more than 100.000 companies) and by the amount of interest and support that the project is receiving. The community is growing year after year: Kafka meetups are very popular and numerous people express a lot of interest about it, as proved by the countless questions asked on a daily basis on StackOverflow and the big amount of Jira tickets opened on the Apache Kafka project.
Of course, this success is far from accidental: if it is true that Kafka is a perfect fit for the requirements of modern architectures, it is also important to remember how many improvements were introduced in the Kafka ecosystem that helped create the image of a very mature, reliable tool when it comes to build fast, scalable, and correct streaming applications and pipelines.
This can be seen, for instance, in the new features introduced in Confluent Cloud (the Confluent solution for managed Kafka) to enhance the documentation and the monitoring of the streaming pipelines running in the environment with the new Stream Catalog and Lineage system. Those two features provide an easy-to-access way to identify and search the different resources and data available in the environment, and how this data flows inside the system improving the governance and monitoring of the platform.
The near future of Kafka - Upcoming features
Among all the numerous upcoming features in the ecosystem presented during the event, there are some that we really appreciated and we had been waiting for quite some time.
One of these is KIP-516, which introduces topic IDs to uniquely identify topics. As you may know since the very beginning - and this holds also today - the identifier for a topic is its name. This has some drawbacks, such as the fact that a topic cannot be renamed (for instance, when you would like to update your naming strategy), since this would be required both to delete and recreate the topic, migrating the whole content, and to update all the producers and consumers that refer to that specific topic. An equally annoying issue is when you want to delete a topic and then recreate another one with the same name, with the goal of dropping its content and creating the new one with different configurations. Also in this scenario, we can currently face issues, since Kafka will not immediately delete the topic, but will plan a deletion that needs to be spread through the cluster without the certainty on when this operation will be actually completed. This makes the operation, as of today, not automatable (our consultants have often faced this limitation in some of our client projects).
The second long-awaited feature is the possibility to run Kafka without Zookeeper. At first, it was very useful and practical to take advantage of the distributed configuration management capabilities provided by Zookeeper (this is specifically important in processes like controller election or partition leader election). During the past years, Kafka has started incorporating more and more functionalities and also maintaining a Zookeeper cluster, instead of just the Kafka one, which feels like an unnecessary effort, risk and cost. As of today, this feature is not yet production-ready, but we can say that it’s pretty close. Indeed, Confluent has shared the plan, and we are all waiting for this architecture simplification to arrive.
The third upcoming feature that we found extremely valuable is the introduction of modular topologies for ksqlDB. ksqlDB is relatively recent in the Kafka ecosystem, but it’s having a good momentum given its capability to easily write stream transformations with minimal effort and just an SQL-like command, without the need to create dedicated Kafka-Stream applications that will require a good amount of boilerplate that, later, have to be maintained.
ksqlDB will not be able to complete the detailed development of some Kafka-streams but, for a good amount of them, it will be an excellent solution. The introduction of modular topologies will simplify the management of the streams inside ksqlDB, and it will simplify its scalability (which is currently limited in some scenarios).
Our Insights from Breakout Sessions & Lightning Talks
The inner beauty of tech conferences lies in the talks, and Kafka Summit was no different!
During the event, indeed, not only the feature announcements caught our attention, but also what was presented during the various breakout sessions and talks: an amazing variety of topics gave us plenty of options to dig more into the Kafka world.
One of the sessions that we particularly enjoyed is, for sure, the one led by New Relic (“Monitoring Kafka Without Instrumentation Using eBPF”). The contribution focused on an interesting way of monitoring Kafka and Kafka-based applications using eBPF without the need for Instrumentation. Antón Rodríguez, as speaker, ran a cool demo of Pixie, in which it was very easy to see what is going on with our applications. It was also easy to get a graphical representation of the actual topology of the streams, and all the links between producers to topics, and topics to consumers, easing answering questions like “Who is producing to topic A?” or “Who is consuming from topic B?”.
Another session that we particularly enjoyed was the talk by LinkedIn (“Geo-replicated Kafka Streams Apps”): Ryanne Dolan outlined some strategies to deal with geo-replicated Kafka topics - in particular in case of Kafka streams applications. Ryanne gave some precious tips on how to manage the replication of Kafka topics in a disaster recovery cluster to guarantee high availability in case of failure, and on how to develop our Kafka streams application to work almost transparently in the original cluster and in the DR one. The talk was also a great opportunity to highlight the high scalability of Kafka in a multi-datacenter scenario, where different clusters can coexist creating some kind of layered architecture composed by a scalable ingestion layer that can fan out the data to different geo-replicated clusters in a transparent way for the Kafka streams applications.
Undoubtedly, the event has been a huge success, bringing the Apache Kafka community together to share best practices, learn how to build next-generation systems, and discuss the future of streaming technologies.
For us, this experience has been a blend of innovation, knowledge, and networking: all the things we missed from in-person conferences were finally back. It was impressive seeing people interact with each other after two years of social distancing, and we could really feel that “sense of community” that online events can only partially deliver.
If you want to know more about the event and its main topics - from real-time analytics to machine learning and event streaming - be sure to also check the dedicated Blog post by our sister-company Radicalbit. You can read it here.
During one of our projects, we worked on a real-time data streaming application using Kafka.
After receiving the data from some external services using REST API, we manipulated it using Kafka streams pipelines; then, we called an external REST API service to make the data available to the destination system.
In order to develop the input and output components of the ETL flow, we wrote some custom Source Connectors and a Sink Connector for the Kafka Connect module.
Probably, most of you have already figured out the problem: for each of our connectors, we needed to build the “fat Jar” and move it to the Kafka Connect folder, which, for the sake of making things more complicated, was running in Kubernetes.
Unfortunately for us, the connectors couldn’t be counted on the fingers of one hand: consequently, the manual procedure was long, repetitive, and very time-consuming.
Another issue we encountered was that the creation/deletion of our connectors was “made by hand”, which made it impossible to determine which configurations were being provisioned by looking at the Git repository.
In our project, each connector was versioned using a separate git repository. We decided to aim at CI (Continuous Integration) and CD (Continuous Delivery), with Continuous Deployments in the development environment.
In our scenario, we decided to use Jenkins as a CI/CD daemon and Terraform to manage all the infrastructure and configurations for the connectors, aiming at achieving a full GitOps experience.
In our daily work, whenever we merge a pull request, this event triggers a Jenkins pipeline that builds the artifact and publishes it into a private Artifactory repository.
If this phase succeeds, then the next step is to trigger the deployment job.
Given that the deployment is fully automated in the development environment, the latest versions of our connectors are always running in a matter of minutes after having merged code changes. For the other environments, Jenkins's job requires manual approval, in order to avoid having unwanted changes promoted past the development environment.
The deployment of connector Jars itself is performed by using a custom Helm chart that fetches the desired connectors artifacts before starting Kafka Connect container.
As seen before, in our project we were using Terraform for managing infrastructure and connectors configuration. We decided to use a single repository for all the Terraform codebase, using different Terraform states to manage different environments.
For each connector, we created a Terraform module containing the connector resource definition...
...and the expected configuration variables:
In each environment configuration, we declared which connectors to configure, by instantiating the proper Terraform modules that were previously created. Terraform modules were versioned as well: this means that, in different environments, we could run different configurations of the same connector artifact. In this way, we were able to deploy the jar without doing it manually.
The last missing piece was the creation of all the required topics for our application. We decided to define them into a simple yaml file and, with the help of a simple bash script, they got created when the Jenkins job ran.
In this article we have explored how to improve and engineer the deployment of Kafka connectors in various environments without the need for manual intervention.
Developers can focus on enhancing their code and almost ignore the deployment part, giving that they are now able to perform one-click deployments.
Enabling connectors or changing configurations are now just a few lines changed in the Terraform repo, without the need of executing Kafka Connect API requests by hand.
From our perspective, it makes sense for a lot of companies to invest time, money and resources to automate the deployment of the connectors.
The popularity of microservice architecture has enormously increased recently; but this comes with new challenges.
One of these is monitoring. In one of our projects, we used a Kafka connector to intercept changes in our database and write data to a topic. This was a very important component of the system, so we needed to consider its health status carefully.
In our first version, we created a Kubernetes’ CronnJob with a simple shell script that checks the status of the connector and, eventually, deletes the failed and restarts it.
This worked quite well; however, this is different from how the other services are health checked with the Kubernetes.
The connector was deployed with Kubernetes; the most natural thing to do is thus using k8s for monitoring pods and eventually restarting it.
The Kafka Connect framework comes with Rest API, and one of these gives you the state of the connectors:
In this case, from the Kubernetes point of view, everything is ok.
The solution that worked well for us consisted in adding a sidecar container that takes responsibility for exposing the state of the connector task.
The sidecar pattern allows you to extract some functionalities of your application in a different component. For example, we can separate the authentication layer from our “main” component that contains the business logic or - as in our case - extracts the monitoring part.
Our goal is to obtain something like this:
First of all, we created a simple application that takes care of calling the connector API and exposes an API for Kubernetes (we used a simple Python application using Flask - but you can use whatever you want). Something like this:
As you can see, the code is very simple.
The application does two different things: first of all, it exposes an endpoint at “/health” paths that will be called periodically by Kubernetes; secondly, it checks the status of a task and eventually returns an Internal Server Error, in case the HTTP status of the connector was not 200 or if the status was not “RUNNING”.
Now, this application needs to be deployed in the same pods of the connector. This can be done by adding to our deployment.yaml file the container that contains our Python application:
The logical result?
Both containers expose the health check of the sidecar, since Kubernetes does not restart the entire pods if one container is up; exposing the same API, the destiny of both containers would be the same.
Once the connector is in FAILED state, Kubernetes will restart the pod.
Some cloud providers may provide a built-in solution for problems like this; but if you can’t use it - for whatever reason - this can be a possible solution.
You are designing a Kafka Streams application which must read commands and produce the corresponding business event. The Avro models you’re expecting to read look like this:
While the output messages you’re required to produce look like this:
You know you can leverage the sbt-avrohugger plugin to generate the corresponding Scala class for each Avro schema, so that you can focus only on designing the business logic.
Since the messages themselves are pretty straightforward, you decide to create a monomorphic function to map properties between each command and the corresponding event. The resulting topology ends up looking like this:
...But then the domain widens
Today new functional requirements have emerged: your application must now handle multiple types of assets, each with its own unique properties. You are pondering how to implement this requirement and make your application more resilient to further changes in behavior.
You could split both commands and events into multiple topics, one per asset type, so that the corresponding Avro schema stays consistent and its compatibility is ensured. This solution, however, would have you replicate pretty much the same topology multiple times, so it’s not recommended unless the business logic has to be customized for each asset type.
Avro doesn’t support inheritance between records, so any OOP strategy to have assets inherit properties from a common ancestor is unfortunately not viable. You could however create a “Frankenstein” object with all the properties of each and every asset and fill in only those required for each type of asset. This is definitely the worst solution from an evolutionary and maintainability point of view.
Luckily for you, Avro offers an interesting feature named union types: you could express the diversity in each asset’s properties via a union of multiple payloads, still relying on one single message as wrapper.
Enter polymorphic streams
Objects with no shape
To cope with this advanced polymorphism, you leverage the shapeless library, which introduces the Coproduct type, the perfect companion for the Avro union type. First of all, you update the custom types mapping of sbt-avrohugger, so that it generates an additional sealed trait for each Avro protocol containing multiple records:
The generated command class ends up looking like this:
Updating the business logic
Thanks to shapeless’ Poly1 trait you then write the updated business logic in a single class:
Changes to the topology are minimal, as you’d expect:
A special kind of Serde
Now for the final piece of the puzzle, Serdes. Introducing the avro4s library, which takes Avro GenericRecords above and beyond. You create a type class to extend a plain old Serde providing a brand new method:
Now each generated class has its own Serde, tailored on the corresponding Avro schema.
Putting everything together
Finally, the main program where you combine all ingredients:
When multiple use cases share (almost) the same business logic, you can create a stream processing application with ad-hoc polymorphism and reduce the duplication of code to the minimum, while making your application even more future-proof.
GDPR introduced the “right to be forgotten”, which allows individuals to make verbal or written requests for personal data erasure. One of the common challenges when trying to comply with this requirement in an Apache Kafka based application infrastructure is being able to selectively delete all the Kafka records related to one of the application users.
Kafka’s data model was never supposed to support such a selective delete feature, so businesses had to find and implement workarounds. At the time of writing, the only way to delete messages in Kafka is to wait for the message retention to expire or to use compact topics that expect tombstone messages to be published, which isn't feasible in all environments and just doesn't fit all the use cases.
HashiCorp Vault provides Encryption as a Service, and as it happens, can help us implement a solution without workarounds, either in application code or Kafka data model.
Vault Encryption as a Service
Vault Transit secrets engine handles cryptographic operations on in-transit data without persisting any information. This allows a straightforward introduction of cryptography in existing or new applications by performing a simple HTTP request.
Vault fully and transparently manages the lifecycle of encryption keys, so neither developers or operators have to worry about keys compliance and rotation, while the securely stored data can always be encrypted and decrypted as long as the Vault is accessible.
What if instead of trying to selectively eliminate the data the application is not allowed to keep, we would just make sure the application (or anyone for this matter) cannot read the data under any circumstances? This would equal physical removal of data, just as requested by GDPR compliance. Such a result can be achieved by selectively encrypting information that we might want to be able to delete and throwing away the key when the deletion is requested.
However, it is necessary to perform encryption and decryption in a transparent way for the application, to reduce refactoring and integration effort for each of the applications that are using Kafka, and unlock this functionality for the applications that cannot be adapted at all.
Kafka APIs support interceptors on message production and consumption, which is the candidate link in the chain where to leverage Vault’s encryption as a service. Inside the interceptor, we can perform the needed message transformation:
before a record is sent to Kafka, the interceptor performs encryption and adjusts the record content with the encrypted data
before a record is returned to a consumer client, the interceptor performs decryption and adjusts the record content with the decrypted data
Does this allow us to delete all the Kafka messages related to a single user? Yes, and it is really simple. If the encryption key that we use for encrypting data in Kafka messages is different for each of our application’s users, we can go ahead and delete the encryption key to guarantee that it is no longer possible to read the user data.
Replication Outside EU
Given that now the sensitive data stored in our Kafka cluster is encrypted at rest, it is possible to replicate our Kafka cluster outside the EU, for example for disaster recovery purposes. The data will only be accessible by those users that have the right permissions to perform the cryptographic operations in Vault.
Part 2: Technicalities
In the previous part we drafted the general idea behind the integration of HashiCorp Vault and Apache Kafka for performing a fine grained encryption at rest of the messages, in order to address GDPR compliance requirements within Kafka. In this part, instead, we do a deep dive on how to bring this idea alive.
Vault Transit Secrets Engine
Vault Transit secrets engine is part of Vault Open Source, and it is really easy to get started with. Setting the engine up is just a matter of enabling it and creating some encryption keys:
Crypto operations can be performed as well in a really simple way, it’s just a matter of providing base64 encoded plaintext data:
The resulting ciphertext will look like vault:v1: - where v1 represents the first key generation, given it has not been rotated yet.
What about decryption? Well, it’s just another API call:
Integrating Vault’s Encryption as a Service within your application becomes really easy to implement and requires little to no refactoring of the existing codebase.
Kafka Producer Interceptor
The Producer Interceptor API can intercept and possibly mutate the records received by the producer before they are published to the Kafka cluster. In this scenario, the goal is to perform encryption within this interceptor, in order to avoid sending plaintext data to the Kafka cluster...
Integrating encryption in the Producer Interceptor is straightforward, given that the onSend method is invoked one message at a time.
Kafka Consumer Interceptor
The Consumer Interceptor API can intercept and possibly mutate the records received by the consumer. In this scenario, we want to perform decryption of the data received from Kafka cluster and return plaintext data to the consumer.
Integrating decryption with Consumer Interceptor is a bit trickier because we wanted to leverage the batch decryption capabilities of Vault, in order to minimize Vault API calls.
Once you have built your interceptors, enabling them is just a matter of configuring your Consumer or Producer client:
Notice that value and key serializer class must be set to the StringSerializer, since Vault Transit can only handle strings containing base64 data. The client invoking Kafka Producer and Consumer API, however, is able to process any supported type of data, according to the serializer or deserializer configured in the interceptor.value.serializer or interceptor.value.deserializer properties.
HashiCorp Vault Transit secrets engine is definitely the technological component you may want to leverage when addressing cryptographical requirements in your application, even when dealing with legacy components. The entire set of capabilities offered by HashiCorp Vault makes it easy to modernize applications on a security perspective, allowing developers to focus on the business logic rather than spending time in finding a way to properly manage secrets.
In this three-day hands-on course you will learn how to build, manage, and monitor clusters using industry best-practices developed by the world’s foremost Apache Kafka experts.
You will learn how Kafka and the Confluent Platform work, their main subsystems, how they interact, and how to set up, manage, monitor, and tune your cluster.
Throughout the course, hands-on exercises reinforce the topics being discussed. Exercises include:
Basic cluster operations
Viewing and interpreting cluster metrics
Recovering from a Broker failure
Performance-tuning the cluster
Securing the cluster
This course is designed for engineers, system administrators, and operations staff responsible for building, managing, monitoring, and tuning Kafka clusters.
Attendees should have a strong knowledge of Linux/Unix, and understand basic TCP/IP networking concepts. Familiarity with the Java Virtual Machine (JVM) is helpful. Prior knowledge of Kafka is helpful, but is not required.
In this three-day hands-on course you will learn how to build an application that can publish data to, and subscribe to data from, an Apache Kafka cluster.
You will learn the role of Kafka in the modern data distribution pipeline, discuss core Kafka architectural concepts and components, and review the Kafka developer APIs. As well as core Kafka, Kafka Connect, and Kafka Streams, the course also covers other components in the broader Confluent Platform, such as the Schema Registry and the REST Proxy.
Throughout the course, hands-on exercises reinforce the topics being discussed. Exercises include:
Using Kafka’s command-line tools
Writing Consumers and Producers
Writing a multi-threaded Consumer
Using the REST Proxy
Storing Avro data in Kafka with the Schema Registry
Ingesting data with Kafka Connect
This course is designed for application developers, ETL (extract, transform, and load) developers, and data scientists who need to interact with Kafka clusters as a source of, or destination for, data.
Attendees should be familiar with developing in Java (preferred)
or Python. No prior knowledge of Kafka is required.
The Motivation for Apache Kafka
Real-Time Processing is Becoming Prevalent
Kafka: A Stream Data Platform
An Overview of Kafka
Kafka’s Use of ZooKeeper
Kafka’s Log Files
Replicas for Reliability
Kafka’s Write Path
Kafka’s Read Path
Partitions and Consumer Groups for Scalability
Developing With Kafka
Using Maven for Project Management
Programmatically Accessing Kafka* Writing a Producer in Java