Monitoring Kafka Connector with Kubernetes

Monitoring Kafka Connector with Kubernetes


The Problem

The popularity of microservice architecture has enormously increased recently; but this comes with new challenges.

One of these is monitoring. In one of our projects, we used a Kafka connector to intercept changes in our database and write data to a topic. This was a very important component of the system, so we needed to consider its health status carefully.


Solution

In our first version, we created a Kubernetes’ CronnJob with a simple shell script that checks the status of the connector and, eventually, deletes the failed and restarts it.

This worked quite well; however, this is different from how the other services are health checked with the Kubernetes.

The connector was deployed with Kubernetes; the most natural thing to do is thus using k8s for monitoring pods and eventually restarting it.

The Kafka Connect framework comes with Rest API, and one of these gives you the state of the connectors:

i.e : https://bitrock.it/blog/monitoring-kafka-connector-with-kubernetes/

This seems to resolve our problem… But is it really the case?

Kubernetes health check controls the HTTP status code; the problem is that the Kafka connector API returns 200 HTTP status.

For instance, if the task is failed, the API will return:

HTTP/1.1 200 OK

{"state":"FAILED","id":1,"worker_id":"192.168.86.101:8083"}

In this case, from the Kubernetes point of view, everything is ok.

The solution that worked well for us consisted in adding a sidecar container that takes responsibility for exposing the state of the connector task.

The sidecar pattern allows you to extract some functionalities of your application in a different component. For example, we can separate the authentication layer from our “main” component that contains the business logic or – as in our case – extracts the monitoring part.

Our goal is to obtain something like this:

First of all, we created a simple application that takes care of calling the connector API and exposes an API for Kubernetes (we used a simple Python application using Flask – but you can use whatever you want). Something like this:

As you can see, the code is very simple.

The application does two different things: first of all, it exposes an endpoint at “/health” paths that will be called periodically by Kubernetes; secondly, it checks the status of a task and eventually returns an Internal Server Error, in case the HTTP status of the connector was not 200 or if the status was not “RUNNING”.

Now, this application needs to be deployed in the same pods of the connector. This can be done by adding to our deployment.yaml file the container that contains our Python application:


Conclusions

The logical result?

Both containers expose the health check of the sidecar, since Kubernetes does not restart the entire pods if one container is up; exposing the same API, the destiny of both containers would be the same.

Once the connector is in FAILED state, Kubernetes will restart the pod.

Some cloud providers may provide a built-in solution for problems like this; but if you can’t use it – for whatever reason – this can be a possible solution.


Author: Marco Tosini, Principal Engineer @Bitrock

Read More
Polymorphic Messages in Kafka Streams

Polymorphic Messages in Kafka Streams


Things usually start simple…

You are designing a Kafka Streams application which must read commands and produce the corresponding business event.
The Avro models you’re expecting to read look like this:

While the output messages you’re required to produce look like this:

You know you can leverage the sbt-avrohugger plugin to generate the corresponding Scala class for each Avro schema, so that you can focus only on designing the business logic.

Since the messages themselves are pretty straightforward, you decide to create a monomorphic function to map properties between each command and the corresponding event.
The resulting topology ends up looking like this:


…But then the domain widens

Today new functional requirements have emerged: your application must now handle multiple types of assets, each with its own unique properties.
You are pondering how to implement this requirement and make your application more resilient to further changes in behavior.


Multiple streams

You could split both commands and events into multiple topics, one per asset type, so that the corresponding Avro schema stays consistent and its compatibility is ensured.
This solution, however, would have you replicate pretty much the same topology multiple times, so it’s not recommended unless the business logic has to be customized for each asset type.


“All-and-none” messages

Avro doesn’t support inheritance between records, so any OOP strategy to have assets inherit properties from a common ancestor is unfortunately not viable.
You could however create a “Frankenstein” object with all the properties of each and every asset and fill in only those required for each type of asset.
This is definitely the worst solution from an evolutionary and maintainability point of view.


Union types

