Combining Kafka Streams and Akka

In our KUKA Connect robot management web app, we have begun refactoring our code to use Kafka Streams as much as possible.

We were already using Kafka, publishing data to various topics, and consuming that data, either with simple consumers or by using Akka Streams.

But now we wanted to move to Kafka Streams: can we just replace our usages of Akka Streams with it? The short answer is, no. Here’s a use case from our web app that shows how combining the two frameworks still makes sense for us.

Kafka Streams and Akka Streams, Each Great at Something

Use Case: When a KUKA robot hits a critical fault, notify the owner of that robot via text message.

Microservices Involved:

Notification Service: Main player, responsible for joining various Kafka Streams together from other microservices and calling into Messaging Service to actually send the messages to the right users.

Device Access Service: Responsible for knowing which users have access to which robots at any given time, publishes a full state KTable with this information.

Device Licensing Service: Responsible for knowing whether a robot has a Plus license or not, publishes to a full state KTable.

Device Fault Service: Publishes robot faults to a KStream as they occur.

Messaging Service: Knows how to send text messages to users.

We build our main stream by constructing a KStream off the deviceFaults topic, joining those fault events to the deviceAccess KTable to find the users who have access to the device, joining that to the deviceLicensing KTable to filter out robots that are not Plus licensed, we do a flatMap to rekey the result from deviceId to userId, and finally we publish those events to a new topic we call the userFaultStream.

Kafka Streams gives us the ability to easily combine these different flows of data, filter based on various criteria, then rekey it from device identifier to user identifier.

But there’s a problem.

Enter the Akka

People dislike getting spammed with hundreds of text messages.

Further, robots can sometimes hiccup and throw a spurious fault that is then immediately cleared by an internal system. The fault stream flows every single fault however, so if we blindly call into Messaging Service with each fault, users will sometimes get flooded with fault notifications.

What we really want is a way to batch the faults to any given user every X seconds or so, to give spurious faults time to clear themselves and also to group up rapid-occurring faults into a single message.

Kafka Streams is not good at that.
But Akka Streams is.

Recall we published our fault stream to a new topic userFaultStream.

We then made an Akka Stream consumer of that topic, grouped by userId, merged with another source that throttles out a “Send Message” signal every 5 seconds, and then we buffer up all text messages intended for a given user during those 5 seconds, until we get the Send Message signal.

This magic means we can be getting robot faults from many robots at once, each handled by a different instance of our faults microservice, join those together to filter and discover which users need to know about the faults, and then key them by those userIds, buffer up and combine messages to the same user, and then send out as a batch.


Because programming with Kafka Streams is so powerful, we are adopting it whenever we can, “streamifying” existing microservices one at a time as it makes sense for us to do so.

That said, we still have many use cases for Akka Streams, as the broadcast and flow processing provide rich abilities that Kafka Streams do not (yet?) offer.

Now that we are programming using this paradigm, we don’t want to go back to the “old” way of doing things, where we made lots of calls between microservices to piece together all the data we needed at any given time.


Custom DataDog Metrics with Kubernetes Daemonsets

For our KUKA Connect web app, we wanted to monitor the backend easily and chose DataDog to do it.

Our backend is running in AWS, and we package, deploy, and manage our microservices in AWS via Docker and Kubernetes.

Without too much trouble, we set up basic metrics for Kubernetes in DataDog by making a DaemonSet as described here. Soon we were able to create a DataDog dashboard with info on cpu and memory usage of pods, disk space, network traffic, and so on.

Custom Metrics — Not So Easy

We wanted custom metrics to track our own product specific info. Things like: how many robots are active in our system; how many are licensed; how many websocket messages are we processing from robots each minute; how many anomalies have been detected across all robots via our predictive maintenance feature.

The documentation on custom metrics says that you only need three lines of code. Those three lines of code should work, as long as your set up is very simple. But if you are running a DataDog agent as a DaemonSet in a Kubernetes cluster in AWS, knowing how to set up the StatsD client and which port to send the metrics to is not well-documented.

We finally got a custom metric working from our web app to DataDog, and this post is to help others needing to do the same. We write our microservices primarily in Scala, but the major steps apply to any language.

Making Custom Metrics Work

Start by adding the statsd client to your build. We use sbt so we add this to our build.sbt:

“com.datadoghq” % “java-dogstatsd-client” % “2.3”

Now we created a helper trait so we can easily add a custom metric to any class by simply mixing in the trait:

Now in our Module.scala we mix-in the StatsDBinding trait:

class Module(val environment: Environment, val configuration: Configuration) 
             extends AbstractModule with StatsDBinding {

Then in our application.conf we add our own configuration for how to hit statsd:

statsd {
  prefix = "kuka"
  host = "dd-agent-service"
  port = 8973

The dd-agent-service is what we named our DataDog agent DaemonSet when we created it as described here.

The port is found in the yaml for the dd-agent-service, which I get by doing this command:

kubectl get service dd-agent-service -o yaml

It prints out all config information, but the relevant part is here:

apiVersion: v1
kind: Service
creationTimestamp: 2017-02-02T16:29:37Z
name: dd-agent-service
namespace: default
clusterIP: XYZ.ABC.123.456
– port: 8973
protocol: UDP
targetPort: 1234
app: dd-agent

Now to actually make the custom metric.

In a class we want to add a custom metric to:

Now when you go to DataDog after deploying all this code, you will see a custom metric in your dropdown with the prefix you chose (we use “kuka”) so for us it shows up in the long list as kuka.awesome_data.added and we can choose which type of visualization we want for the data.