The Kafka Streams Experience Pt. 1

What And Why?

Kafka Streams is a relatively new technology available as part of the standard Kafka distribution. I’ve mentioned it before in previous blog posts such as my massive wall of text on event sourcing. It’s attractive to us because it solves a lot of the difficult problems with event sourcing with Kafka, many of which I discussed in that post. We have several use cases for event sourcing, where its use makes certain features much easier to implement at scale than more traditional approaches.  One such application is discussed in this post.

We have now implemented several of our microservices using Kafka Streams, and have discovered and overcome a host of challenges along the way to becoming (relatively) fluent.  In this series of posts, I’ll discuss these challenges, how we addressed them, and what the outlook is for Kafka Streams in our products going forward.

We also make extensive use of the “interactive queries” feature of Kafka Streams, which has allowed us to address querying data that is both big and fast in ways that other storage technologies have challenges with.  This feature has its own set of specific implementation challenges, some of which I’ll discuss in this post, as well as later posts in this series.

This post assumes basic familiarity with Kafka and Kafka Streams concepts, such as the difference between a KStream and a KTable, as well as the basics of how Kafka topics and partitions work.  If you’re not there yet, take a look at this introductory post on Kafka Streams concepts.

Challenge: Topic/Partition Explosion

The Problem

Kafka Streams creates a lot of topics for you behind the scenes under a host of circumstances. For instance, you get a topic created:

  • For any stage that has state, such as an aggregate or KTable join.
  • For any interactive query store you create with logging support.
  • Any time you join two KStreams that are not co-partitioned already

For even a medium-complexity streaming application, this can quickly reach tens of topics, each with as many partitions as the input topics to the stream.  This has several issues, all of which we have experienced in one form or another:

  • More latency.  If you are publishing data back and forth to the brokers many times, you pay the round trip time at least, and possibly the commit interval time (a configuration parameter for the stream), for every stage that does this.
  • More disk usage.  Publishing stream state through many topics means that you’re using disk space for redundant or partially-redundant copies of data on your brokers.  In more than one case we ran our brokers out of disk space with disastrous results.  Details in a later post…
  • Drain on broker resources.  Brokers have to do work for each partition of each topic they host, even if those partitions are not particularly active.  For log-compacted topics (which many of the auto-created topics are), this amount of work can be large.  We observed several cases where our brokers would experience significant slow-down simply due to the load of hosting many partitions.
  • Drain on client resources.  A lot of the work of re-publishing and subscribing to all these intermediate topics is done client-side.  For large volumes of data, this load can be significant.  This manifests as additional latency, and in extreme cases, consumers falling behind.
  • Slow client startup. When clients with state stores start up, they have to replay all the partitions of all the KTables backing their state stores into their local storage.  They do this one partition at a time, and the time this takes scales roughly with the total number of partitions in all state stores in the application.

The Solution

We took several measures to control the topic/partition explosion problem.

Repartition source topics down.

In a few cases, we had source topics with large numbers of partitions, because those topics are very active.  Downstream processing might reduce or aggregate the data so that it is much less active or large, making the number of partitions required much less.  Don’t let the needs of the input data dictate the partitioning of your entire stream, especially because complicated streams may create many topics.

For example, we have an application where a stream took high-frequency robot operational data as input.  This is the biggest, fastest data in our entire application and flows through a topic with many partitions.  However the output was quite small: we simply aggregate the input data to a single value per robot, namely the frequency of the data in Hz.  To accomplish this, we first transformed the input data into something much smaller (namely a time stamp and number of data points) in a simple map step.  Then we repartitioned that down to 1/10th as many partitions, and used that as an input to the aggregation stage.  This in turn ensured that the topic created behind the scenes to capture the state of the aggregation had a small number of partitions. In pseudocode, this looks like:

stream("TopicWithLotsOfPartitions")
  .mapValues(transformToTimeStampAndPoints)
  .through("TopicWithFewPartitions")
  .groupByKey()
  .aggregate(calculateRunningFrequency, "frequencyStoreName")

Now we have a queryable store with the update frequency for each robot without having to host a topic with the same (large) number of partitions as the raw robot high-frequency data.

Reduce Number Of Stateful Streaming Stages

This seems pretty obvious in retrospect, but it’s easy to do and pays big dividends.  In short, be mindful of all the streaming stages that cause topics to be created behind the scenes, and minimize those.

As an example, we had an application where we were joining a KStream of robot events with a KTable of meta-information about the robot such as a user-assigned name.  We then aggregate the events into a notion of current state for a robot (resulting from all the events happening to it over time).  Initially, the stream looked like this:

