Introduction

I almost called this post “conquering the non-idempotent event materialization function,” in reference to my event sourcing wall o’ text from last year, but I didn’t want to lose 80% of my audience before the first paragraph. It would have been an appropriate title however. Still here? In the previous event sourcing post, I talked about the challenges of working with events whose materialization functions are non-idempotent, meaning that if you replay the same event multiple times, your system does not come out the same.

A trivial example of this would be an event that says “$100 added to bank account.” If you play this event 4 times, you add $400 to the account. By contrast, an event that says “account balance is now $100” can be played as many times as you want, and the account balance will remain consistent. In the previous post, I concluded that the latter type of event is preferable, since it allows you to materialize events into system state in an idempotent fashion, and duplicate events are a non-issue. This is convenient, because it’s common for Kafka to replay duplicate events (and in general, “at least once” is a much easier delivery guarantee to enforce than “exactly once”).

In this post, I detail a case where idempotent materialization was impossible: namely a streaming analytics routine that counted incoming numerical data and aggregated the sum of that data, very similar to aggregating “$100 added to account balance” events. We have several new cases where this is critical. For instance, KUKA’s “ready2_fasten” screw fastening package uses our KUKA Connect cloud application to report metrics on screw fastening over time, such as number of successful or unsuccessful attempts in a particular time window. In this case the events looked like “this many new screws have been fastened.” We had to prevent duplicate event replay if the data were to be accurate. This had to be done in a way that could be resilient to transient Kafka issues, microservice crashes/reboots/re-deployments, and network retires. Here is how we did that.

…And How We Didn’t Do That

We did not use Kafka Streams to solve this problem. Soon, I will write a post detailing why we abandoned Kafka Streams for the time being due to various challenges implementing it in production. For now, suffice it to say that we had concluded it was not an option. This was a shame, because if it actually worked, Kafka Streams has some powerful facilities for defining non-idempotent aggregations without having to worry about many of the complicated underlying details.

We did use Akka Streams and Akka’s Kafka connector to accomplish this however. This means we had to implement a few things that Kafka Streams supposedly handles for you, such as guarding against duplicate event replay even when events are consumed and republished by what I refer to as “republish systems” (systems that consume Kafka messages, transform them, and publish them to new topics) in the previous event sourcing post. This had to hold true even if the republish system and the aggregation system were in different microservices.

The Details

The breakthrough in this approach is relatively obvious in retrospect: use the Kafka partition offset. Any time you receive a Kafka message, it has an “offset”, which is a 64-bit integer that increments by one for every message published to a Kafka partition. Every message with an offset of n occurred after any message with offset <n and before any message with offset >n. Also, due to the way Kakfa handles partitioning, we knew that every message for any particular “thing” (in this case a robot, represented by a Kafka partition key corresponding to the robot’s serial number) is in the same partition, and therefore ordered with respect to any other message from that robot. Therefore, the procedure became, simply:

  • When you aggregate events into a state for a robot, that state should include the Kafka offset of the latest message that was aggregated into that state.
  • If you encounter a message with an offset earlier than that stored in the current state, discard it.

This sounds straightforward, but there was some complexity to it. We had several stages in our streaming app that were effectively “flat maps” meaning that one incoming event could produce several outgoing events. The outgoing events all corresponded to a single incoming Kafka message with a single offset. This meant 2 things:

  1. We needed to keep track of the source message offset through all stages of the Akka stream so we could update any aggregation state at any stage with the latest offset of the original Kafka message that produced that state (and discard earlier ones as described above).
  2. Events produced by a “flat map” stage (i.e. multiple events produced by a single Kafka message) must be processed as a batch! It was critical that we avoided processing part of a batch of messages, then possibly crashing. In that case we would not know where in the single Kafka message offset that produced the collection of messages we were processing and had left off. This could cause us to either lose data (by skipping part of a partially-processed collection) or replay duplicate events (by double-processing part of pa partially-processed collection), both of which were unacceptable. Therefore, we passed “flat mapped” events through the stream as a collection (usually a Scala immutable.Queue due to its good append/prepend/dequeue performance) with a single attached Kafka offset, and the aggregate states were updated and persisted only after processing the entire collection.

Persistence And Load Handling

The last remaining problem to solve was how to persist and load aggregated states. When a microservice boots up and starts receiving Kafka messages, we first need to load up the current state of every aggregation stage in the Akka stream (which remember, includes the latest-processed Kafka offset in each stage). Then we can start playing events on top of that state (or discarding events whose Kafka offset is too early).

For this, we used a mix of Redis and Cassandra. A full discussion of the trade offs between the two is somewhat beyond the scope here, but a short treatment would be that Redis is better with smaller, faster data while Cassandra is better with bigger, slower data. We had both flavors of data in our system.