Luckily for you, Avro offers an interesting feature named union types: you could express the diversity in each asset’s properties via a union of multiple payloads, still relying on one single message as wrapper.


Enter polymorphic streams

Objects with no shape

To cope with this advanced polymorphism, you leverage the shapeless library, which introduces the Coproduct type, the perfect companion for the Avro union type.
First of all, you update the custom types mapping of sbt-avrohugger, so that it generates an additional sealed trait for each Avro protocol containing multiple records:

The generated command class ends up looking like this:


Updating the business logic

Thanks to shapeless’ Poly1 trait you then write the updated business logic in a single class:

Changes to the topology are minimal, as you’d expect:


A special kind of Serde

Now for the final piece of the puzzle, Serdes. Introducing the avro4s library, which takes Avro GenericRecords above and beyond.
You create a type class to extend a plain old Serde providing a brand new method:

Now each generated class has its own Serde, tailored on the corresponding Avro schema.


Putting everything together

Finally, the main program where you combine all ingredients:



Conclusions

When multiple use cases share (almost) the same business logic, you can create a stream processing application with ad-hoc polymorphism and reduce the duplication of code to the minimum, while making your application even more future-proof.

Read More
Bringing GDPR in Kafka with Vault

Bringing GDPR in Kafka with Vault


Part 1: Concepts

GDPR introduced the “right to be forgotten”, which allows individuals to make verbal or written requests for personal data erasure. One of the common challenges when trying to comply with this requirement in an Apache Kafka based application infrastructure is being able to selectively delete all the Kafka records related to one of the application users.

Kafka’s data model was never supposed to support such a selective delete feature, so businesses had to find and implement workarounds. At the time of writing, the only way to delete messages in Kafka is to wait for the message retention to expire or to use compact topics that expect tombstone messages to be published, which isn’t feasible in all environments and just doesn’t fit all the use cases.

HashiCorp Vault provides Encryption as a Service, and as it happens, can help us implement a solution without workarounds, either in application code or Kafka data model.


Vault Encryption as a Service

Vault Transit secrets engine handles cryptographic operations on in-transit data without persisting any information. This allows a straightforward introduction of cryptography in existing or new applications by performing a simple HTTP request.

Vault fully and transparently manages the lifecycle of encryption keys, so neither developers or operators have to worry about keys compliance and rotation, while the securely stored data can always be encrypted and decrypted as long as the Vault is accessible.


Kafka Integration

What if instead of trying to selectively eliminate the data the application is not allowed to keep, we would just make sure the application (or anyone for this matter) cannot read the data under any circumstances? This would equal physical removal of data, just as requested by GDPR compliance. Such a result can be achieved by selectively encrypting information that we might want to be able to delete and throwing away the key when the deletion is requested.

However, it is necessary to perform encryption and decryption in a transparent way for the application, to reduce refactoring and integration effort for each of the applications that are using Kafka, and unlock this functionality for the applications that cannot be adapted at all.

Kafka APIs support interceptors on message production and consumption, which is the candidate link in the chain where to leverage Vault’s encryption as a service. Inside the interceptor, we can perform the needed message transformation:

  • before a record is sent to Kafka, the interceptor performs encryption and adjusts the record content with the encrypted data
  • before a record is returned to a consumer client, the interceptor performs decryption and adjusts the record content with the decrypted data


Logical Deletion

Does this allow us to delete all the Kafka messages related to a single user? Yes, and it is really simple. If the encryption key that we use for encrypting data in Kafka messages is different for each of our application’s users, we can go ahead and delete the encryption key to guarantee that it is no longer possible to read the user data.


Replication Outside EU

Given that now the sensitive data stored in our Kafka cluster is encrypted at rest, it is possible to replicate our Kafka cluster outside the EU, for example for disaster recovery purposes. The data will only be accessible by those users that have the right permissions to perform the cryptographic operations in Vault.



Part 2: Technicalities