stream("RobotEventsTopic")
  .groupByKey()
  .aggregate(calculateRobotEventState)
  .leftJoin("RobotMetadataTable", joinRobotData, "queryableStoreName")

This looks reasonable enough, but it creates two topics behind the scenes: one for the state of the aggregate stage and one for the queryable store we specify in the left-join stage.  Each will have as many partitions as “RobotEventsTopic”.  The data between these two topics is almost completely redundant. With a simple rewrite however, we were able to eliminate one of these topics with no compromise in functionality:

stream("RobotEventsTopic")
  .leftJoin("RobotMetadataTable", joinRobotData)
  .groupByKey()
  .aggregate(calculateRobotEventState, "queryableStoreName")

The trick here was that we made the aggregate stage the same stage as the queryable store, so we only need one topic that now does double duty: stores the state of the aggregation as well as serves queries.

With the above example and some others, we derived some rules of thumb that we follow that allow us to minimize the auto-created topics when defining the stream itself:

  • Prefer to join KStreams to KTables over KTables to KTables.  Stream-to-table joins do not require backing topics.
  • Minimize the number of aggregate steps (also applies to reduce, count, or transform), preferably to one per stream.  If you are aggregating many KStream events into a single state, first merge all the input KStreams into one, then do a single aggregate on the merged KStream.  We often package disparate events into a single protobuf message with a oneof field for this purpose.
  • Design your topics ahead of time to minimize the need for repartitioning.  We are still in the process of perfecting this, and it is difficult because different topics have different partitioning needs.  Our current thinking is to define “tee shirt size” levels of topic partitioning (very busy, kinda busy, not busy, etc.).  Then repartition early, only when you are crossing from one tee shirt size to another, to the lowest tee shirt size possible.

Use Change Log Topics Directly

One of the most common auto-created topics for Kafka Streams is the “change log” topic, which backs any stateful streaming stage or queryable store.  These topics always end with the string “-changelog.”  Initially, if we wanted a microservice to publish a KTable for other services to consume, we’d add a “to” step at the end of the stream, as in this example:

stream("InputStreamTopic")
  .groupByKey()
  .aggregate(someAggregationFunction)
  .to("OutputTableTopic")

This ends up creating two topics with exact copies of the same information.  “OutputTableTopic” is one; the other is created behind the scenes for the aggregate stage.  Both have the exact same keys and values (i.e. the output of the aggregation stage).  We did this because we have our own naming conventions for topics, and using our custom topic in the “to” stage allowed us to adhere to those.  However, we have decided that adhering naming conventions is not worth the cost of a redundant topic (which is disk space as well as CPU on the broker and client, plus some latency).  We now do this:

stream("InputStreamTopic")
 .groupByKey()
 .aggregate(someAggregationFunction, "queryableStoreName")

This causes the backing topic for the aggregate stage to have a deterministic name, namely “application-name-queryableStoreName-changelog” (where “application-name” is the name you give to the entire Kafka Streams application).  We then  allow other microservices to subscribe directly to this topic to use it as an input KTable.

Note that this approach is not without challenges.  For instance, a common practice in Kafka Streams is to attach a version number to the name of a streaming app, which you increment any time you change the app in a way that is not compatible with the old version.  Doing this also changes the names of any change log topics, since they include the name of the streaming app.  This would in turn break any services that consume the old topic.  We don’t have a good solution for this yet.

Use Kafka 1.0 (Only Don’t Yet)

The Kafka 1.0 client libraries include many improvements that reduce the number of backing topics created for stateful stages.  For instance, in some cases, KTable-to-KTable joins can be performed without a backing topic at all. There is one problem though: Kafka 1.0 has a critical bug precisely with this optimization, which renders it unusable for most streaming applications.  We are currently still using the 0.11.0.2 client libraries with plans to switch as soon as we’re satisfied the issue is fixed.

Coming Soon…