Akka gave us a lot of flexibility in how to persist states. The basic approach we used was:

  • Pick a persistence interval. This is how often we write our aggregated states to Cassandra/Redis. This is a tunable parameter of the system. The longer it is, the less we stress out the database, but the more Kafka events we have to potentially replay on a reboot or crash. Also we can only count on our data being as “fresh” as this interval. We used 1 minute. This was very easy to accomplish using Akka’s built in throttle and conflate functions. We throttled the stream elements representing updated state to 1 per minute per device, and conflated them by simply selecting the most recent state. This is what went to Cassandra/Redis. Kafka Streams has no equivalent of throttle/conflate, or really any time- or rate-aware streaming primitives, which is one of my chief gripes about it.
  • Only commit the message to Kafka after the states that include that Kafka event have been written to Cassandra/Redis. This means that if the system crashes, we will replay any lost Kafka events. We might also replay events that have already been persisted if the system crashes after persisting to Cassandra/Redis but before committing to Kafka. However, this is okay since we ignore Kafka messages with an offset earlier than the latest one that has already been aggregated into the state, as described above.
  • Persist states in the reverse order that the stages appear in the Akka stream. This required some thought. If a stream has 4 aggregation stages, what happens if you persist only 2 of them then crash? When you recover, if the ones you persisted before crashing are the earlier stages, then you might have earlier stages in the stream with later Kafka offsets. If an earlier Kafka message comes in, the earlier stream stage with the later offset will discard that message, and it will never make it to the later stages that still need that event since they were not persisted before the crash. This is trivially avoided by persisting the later (more downstream) stages before the earlier (upstream) ones.

And Then, Oops

Once we had implemented the above system, we ran it in our test environment for several days. To our shock and dismay, we still observed analytics results indicating duplicate events had been replaying. After some frustrating investigation, we identified the cause: republish systems. I discuss these in the previous event sourcing post: they represent any system that takes in Kafka messages, does calculations, and publishes other Kafka messages representing the results of those calculations.

In the case of our streaming analytics, there was in fact a republish system upstream of our analytics stream. This system would translate raw incoming data from robots into a standard data format that most of KUKA Connect uses. Obviously this system could crash just like our analytics system. And equally-obviously, crashing could cause Kafka to replay recent messages that had not been committed. Since our analytics system paid attention only to the Kafka offset of the topic it was directly consuming, and not the input topic to any republish streams that came before it, we could still get duplicate events.

One possible fix would have been to add the same behavior of tracking the Kafka message offset — then discarding earlier messages — into the republish system as well. The problem was that (in this case at least), the republish system was stateless. It had no aggregation state at all, and we’d have had to add one for no other purpose than tracking Kafka offsets.  This could have worked, but it would have meant saving to (and possibly stressing out) a database where we did not previously do this. Stateful streams are also more memory intensive and not as high performing as stateless streams. Finally, several services downstream of this republish system were idempotent already and did not care about message replay. In these cases we’d be paying a cost to persist an aggregation state we didn’t even need. We wanted a general solution that would work for stateless streams just as well as stateful.

Fortunately, we already had a standard protobuf message that we used to wrap any messages we publish to Kafka event streams, that handled things like timestamp and a unique ID for the event. To this, we added what we termed a source topic chain, which is an ordered list of tuples of the following data for each republish system that a message passes through:

  • Topic name
  • Partition number
  • Partition offset

This collection of topic-information tuples is ordered according to the order in which messages pass through republish systems, so we can always tell the “history” of a message no matter how many times it was republished.

Using this information, we only had to change our existing logic that would track the latest Kafka offset for each aggregation state a little bit:

  • Instead of looking at the offset of the current message, look at the offset of the very first message in the republish chain, i.e. the “root cause” of this event. Track that offset in the aggregation state and discard messages of earlier offsets as before.
  • Also track topic and partition. If either of those changes, assumed we changed something about the topology of the republish chain, or possibly repartitioned a Kafka topic. In that case, start over (throw out the tracked latest offset for the different partition/topic and start with the new topic/partition/offset)

Conclusion

Using these approaches together, we were able to implement a streaming analytics system that was resilient to replayed events even though it included non-idempotent event materialization functions. To summarize, the approach was:

  • Track the path of a message through republish systems by appending the topic/partition/offset of every input message to a republish system in the output message of that republish system.
  • In every non-idempotent aggregation state of a stream, also keep track of the latest offset processed, as well as its topic name and partition number.
  • If an incoming message is of the same topic/partition and an earlier offset than the latest one tracked in the aggregated state, discard it. If the topic or partition changes, discard the old offset information and start over.
  • Only commit an offset to Kafka after persisting all the aggregation states corresponding to that offset.
  • For a stream with multiple aggregation stages, persist aggregation states in downstream-first order.
  • Use multi-rate Akka primitives such as conflate and throttle to control how often you persist aggregation states. Tune this value to achieve the proper trade off between performance and data “freshness.”
  • If a streaming stage “flat maps” its inputs, i.e. produces multiple output elements for a single input, avoid Akka primitives that expand the output stream such as mapConcat. Instead, track the output elements as a collection (i.e. the output of the flow should be a collection of things, not a single thing). Process that collection as a single element and only persist the aggregated state after processing the entire collection.
  • Create a class you can use as a streaming element that provides access to the Kafka topic/partition/offset and source topic chain information at every stage in the Akka stream.

Until next time!