In the previous part we drafted the general idea behind the integration of HashiCorp Vault and Apache Kafka for performing a fine grained encryption at rest of the messages, in order to address GDPR compliance requirements within Kafka. In this part, instead, we do a deep dive on how to bring this idea alive.


Vault Transit Secrets Engine

Vault Transit secrets engine is part of Vault Open Source, and it is really easy to get started with. Setting the engine up is just a matter of enabling it and creating some encryption keys:

Crypto operations can be performed as well in a really simple way, it’s just a matter of providing base64 encoded plaintext data:

The resulting ciphertext will look like vault:v1: – where v1 represents the first key generation, given it has not been rotated yet.

What about decryption? Well, it’s just another API call:

Integrating Vault’s Encryption as a Service within your application becomes really easy to implement and requires little to no refactoring of the existing codebase.


Kafka Producer Interceptor

The Producer Interceptor API can intercept and possibly mutate the records received by the producer before they are published to the Kafka cluster. In this scenario, the goal is to perform encryption within this interceptor, in order to avoid sending plaintext data to the Kafka cluster…

Integrating encryption in the Producer Interceptor is straightforward, given that the onSend method is invoked one message at a time.


Kafka Consumer Interceptor

The Consumer Interceptor API can intercept and possibly mutate the records received by the consumer. In this scenario, we want to perform decryption of the data received from Kafka cluster and return plaintext data to the consumer.

Integrating decryption with Consumer Interceptor is a bit trickier because we wanted to leverage the batch decryption capabilities of Vault, in order to minimize Vault API calls.

Usage

Once you have built your interceptors, enabling them is just a matter of configuring your Consumer or Producer client:

or

Notice that value and key serializer class must be set to the StringSerializer, since Vault Transit can only handle strings containing base64 data. The client invoking Kafka Producer and Consumer API, however, is able to process any supported type of data, according to the serializer or deserializer configured in the interceptor.value.serializer or interceptor.value.deserializer properties.


Conclusions

HashiCorp Vault Transit secrets engine is definitely the technological component you may want to leverage when addressing cryptographical requirements in your application, even when dealing with legacy components. The entire set of capabilities offered by HashiCorp Vault makes it easy to modernize applications on a security perspective, allowing developers to focus on the business logic rather than spending time in finding a way to properly manage secrets.



Author: Simone Ripamonti, DevOps Engineer @Bitrock

Read More
Confluent Operations Training for Apache Kafka

Confluent Operations Training for Apache Kafka

In this three-day hands-on course you will learn how to build, manage, and monitor clusters using industry best-practices developed by the world’s foremost Apache Kafka experts.

You will learn how Kafka and the Confluent Platform work, their main subsystems, how they interact, and how to set up, manage, monitor, and tune your cluster.

Hands-On Training

Throughout the course, hands-on exercises reinforce the topics being discussed. Exercises include:

  • Cluster installation
  • Basic cluster operations
  • Viewing and interpreting cluster metrics
  • Recovering from a Broker failure
  • Performance-tuning the cluster
  • Securing the cluster

This course is designed for engineers, system administrators, and operations staff responsible for building, managing, monitoring, and tuning Kafka clusters.

Course Prerequisites

Attendees should have a strong knowledge of Linux/Unix, and understand basic TCP/IP networking concepts. Familiarity with the Java Virtual Machine (JVM) is helpful. Prior knowledge of Kafka is helpful, but is not required.



Course Contents


