Kafka Needs No Keeper

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
thanks guys yeah so today we're going to be talking about Kafka and how it's going to evolve in the future so Kafka has gotten a lot of mileage out of zookeeper and zookeeper is used in many ways in Kafka so we use it to store metadata we use it to receive notifications when certain things change and it's it's actually you know done pretty well at that so we have a lot of respect for zookeeper for the community behind it and for the authors but it is still a second system so it's another thing people have to learn it's another moving part all of those things so what we're talking about today is a new Kafka improvement proposal called kit 500 that's talking about how we can move beyond zookeeper and basically use Kafka itself to store metadata and manage it so I want to emphasize that this isn't going to be a 1:1 2:1 replacement of zookeeper so we're not replacing zookeeper with another system which is substantially like zookeeper so it's not about using something like one of the many projects which are similar to zookeeper but maybe have a slightly different API slightly different implementation it's actually about managing the metadata in a very different way and a way that we feel will be much more extensible and supportable in the future along the way I'm going to talk a little bit about you know how we got here and what direction we've been going and there's actually a lot of previous work that's been leading up to this so kit 500 is kind of like the culmination of a lot of work that the community has been doing for many years and so to start off I'm going to talk a little bit about the evolution of Apache kafka clients and you know you guys probably not already know already the client is the thing that talks to the server here we basically have three kinds of clients one of them is the producer that will put messages into the system the consumer that'll pull them out an Administrative Tools that will do stuff like creating topics deleting topics security related stuff setting up security settings that sort of thing so in the very beginning you know we had producers they would write to topics that was obviously going through the broker I'm using this Kafka symbol here to represent the broker processes we would have consumers that would read those topics but there was a hitch here which is that the consumers needed to store the offset that they last fetched from periodically and we use zookeeper for that in the beginning we also use zookeeper to understand what consumer group we were part of and understanding this is key to doing things like doing rebalancing when a consumer from a consumer group goes down and all that stuff and of course the Administrative Tools would also go directly to zookeepers so there would be no broker involvement in the administrative tools so if you wanted to delete a topic or create a topic you would just basically have a script that launched a scallop process and that would manipulate the zookeeper stuff directly so this was the situation back in like the point-eight Kafka point eight days the first room and I guess I should talk a little bit about why you know why there were limitations here before we go on so first of all you know zookeeper is not really a high bandwidth system so if you have a lot of consumers doing offset fetches and offset commits it can really start to bog down the system and it can interfere with stuff which is also key for zookeeper to do and secondly you know we're going to talk more about this later but this is a real obstacle to having effective security because all of the security that we implemented was actually on the broker and by going through zookeeper you're bypassing that layer of security so we started to move away from this and the first instance of this was the consumer group coordinator so as I mentioned earlier originally all of the consumer group stuff would just go through zookeeper and all of the offset fetch offset commits commit stuff would also go through zookeeper so you had these end-users who were talking directly to zookeeper in order to avoid this we were able to create these consumer api's and basically this offset fetch an offset commit became api's on the broker and so they were no longer things that you would talk directly to zookeeper to do but you could talk to the broker and then the broker could store this stuff internally in a topic so two things here we're creating a new API on the broker and we're storing the data in a different place it's no longer being stored in zookeeper but it's being stored in Kafka itself so this is kind of an example of Kafka on Kafka right we're using the system to store our own metadata here the second thing we did was basically to do the same for group partition assignment so we created a similar set of api's the join group API the sync group API heartbeat API and so we were able to take zookeeper out of the consumer read path and that's really good because then you know we can scale out the number of consumers more effectively and we can also basically secure consumers a lot easier so after this the diagram looks a little more like this right the producer and the consumer are going through the regular brokers and the admin tools are still talking directly to zoo keeper so before we talk more we should have a brief digression about security here so as Kafka was growing in popularity there was a greater demand to have security and in particular policies that we can enforce at a topic level and things like that so this is something that we implemented on the broker side through access control lists or Ackles we usually call them so you could have an Akal that would say like you can read from this thing or you can't read from this thing stuff like that but again when you go directly to zookeeper you're bypassing the Eccles so it's a bit of a hole in the security model in that sense so our next goal here was basically to rework the admin tools in terms of api's on the broker and in order to do that we created this thing called admin client admin client was similar to the producer and consumer in the sense that it's basically a class which communicates with the broker over well-defined api's and so these api's were things like create topics delete topics alter configs and so we've been reworking the admin tools so that they use the admin client rather than talking directly to zookeeper and by doing this we can do things like give people the ability to create topics let's start with a certain prefix but not the ability to create topics that start with another prefix or we can you know they can't delete certain topics but maybe they can create their own topics so rather than being an all-or-nothing security is now a lot more fine-grained and in fact we can even have programs which create Kafka topics as part of their regular operations so it's no longer just something that the admin has to from the command line so at this point you know that the diagram looks a lot more like this right you have the producer consumer and admin client all going through these api's and all talking to the broker and then the broker will manage whatever needs to be done in zookeeper so in addition to security there's actually a lot more benefits that we've only really touched on here and basically these are the benefits that you get by encapsulating the the data here so what do I mean by that what I mean is that the data in zookeeper is in a certain format right and over time that format will evolve but on the other hand people may use different versions of the client so it's actually really nice to have an API in the middle that can translate let's say an older request into the new format which we're now using and this helps with compatibility because we're no longer directly accessing basically the brain of Kafka we're actually going through these api's which give us the ability to to have like a well-defined interface and we can also validate these API so we can say stuff like well you know you can't you can't create this topic with these certain settings things like that and so it became possible to have policies which is something we really couldn't have when we're directly modifying stuff in zookeeper so those are all pretty important things there so so far we've just been talking about clients and how they interact with the system but it's actually equally important to think about how brokers interact with other brokers over the zookeeper and in the case of Kafka we actually have a lot of interacting to do so brokers need to register themselves with zookeeper so that people know where they are obviously we have to manage Eccles we have to know about what they are we have this concept of dynamic configuration which is basically configurations that you can change over time in a managed way have them changed at runtime we also have this thing called the ISR which is basically in the instinct replica set and this has to do with how we would do replication so this is all metadata that we need to worry about in order to do this Kafka elects a single broker to be the controller and this is kind of a familiar pattern if you've used a keeper before this idea of having a leader which is elected by zookeeper where you delegate certain responsibilities to that node and of course control their election is very important for us control elections management zookeeper so the model that we have here is basically that zookeeper will elect a single controller and that controller will update the rest of the system with the changes to metadata that are made and that's done through certain api's I'm not going to go into detail about what they are but basically the ones pictured in the top right here so the controller will be responsible for pushing out these updates to the metadata so even though I just got done saying this is the model there are actually some exceptions now one of them is that there are cases where brokers go directly to zookeeper and one of them is when changing the in sync replicas set so currently the leader of a partition will directly communicate with zookeeper when changing the in sync replicas set and this kind of creates a lot of the same problems that we saw with clients right which is that now it's sort of hard to change the format right because what if you have a broker from an older version that's changing this and it's overwriting the new format with an old one there's also issues about notifications we'll get into those later so one of the things that we've been working on lately is actually creating api's to avoid having this kind of direct zookeeper access from brokers so the model that we'd like to get to is a model where we centralized the zookeeper access in the controller so the controller is sort of the node that we're gonna delegate this work to and that gives us again a lot of the same benefits we had before so it's a lot easier to reason about a system where there's a single source of truth in this case the controller and well ideally zookeeper is the single source of truth but the controller should be like the the bottleneck through which changes flow to it it's a lot easier to reason about that than having dozens of brokers that are all making changes at once potentially overwriting each other's changes we have the benefits of better compatibility and of course you know we just have better encapsulation I should note in passing that there are facilities in zookeeper to manage concurrent writes there is basically like a compare and swap tech primitive but again that that sort of makes the system pretty hard to reason about if you start using it a lot so another important concept here is broker liveness and this is basically who's in the cluster right so this is actually a very important concept in any clustered system you have to know who's alive who's part of the system and what responsibilities they have at any given moment this is one of the things that zookeeper gives us currently and the way that it's done is by maintaining this zookeeper session so the zookeeper session is basically opened when the broker starts talking to the zookeeper cluster and the broker will create this node that talks about where it is how you contact it and so on so currently and that node will contain information like the IP addresses you can use the rack and so on so let's say that a broker actually goes away in that case the zookeeper session will also go away and this femoral z node will be removed now currently that will trigger a watch and that will update the controller and then the controller will push out these updates basically saying what just happened that the broker is offline so again this seems like a pretty simple system right you have zookeeper here and basically whenever the session goes away then the controller is notified and the controller pushes out the updates so let's talk about you know what can go wrong right because that's always the interesting part here well one thing that can go wrong is maybe the nodes not down at all but it's just partitioned off from the cluster in some way and you've probably heard the term network partition before basically the idea is that you know maybe some nodes can see each other but some other nodes can't the network partition that we like the most is the total partition where basically the node goes away and you could also regard this as being the case when the node crashes or the hardware goes away logically it's out of the cluster right no one can see it and Kafka handles this pretty well because the zookeeper session will go away and then the controller will notify everyone about what's going on and all the good things then there's the case of a just being partitioned from a broker or several brokers in this case you know there will actually be some potential loss of availability because you may not be able to replicate if you have a partition which relies on other on that other broker that will not be replicated but at least should keep consistency in this system and then you know the case which is most interesting for this talk is when your partition from zookeeper well I guess there's two cases that are most interesting for this talk but this is one of them and this case is interesting because your cluster you will not be regarded as part of the cluster and yet you know nodes can maybe continue talking to you which kind of creates a a bit of an uncomfortable situation but regardless of you know there's potentially some weird situations that can happen you'll still basically keep consistency here because the controller will remove you it'll remove leadership of all the partitions that this node has so again similar to case two we lose some availability but we still keep the consistency but then there's a fourth case which actually we don't handle that well and which kipp 500 will actually improve a lot as we'll see and that's the case of the controller partition so if you remember from earlier the controller is actually telling us about the changes that are happening the cluster topology so if we can't talk to the controller or if they can rather if the controller can't talk to us we miss out on that stuff but at the same time we're still regarded as part of the cluster because we keep our zookeeper session alive so this is a very specific example of how the separation between the controller and zookeeper actually creates awkward situations because they're two separate systems and so you know keep that in mind we'll come back to that later so metadata inconsistency you know logically we have this model where zookeeper is the source of truth zookeeper tells the controller about what's going on and zookeeper sorry the controller then pushes out these updates to everyone else so in a sense you know everybody is getting these updates in an asynchronous way because they happen first in zookeeper the controller becomes aware and then they're pushed down so this is kind of the ideal model of what we would like to happen but in reality though this doesn't always work because there may be cases where again if you have a partition the controller will try to send out the information but it simply won't be received and so in this case you can actually end up with divergence and so as a last resort some sometimes people have to actually force a controller election and there's you know there's a way to do that from within zookeeper so this is actually kind of frustrating right because you would really like the system to always know whether the minute edicts assistant even if there's some weirdness going on at the network level and there's a second reason why this is annoying which is that when a new controller comes up we actually have to load all of the metadata from zookeeper because remember the metadata is not stored on the controller the metadata is stored on zookeeper so when you load this metadata yeah sorry I'll go to the control the performance issue here again when you're loading this minute of the complexity is o n where n is the number of partitions here so as your number of partitions increases this loading time will increase as well and this loading time is actually critically important because while the controller is loading the metadata it can't be doing things like electing a new leader stuff like that so that again can lengthen the period when the system is unavailable I should note in passing that controller election doesn't necessarily create unavailability it only creates unavailability if you needed the controller to do something during that time so it's it's not as bad as it may seem but it still isn't good right in order to scale we actually would like to get rid of this Oh end behavior and another o n thing is pushing all the metadata right so as the number of partitions grows that metadata will increase it really kind of sucks to have to tell someone all the metadata every time you do this election and this complexity here obviously the number of brokers will factor in because we have to send this information to every broker so you know how can we get around this problem well I guess I kind of gave it away with this slide title but the solution here is to actually do something more like Kafka's users are doing inside Kafka specifically you know we want to see metadata as an event log so rather than sending out snapshots we'd actually like to treat it as a log so I'll give an example here like if you treatment it it as an event log then each change to it can become a message so for example creating a topic becomes a message deleting a topic and so on and these changes can be propagated to all the brokers in the same way that you know we propagate changes to consumers the nice thing about having this log is that there's a clear ordering of what's happening and we can also send Delta's rather than these snapshots so for example if I read up to 925 then I don't need to be sent everything I just need to be sent the updates that happened after 9:25 it's also easy to sort of quantify what the lag is how far behind are you actually well if you know the offset then you can have that information so you know traditionally Kafka has these consumers they consume in an offset and they're consuming from the brokers in this case we're actually talking about the brokers that are consuming so but what are they consuming from though in this case well it's the controller so the controller will maintain this metadata log internally but how do we you know implement this controller log so if you recall back to the earlier slides we were able to implement the the log which stored the consumer information just by using a regular kafka topic it was nothing special well it has some underscores before it but it's not that special but in this case we actually can't do that because right now the controller is involved in choosing the leader for partitions but this doesn't really work we have a circular dependency when we're dealing with metadata so what we really need is a self-managed quorum here where there's not a dependency on an external system to choose the leader the notes should be able to choose their own leader and this is actually where raft comes into the picture so the main property that we really like about raft for this application is the last one right the fact that leader election is by a majority of the nodes rather than being done through an external system aside from that though actually the replication protocols are not too different so raft has terms Kafka has epochs they're pretty similar the push/pull thing is interesting so in in the traditional you know raft context changes are pushed out and conquer their traditionally pulled but it turns out this isn't actually it's not that big of a difference you can actually adapt the system pretty easily so let's talk about the controller quorum a little bit more here so how do we choose the active controller well first of all why do we even have multiple controllers well the biggest reason is we want high availability so we want to actually have the ability to failover when the when the controller fails but secondly by having a quorum we can actually take advantage of some of the properties of Raft here so we can just say that the the leader is actually just the active sorry the active controller is just the leader of the Rev quorum and the active controller is the only one that can write to the log so the number of nodes that we'd like here is basically going to be very similar to zookeeper so we're probably gonna want three nodes maybe five we're probably not gonna want to go to too many nodes in most cases so when I said we weren't gonna build a better zookeeper I kind of lied because we sort of are but but not in the sense that we're creating a general purpose system only in the sense that we want to have a replication system which is vaguely reminiscent of zookeeper and it turns out that this gives us practically instant failover because wrapped election is pretty quick and furthermore the standbys have all the data in memory so they don't have to load it from some other place and another way that's stand by sorry another way that failover is instant isn't the fact that the brokers don't have to refresh everything and this ties in with what we talked about earlier which is that if the broker knows the offset that it read up to it just needs to read the offsets after that it doesn't need to refresh everything in the world and another advantage we can have here is we can actually cache this metadata to disk so when you're starting up your broker maybe you actually don't want to have to refresh everything now I will note in passing that there are cases where you probably do want to refresh maybe your disk was wiped out or maybe you're just so far behind that you don't want to replay all the deltas but we hope that that's the exception rather than the rule so these sort of things are going to become increasingly important as we start adding more zeros to the number of topics the number of partitions the kafka supports and as Kafka starts scaling out it's critical that we get rid of the O n behavior in the system and so let's talk a little bit about Broker registration which we discussed earlier in the context of how it works now and of course bravo registration is just the act of building a map on the cluster so in the post skip 500 world broker's just send heartbeats to the active controller and the controller can use this to build a map of who's in the cluster and the controller can also use its responses to tell brokers things about what they should do like for example maybe they should be fenced if there's already a broker with that ID and stuff like that fencing is it is an important topic in general so if broker's can't keep up with the metadata updates that are happening they need to be removed from the cluster because otherwise they're gonna be propagating really old information and brokers should also sell fence if they can't talk to the controller so this closes off sort of a lot of the problem scenarios that we've had in the past there's a lot of different types of fencing in Kafka and I couldn't possibly talk about all of them but this is a very important one so let's talk a little bit about network partitions we obviously have the same case we had before where something's totally partitioned the behavior hasn't really changed if you can't contact the controller then you will be removed from the cluster as is appropriate the broker partition hasn't really changed either same behavior there but in the case of the controller partition which was a problem before it's actually not now because if you can't talk to the controller then you can't send heartbeats and so therefore you can be removed from the cluster and not be causing problems as sort of a zombie so this is actually an example of where the post kip 500 system eliminates some inconsistent metadata inconsistency and of course we don't have the case we had before where you can't contact zookeeper because we don't have say keeper so let's switch gears a little bit and talk about deployment I think deployment is one of the things that people are most excited about when they think about getting me nervous ooh keeper and I do understand why again not to and that's a really knock zookeeper but it is a separate system it's another configuration file with a different format different stuff in it it's another set of metrics and another set of admin tools that you have to learn and finally and perhaps even worst of all it's another set of security configurations that you have to master to really secure your system and it's it's kind of alarming how many people are running without zookeeper security right now so definitely getting everything under the same security umbrella is really gonna help so when we talk about how controller nodes are going to be deployed there's actually two options that we're going to support one of them is shared controller node where the controller is actually co-located with the broker and this is kind of similar to what we have now where a node that is a controller also has broker duties associated with it the other option that we're gonna support is having separate controller nodes and this also has a precedent in the sense that typically bigger clusters will run separate zookeeper nodes to avoid having to avoid having the performance of the brokers negatively interact with so both modes I think will be useful and I think it will depend on the size of the cluster and maybe you know what kind of hardware you have stuff like that for small clusters colocation I think will be very useful so I've talked about a lot of stuff but what's the roadmap for actually doing this stuff pretty important question so there's three basic phases here I think one of them is that we remove the client-side zookeeper dependencies and this is actually almost done so we've been creating api's for the last stragglers who still access zookeeper directly from the client side we haven't yet deprecated direct zookeeper access but that's happening soon the phase after this is to remove the broker sign zookeeper dependencies and in this phase you know we actually will try to centralize the access to zookeeper in the controller and this will involve creating some broker sign api's sort of in a parallel echo of what we did on the client I think this phase should also involve fully removing the zookeeper access for the tools you may ask why we didn't do it in the previous phase and the answer is you know we try to have a pretty generous deprecation policy in Kafka and finally you know the final phase is this controller quorum and basically reworking the controller in terms of it and this includes implementing raft as well so let's say you know once we've implemented all this stuff you're gonna need a way to upgrade to it and this is a problem because right now you probably have tools running around which are directly accessing zookeeper you have brokers running around accessing zookeeper and you have a lot of state in zookeeper so if you were to try to jump directly to a Kip 500 release it wouldn't be really possible to do that through a rolling upgrade and you guys probably already know this but a rolling upgrade is one in which you upgrade to Notes one by one and there's no actual downtime and this is something Kafka's has supported for a long time and it's very important to us so in order to keep that working we have this concept of a bridge release and in the bridge release there's no zookeeper access from the tools and there's also no zookeeper access from the brokers except the controller and the rationale behind this will be a parent in a few slides so let's say you're doing this upgrade to a post zookeeper sorry to a post Kip 500 release you would start from this bridge release which remember has no zookeeper access from tools and no zookeeper access from brokers other than the controller and the first thing you would do is you would start the new controller nodes for the release you're upgrading to now the quote that quorum will elect a leader and it will claim the leadership in zookeeper so basically the new controller will take the leadership and maintain the leadership throughout the upgrade process now at this point you can start upgrading the nodes one by one just as you usually would and the reason this will work of course is because all of the zookeeper axis is going through the new controller and so basically it's handling all of that this also means that the new software will need to know how to send these old leader in is our messages to the existing nodes and finally of course you know once you've rolled all the brokers you can decommission the zookeeper nodes so in summary it is going to be possible to upgrade to oppose zookeeper release with zero downtime so wrapping up here zookeeper has served us pretty well on what we're doing here is not so much replacing zookeeper as switching to a different paradigm a different way of managing metadata we think we much more effective and these changes have been going on for a long while and there are more reasons to do it then you know just scalability or even just deploy ability there's also the improved encapsulation the improved security and the improved ability to upgrade and to freely change our internal formats so those things are all all important here I would say and I think this is a great example of how managing metadata as a log is much more effective because it sort of eliminates this need to send full snapshots and as the size of the system grows and we expect Kafka to you know really grow over time as people start using more and more data more and more topics and so on it becomes more and more important to have the good properties of a log to basically have Delta's rather than sending the full state to have caching to be able to know what the ordering is for metadata changes we also want to improve our availability you know to improve controller failover to improve fencing and basically be more scalable and more robust and along the way you know it's important that this metadata log be self managed and I sort of touched on why that is and why that needs to be the case here and you know finally it'll take a few releases to fully implement Kip 500 so there's gonna be additional Kafka improvement proposals for some of these api's for raft for how we're gonna handle metadata so KITT 500 is kind of like the overall roadmap but there are gonna be more detailed proposals to come it's very important to us and really I mean the whole community is very interested in rolling up grades and we are going to support those so we talked a little bit about how and how that's all going to work and so in conclusion we believe that once this is implemented Kafka will need no keeper so we've got about ten minutes for questions Thanks so you've mentioned it with the raft right so is it a external system or is it gonna be implementing that like we're going to implement raft in Kafka itself and the reason for that is you know Kafka is a system for managing logs and so we think raft is kind of like part of our core competency if you're assistant managing logs and you use another system manage your logs it's it's kind of bad alright see and it all the usual software engineering reasons applied [Music] so how do you make sure that raft consensus is the participation of the raft consensus is only done by the brokers which are in sync replicas and not the ones that are already taken out by a controller sorry I didn't quite guess so so you have controller and let's say you have 10 brokers and control is keeping track of the in sync replicas for all of these brokers and one of the brokers never actually received the metadata so controller will possibly remove that from the in sync replicas drop is that correct so here I was yeah in most cases the leader will remove a broker from the in sync replica set and not the controller there are cases where the controller will do it but in general it's normally the leader of the partition okay so in that case suppose the leader actually removes the in single applica a broker from the in sync replica and the leader dies there will be a consensus rough consensus that would elect a new leader I think I think we're talking about two different things here if we're using raft we're not using the ISR right so there's gonna be basically two replication systems one of them the one we have now and the other will be raft oh and Raph does not have well I mean it probably does it does have something similar to the in sync replica set but it's not going to be the in sync replica set you know through Kafka it's not going to have all the same properties exactly guard thank you [Music] way over there really whatever you have only to have cover occurs and something but happens this network and who will be controller so in general if you only have two nodes then you can't really have I mean you can have a raft quorum but it'll be up size either one or two and that's basically not going to provide any redundancy I mean it's example let's say you have for rocket oh yeah rockets and it separates it - and - who will be controller so if there's a network partition and it's 2 & 2 then you can't form a quorum basically right oh I'm sorry I'm so first of all I'm assuming that you have an odd number of controller nodes because that's how you would normally deploy it but even if you did choose to have 4 nodes the rule is gonna be basically you need a majority right so let's say your quorum is size 3 that means you need two nodes for a majority right if your quorum is size 4 then you would need 3 I think wait but but the point is at certain at a certain point you have to sacrifice availability right if you can't form a quorum then you must sacrifice availability and this is the same thing we have with a zookeeper right now right let's say you have a zookeeper quorum of size 3 and you lose two nodes they go away then you your quorum is is done and your system is down and so you can bring up at least one more node right yeah so it's not changing basically in that regard are you expecting if you have a pool of 10 20 brokers they'll all be participant potential controllers or you have to explicitly set these three these six could be controllers well that's a good question Zook it out and say hey v that sounds like a good number and choose yes so that's a good question and the answer is the second one so there's only going to be certain notes that you would nominate his potential controllers which again is it's kind of similar to how we do zookeeper now right I mean you're not going to run a zookeeper node alongside every broker in a very large cluster you wouldn't have a controller note on every broker in a post kit 500 cluster but good question anyone else all right Thank You Colin all right thanks
Info
Channel: InfoQ
Views: 4,071
Rating: undefined out of 5
Keywords: Kafka, Zookeeper, Apache Kafka, Artificial Intelligence, Machine Learning, Data Science, InfoQ, QCon, QCon San Francisco, Transcripts
Id: 3qNNinbnWmw
Channel Id: undefined
Length: 43min 52sec (2632 seconds)
Published: Wed Mar 11 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.