Aggregating Kafka Streams with NodeJS

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
welcome back to coding with dustin uh recently we've done some learning about writing node.js applications uh specifically for consuming from kafka producing to top kafka using related technologies and today we're going to get a little bit more practical we're going to look at hey i've got some data i need to take two streams and merge them together in some way create some kind of aggregate or summary data and you might use something like spark or finc or whatever for this and there's some advantages to using tools like that there's they're mature robust there's great things you can do there but there's some downsides to uh learning spark for example is a lot to ask of the developers and it's not just that the you need to be able to operate this thing and it's not just about being able to turn it on and run it but but to really understand the nuances of it and be able to deal with issues if you run into tricky little problems uh under uh in an important production situation be able to firefight it and i would propose that if the job can be done well with teams that your technology technologies that your team currently uses then the burden of providing better features rests on the new technology so let's say your team uses node which my team uses if you can get the job done with node why would you use spark well spark better be bringing something serious to the table that really makes your situation better i don't want to to rag on spark today that's not not the point the point is more to say let's let's explore trying to do some of these things directly with node and just see how it feels and it'll put us in a better place to be able to compare with with alternatives as you research that so today we're going to make a little demo app i'm going to start by creating a file actually needs to be in a directory it's going to contain our schema here we go these two tables are the inputs let's just note that here so obviously the inputs in our example are actually going to be coming from kafka that's the whole point we're reading a user stream and a post stream from kafka and we would like to ultimately produce this table so the approach that i'm going to show you today will involve having these two tables as well but that's just an implementation detail that i've chosen that's not important let's start by bringing up a database that has this schema and while we're at it i know we're going to be needing kafka and zookeeper and all of that so i'm just putting that all in here as well i'll just take a quick scan of this so here's our database it is being configured to load our init db directory here this will cause it to read any sql files in that directory and execute them when it is started and then zookeeper and kafka and schemer registry um which actually take a look at my notes here yeah we are going to be using schema registry so that's a little bit of a not super necessary but a fun little bonus add-on that we're gonna do today okay let's bring that up make sure i don't have anything else running i do not uh just so you know so you can run this stuff on your own i have a little alias for docker compose that's just dc to make a little faster for me but when you're running it at home you want to run docker compose okay the first thing i'd like to do is to publish some test data i'm just gonna breeze right through this here real quick because this is some information we have already covered in some past videos i've done so i'll post some links to those in the description if you're interested in more of that background just like how to publish how to consume this video is not so much in the details of how to do it but more get trying to get something practical done so i'm just stealing some code from those old videos this is just going to publish some test records uh it takes in a schema and a function that knows how to build some kind of object that matches that schema and then gives us a topic name and so then this is kind of a generic function that is just going to sit here on a loop and just call this generate function and publish to that topic over and over um and it's gonna do there's a sleep in here this is essentially a sleep statement and it will sleep from zero to five seconds so we'll get kind of some randomness to the interval between publishing and basically we just go ahead and create register our schema connect to kafka and then while we're in this loop we do what i was saying before generate the entity and publish it if we get interrupted then we disconnect and we have the user schema here which should line up pretty well with the sql that you just looked at and we create one of those instances of this to publish user records uh generate user i need to import that i'll show you that code in a second and then here's the code to for the posts same deal and then here's just some test data generating user generating the post looks like i got a call from spam risk and uh we're using some hard-coded ids here this may look a little strange the reason i did that instead of you know like a uuid or something is i just wanted to simulate not just getting new records from kafka but also getting updates and the way an update is represented is simply getting a record that has the same id as a previous record we've gotten in the past so this was just kind of a quick and dirty way to say hey grab one of these ids if it's the first time i've ever grabbed it then it represents the creation of that entity if it's one i've already published before then it represents an update that's pretty much it oh and i guess we didn't really talk too much about the output we're trying to get here so a user can write one or more posts or posts as content and we're going to analyze that content of all their posts a little bit to come up with one a more simple aggregate which is just the count of posts they've written and this one's still pretty simple but slightly more intricate we're gonna determine what we think are their favorite words so we're just looking for repetition of the words within the content okay i think we're just going to need to install a couple things and let's start seeing if we can get this to run shall we maybe that's it let's give this a try faker okay looks like that's plugging along publishing a whole bunch of data i'm just gonna stop that now while we work on our consumers so just to talk about the the design here one option we could do that i'm not going to do is whenever we get a user record we could come and update this there's actually nothing we need to update here i should have done that let's do that let's let's include the user's name in here oh that's gonna have to be it's gonna have to be an elbow no it's kind of a choice we'll see okay so the whole idea the reason i added name in is just for the purpose of the demonstration here i want there to be some data from both tables that need to make its way into this final table or from both streams i guess i should say and one way we could handle this is we could have the users kafka topic whenever we receive one of those messages we could you know upsert a record in here make sure we have a record for the user and and then update the name so we can get an update later that changes the user's name and uh and this name could change over time and then if we get any posts then we need to find this username you know we'll calculate based on all their posts what their favorite words post count is and we will update these and if you know what do we do if the record doesn't exist yet we don't have a record for that user yet um i guess we just need to decide if there's no user record yet because they can come out of order the name needs to be nullable or we could just say that we won't generate a record until the user comes in so we just say hey if we didn't find a user then we won't bother to generate the user summary yet which is what i propose because i hate nulls they cause pain down the road so why add a null here just so that we can have this record show up probably milliseconds sooner it doesn't seem worth it so let's go ahead and do that and so yeah so one way we could do this is have each of these consumers updating different parts of this table which would totally work it might even be the most optimized way to handle it but i propose that it's harder to reason about especially if you get into more complex aggregates it's nice to have one path through i propose what's better is to when you get a user record you just have kind of like this raw users table and you just update it and there's a raw post table that gets updated when you get a post and then when you get a post you know that the user summary should need to get updated for the author because it will affect their favorite words and post count and when you get a user's up update of some sort same thing you know that that id is what matters so that allows us to have a common path where we can have a separate function that reads by user id all the records it needs from this table and this table and then regenerates the affected records in this table and there's only one code path through it so that same code path just gets triggered by both consumers and not only is it just one code path but there's a couple other interesting benefits one is that to determine you know let's say you get a certain message right what is that going to what kind of resulting state is that going to generate well it kind of depends on what the state was before you got the latest message so it's kind of like the same reason we stopped liking jquery and and trying to manage a whole ui state by using like data attributes and stuff it's because you're updating your state incrementally and so to figure out how you got from a to z you have to go through all the steps to reproduce some weird bug or whatever and uh i'm a fan of uh like flux for example you just have like a canonical data source and the ui is entirely re-rendered based on the data every time and that's kind of what i'm targeting here is that hey these are these are more like the canonical sources of data and if anything in here needs to be updated we don't actually just incrementally update we always regenerate it from scratch we go to the canonical data sources recalculate the whole thing and just reproduce it so that way no matter what there's only one way to reproduce it if you ever want to try and reproduce a bug or whatever there's only one step that you have to go through there's no incremental updates so that's why i'm arguing for that another kind of perk of the way i'm going to do it here is that it's going to allow us to uh create a pure function if you're into functional programming or even lightly into functional programming it's nice to be able to we yes we'll do some i o to read any users and posts that we need but then we'll be able to pass all of that raw data into a pure function that maps it you know that creates the user summary that should result and we're able to to unit test that without doing any mocking or anything like that so it's just kind of a nice favorable setup so let's ride our first consumer right got ourselves a main function let's connect to kafka it doesn't really matter for our little test and we will be connecting on localhost just to the instance of kafka that's running in docker let's call that and once we're connected let's subscribe to the user's topic let's run this thing for each message that we get so this message that we get out this value is encoded because of what we did over here publish records we we have a regular entity here just plain object but then we encode the value before publishing it and as you saw in the previous video if you want to go uh check out my schema registered video you can see more details on this but schema registry is aware of the schema that was used to encode this and that's all baked into the value so all we have to do is ask schema registry to uh just decode that it should be the registry and i'm gonna have to connect to the registry okay i've got a user and let's just start by calling loginizer okay we got here oh just have a typo another typo come on y'all gotta watch me let me know when i make these typos trying to pure program with you okay so we got our users out let's do something more interesting with them like write them to a database let's go ahead and create a file called db just keep it simple we're gonna need to install pg talk with postgres i really prefer to use typescript actually with projects like this because it's really nice one of the things we get with steam registry and with avro is that the incoming objects are reliably a certain shape and there's a library like to use that will generate typescript types from that schema so then that can kind of translate across the boundaries and i'm not working with these kind of unknown objects let my typescript type system tell me if i'm mapping things correctly and all that but i didn't want to over complicate this example so today we're just going to keep it simple and use javascript just plain javascript so upsert user again we don't know since we're reusing ids intentionally want to be dealing with not only creating users but also updating users and so let's look at how to do that in postgresql we've got two fields to deal with this simple dollar sign one and two are placeholders we'll fill those in with values with this argument right there in just a moment and then we say on conflict of id so if there's already a record that exists with this id then instead we're going to do an update and in that update we're going to set the name to equal excluded.name so basically this is the name that i tried to insert that couldn't be up to inserted because of the conflict that allows me to reference it here and assign it to name of the found record that conflicted and we'll pass in user dot id and name [Music] and that should be that should be it we just need to export this thing so we can call it uh okay let's run these in parallel now and see what's happening actually want to split this anyway okay and we're supposed to go to see what's going on here come on there we go okay actually let's make this full screen so we got our three table we got posts user summary and users this should be empty it should be empty but this one should have something in it and it does cool and basically we're always gonna have three users because i only hard-coded three ids but these names should be sort of changing over time so let's just see if we can see those changing and indeed we are yeah okay now let's go do the same thing for the posts oops oh you know what i didn't do that's fine we don't need to worry about that i didn't create any good code for shutting down we could maybe look at that later so let's just copy consume users we're not going to try to reuse in any kind of fancy way here and what's different about posts topic is going to be posts this is going to be a post but we need to not upset a user but rather absurd a post never copy and paste at home it's dangerous so insert post id we've got an author id [Music] totally have a screen to the left here where i have all the stuff to reference no i don't memorize everything super well pretty bad of that actually uh yep still gonna check conflict on id but this time we have two fields to update author id and content okay let's absorb a post instead and you know we haven't consumed any of our posts yet so i don't even need to produce anything right now i should have a lot to consume as is so let's just give it a try all right it's mad let's see what's going on it seems like i might just do the fat finger or something again here that looks better something else post is not defined only 28. what happens when you copy and paste i told you looks like it did the trick take a look in postcode there's some posts grand okay moving on to the next step here what we really care about is generating this aggregate so let's call this summary aggregate the goal of this thing is to have just some function that updates it and like i said for a given user id okay so first of all we're gonna look up the user i'm just gonna write some pseudo code here for a minute stuff doesn't actually exist yet we need all their posts i guess it's authority and then we're going to calculate our summary this is what i was talking about to be pure function easily tested we're not going to test it i just don't want to uh drag you through all of that right now but that is a big point of this it's making that easier to do and then we're going to absorb this final thing so now we just gotta fill in these functions figure out how to get a user by id you pretty much the same deal here so this time we want multiple rows and then we had an absurd as well for the summary it's gonna be really similar once again to these i'm just gonna snag one of these okay user summary and we're just gonna potentially conflict on the primary key which is gonna be user id and then we got to get all of these uh name is actually right that looks right let's just fill in these values okay oh there we go okay i'll do it let's go to our aggregate and import all these oh i haven't done that yet okay now we need our transform just call it transforms and a lot of this is going to be really easy want the user id which is just going to be user.id you want the name also just going to be user.name um and that's supposed to be an array of something something like that figure out how we want to compute that maybe we just hard code that for now let's just leave it literally as astf and then kind of show that this is working for starters and we also need to work through so it's entirely possible i mean i guess in the real world it would be pretty unlikely but technically it's possible that a user could be created and their first post could be created in such a close period of time that with the eventual consistency happening here with kafka we could end up ingesting or consuming the the first post record before the user record came through and that would mean the user would be null um so some we were talking about earlier so if we want to be really good we could handle that i'm just gonna i'm just gonna acknowledge that we need to deal with it but i'm not gonna handle it at least right this second so coming back to our aggregate here that's their build user summary now i just need to export this oh actually export this no i did not okay now consume users every time i get one of these i want to okay these are summary for user.id and we consume posts same dealio only it's for the post author id and in both cases i'm going to have to import this let's publish some records let's get this consumer going and we're really cutting them tight here but i'm gonna squeeze all these in here uh oh we got a problem some people have the same problem there we go update user summary is not a function right there okay imported that from here oops well that's why should be from user summary aggregate is your summary aggregate oh i used an import statement somewhere i told you i like typescript this is not natural to me okay column name of viewers your summary doesn't exist that makes sense so in this schema we added name here together but it was after we already did a docker it goes up and it doesn't just like constantly read these updates um so i'm just gonna go dc down which is gonna totally destroy the database it's actually gonna destroy kafka and everything so we might as well just kill that and then we'll bring it back up we'll have a nice fresh clean slate which is kind of nice too takes a minute to come up docker will report that it's up pretty quick uh but kafka is gonna need another minute to actually be up post chris is pretty fast but i mean kaka's fast too it doesn't need a full minute let's just see how long it takes shall we [Music] not really quite yet there we go these errors by the way that i've been ignoring uh they're normal actually it's because i'm not explicitly creating the topic i've configured kafka to allow me to just like if you just publish to a topic that doesn't do this it'll just create it on the fly but as part of creating the topic it actually has to do this whole leader election thing like per topic so that's why i was just saying hey we haven't finished doing that yet but it's pretty smart and it just you know it gives you the error but then it self corrects as as those tasks get completed and there is a leader then it moves on so that's why i'm just ignoring that in case you're curious so we got a whole bunch of data being published let's see if our consumer works this time let's consume some posts oh actually we might have a problem if we consume post first because i didn't deal with that no username yet oh that's exactly that's actually exactly what it was user is null i was just thinking of user.name but i was forgetting about user id because technically we could get the id from one of the posts but we're not uh so i'm just going to ignore that and we're just going to process the users first and now the post should work yep everything's going along swimmingly see what's going on with our user summary it is getting populated favorite words always a sdf we should be see these names kind of shifting around over time and the post counts should generally be increasing as well uh not always increasing though i don't know if you paid much attention to my builder but every time we create a new uh or edit a post it gets randomly assigned not only new content but also a new author so it can kind of shift around and an author can actually have fewer posts over time but the sum of all the post counts should be going up over time and that all looks good i think the only thing we're missing here is doing a more proper calculation of the favorite words which being fancy about that is not really the purpose of this so we're just going to kind of get that done in a simple way here let's see in our transform we need to build favorite words let's start by getting all of the words cool i should do it if you haven't heard of flat map it's pretty cool so map is just going to loop through posts and let me say for each item in the post array return something else instead so in this case i would return post.content.split i'll give me a list of words so i would end up with a list of lists and flat map just takes the list of lists and flattens it so you have a single list pretty handy let's get some counts now okay and basically what i'm going to do is so this is like our initial value it's just an empty object and i'm going to populate this with all the keys being words and the values being how many times we saw that word and that's this this an initial value gets passed in as word counts and then we're expected to return that each time too that's just kind of basics of how reduce works and then word is what we're iterating through here on words okay so word counts for word is going to equal let's see if uh if it wasn't anything before then it's our first time so we just want it to be one otherwise we're going to increment it and then we just want to return word counts that should be it now we gotta like sort these things let's turn this into a list and then we can s here so object.entry just turns an object into an array of arrays where each array think of it more like a tuple where the first item in each tuple is going to be the key and the second item will be the value and that's going to allow us we need a list so that we can sort the list uh so that's why we're doing our basic sort function here it's it just looks like the classic sort function but what we want to be sorting on is the second value in each of the arrays which is going to be the count that we just produced with this reduce function um so that should sort those the way we want and this is lame i hate this but javascript's sort function sorts in place so this is mutating and now we can just pull out the words um let's map and we're just gonna take the keys which is the zeroth item in each of those arrays that's actually all the words we only want the favorite ones um it's like all the sorted words but let's get like the first five so we'll slice that okay see if that works so i think what's happening here by the way if you're curious why it takes so long to start up there is some startup time to like join a consumer group but it normally isn't this long what's happening is that when we exit the consumer group i have not built any robust shutdown logic uh i'm just like killing the process immediately so from kafka's perspective the consumer just disappeared and it doesn't give up on a consumer quite that fast there's like a timeout period there before it will start divvying out uh data that it was giving that con to the consumer to where it like gives up on the consumer and it gives that data to some other consumer so what we're waiting on here when we're booting up pairs we're waiting for that timeout to kick in so that we can join the group if i shut down more properly then the shutdown would take a bit longer but it would make that boot up much faster so let's see what's going on with the user summary here hey we've got some favorite words i think that's uh that's a wrap people so in summary uh i don't know i mean if you have experience with ksql or spark or something you may be a better judge than me to to say whether this would have been like way easier to do in a project like that but i do think this was pretty doable uh even if we took our time to build this with higher quality and tests it's still we're just talking like an afternoon here and it's using technology that my team already knows you know if we don't touch this code for six months then we have to come back into it it's going to be a lot easier for us to do that and you know if we're trying to if something goes wrong with it we're trying to troubleshoot it it's going to be the same troubleshooting process as the rest of our node applications so it definitely gets the job done it wasn't too painful in my opinion
Info
Channel: Coding With Dustin
Views: 769
Rating: undefined out of 5
Keywords: kafka nodejs join aggregate
Id: Mg_VD2scXvw
Channel Id: undefined
Length: 52min 11sec (3131 seconds)
Published: Fri Jul 02 2021
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.