I like Apache Kafka - it's a real engineer's tool. Like an Austin Maxi [1], it's built on simple, solid, reliable foundations, it will keep running smoothly as long as you're prepared to roll up your sleeves every now and then and have a tinker under the bonnet, but even then it can still surprise you at times.
Because I like Kafka, I want other people to enjoy using it (as long as it's the right tool for them), so I try to be helpful whenever I can, especially if they have just recently picked it up and have lots of questions. Well, I thought I was being helpful, but actually it turns out that for a while I've been casting false accusations, and lying through my teeth - I'm actually not the nice helpful person I thought I was.
First the lies - the problem is that I see lots of teams who adopt Kafka and they really, really want to override configuration - "what should I set my consumer connections.max.idle.ms to?", "What should I use for my producer request.timeout.ms?" The trouble is, NONE of the producer or consumer configuration properties should be touched without understanding what they do and what the implications are of changing them, especially as some really need to be adjusted together - like flying a helicopter, and for that reason I ALWAYS tell people to go with the defaults - they're sensible, don't change them unless you need to. And then the accusations - I hear people say "I've lost messages, what happened? Why did Kafka lose them?". To those people I ALWAYS say - "hey, Kafka doesn't lose messages, I'll bet the problem is in your consumer code - you're consuming but then silently discarding messages or not processing them for one reason or another."
Then our team started losing messages, and I discovered that I was wrong on both counts, for the usual reasons - assumptions and a failure to properly understand the documentation.
My first incorrect assumption is that Kafka always guarantees at least once delivery - if you publish a message it will always be consumed at least once, regardless of what happens. The second, and closely related assumption, is that default configuration will always serve you well.
The problem is that Kafka is flexible: if you can afford to lose a few messages, then you can use Kafka for efficient and phenomenally high throughput, if you can't afford to lose any messages, then Kafka can handle that too but at the cost of lower throughput. Because of this flexibility, default configuration CAN'T serve both purposes - and as I discovered to my cost the default producer configuration DOES NOT guarantee at least once delivery. The reason is that small innocuous configuration - 'acks'.
There are 3 different values (strictly 4 but one is an alias), and unlike many configurations where an understanding of client code is enough, to understand the implications of your choice on acks you really have to understand what is happening at the broker end.
Now if you are short of time or don't like reading, I'll cut to the chase - guaranteed delivery? choose acks=all, not too fussed if you lose a few? choose acks=1, really don't care too much but just want to shove through as many messages per second as your weedy little server can possibly handle? choose acks=0.
However, feel free to read on to look at what these really mean in a little bit more detail, and to understand the implications of your choice.
Now when I started losing messages, after a bit of due diligence on our client code, it became clear that the problem was probably at the producer end, and acks might be the culprit. We had gone with the default setting of 1, and the documentation states - "This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost."
My initial naive interpretation concentrated on the first half of the sentence and took it to mean that as long as the leader writes to disk we are all okay, the message isn't lost, it's on disk. So, I started to look a bit at what 'written to local log' really means. Kafka just delegates to the Operating System and Operating Systems cache data rather than persisting it straight away - flushing to disk when it makes sense. So was that the issue? The leader being shut down unexpectedly with data still in memory? Then it dawned on me that I was focusing on the wrong bit of that sentence in the documentation, writes to disk are not the important point here - the key is replication - it doesn't really matter one bit if the leader has stored the messages persistently on disk.
So how does replication work in Kafka? Unless you perversely choose no replication (which you are free to do), each partition is hosted by multiple brokers, and however many replicas you have (3 is a pretty standard number) one broker is leader and the rest are followers. When we send messages to Kafka we only ever talk to the leader of the partition - the producer client code figures out which partition a message belongs to, and uses cached metadata to find out which broker is the leader for that partition. The followers are only there as a safety net, and they are essentially just Kafka consumers - they are continuously polling the leader, and like any consumer, each time they fetch they tell the leader their current offset (or rather the next message they want to receive). In this way the leader knows where all the followers have got to, and hence which ones are keeping up (in-sync) - by default the definition of keeping up is that they are effectively less than 10 seconds behind the leader. So, the leader knows which messages have definitely been replicated, and which ones may not have been, and as the leader is the only broker that consumer clients talk to, it can ensure that they only see messages that have been replicated. This is the definition of the high watermark - the high watermark is NOT the head of the partition, as I used to think. There are likely to be other messages ahead of it, but it is the latest message that is visible to consumers.
Now, if a follower dies at any point, then their partition will be picked up by another broker and the cluster will get back in sync - in normal times followers aren't that important. If on the other hand the leader dies then one of the in-sync followers will be elected leader, and start servicing producers and consumers. BUT hang on - that follower might be up to 10 seconds behind the old leader, and that can cover an awful lot of messages that are no longer available. What happens about that?
This is where acks come in - the messages have gone as far as the cluster is concerned, but acks determines whether the producers get to know about it. Actually, with no acks, the producers don't even know if their messages reached the leader - the servers may have all caught fire but those producers will cheerfully keep throwing messages into the flames. With just one ack, the producers don't care about replication, they are happy when the leader has the message. In this scenario, lots of producers may be sending messages without a care in the world, not knowing that behind the scenes followers have started to lag - maybe there's a bit of temporary network latency, or maybe the cluster is always like that. If at that point a server gets restarted - maybe it's time for some patching - then one of the in-sync followers is going to take over as leader, but the leader that has now gone may have successfully stored 9 seconds worth of messages that have not yet been fetched by the new leader, so they are gone. No consumer will ever see them, and no producer knows about it. Even if the old leader comes back to life, that 9 seconds worth of messages is useless - the world has moved on, they can't just be made available as if nothing had happened.
With all acks, this situation won't cause any loss - the messages you sent will not be acknowledged until all followers[2] have sent a fetch request with an offset later than those messages, and if followers or the leader die, the producer client will timeout before getting that acknowledgement, and retry. The downside is that even when everything is fine, if the followers are 9 seconds behind the leader then that's how long your producer will wait for an acknowledgement. So now you might be asking incredulously if your application is going to be regularly blocked for up to 10 seconds (or more if there are elections going on at the broker end). Relax, it's not your application code that is blocked - unlike consuming, producing is asynchronous for this very reason, the producer send method returns immediately, a separate thread is continuously sending batches of messages to the brokers from a buffer. A temporary issue at the broker end will just cause the buffer to fill up a bit. If everything is really grinding to a halt, then the buffer will fill up, and send method calls will fail immediately, so you're application will know about it. And what to do about that? Well now you have a problem to solve - if there's a temporary spike that has overwhelmed the server end, you probably want to open a circuit breaker at this point and fail over to some other strategy - maybe store pending messages somewhere until the situation eases.
So how likely is any of this? Are we worrying about something that probably won't happen? Well, the question is really - do you EVER restart a server. For resilience, Kafka spreads partitions across brokers on different servers and racks, and so each server will host a mix of different partitions, acting as follower for some and leader for others - or to put it another way EVERY server is a leader for at least one partition and so restarting a server WILL almost certainly lose messages ahead of the high watermark, whether they exist on disk or not. That is why acks=1 will pretty much inevitably cause message loss.
[1] My first car was a beautiful Austin Maxi - I remember rebuilding the starter motor from the remains of two starter motors I found at the scrap yard - the days when cars were made to be messed about with. I also remember having to push the thing to a petrol station when the fuel gauge decided to play up - they weigh a ton!
[2] A slight detail here is that we don't always have to wait for all the followers - min.insync.replicas is a broker end configuration that defines what is good enough for all acks, in the case where one or more followers have dropped out of sync. With 3 replicas (1 leader and 2 followers) it is common to have a minimum of 2 in sync. This means that when both followers are in-sync the leader will wait for both (even if one of them is 1 second behind and the other is 9 seconds behind). But if one follower has fallen behind far enough to be considered out of sync, the leader doesn't wait for the partition to be picked up by a new follower, and so the producer doesn't wait either - it effectively trusts that the leader and still in-sync follower won't both fail before everything gets back to normal - here lies the only risk if min.insync.replicas is lower than the total replicas.
Because I like Kafka, I want other people to enjoy using it (as long as it's the right tool for them), so I try to be helpful whenever I can, especially if they have just recently picked it up and have lots of questions. Well, I thought I was being helpful, but actually it turns out that for a while I've been casting false accusations, and lying through my teeth - I'm actually not the nice helpful person I thought I was.
First the lies - the problem is that I see lots of teams who adopt Kafka and they really, really want to override configuration - "what should I set my consumer connections.max.idle.ms to?", "What should I use for my producer request.timeout.ms?" The trouble is, NONE of the producer or consumer configuration properties should be touched without understanding what they do and what the implications are of changing them, especially as some really need to be adjusted together - like flying a helicopter, and for that reason I ALWAYS tell people to go with the defaults - they're sensible, don't change them unless you need to. And then the accusations - I hear people say "I've lost messages, what happened? Why did Kafka lose them?". To those people I ALWAYS say - "hey, Kafka doesn't lose messages, I'll bet the problem is in your consumer code - you're consuming but then silently discarding messages or not processing them for one reason or another."
Then our team started losing messages, and I discovered that I was wrong on both counts, for the usual reasons - assumptions and a failure to properly understand the documentation.
My first incorrect assumption is that Kafka always guarantees at least once delivery - if you publish a message it will always be consumed at least once, regardless of what happens. The second, and closely related assumption, is that default configuration will always serve you well.
The problem is that Kafka is flexible: if you can afford to lose a few messages, then you can use Kafka for efficient and phenomenally high throughput, if you can't afford to lose any messages, then Kafka can handle that too but at the cost of lower throughput. Because of this flexibility, default configuration CAN'T serve both purposes - and as I discovered to my cost the default producer configuration DOES NOT guarantee at least once delivery. The reason is that small innocuous configuration - 'acks'.
There are 3 different values (strictly 4 but one is an alias), and unlike many configurations where an understanding of client code is enough, to understand the implications of your choice on acks you really have to understand what is happening at the broker end.
Now if you are short of time or don't like reading, I'll cut to the chase - guaranteed delivery? choose acks=all, not too fussed if you lose a few? choose acks=1, really don't care too much but just want to shove through as many messages per second as your weedy little server can possibly handle? choose acks=0.
However, feel free to read on to look at what these really mean in a little bit more detail, and to understand the implications of your choice.
Now when I started losing messages, after a bit of due diligence on our client code, it became clear that the problem was probably at the producer end, and acks might be the culprit. We had gone with the default setting of 1, and the documentation states - "This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost."
My initial naive interpretation concentrated on the first half of the sentence and took it to mean that as long as the leader writes to disk we are all okay, the message isn't lost, it's on disk. So, I started to look a bit at what 'written to local log' really means. Kafka just delegates to the Operating System and Operating Systems cache data rather than persisting it straight away - flushing to disk when it makes sense. So was that the issue? The leader being shut down unexpectedly with data still in memory? Then it dawned on me that I was focusing on the wrong bit of that sentence in the documentation, writes to disk are not the important point here - the key is replication - it doesn't really matter one bit if the leader has stored the messages persistently on disk.
So how does replication work in Kafka? Unless you perversely choose no replication (which you are free to do), each partition is hosted by multiple brokers, and however many replicas you have (3 is a pretty standard number) one broker is leader and the rest are followers. When we send messages to Kafka we only ever talk to the leader of the partition - the producer client code figures out which partition a message belongs to, and uses cached metadata to find out which broker is the leader for that partition. The followers are only there as a safety net, and they are essentially just Kafka consumers - they are continuously polling the leader, and like any consumer, each time they fetch they tell the leader their current offset (or rather the next message they want to receive). In this way the leader knows where all the followers have got to, and hence which ones are keeping up (in-sync) - by default the definition of keeping up is that they are effectively less than 10 seconds behind the leader. So, the leader knows which messages have definitely been replicated, and which ones may not have been, and as the leader is the only broker that consumer clients talk to, it can ensure that they only see messages that have been replicated. This is the definition of the high watermark - the high watermark is NOT the head of the partition, as I used to think. There are likely to be other messages ahead of it, but it is the latest message that is visible to consumers.
Now, if a follower dies at any point, then their partition will be picked up by another broker and the cluster will get back in sync - in normal times followers aren't that important. If on the other hand the leader dies then one of the in-sync followers will be elected leader, and start servicing producers and consumers. BUT hang on - that follower might be up to 10 seconds behind the old leader, and that can cover an awful lot of messages that are no longer available. What happens about that?
This is where acks come in - the messages have gone as far as the cluster is concerned, but acks determines whether the producers get to know about it. Actually, with no acks, the producers don't even know if their messages reached the leader - the servers may have all caught fire but those producers will cheerfully keep throwing messages into the flames. With just one ack, the producers don't care about replication, they are happy when the leader has the message. In this scenario, lots of producers may be sending messages without a care in the world, not knowing that behind the scenes followers have started to lag - maybe there's a bit of temporary network latency, or maybe the cluster is always like that. If at that point a server gets restarted - maybe it's time for some patching - then one of the in-sync followers is going to take over as leader, but the leader that has now gone may have successfully stored 9 seconds worth of messages that have not yet been fetched by the new leader, so they are gone. No consumer will ever see them, and no producer knows about it. Even if the old leader comes back to life, that 9 seconds worth of messages is useless - the world has moved on, they can't just be made available as if nothing had happened.
With all acks, this situation won't cause any loss - the messages you sent will not be acknowledged until all followers[2] have sent a fetch request with an offset later than those messages, and if followers or the leader die, the producer client will timeout before getting that acknowledgement, and retry. The downside is that even when everything is fine, if the followers are 9 seconds behind the leader then that's how long your producer will wait for an acknowledgement. So now you might be asking incredulously if your application is going to be regularly blocked for up to 10 seconds (or more if there are elections going on at the broker end). Relax, it's not your application code that is blocked - unlike consuming, producing is asynchronous for this very reason, the producer send method returns immediately, a separate thread is continuously sending batches of messages to the brokers from a buffer. A temporary issue at the broker end will just cause the buffer to fill up a bit. If everything is really grinding to a halt, then the buffer will fill up, and send method calls will fail immediately, so you're application will know about it. And what to do about that? Well now you have a problem to solve - if there's a temporary spike that has overwhelmed the server end, you probably want to open a circuit breaker at this point and fail over to some other strategy - maybe store pending messages somewhere until the situation eases.
So how likely is any of this? Are we worrying about something that probably won't happen? Well, the question is really - do you EVER restart a server. For resilience, Kafka spreads partitions across brokers on different servers and racks, and so each server will host a mix of different partitions, acting as follower for some and leader for others - or to put it another way EVERY server is a leader for at least one partition and so restarting a server WILL almost certainly lose messages ahead of the high watermark, whether they exist on disk or not. That is why acks=1 will pretty much inevitably cause message loss.
[1] My first car was a beautiful Austin Maxi - I remember rebuilding the starter motor from the remains of two starter motors I found at the scrap yard - the days when cars were made to be messed about with. I also remember having to push the thing to a petrol station when the fuel gauge decided to play up - they weigh a ton!
[2] A slight detail here is that we don't always have to wait for all the followers - min.insync.replicas is a broker end configuration that defines what is good enough for all acks, in the case where one or more followers have dropped out of sync. With 3 replicas (1 leader and 2 followers) it is common to have a minimum of 2 in sync. This means that when both followers are in-sync the leader will wait for both (even if one of them is 1 second behind and the other is 9 seconds behind). But if one follower has fallen behind far enough to be considered out of sync, the leader doesn't wait for the partition to be picked up by a new follower, and so the producer doesn't wait either - it effectively trusts that the leader and still in-sync follower won't both fail before everything gets back to normal - here lies the only risk if min.insync.replicas is lower than the total replicas.
1 comment:
Thanks Chris! We came across a similar 'discovery' recently where it was plain the manual wasn't read - hard lessons but one we won't forget. Thanks for sharing.
Post a Comment