Topic/partition explosion is but one of the many weird and wonderful issues we’ve encountered since implementing Kafka Streams into many of our microservices.  In upcoming entries, I’ll discuss other challenges, such as:

  • Revving streams.  Specific challenges apply if you ever change the topology of a streaming application, including changing the version of the Kafka client libraries, or even certain configuration parameters. Some of these are covered by published best practices, some we have had to figure out ourselves.
  • Interactive Query Startup.  For large stores, the interactive query feature of Kafka Streams can take up to several minutes to start up, during which time queries do not work. We had to use Kubernetes readiness probes in combination with some specific calls into Kafka Streams to cause this not to manifest as downtime to the end user.
  • Interactive Query Downtime.  If a replica of a microservice goes offline temporarily, any queryable Kafka Streams stores associated with that service also go offline, possibly for several minutes. This ended up being unacceptable for us since it causes several minutes of partial downtime every time we restart a replica of a service for any reason. An open issue with Kafka covers this but has no published ETA. We have also implemented a fairly involved system to allow interactive queries to remain online during restarts or crashes of service replicas.

See you next time!

Adventures In Event Sourcing

“[Event sourcing] is an interface style that not everyone is comfortable with, and many find to be awkward.”
Martin Fowler

That Wasn’t Easy What I Just Did

When our office started nearly 2 years ago, a goal of mine was to make sure that our products made the fullest use possible of cutting edge technologies and patterns for cloud software.

Event sourcing was a…I guess a design pattern that was prevalent enough in my various readings that it seemed wise to become deeply acquainted with it.  Now, with close to a year and a half of product development and our first release behind us, we’ve already collected a rich history of successes, failures, and hard-learned lessons in the area of event sourcing.

Many of these I wished had been more plainly spelled out at the beginning of all this, so my objective with this blog is to do just that.  I’ll start with a general survey of event sourcing, then talk about some specific lessons we learned the hard way.  In a later blog post, I’ll examine a case study from a real production application that applies these principles.

Events May Have Been Sourced

So first, why event sourcing?  It’s a difficult pattern, easy to massively screw up, and hard to master.  It’s not well-supported by pre-packaged solutions or technologies (this is changing though).  So we must be getting something for our trouble, right?  A lot of the answer to this question is dealt with in the Fowler article I quoted above.  In brief, what excited us was:

  • Data scalability.  Event sourcing provides a (somewhat) well-understood framework for reasoning about handling large amounts of rapidly changing data.  Robots tend to do that, so this sounded like a good idea.
  • Distribution.  Event sourcing inherently handles the problem of applications that are widely distributed.  You trade away strong, immediate consistency for ultra-high performance at scale with eventual consistency.
  • Autonomy.  A future blog entry will address Microservices and why I think they are awesome.  In fact I think any good framework for dividing up large scale software is awesome.  Building a system of actors that emit events as opposed to calling into one another synchronously is a great way to decouple elements of your system in time and space.
  • Robustness.  Event sourcing gives you a single, durable, immutable (append-only) source of truth.  Individual parts of your system can come and go, and the state of the system in whole or part can always be inferred from this source of truth at any time.

There are many other advantages to event sourcing as well, but the above were the most compelling to us and the reasons why we felt it worthwhile to embark on the long and arduous journey of figuring out how to make it work.

Wait…Event Who-To-The-What Now?

Before I proceed, I should also probably describe briefly what event sourcing is.

Really you should read the Fowler article first, but to sum up: in an event sourcing system, components of your system express their state by emitting events that represent state transitions.  Those events are stored in a sequential log.  The state of any part of the system can be reconstructed at any time by simply reading the log.  Event sourcing also usually goes hand-in-hand with Command Query Responsibility Segregation (CQRS).

It is natural in event sourcing for separate subsystems to generate events in response to some sort of stimulus (command) and to allow clients to ask questions about the state of a system (queries).  Here’s a diagram with some terms I’ll use throughout this post and later posts as well:

I’ll call the reader’s attention to a few important details:

  • There are several independent data stores in this model.  One is the event store, which is your ultimate source of truth.  But every subsystem also has a backing data store that it uses to do its job.
  • All the subsystems are independent from one another in every way except that they all communicate to the same event store.  In general, these systems don’t call into one another and may be in separate microservices entirely.  They most definitely don’t share data stores (apart from the event store).

Of course, all this immediately begs many questions about implementation like:

  • How long does the event store store events?  Forever?
  • In order to reconstruct state, do I have to read the log from the beginning of time?  Every time?  Doesn’t that get more expensive the older your system gets?
  • Does the logic for reconstructing state from events have to be replicated everywhere?  In every microservice?

And that doesn’t even get into more basic questions like:

  • What should I use as an event store?
  • Are there canned implementations of event sourcing?

I’ll try to give practical answers to these questions.  Not authoritative one-size-fits-all answers, but hopefully with enough context that it will save the reader a lot of the heartache we had to go through to arrive at practical, successful event sourcing.

Step 1: Seriously, Use Kafka