The Motivation for Apache Kafka

  • Systems Complexity

  • Real-Time Processing is Becoming Prevalent

  • Kafka: A Stream Data Platform

    Kafka Fundamentals

  • An Overview of Kafka

  • Kafka Producers

  • Kafka Brokers

  • Kafka Consumers

  • Kafka’s Use of ZooKeeper

  • Comparisons with Traditional Message Queues

    Providing Durability

  • Basic Replication Concepts

  • Durability Through Intra-Cluster Replication

  • Writing Data to Kafka Reliably

  • Broker Shutdown and Failures

  • Controllers in the Cluster

  • The Kafka Log Files

  • Offset Management

    Designing for High Availability

  • Kafka Reference Architecture* Brokers

  • ZooKeeper

  • Connect

  • Schema Registry

  • REST Proxy

  • Multiple Data Centers

    Managing a Kafka Cluster

  • Installing and Running Kafka

  • Monitoring Kafka

  • Basic Cluster Management

  • Log Retention and Compaction

  • An Elastic Cluster

    Optimizing Kafka Performance

  • Producer Performance

  • Broker Performance

  • Broker Failures and Recovery Time

  • Load Balancing Consumption

  • Consumption Performance

  • Performance Testing

    Kafka Security

  • SSL for Encryption and Authentication

  • SASL for Authentication* Data at Rest Encryption

  • Securing ZooKeeper and the REST Proxy

  • Migration to a Secure Cluster

    Integrating Systems with Kafka Connect

  • The Motivation for Kafka Connect

  • Types of Connectors

  • Kafka Connect Implementation

  • Standalone and Distributed Modes

  • Configuring the Connectors

  • Deployment Considerations

  • Comparison with Other Systems

Read More
Confluent Developer Training

Confluent Developer Training

Building Kafka Solutions

In this three-day hands-on course you will learn how to build an application that can publish data to, and subscribe to data from, an Apache Kafka cluster.

You will learn the role of Kafka in the modern data distribution pipeline, discuss core Kafka architectural concepts and components, and review the Kafka developer APIs. As well as core Kafka, Kafka Connect, and Kafka Streams, the course also covers other components in the broader Confluent Platform, such as the Schema Registry and the REST Proxy.

Hands-On Training

Throughout the course, hands-on exercises reinforce the topics being discussed. Exercises include:

  • Using Kafka’s command-line tools
  • Writing Consumers and Producers
  • Writing a multi-threaded Consumer
  • Using the REST Proxy
  • Storing Avro data in Kafka with the Schema Registry
  • Ingesting data with Kafka Connect

This course is designed for application developers, ETL (extract, transform, and load) developers, and data scientists who need to interact with Kafka clusters as a source of, or destination for, data.

Course Prerequisites

Attendees should be familiar with developing in Java (preferred) or Python. No prior knowledge of Kafka is required.



Course Contents


The Motivation for Apache Kafka

  • Systems Complexity

  • Real-Time Processing is Becoming Prevalent

  • Kafka: A Stream Data Platform

    Kafka Fundamentals

  • An Overview of Kafka

  • Kafka Producers

  • Kafka Brokers

  • Kafka Consumers

  • Kafka’s Use of ZooKeeper

  • Kafka Efficiency

    Kafka’s Architecture

  • Kafka’s Log Files

  • Replicas for Reliability

  • Kafka’s Write Path

  • Kafka’s Read Path

  • Partitions and Consumer Groups for Scalability

    Developing With Kafka

  • Using Maven for Project Management

  • Programmatically Accessing Kafka* Writing a Producer in Java

  • Using the REST API to Write a Producer

  • Writing a Consumer in Java

  • Using the REST API to Write a Consumer

    More Advanced Kafka Development

  • Creating a Multi-Threaded Consumer

  • Specifying Offsets

  • Consumer Rebalancing

  • Manually Committing Offsets

  • Partitioning Data

  • Message Durability

    Schema Management in Kafka

  • An Introduction to Avro

  • Avro Schemas

  • Using the Schema Registry

    Kafka Connect for Data Movement

  • The Motivation for Kafka Connect

  • Kafka Connect Basics

  • Modes of Working: Standalone and Distributed

  • Configuring Distributed Mode

  • Tracking Offsets

  • Connector Configuration

  • Comparing Kafka Connect with Other Options

    Basic Kafka Installation and Administration

  • Kafka Installation

  • Hardware Considerations

  • Administering Kafka

    Kafka Streams

  • The Motivation for Kafka Streams

  • Kafka Streams Fundamentals

  • Investigating a Kafka Streams Application

Read More