Polymorphic Messages in Kafka Streams
Things usually start simple...
You are designing a Kafka Streams application which must read commands and produce the corresponding business event.
The Avro models you’re expecting to read look like this:
While the output messages you’re required to produce look like this:
You know you can leverage the sbt-avrohugger plugin to generate the corresponding Scala class for each Avro schema, so that you can focus only on designing the business logic.
Since the messages themselves are pretty straightforward, you decide to create a monomorphic function to map properties between each command and the corresponding event.
The resulting topology ends up looking like this:
...But then the domain widens
Today new functional requirements have emerged: your application must now handle multiple types of assets, each with its own unique properties.
You are pondering how to implement this requirement and make your application more resilient to further changes in behavior.
You could split both commands and events into multiple topics, one per asset type, so that the corresponding Avro schema stays consistent and its compatibility is ensured.
This solution, however, would have you replicate pretty much the same topology multiple times, so it’s not recommended unless the business logic has to be customized for each asset type.
Avro doesn’t support inheritance between records, so any OOP strategy to have assets inherit properties from a common ancestor is unfortunately not viable.
You could however create a “Frankenstein” object with all the properties of each and every asset and fill in only those required for each type of asset.
This is definitely the worst solution from an evolutionary and maintainability point of view.
Luckily for you, Avro offers an interesting feature named union types: you could express the diversity in each asset’s properties via a union of multiple payloads, still relying on one single message as wrapper.
Enter polymorphic streams
Objects with no shape
To cope with this advanced polymorphism, you leverage the shapeless library, which introduces the Coproduct type, the perfect companion for the Avro union type.
First of all, you update the custom types mapping of sbt-avrohugger, so that it generates an additional sealed trait for each Avro protocol containing multiple records:
The generated command class ends up looking like this:
Updating the business logic
Thanks to shapeless’ Poly1 trait you then write the updated business logic in a single class:
Changes to the topology are minimal, as you’d expect:
A special kind of Serde
Now for the final piece of the puzzle, Serdes. Introducing the avro4s library, which takes Avro GenericRecords above and beyond.
You create a type class to extend a plain old Serde providing a brand new method:
Now each generated class has its own Serde, tailored on the corresponding Avro schema.
Putting everything together
Finally, the main program where you combine all ingredients:
When multiple use cases share (almost) the same business logic, you can create a stream processing application with ad-hoc polymorphism and reduce the duplication of code to the minimum, while making your application even more future-proof.