I can answer the question of what event store to use pretty easily: Kafka.  I am sure there are plenty of other viable choices.  We chose Kafka (and haven’t regretted for a second in the past year and a half) because:

  • It is actively developed
  • It is used by industry-leading companies with more to gain or lose from Kafka’s success (or lack thereof) than you.
  • Its basic semantics are very amenable to event sourcing (more on this later).
  • Its APIs for Scala are excellent.
  • And of course it performs very well and has excellent stability

Central to Kafka’s good fit for event sourcing is the way it handles partitioning.  Kafka (like many publish/subscribe brokers) has a concept of a topic you can publish or subscribe to.  However the semantics of a Kafka topic are fairly unique: a topic has a certain number of partitions, and each message you publish to a topic has a key.  It helps to think of a message key similarly to how you’d thing about a primary key of a row in a database.  The key determines (via a user-supplied hashing function) what partition a message ends up in.

This becomes important when you consider what happens on the subscribe side.  Subscribers join a consumer group.  Within a consumer group, Kafka automatically assigns a subset of the total partitions to each subscriber in the group, such that every partition in the topic is going to exactly one subscriber.  Combined with the fact that Kafka guarantees in-order, at-least-once delivery of every message in a partition, this means:

  • Topics are automatically load-balanced among subscribers in the same consumer group.  This means you can scale the compute resources you bring to bear on any stream processing task seamlessly, dynamically.  Kafka does an excellent job of reassigning partitions as subscribers come and go.
  • For a particular key, a single (as in exactly one) consumer will see all messages for that key in order, at least once.  This means that a consumer can cache state for a particular key in memory without going to a database on every event since you know that all the events for that key are coming to you, in order, and no one else.

Central to event sourcing is the ability to reconstruct the state of some entity in your system by replaying its events.  Kafka makes this easy because you know a subscriber is going to see all the events for any entity represented by a key in order, so you only need to define the logic that takes in the current state and an event and spits out the new state, then run that on every event.  I’ll refer to this logic as the materialization function.  This is a term I made up, and is an example of a term I think should be more standardized and actively reasoned about in event sourcing, but that we had to discover and invent terms for ourselves.  I’ll talk more about some details of the materialization function below.

Kafka does have some limitations you have to work with however.  The biggest is the log size.  Kafka does not store events forever nor would you want it to (the default is a week).  You need an upper bound on how long it will take to chew through the backlog of events.  You also need an upper bound on how much storage Kafka needs.  To work around this, you need to take periodic snapshots of your materialized state and persist them somehow, at a rate more often than the length of the log.  So in actuality, you are reconstructing state by replaying events on top of a recent snapshot.

The Query Side

It may seem backward to start by examining the query side of things, because ultimately the rubber meets the road when you start emitting events that that represent state changes in the system.  But trust me (you do, or else why are you reading this right?), the queries are the hard part.  Usually you define the events a system might emit when you are designing the command system, but it’s important that you keep a few basic needs of the query side in mind when doing so, or else you’ll find yourself painted into some unappealing corners.

Defining a query system involves a few tasks that have their own challenges that I will discuss:

Task 1.  Decide what you want the materialized state of your system to look like.

This is almost always dictated by the types of queries you want to perform on the system.  It may not represent the complete state of the system, and you may have several separate query systems all listening to the same event stream(s), but are concerned with answering different kinds of questions.

To use an example from robotics, let’s say we have an event stream representing position changes in a robot arm.  One (easy) question we might want to answer is, “what is the current position of the arm?”  Another (less easy) one might be “what percent of a given time window has the robot arm been in motion vs. still?”  These would be served by 2 separate query systems with 2 separate data stores and schemas.  The “what is the current position” is probably simple: a database of robot ID to position.  The latter is less simple, maybe a database with time-series rows with % time in motion for a particular time window, or even multiple tables with varying time granularities.  These are both actual examples of production use cases we have.

Task 2.  Define your materialization function.

You need to write some logic that takes an event and applies it to your current materialized state in order to create a new materialized state, which can be persisted and queried.  This is a place where we learned a lesson I wish we had known from the outset: your materialization function should be intrinsically idempotent.  

In other words, you should be able to play the same event many times on the same materialized state, and it will not change from subsequent invocations.  Also when I say “intrinsically” idempotent, I mean that it (in my experience) does not work well to try to “bolt on” idempotent behavior to a function that isn’t.  Let me illustrate with an example.  Suppose you are modeling a bank account.  You have events for “account debited” and “account credited” when the account balance decreases or increases.  You could model these events in one of two ways:

  1. The account has been debited $10.
  2. The account balance has changed to $200.

