26 February 2020

Why is Pausing a Kafka Consumer so Complicated

If you have any Kafka consumers it is almost certainly a good idea to be able to pause them. There are a few reasons - for example a circuit break situation where downstream processing has failed and you need to stop consuming more messages, or a rolling deployment on a horizontally scaled application, if you don't want old and new code consuming messages at the same time, so a pause across deployment is a good idea.

Fortunately the Consumer interface has both pause and resume methods, and because they are idempotent naturally you might approach pausing as follows: in my poll loop check a flag (that can be set by another thread) to see if I should be paused. If so, call pause. If not, call resume.

Then we hit a problem - the pause and resume methods take an argument, that argument is a Collection of TopicPartitions. I'm using automatic partition allocation so how do I know what to pass? Ah, no problem, there's an assignment method that returns a Collection of TopicPartitions.

Okay now my code looks great:

while (true) {
  if (paused) {
  } else {

Lovely, all works perfectly - until a rebalance happens while paused. Now my consumers all get exactly one batch of records. What's going on?

It doesn't take much debugging to discover that after a rebalance my consumer is no longer paused when entering the while loop, and my call to assignment() returns an empty Collection so I pause nothing. Next time round the loop, assignment() returns a Collection of the partitions I'm subscribed to, as expected, so the code works from that point on, but too late to stop getting one batch.

So what happened? First question is why was my consumer unpaused - well, this is because there is no pause flag at the consumer level - when we call pause on the consumer, that paused state is actually held by instances of TopicPartitionState indirectly referenced by the consumer, and over the course of a rebalance the existing instances associated with my consumer are ditched and new ones created - these may or may not relate to the same partitions at the broker end, but that makes no difference - they are brand new instances that know nothing about what has happened previously. Because of this, after rebalance the Consumer and the new TopicPartitionStates have no awareness that they were meant to be paused, so it's up to application code to deal with that.

Second question is, why the empty Collection - and that is simply about when the consumer knows about its assigned partitions. The call to assignment() doesn't trigger any request to the broker or consumer co-ordinator - it just reports back the current assignment. Communication with the broker really only happens in the poll method - in fact pretty much everything happens in the poll method. Unlike producers, consumer client code is single threaded [1] - when we talk about rebalance and assignment it is easy to think of these things going on in the background - but actually for any given consumer these things all happen sequentially within the poll method - between my code calling poll, and that method returning control to my code, a lot is happening - the client needs to talk to the co-ordinator to find out if anything significant is happening in the group (eg a rebalance), it needs to make sure it has assignments, it needs to make sure it has up to date metadata (so it knows which broker is leader for each of its assigned partitions), it needs to check if it knows the offsets for each partition, and look up the last committed offset if not, it needs to see if it's time to commit previously polled records if auto-commit is on - oh, and of course it needs to get hold of some records too!

So over the course of a rebalance, one call to poll will lead to the discovery that all partitions have been revoked (removing all the associated TopicPartitionState objects), and some subsequent call to poll will discover that some partitions have been assigned. That call then goes on to fetch records from those freshly assigned partitions before returning to your application code.

Fortunately this is where the ConsumerRebalanceListener interface comes into play - if you add a ConsumerRebalanceListener when subscribing your consumer to a topic, it will get called during the execution of the poll method - when a rebalance has been initiated its onPartitionsRevoked method is called, and then after the rebalance has been completed, its onPartitionsAssigned method is called - and crucially this happens BEFORE the poll method checks if it is paused, which in turn happens before fetching records. Hence all you have to do is carry out a pause check in your RebalanceListener:

override fun onPartitionsAssigned(assignment: Collection): Unit {
  if (paused) {

Simple when you know how, but a real gotcha when you don't. No surprise really, as the behaviour is just not intuitive. So why is it done that way?

Instinctively, it feels like pausing shouldn't need to know about the allocation of partitions - either my consumer is paused or it isn't. The Javadoc on KafkaConsumer gives some insight on why this may be a simplistic view. While I tend to have a different consumer for each topic, that is not a constraint of the Kafka Consumer, and it is reasonable to have a single consumer subscribed to multiple topics - this may in fact be quite common in streaming applications where input from several topics is processed and merged into a single output. In such cases it is possible that the consumption from one or more topics may lag behind others - this could lead to messages falling in different time windows. A reasonable way of handling that is by pausing consumption on the topics that are ahead, to allow others to catch up. While I've been thinking about partitions, it is of course a Collection of TopicPartitions that is used for pausing - each one a combination of TOPIC and partition (mind you, the Javadoc and variable names in KafkaConsumer consistently emphasise the partitions, pretty much ignoring the presence of topic)

However, this still doesn't explain why I can't just pause on topics - would there ever be a need to pause some partitions and not others? Admittedly if the producers used a very unbalanced key strategy then it might be necessary to allow consumption of busier partitions to catch up by pausing the sparser ones - the degree of granularity offered allows this. Maybe this is the reason for the design that holds pause state in TopicPartitionState, and that decision then inevitably leads to the loss of this state across a rebalance. But I would expect the need for this level of granularity to be uncommon, and while partition allocation varies across rebalances, topic subscription does not. It seems conceptually feasible to implement convenience pause methods that take in a Collection of topic names, or no arguments, and internally mark the topics as paused in the consumer - this state could then easily span rebalances - just being checked during poll at the same time that ConsumerRebalanceListener callbacks are invoked, and significantly less error prone for users who just want to pause all consumption, or consumption at the topic level.

Although I'm considering a pull request, I'm slightly uncomfortable about having two different types of pause state, transient in the TopicPartitionState and more durable at the Consumer or Fetcher level. In principle that may not be so problematic as it is internal, but worse would be the consumer interface which would appear to have 3 overloaded pause methods differentiated only by granularity, when actually the partition level method would behave significantly differently in the event of rebalance. Such semantics could be made clear in the Javadoc, but it sounds like I'd be replacing an existing unintuitive interface with one that is inconsistent. Which is the lesser of two evils?

[1] actually it isn't quite single threaded - if it was it would be pretty inefficient as the consumer would hang about doing nothing while your code processes a batch of records, and then your code would hang about doing nothing while the consumer received records from the brokers. Instead the background heartbeat thread is also pre-fetching continuously to that when the poll() method asks for the latest records it is just getting the ones that have already been fetched since the last poll. However this detail doesn't have any impact on the sequence of processing within poll, and so was glossed over in this post.

23 February 2020

Kafka Acks Configuration - an Apology

I like Apache Kafka - it's a real engineer's tool. Like an Austin Maxi [1], it's built on simple, solid, reliable foundations, it will keep running smoothly as long as you're prepared to roll up your sleeves every now and then and have a tinker under the bonnet, but even then it can still surprise you at times. 

Because I like Kafka, I want other people to enjoy using it (as long as it's the right tool for them), so I try to be helpful whenever I can, especially if they have just recently picked it up and have lots of questions. Well, I thought I was being helpful, but actually it turns out that for a while I've been casting false accusations, and lying through my teeth - I'm actually not the nice helpful person I thought I was. 

First the lies - the problem is that I see lots of teams who adopt Kafka and they really, really want to override configuration - "what should I set my consumer connections.max.idle.ms to?", "What should I use for my producer request.timeout.ms?" The trouble is, NONE of the producer or consumer configuration properties should be touched without understanding what they do and what the implications are of changing them, especially as some really need to be adjusted together - like flying a helicopter, and for that reason I ALWAYS tell people to go with the defaults - they're sensible, don't change them unless you need to. And then the accusations - I hear people say "I've lost messages, what happened? Why did Kafka lose them?". To those people I ALWAYS say - "hey, Kafka doesn't lose messages, I'll bet the problem is in your consumer code - you're consuming but then silently discarding messages or not processing them for one reason or another." 

Then our team started losing messages, and I discovered that I was wrong on both counts, for the usual reasons - assumptions and a failure to properly understand the documentation. 

My first incorrect assumption is that Kafka always guarantees at least once delivery - if you publish a message it will always be consumed at least once, regardless of what happens. The second, and closely related assumption, is that default configuration will always serve you well. 

The problem is that Kafka is flexible: if you can afford to lose a few messages, then you can use Kafka for efficient and  phenomenally high throughput, if you can't afford to lose any messages, then Kafka can handle that too but at the cost of lower throughput. Because of this flexibility, default configuration CAN'T serve both purposes - and as I discovered to my cost the default producer configuration DOES NOT guarantee at least once delivery. The reason is that small innocuous configuration - 'acks'. 

There are 3 different values (strictly 4 but one is an alias), and unlike many configurations where an understanding of client code is enough, to understand the implications of your choice on acks you really have to understand what is happening at the broker end. 

Now if you are short of time or don't like reading, I'll cut to the chase - guaranteed delivery? choose acks=all, not too fussed if you lose a few? choose acks=1, really don't care too much but just want to shove through as many messages per second as your weedy little server can possibly handle? choose acks=0. 

However, feel free to read on to look at what these really mean in a little bit more detail, and to understand the implications of your choice. 

Now when I started losing messages, after a bit of due diligence on our client code, it became clear that the problem was probably at the producer end, and acks might be the culprit. We had gone with the default setting of 1, and the documentation states - "This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost." 

My initial naive interpretation concentrated on the first half of the sentence and took it to mean that as long as the leader writes to disk we are all okay, the message isn't lost, it's on disk. So, I started to look a bit at what 'written to local log' really means. Kafka just delegates to the Operating System and Operating Systems cache data rather than persisting it straight away - flushing to disk when it makes sense. So was that the issue? The leader being shut down unexpectedly with data still in memory? Then it dawned on me that I was focusing on the wrong bit of that sentence in the documentation, writes to disk are not the important point here - the key is replication - it doesn't really matter one bit if the leader has stored the messages persistently on disk. 

So how does replication work in Kafka? Unless you perversely choose no replication (which you are free to do), each partition is hosted by multiple brokers, and however many replicas you have (3 is a pretty standard number) one broker is leader and the rest are followers. When we send messages to Kafka we only ever talk to the leader of the partition - the producer client code figures out which partition a message belongs to, and uses cached metadata to find out which broker is the leader for that partition. The followers are only there as a safety net, and they are essentially just Kafka consumers - they are continuously polling the leader, and like any consumer, each time they fetch they tell the leader their current offset (or rather the next message they want to receive). In this way the leader knows where all the followers have got to, and hence which ones are keeping up (in-sync) - by default the definition of keeping up is that they are effectively less than 10 seconds behind the leader. So, the leader knows which messages have definitely been replicated, and which ones may not have been, and as the leader is the only broker that consumer clients talk to, it can ensure that they only see messages that have been replicated. This is the definition of the high watermark - the high watermark is NOT the head of the partition, as I used to think. There are likely to be other messages ahead of it, but it is the latest message that is visible to consumers. 

Now, if a follower dies at any point, then their partition will be picked up by another broker and the cluster will get back in sync - in normal times followers aren't that important. If on the other hand the leader dies then one of the in-sync followers will be elected leader, and start servicing producers and consumers. BUT hang on - that follower might be up to 10 seconds behind the old leader, and that can cover an awful lot of messages that are no longer available. What happens about that? 

This is where acks come in - the messages have gone as far as the cluster is concerned, but acks determines whether the producers get to know about it. Actually, with no acks, the producers don't even know if their messages reached the leader - the servers may have all caught fire but those producers will cheerfully keep throwing messages into the flames. With just one ack, the producers don't care about replication, they are happy when the leader has the message. In this scenario, lots of producers may be sending messages without a care in the world, not knowing that behind the scenes followers have started to lag - maybe there's a bit of temporary network latency, or maybe the cluster is always like that. If at that point a server gets restarted - maybe it's time for some patching - then one of the in-sync followers is going to take over as leader, but the leader that has now gone may have successfully stored 9 seconds worth of messages that have not yet been fetched by the new leader, so they are gone. No consumer will ever see them, and no producer knows about it. Even if the old leader comes back to life, that 9 seconds worth of messages is useless - the world has moved on, they can't just be made available as if nothing had happened. 

With all acks, this situation won't cause any loss - the messages you sent will not be acknowledged until all followers[2] have sent a fetch request with an offset later than those messages, and if followers or the leader die, the producer client will timeout before getting that acknowledgement, and retry. The downside is that even when everything is fine, if the followers are 9 seconds behind the leader then that's how long your producer will wait for an acknowledgement. So now you might be asking incredulously if your application is going to be regularly blocked for up to 10 seconds (or more if there are elections going on at the broker end). Relax, it's not your application code that is blocked - unlike consuming, producing is asynchronous for this very reason, the producer send method returns immediately, a separate thread is continuously sending batches of messages to the brokers from a buffer. A temporary issue at the broker end will just cause the buffer to fill up a bit. If everything is really grinding to a halt, then the buffer will fill up, and send method calls will fail immediately, so you're application will know about it. And what to do about that? Well now you have a problem to solve - if there's a temporary spike that has overwhelmed the server end, you probably want to open a circuit breaker at this point and fail over to some other strategy - maybe store pending messages somewhere until the situation eases. 

So how likely is any of this? Are we worrying about something that probably won't happen? Well, the question is really - do you EVER restart a server. For resilience, Kafka spreads partitions across brokers on different servers and racks, and so each server will host a mix of different partitions, acting as follower for some and leader for others - or to put it another way EVERY server is a leader for at least one partition and so restarting a server WILL almost certainly lose messages ahead of the high watermark, whether they exist on disk or not. That is why acks=1 will pretty much inevitably cause message loss. 

[1] My first car was a beautiful Austin Maxi - I remember rebuilding the starter motor from the remains of two starter motors I found at the scrap yard - the days when cars were made to be messed about with. I also remember having to push the thing to a petrol station when the fuel gauge decided to play up - they weigh a ton! 

[2] A slight detail here is that we don't always have to wait for all the followers - min.insync.replicas is a broker end configuration that defines what is good enough for all acks, in the case where one or more followers have dropped out of sync. With 3 replicas (1 leader and 2 followers) it is common to have a minimum of 2 in sync. This means that when both followers are in-sync the leader will wait for both (even if one of them is 1 second behind and the other is 9 seconds behind). But if one follower has fallen behind far enough to be considered out of sync, the leader doesn't wait for the partition to be picked up by a new follower, and so the producer doesn't wait either - it effectively trusts that the leader and still in-sync follower won't both fail before everything gets back to normal - here lies the only risk if min.insync.replicas is lower than the total replicas.