What And Why?
Kafka Streams is a relatively new technology available as part of the standard Kafka distribution. I’ve mentioned it before in previous blog posts such as my massive wall of text on event sourcing. It’s attractive to us because it solves a lot of the difficult problems with event sourcing with Kafka, many of which I discussed in that post. We have several use cases for event sourcing, where its use makes certain features much easier to implement at scale than more traditional approaches. One such application is discussed in this post.
We have now implemented several of our microservices using Kafka Streams, and have discovered and overcome a host of challenges along the way to becoming (relatively) fluent. In this series of posts, I’ll discuss these challenges, how we addressed them, and what the outlook is for Kafka Streams in our products going forward.
We also make extensive use of the “interactive queries” feature of Kafka Streams, which has allowed us to address querying data that is both big and fast in ways that other storage technologies have challenges with. This feature has its own set of specific implementation challenges, some of which I’ll discuss in this post, as well as later posts in this series.
This post assumes basic familiarity with Kafka and Kafka Streams concepts, such as the difference between a KStream and a KTable, as well as the basics of how Kafka topics and partitions work. If you’re not there yet, take a look at this introductory post on Kafka Streams concepts.
Challenge: Topic/Partition Explosion
Kafka Streams creates a lot of topics for you behind the scenes under a host of circumstances. For instance, you get a topic created:
- For any stage that has state, such as an aggregate or KTable join.
- For any interactive query store you create with logging support.
- Any time you join two KStreams that are not co-partitioned already
For even a medium-complexity streaming application, this can quickly reach tens of topics, each with as many partitions as the input topics to the stream. This has several issues, all of which we have experienced in one form or another:
- More latency. If you are publishing data back and forth to the brokers many times, you pay the round trip time at least, and possibly the commit interval time (a configuration parameter for the stream), for every stage that does this.
- More disk usage. Publishing stream state through many topics means that you’re using disk space for redundant or partially-redundant copies of data on your brokers. In more than one case we ran our brokers out of disk space with disastrous results. Details in a later post…
- Drain on broker resources. Brokers have to do work for each partition of each topic they host, even if those partitions are not particularly active. For log-compacted topics (which many of the auto-created topics are), this amount of work can be large. We observed several cases where our brokers would experience significant slow-down simply due to the load of hosting many partitions.
- Drain on client resources. A lot of the work of re-publishing and subscribing to all these intermediate topics is done client-side. For large volumes of data, this load can be significant. This manifests as additional latency, and in extreme cases, consumers falling behind.
- Slow client startup. When clients with state stores start up, they have to replay all the partitions of all the KTables backing their state stores into their local storage. They do this one partition at a time, and the time this takes scales roughly with the total number of partitions in all state stores in the application.
We took several measures to control the topic/partition explosion problem.
Repartition source topics down.
In a few cases, we had source topics with large numbers of partitions, because those topics are very active. Downstream processing might reduce or aggregate the data so that it is much less active or large, making the number of partitions required much less. Don’t let the needs of the input data dictate the partitioning of your entire stream, especially because complicated streams may create many topics.
For example, we have an application where a stream took high-frequency robot operational data as input. This is the biggest, fastest data in our entire application and flows through a topic with many partitions. However the output was quite small: we simply aggregate the input data to a single value per robot, namely the frequency of the data in Hz. To accomplish this, we first transformed the input data into something much smaller (namely a time stamp and number of data points) in a simple map step. Then we repartitioned that down to 1/10th as many partitions, and used that as an input to the aggregation stage. This in turn ensured that the topic created behind the scenes to capture the state of the aggregation had a small number of partitions. In pseudocode, this looks like:
stream("TopicWithLotsOfPartitions") .mapValues(transformToTimeStampAndPoints) .through("TopicWithFewPartitions") .groupByKey() .aggregate(calculateRunningFrequency, "frequencyStoreName")
Now we have a queryable store with the update frequency for each robot without having to host a topic with the same (large) number of partitions as the raw robot high-frequency data.
Reduce Number Of Stateful Streaming Stages
This seems pretty obvious in retrospect, but it’s easy to do and pays big dividends. In short, be mindful of all the streaming stages that cause topics to be created behind the scenes, and minimize those.
As an example, we had an application where we were joining a KStream of robot events with a KTable of meta-information about the robot such as a user-assigned name. We then aggregate the events into a notion of current state for a robot (resulting from all the events happening to it over time). Initially, the stream looked like this:
stream("RobotEventsTopic") .groupByKey() .aggregate(calculateRobotEventState) .leftJoin("RobotMetadataTable", joinRobotData, "queryableStoreName")
This looks reasonable enough, but it creates two topics behind the scenes: one for the state of the aggregate stage and one for the queryable store we specify in the left-join stage. Each will have as many partitions as “RobotEventsTopic”. The data between these two topics is almost completely redundant. With a simple rewrite however, we were able to eliminate one of these topics with no compromise in functionality:
stream("RobotEventsTopic") .leftJoin("RobotMetadataTable", joinRobotData) .groupByKey() .aggregate(calculateRobotEventState, "queryableStoreName")
The trick here was that we made the aggregate stage the same stage as the queryable store, so we only need one topic that now does double duty: stores the state of the aggregation as well as serves queries.
With the above example and some others, we derived some rules of thumb that we follow that allow us to minimize the auto-created topics when defining the stream itself:
- Prefer to join KStreams to KTables over KTables to KTables. Stream-to-table joins do not require backing topics.
- Minimize the number of aggregate steps (also applies to reduce, count, or transform), preferably to one per stream. If you are aggregating many KStream events into a single state, first merge all the input KStreams into one, then do a single aggregate on the merged KStream. We often package disparate events into a single protobuf message with a oneof field for this purpose.
- Design your topics ahead of time to minimize the need for repartitioning. We are still in the process of perfecting this, and it is difficult because different topics have different partitioning needs. Our current thinking is to define “tee shirt size” levels of topic partitioning (very busy, kinda busy, not busy, etc.). Then repartition early, only when you are crossing from one tee shirt size to another, to the lowest tee shirt size possible.
Use Change Log Topics Directly
One of the most common auto-created topics for Kafka Streams is the “change log” topic, which backs any stateful streaming stage or queryable store. These topics always end with the string “-changelog.” Initially, if we wanted a microservice to publish a KTable for other services to consume, we’d add a “to” step at the end of the stream, as in this example:
stream("InputStreamTopic") .groupByKey() .aggregate(someAggregationFunction) .to("OutputTableTopic")
This ends up creating two topics with exact copies of the same information. “OutputTableTopic” is one; the other is created behind the scenes for the aggregate stage. Both have the exact same keys and values (i.e. the output of the aggregation stage). We did this because we have our own naming conventions for topics, and using our custom topic in the “to” stage allowed us to adhere to those. However, we have decided that adhering naming conventions is not worth the cost of a redundant topic (which is disk space as well as CPU on the broker and client, plus some latency). We now do this:
stream("InputStreamTopic") .groupByKey() .aggregate(someAggregationFunction, "queryableStoreName")
This causes the backing topic for the aggregate stage to have a deterministic name, namely “application-name-queryableStoreName-changelog” (where “application-name” is the name you give to the entire Kafka Streams application). We then allow other microservices to subscribe directly to this topic to use it as an input KTable.
Note that this approach is not without challenges. For instance, a common practice in Kafka Streams is to attach a version number to the name of a streaming app, which you increment any time you change the app in a way that is not compatible with the old version. Doing this also changes the names of any change log topics, since they include the name of the streaming app. This would in turn break any services that consume the old topic. We don’t have a good solution for this yet.
Use Kafka 1.0 (Only Don’t Yet)
The Kafka 1.0 client libraries include many improvements that reduce the number of backing topics created for stateful stages. For instance, in some cases, KTable-to-KTable joins can be performed without a backing topic at all. There is one problem though: Kafka 1.0 has a critical bug precisely with this optimization, which renders it unusable for most streaming applications. We are currently still using the 0.11.0.2 client libraries with plans to switch as soon as we’re satisfied the issue is fixed.
Topic/partition explosion is but one of the many weird and wonderful issues we’ve encountered since implementing Kafka Streams into many of our microservices. In upcoming entries, I’ll discuss other challenges, such as:
- Revving streams. Specific challenges apply if you ever change the topology of a streaming application, including changing the version of the Kafka client libraries, or even certain configuration parameters. Some of these are covered by published best practices, some we have had to figure out ourselves.
- Interactive Query Startup. For large stores, the interactive query feature of Kafka Streams can take up to several minutes to start up, during which time queries do not work. We had to use Kubernetes readiness probes in combination with some specific calls into Kafka Streams to cause this not to manifest as downtime to the end user.
- Interactive Query Downtime. If a replica of a microservice goes offline temporarily, any queryable Kafka Streams stores associated with that service also go offline, possibly for several minutes. This ended up being unacceptable for us since it causes several minutes of partial downtime every time we restart a replica of a service for any reason. An open issue with Kafka covers this but has no published ETA. We have also implemented a fairly involved system to allow interactive queries to remain online during restarts or crashes of service replicas.
See you next time!