For event #1, it is impossible to write an intrinsically idempotent materialization function.  If you play that event twice, you get debited $20.  You can try to “bolt on” idempotent behavior by (for instance) giving the event a unique ID, then somehow tracking IDs of recently applied events and ignoring duplicates, but in practice that is difficult.

Now you have to answer questions like, “how many recent IDs do you keep track of?”  Those take up space.  They might take up a lot of space for high-frequency events.  How long does your “recently processed” backlog have to be in order to guarantee no duplicates?  This opens up a set of engineering considerations that are nice to avoid.  Therefore, if possible, prefer an event style like #2.  As long as this event is played in order relative to the events that come before and after it (Kafka guarantees this), you can play this event as many times as you want and your state will still come out consistent.

Idempotent materialization functions are important for other reasons.  I mentioned above that since Kafka doesn’t store all events for all time, in practice you will be persisting periodic snapshots of your materialized state.  That means when a microservice first boots up, it will load a snapshot then start playing events on top of that.

Many of these events might have occurred before the snapshot was taken, because it’s impossible (or at least difficult and expensive) to ensure transactional consistency between your snapshot store and your event store.  If your materialization functions are idempotent, this does not matter.  As long as your events come in order (and again, you get this from Kafka for free), it is perfectly okay to replay an entire stream of them on top of a later snapshot, and that snapshot will become consistent once your events catch up to the present.  Stated more generally, idempotent materialization functions make dealing with at-least-once delivery completely free and transparent.

Considering the above, another important design principle emerges: you should design your events such that it is possible to write intrinsically idempotent materialization functions.  This is what I alluded to above: the needs of the query system can and should influence how you design your events.  You can’t (and shouldn’t) design your events “in a box” when designing your command system.  Going back to the robotic example, our events that signal a position change in a robot arm look like “Axis 2 joint changed to absolute position X” not “Axis 2 moved by N radians”.  This allows us to design intrinsically idempotent events which in turn vastly simplifies the design on the query systems that listen to those events.

Task 3.  Hook up your Kafka subscription.  If you did the above 2 tasks properly, this is only slightly more involved than pressing a button.  Kafka’s guarantees make your event-to-query pipe work almost by magic.  The only concern you have at this step is how to persist your periodic snapshots and how to commit your offsets back to Kafka.  Committing to Kafka is important, because if you crash or reboot, Kafka will replay all messages from the topic that have not been committed.  The process we use is quite simple:

  • Persist a snapshot of each materialized state (you will have one per key in the Kafka topic, per query system) once every so often.  We usually do it every few minutes.  This is a lot faster than needed given Kafka’s log length, but gives us much better performance if a microservice reboots, since we’re only chewing through a few minutes of events vs. several hours or days.
  • Commit to Kafka only after persistence of the materialized state was successful, and take care only to commit messages that have already been replayed into that materialized state.

The above means that if a microservice crashes, you might get some un-committed events replayed on top of a snapshot that was taken after those events occurred.  But since you followed the advice about idempotent materialization functions above, you don’t care.  It all just works.

Task 4.  Implement the actual querying.  This was a tough one to get right.  What makes it tough is a combination of these facts:

  • Materialized state exists in memory, and is only persisted every several minutes when we save a snapshot.  Therefore we can’t query the snapshot store directly or else we might get really old data.  We have to figure out how to query the in-memory state.
  • Because of how Kafka partitions topics, if you have many replicas of a microservice consuming a topic, Kafka will pick where any particular partition (and therefore any particular key) goes.  Somehow, you have to figure out what replica of your microservice that Kafka is sending your data to and route the query there.

To accomplish this, we use Akka Clustering.  If you’re not familiar with the technology, Akka is an actor- and stream-based computation framework for Java and Scala that we use a lot.  Its clustering feature is effectively a lightweight, high-performance, peer-to-peer networking framework that is very convenient to use when you need replicas of a microservice to communicate with one another.  Akka allows us to register “actors” at a network transparent path (similar to a file path or URL), which we name after a key of an entity we want to query.  Routing a query to that entity is as simple as registering an actor at a well-known path for every entity, on the replica of the microservice that is receiving events for that key from Kafka.  In this way, we implement ultra-high-performance in-memory queries of objects based on materializing state from events.

