26 February 2020

Why is Pausing a Kafka Consumer so Complicated

If you have any Kafka consumers it is almost certainly a good idea to be able to pause them. There are a few reasons - for example a circuit break situation where downstream processing has failed and you need to stop consuming more messages, or a rolling deployment on a horizontally scaled application, if you don't want old and new code consuming messages at the same time, so a pause across deployment is a good idea.

Fortunately the Consumer interface has both pause and resume methods, and because they are idempotent naturally you might approach pausing as follows: in my poll loop check a flag (that can be set by another thread) to see if I should be paused. If so, call pause. If not, call resume.

Then we hit a problem - the pause and resume methods take an argument, that argument is a Collection of TopicPartitions. I'm using automatic partition allocation so how do I know what to pass? Ah, no problem, there's an assignment method that returns a Collection of TopicPartitions.

Okay now my code looks great:

while (true) {
  if (paused) {
    consumer.pause(consumer.assignment())
  } else {
    consumer.resume(consumer.assignment())
  }
  handleRecords(consumer.poll(timeout))
}

Lovely, all works perfectly - until a rebalance happens while paused. Now my consumers all get exactly one batch of records. What's going on?

It doesn't take much debugging to discover that after a rebalance my consumer is no longer paused when entering the while loop, and my call to assignment() returns an empty Collection so I pause nothing. Next time round the loop, assignment() returns a Collection of the partitions I'm subscribed to, as expected, so the code works from that point on, but too late to stop getting one batch.

So what happened? First question is why was my consumer unpaused - well, this is because there is no pause flag at the consumer level - when we call pause on the consumer, that paused state is actually held by instances of TopicPartitionState indirectly referenced by the consumer, and over the course of a rebalance the existing instances associated with my consumer are ditched and new ones created - these may or may not relate to the same partitions at the broker end, but that makes no difference - they are brand new instances that know nothing about what has happened previously. Because of this, after rebalance the Consumer and the new TopicPartitionStates have no awareness that they were meant to be paused, so it's up to application code to deal with that.

Second question is, why the empty Collection - and that is simply about when the consumer knows about its assigned partitions. The call to assignment() doesn't trigger any request to the broker or consumer co-ordinator - it just reports back the current assignment. Communication with the broker really only happens in the poll method - in fact pretty much everything happens in the poll method. Unlike producers, consumer client code is single threaded [1] - when we talk about rebalance and assignment it is easy to think of these things going on in the background - but actually for any given consumer these things all happen sequentially within the poll method - between my code calling poll, and that method returning control to my code, a lot is happening - the client needs to talk to the co-ordinator to find out if anything significant is happening in the group (eg a rebalance), it needs to make sure it has assignments, it needs to make sure it has up to date metadata (so it knows which broker is leader for each of its assigned partitions), it needs to check if it knows the offsets for each partition, and look up the last committed offset if not, it needs to see if it's time to commit previously polled records if auto-commit is on - oh, and of course it needs to get hold of some records too!

So over the course of a rebalance, one call to poll will lead to the discovery that all partitions have been revoked (removing all the associated TopicPartitionState objects), and some subsequent call to poll will discover that some partitions have been assigned. That call then goes on to fetch records from those freshly assigned partitions before returning to your application code.

Fortunately this is where the ConsumerRebalanceListener interface comes into play - if you add a ConsumerRebalanceListener when subscribing your consumer to a topic, it will get called during the execution of the poll method - when a rebalance has been initiated its onPartitionsRevoked method is called, and then after the rebalance has been completed, its onPartitionsAssigned method is called - and crucially this happens BEFORE the poll method checks if it is paused, which in turn happens before fetching records. Hence all you have to do is carry out a pause check in your RebalanceListener:

override fun onPartitionsAssigned(assignment: Collection): Unit {
  if (paused) {
    consumer.pause(assignment)
  }
}

Simple when you know how, but a real gotcha when you don't. No surprise really, as the behaviour is just not intuitive. So why is it done that way?

Instinctively, it feels like pausing shouldn't need to know about the allocation of partitions - either my consumer is paused or it isn't. The Javadoc on KafkaConsumer gives some insight on why this may be a simplistic view. While I tend to have a different consumer for each topic, that is not a constraint of the Kafka Consumer, and it is reasonable to have a single consumer subscribed to multiple topics - this may in fact be quite common in streaming applications where input from several topics is processed and merged into a single output. In such cases it is possible that the consumption from one or more topics may lag behind others - this could lead to messages falling in different time windows. A reasonable way of handling that is by pausing consumption on the topics that are ahead, to allow others to catch up. While I've been thinking about partitions, it is of course a Collection of TopicPartitions that is used for pausing - each one a combination of TOPIC and partition (mind you, the Javadoc and variable names in KafkaConsumer consistently emphasise the partitions, pretty much ignoring the presence of topic)

However, this still doesn't explain why I can't just pause on topics - would there ever be a need to pause some partitions and not others? Admittedly if the producers used a very unbalanced key strategy then it might be necessary to allow consumption of busier partitions to catch up by pausing the sparser ones - the degree of granularity offered allows this. Maybe this is the reason for the design that holds pause state in TopicPartitionState, and that decision then inevitably leads to the loss of this state across a rebalance. But I would expect the need for this level of granularity to be uncommon, and while partition allocation varies across rebalances, topic subscription does not. It seems conceptually feasible to implement convenience pause methods that take in a Collection of topic names, or no arguments, and internally mark the topics as paused in the consumer - this state could then easily span rebalances - just being checked during poll at the same time that ConsumerRebalanceListener callbacks are invoked, and significantly less error prone for users who just want to pause all consumption, or consumption at the topic level.

Although I'm considering a pull request, I'm slightly uncomfortable about having two different types of pause state, transient in the TopicPartitionState and more durable at the Consumer or Fetcher level. In principle that may not be so problematic as it is internal, but worse would be the consumer interface which would appear to have 3 overloaded pause methods differentiated only by granularity, when actually the partition level method would behave significantly differently in the event of rebalance. Such semantics could be made clear in the Javadoc, but it sounds like I'd be replacing an existing unintuitive interface with one that is inconsistent. Which is the lesser of two evils?

[1] actually it isn't quite single threaded - if it was it would be pretty inefficient as the consumer would hang about doing nothing while your code processes a batch of records, and then your code would hang about doing nothing while the consumer received records from the brokers. Instead the background heartbeat thread is also pre-fetching continuously to that when the poll() method asks for the latest records it is just getting the ones that have already been fetched since the last poll. However this detail doesn't have any impact on the sequence of processing within poll, and so was glossed over in this post.

1 comment:

chris said...

Actually, my postscript is incorrect and based on my misunderstanding - the potential for prefetch is purely due to the fact that you can specify a maximum poll size but fetching is not at the granularity of messages - nor is publishing. Messages are bundled up in batches that are completely opaque to the broker. That means when a batch is fetched it may not all be presented to the poll method as the limit may be reached leading to another poll when some of a batch remains