As a brief aside, a technology we are keeping a close eye on is the Kafka Streams API.  This API has a system for dealing with this exact same issue (see section on “Discovering any instance’s stores”) when querying stream state.  We have not started using it yet mainly due to its immaturity, but it’s the closest thing I have seen to a fully-canned implementation of Kafka-based event sourcing.

The Command Side

The command side is often simple by comparison.  We have many command implementations that simply accept a command, then turn around and publish an event or two, and as long as we didn’t have a low-level failure talking to Kafka, then we report success.  Where possible you should strive to design command systems this simple.  In these cases, the command system has no write model (i.e. no persistent data store apart from the event store itself) and are trivial to implement.

Command implementations can become more complicated when you need some state in order to generate idempotent events.  Note that “idempotent event” is somewhat of a nonsensical term since events can’t be idempotent; what’s idempotent (or not) is the materialization function on the query side.  But, as discussed above, you must design your events carefully in order to support idempotent materialization.  Sometimes you can’t easily do this.  Let’s go back to the bank account example.  If someone wants to withdraw $10, and you want to generate an event that says “account balance changed to $200,” then you need current account balance.  This is where a write model comes in.  And with it, a few design challenges:

  • Obviously for something as important as a bank balance, you’ll want your data store to be transactional and strongly consistent.  That suggests a traditional RDBMS.  Which is fine, as long as your data is small and slow enough for an RDBMS.
  • If your data is too fast or big for an RDBMS, things get interesting.  We were tempted to store write model data in memory (and persist periodic snapshots) just like we do with the read model.  However this leaves you with a similar problem to the read model: in a replicated microservice, commands might go to any replica.  So somehow you must find where the one source of truth for each key is.
  • Also, persisting periodic snapshots doesn’t work like it does for the read model, since commands are not conceptually persistent (events are).  If you crash before persisting a snapshot but after emitting a “account balance changed to X” event, you now have an incorrect idea of account balance.

The unfortunate (or maybe fortunate) news is that we have not had to solve this problem yet because we haven’t developed a command system complicated enough to need it yet.  We have some upcoming use cases that may require this, so we’ve done some preliminary research.  This led us to another feature of the Akka framework, namely Akka persistence.  If you read that page, you’ll see that their example application is precisely the command side of an event sourcing system.  Even if we don’t end up using Akka persistence itself (I tend to be suspicious of systems that put too much abstraction on my persistence technologies…if you are creating Cassandra tables and Kafka topics behind my back with opaque schemas, that is close to an immediate disqualification in my book), its concepts are sound.  In particular:

  • You can use the system of storing state in memory and persisting period snapshots, as long as you also persist every event you post.
  • Fortunately we are already doing that, as Kafka is exactly a persistent event store.

There are some complications that we still have not worked through though, like the fact that using Kafka strictly as an event store has issues like not having an explicit concept of “you are done reading the backlog and are now up to date” so you have no straightforward way of knowing when you have restored your state from Kafka and are ready to start accepting commands again.  You probably have to guess and be prepared to correct errors post-hoc (staying with the bank account example, be prepared to deal with an account becoming overdrawn and/or take safety measures to prevent excessive overdrafts).  Again, these will be challenges that get left to a future blog when we actually have to deploy some code that tackles them.

What Was That Republish Thingy From The Diagram?

In an event sourcing system, 2 different types of entities can generate events:

  • A command system, which might be literally a user interacting with an app, or maybe some external system feeding data into your app such as a moving robot arm streaming data about its movements.
  • A republish system, which listens for events and generates other events in response to them.

Republish systems can be powerful and dangerous.  You use one if you have an entity whose state depends partially on the state of another entity, and you want to use event sourcing for both.  I’ll give an example from our actual product.  We have a microservice that handles a concept of “device groups.”  Users can organize devices (like robots) into hierarchical groups that are like folders in a file system.  In turn, certain users will have access to certain groups of devices.  We also use the device group service to represent the physical topology of a device: when we discover a device on the network, we will create a device group that represents its position in the network topology, which is how users of our system initially discover new devices.  In addition, we have a microservice that handles “user notifications,” which are what they sound like: any notification to a user that something potentially of interest has happened.

Both the device groups entity and the notification entity are implemented using event sourcing and CQRS.  Device groups have commands like “add/remove device” and “add/remove child group” that emit events like “device added” or “child group added”.  Notifications have commands like “mark as read” or “dismiss” and corresponding events.  There is also a republish system that observes events from the device groups system.  Whenever a new device is added to one of the groups that we know to be a special “network topology” group, we post a “new device discovered” notification to every user with access to see new devices.  Then, when that device is added to a user-created group, we consider that device to be “onboarded” and clear the notification (we also get this from observing device group events).

We learned a some lessons writing this republish system, namely:

  • Take care what you use as a key.  Device group events are keyed by device group ID.  Notification events are keyed by user ID.  Somehow we had to translate a device group ID to a user ID (actually several user IDs) before generating a notification event (actually several, one per user ID of interest).
  • Our first intuition was to query the user service for what users had access to see new devices.  This has issues however in that it means your materialization function is no longer idempotent.  You should think about a republish system as a query and command system all in one.  You are materializing events into a state, and that state then produces certain events when it changes, and those events produced are a function of both the previous state and the event being applied.  All the same concerns about idempotent materialization functions and at-least-once delivery apply here too.  If your materialization function depends on querying an external system whose state might change in unpredictable ways, it is no longer idempotent.
  • The solution was to also use event sourcing for user access.  Fortunately we had recently converted the user access system to use event sourcing for other reasons (mostly performance).  We therefore observed multiple event streams from multiple topics: changes to device groups and changes to user access.  Both of these materialized into the state of our notification republish system, and contributed to how we generate notification events to various users.  It also let us hit corner cases we had not even considered previously, like what if a device was recently discovered, then afterward a user is given access to that device?  That “just worked” in our new purely-event-sourced system, but would have been missed in a system that relied on synchronously querying an external system to materialize state.
  • Realize that if you’re listening to multiple Kafka topics, even if they are keyed by the same thing, you may see events in a non-deterministic order relative to one another.  Kafka’s in-order guarantee only applies to a single key in a single topic.
  • Also, more importantly, Kafka may assign the same key from different topics to totally different replicas in a cluster entirely!  Therefore if you are relying on materializing state into memory from multiple topics, this will not work.
  • However if your republish stream republishes to a single topic (which it should), and all your materialization functions at all stages of your event pipeline are idempotent, this does not matter.
  • In fact, sometimes the only thing a republish system does is take events from multiple topics with the same key and publish them back to a single topic,

In summary: if you find that the system that materializes your state from events, whether in a query or a republish system, is querying some external system in order to do that, stop it.  Instead get that information from observing events from that system.  Also never listen to multiple Kafka topics and expect to materialize events from those multiple topics into memory, since Kafka can assign partitions from different topics in ways such that different replicas will see the same key.  Stated more simply: Kafka’s strong in-order guarantees per key only apply within a single topic.  To work around this, republish these events to a single topic and materialize from there.  The Kafka Streams API I mention above has support for doing this automatically with a private topic.  We do this my creating the private topic manually ourselves.

Other Design Concerns

Event Format

Since the event store is a shared data store, which if you want to get super-technical is an anti-pattern in microservice design, it is important that services that publish and consume data from one another agree on some basic things.  In particular:

  • The format of the events themselves must be controlled, agreed upon, and support evolutionary change.  Since in Kafka events are just binary blobs, we use Google protocol buffers (protobuf) to define data formats for events.  Protocol buffers have establish patterns and practices for evolving data formats, and have excellent size and speed characteristics as well.
  • The same agreement applies to keys of course, which are also binary blobs in Kafka.
  • With very few exceptions, only one microservice ever publishes to a particular topic (i.e. emits events).  In other words, although you might have many query systems or even republish systems that subscribe to a topic, you should only have one publisher.  This gets around all sorts of concerns that arise if you try to share or agree on state of a write model between many publishers.  For republish systems, they might subscribe to topics that other services also subscribe to, but they should always own the topic(s) they publish to.  These might be “private topics” in the sense that the same microservice listens to the same topic, which only exists for purposes of merging events from many topics into one (the issues with that were discussed in the section above).

Back Pressure

Back pressure is incredibly important.  Kafka has breathtakingly fast performance when it comes to slamming events to consumers.  When you first start a new service, it may chew through several days of events from a topic it has never subscribed to before.  It is all but guaranteed that Kafka will send you events faster than you can process them.  If you just consume them and toss them in an unbounded buffer somewhere,  you’ll run out of memory (and then crash, then restart, then immediately run out of memory again and so on…if it sounds like I am having flashbacks then that is no mistake).  If you drop messages, you’ll miss events and your materialized state will be inconsistent.  The only solution is that you have to somehow flow control the Kafka consumer, i.e. slow down the message stream to the speed with which you can process them.  Even using the new consumer API, this code can be complicated and error prone.  Instead, we use Reactive Kafka, which is a Kafka plug in to the Akka Streams API (yes another Akka API, we love that stuff).  This allows you to program a stream application with Kafka at the source or sink end (or both) with full back pressure natively built into the programming model.  If any stage in the pipeline backs up, it will flow control all the way up to the broker seamlessly.

The Difference Between Commands And Events

This topic is dealt with in many existing writings on event sourcing but it bears repeating.  Commands and events are not the same thing.  A command is something you want to happen.  It may fail.  You may want to withdraw $100 from your account.  If your account only has $10 in it, I am not going to let you.  An event is something that definitely happened.  A command (“do this please”) will emit events if (and usually only if) it is successful (“this happened”).  Events are truth.  Commands are desires.  I feel like I am going to get metaphysical here about the difference between desires and the truth, so I’ll conclude with: it often helps to name events in the past tense (“account balance changed to X”) and commands in the imperative mood (“withdraw X from my account”).

Log Compaction And Event Bootstrapping

A recent feature addition to Kafka is log compaction.  In brief, instead of chopping off events that are some amount of time old from the Kafka log, Kafka instead retains at least the single most recent event for every key in the topic.  This solves a challenge we hit in event sourcing, namely bootstrapping.  Imagine you have designed an event sourcing system.  Then months later you come up with some new questions you want to answer, which means you want to develop a new query system.  Maybe even a new republish system or two.  Problem is, you only have access to the last few days of events, and since you are a new system you don’t have any snapshots to start from.  Somehow you have to do the thing we already know Kafka can’t do, i.e. play events from the beginning of time.

One way we have solved this is to build boostrap “buttons” into the command systems.  You hit a button, and we read the current state of an entity from the write model and emit events representing the current state of everything.  This doesn’t represent all events for all time, but it does not have to.  Going back to the bank account analogy, we only need to emit a single “account balance set to X” for each account, with the latest balance.  We stand up the new query system, then we hit the bootstrap button.  Since all our materialization functions are idempotent, the redundant events are consumed by the already existing systems with no side effects.  And now the new system has full state.  This works, but it is clunky and also has potential for race conditions unless you are very careful (you don’t want to emit a bootstrap event out of order with a “real” event in response to a command that happens while bootstrapping is in progress).

Another solution is to use log compaction.  This way you know the latest state of every key is in the Kafka backlog and never gets chopped off.  However, there is one additional concern you sign up for when using log compaction, namely your events must represent the full state of your system as opposed to a delta.  If you are using idempotent materialization functions, you may be almost all the way there.  But maybe not entirely — remember our device groups service?  Its events looked like “device added/removed.”  As in, a single device per event.  These are idempotent, but they are not complete.  If the only event we have in the Kafka log is “device X added to group Y”, that does not tell us the full set of devices that group Y owns.  It also doesn’t tell us anything at all about changes to a group’s child groups, or a group’s name (which are separate events).  To support log compaction, we need the event to capture the full state of a device group.  It would look like “group Y now has children A, B, and C and devices 1, 2, and 3.  It is also named ‘foo’.”  This in turn means your events are larger and more expensive to store and transmit.  It also means you probably need more logic and/or a more involved and complicated data store on the command side to generate a full-state event from a simple command like “add device X to group Y please.”

Read And Write Models

At some point in your journey with event sourcing, you will be tempted to share a data store between a command system and a query system.  Resist this temptation.  For simple systems, this may mostly work, and the information a command system needs might be 90% in common with the information the query system needs.  However you will find that these will diverge over time as you get more use cases and demands on the system.  This is an area where it pays dividends to do things according to convention from the outset even if it’s not completely obvious why at the time.

And Yeah That Kafka Stream Thing

As I mentioned before the Kafka Streams API seems to have promise.  It might be the just-opinionated-enough event sourcing API (notably only the query side, but that is the hard part) we have been waiting for.  They have abstractions that capture complicated concepts like the log compaction/full state issue above, as well as many of the issues discussed with the republish system.  Right now I am mostly scared off by the fact that many of the interfaces are still marked as “unstable” and the fact there is not a good Scala API for it yet.  Also, it doesn’t yet have all the features we’d need to solve some of the harder problems we have (that mostly amount to complicated relationships between event-sourced entities).

And That’s All (For Now)

In a future blog post I’ll do a case study of the (currently) most complicated event sourcing system in our application, which ironically enough has nothing to do with robotics (it’s the user access management system).  I’ll also talk more generally about microservices, which have a lot to do with event sourcing since event sourcing provides a way to keep microservices truly autonomous and independent.  See you next time!