Kafka Tutorial - Node.js Producer & Consumer

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hey everyone today we're going to be building a simple application using kafka so first of all what is kafka and why would you want to use it kafka is an event streaming platform which you can use to build event driven systems so you can have producers which produce events and you can have completely separate applications which are able to consume these events by reading them from kafka so these gray boxes here are our events and kafka has this notion of topics so this is i see it as kind of different categories of events so you can have a producer that will write to a topic and then you can have a consumer that will read events from that topic so this is what we're going to build today we're going to run kafka in a docker container which is just a nice easy way to get started with it and we're going to build a producer using node.js which will publish events to a topic and we're going to also build a consumer in node.js which will read events from that topic so the first thing we have here is our docker compose file and this docker compose file is just gonna get kafka up and running in a really easy and repeatable way so this code is all going to be on github i'll put the link in the description below but basically this docker compose file is going to have two containers running one with zookeeper and one with kafka so we need zookeeper here because zookeeper is a dependency on kafka kafka can't run without zookeeper being there and these are just some simple environment variables if you're doing more complex things you might need to configure these a little bit um but this should get you get you started so to start this we're just going to run docker compose up and this should start kafka so we can see now the kafka is up and running and everything looks okay it looks healthy to make this a little bit bigger and we're going to create a new window so we can minimize this one a lot because we don't really care what's going on there so now that we have this middle part running kafka is running we need to define our topic that our producers and consumers are going to write to so to do that we're going to use this and this is going to execute um this command inside our kafka container this is going to look for a container running um which is called kafka which we have here and it's going to go inside this container and it's going to run this shell script which is inside the container which is basically to create a topic and it's going to just pass in some things here again if you're doing something more complicated you might need to to change these around for example we have one partition here in our topic so we're going to run that and we should get a message just saying that the topic was created and it was so this is kind of the initial setup done for this middle part we now have kafka running in a docker container and we have a topic created and we're going to be able to use this topic now with our producer and our consumer so as i said we're going to use node.js for our producer and consumer but technically these could be in any language they could even be in separate languages you could have a producer in java or kotlin and you could have your consumer in golang or node.js or whatever you'd like for simplicity we're going to use node.js on both sides so to do that i'm just going to initialize my node project using npm init and we're going to go with all the defaults here it's going to create our package.json file so we have this package.json now there's going to be two different parts to this so we're going to have a consumer and a producer we're just going to create both of these parts in their own directories here so i'm going to create a new folder called producer and another folder called consumer each of these projects or these simple microservices are going to need a file to start the application so i'm just going to create a file called index.js in both of these so the consumer and the producer are both going to have their own index.js file and this is where we're going to do all of our work today really so let's just um console.log consumer and let's copy that and paste it into the producer to be able to start each of these separately we're going to have two different start scripts so we're going to have a start script for producer is going to be node and it's going to go inside the producer directory and run the index.js file we're just going to copy this and do the same thing but for consumer like so so we're going to create another window here and again we can minimize this one even more so in this left hand side we're going to use this for our producer and this right hand side we're going to use this for our consumer so let's do mpm run start producer to start our producer and we can see it's console.login producer so we know it's running the correct one and we'll do the same thing for consumer just to see that we're we're starting these um projects correctly so producer and consumer so that's kind of the initial setup of our project so now we need a couple of dependencies we need a dependency to be able to interact with kafka to be able to write to the topic and we also need another package to be able to serialize and deserialize data we're going to use that towards the end of the tutorial we won't need that initially but we'll use it towards the end so let's install both of them now there's a couple of different clients that you can use for kafka in node.js but i think this is this is probably one of the best simply because it wraps around this low level c plus plus library um which is very performant and it's very easy to use as well so let's install this and we're going to save it so this is going to save it to our package.json file so that's installed now we're going to install the second one while we're here so this one is called avsc and this is basically used for serializing and deserializing data that we put into the into the stream um so if we're going to put objects into the stream we would need to do this you don't need to do it if you're just putting plain strings in but we're going to need to to serialize and deserialize if we're using objects so let's save this one as well okay so that's all of our dependencies and everything installed now just to go back for a second i know i didn't really mention this but whenever we ran this command to create our topic which is where the events flow through we gave the topic a name and we just called our topic test for this and this is going to be important in a minute so this is this topic in the middle here that events are passed through we call this test for for this example cool so now we're ready to start actually writing some code so we're going to start off in the producer and we're going to import the library that we just installed we're going to import kafka from node rd kafka so now we're going to use this library and we're going to say con stream equals kafka dot producer dot create right stream so this is creating a stream that we're going to be able to use to um to write to to write our events to so this is going to take a parameter here of an object and in this object we need to define our brokers so our broker broker is running on localhost 9092 and this was in our original docker compose file you can see that we have this port mapping which is mapping our local port 9092 to the port 9092 that's inside this container and this is going to allow us to be able to hit this port from outside the container and we need to do that because this producer is going to connect to that broker it's going to take a second parameter which is some options and we're not going to pass anything in there and then the third parameter it takes is going to be our topics so we're just going to define our topic here and if you remember before our topic was called test and just to note i need to close out this object here so next we're going to set an interval to write messages or events to this stream so we're going to say every we'll say every three seconds so we're going to do this and we're going to create our interval here with 3000 milliseconds and every three thousand milliseconds we're gonna call a function that's just gonna be q message so every three seconds we're gonna call this function so now we need to define this function so we have this q message function and we're just going to say const result equals stream dot right we're going to write to the stream and what we're going to write is going to be we're going to create a buffer from a string and let's just pass in hi so every three seconds we should be writing this to the stream and we're just assigning this to a variable result and let's just console.log result to see what the result looks like so if we save all of that and now we're going to run our producer and see if this works so i'm using this syntax for imports uh i guess we need to change in our package.json we need to change the type which is going to be a module and this is going to allow us to use imports the way i'm using them so this this could catch you out but you just need to add this type module you could import them in a different way using require but i like this syntax so let's go with that so now we can see producer everything started successfully and we can see something was wrote to the stream and result is just true false it's a boolean so here we have four different events that were wrote to the stream successfully so let's stop that now we're gonna reassign this um to a variable called success and now we're just gonna say if success i'm just gonna say message wrote successfully to stream and else let's just say something went wrong okay so if we start this again we should see this being logged out every three seconds and it was so that's great we've wrote all of these events to the stream or to the topic so now we've kind of done this part so we're actually publishing events to this topic already so the next thing we're going to do is write the code for this part to be able to consume these events and we've already started our application and it's ran a couple of times and we've published probably about 10 different events here and just so you know all these events the default behavior is that they're all going to wait here in this topic until somebody consumes them so these events are are sitting here so whenever we write our consumer and start it it should pick up all those events from the beginning um so it didn't it didn't lose any so let's take a look at the code for the consumer first of all i'm just going to copy this import because we're going to need that and i guess we're going to need need some things here so let's just copy this so we're importing node rd kafka again as kafka and this time we're going to have instead of kafka producer it's going to be kafka.kafka consumer and we don't need to call this function here and inside we're going to pass in our broker list again and we're also just going to pass in a group id which is kafka cool so this this all looks okay we have a group id um we have that and we don't actually need to pass in the topic at this stage we're just going to pass in the second parameter which is the options but we don't have any so that's okay and instead of calling this stream it's not really a stream we're going to call this our consumer so once we've initialized that we're going to call consumer.connect and this should connect our consumer to our broker so at this point uh we aren't actually reading any events yet so we need to say consumer.on and we're going to listen for a ready event so whenever the consumer is ready this anonymous function is going to be called so let's just say console.log consumer ready okay so let's start our consumer now and see what happens so we got as far as here consumer ready we can see this jumped a little bit which means that kafka probably logged out that a consumer connected so consumer ready we're going to stop that now and just so you know at this point we haven't actually read any events yet from the topic and so that's what we're going to do now and to do that we need to subscribe to our topic so we're going to say consumer dot subscribe and at this point this function is going to take an array and this is going to be an array of topics so you can have a consumer that's going to consume to more than one it's going to subscribe to more than one topic for us we created one topic called test so we're going to subscribe to that one topic and then we're going to call consumer dot and this is going to consume the events for that topic as they flow in cool so we're going to add one more handler here which is consumer dot on and this time it's going to be data so anytime any new data comes in this anonymous function is going to be called so on the producer side we said message wrote successfully to stream here we'll just console.log received message and this data event also passes in the data so we need to use data.value to grab the value from there so let's see if all of this works by running our consumer so we've run our consumer consumer ready and now if we start our producer we're able to write these to the stream like that so here we've wrote three different messages to the stream and we've received receiving them here so that's great um just to note if we stop our consumer now and our producer keeps producing when we start our consumer it should pick up all the messages it it missed straight away so here we have all of these messages so again to show that again our producer is running and it's writing events to the topic every three seconds our consumer isn't picking these up so if we start a consumer now it's going to pick them all up once it starts okay and if we stop our producer now our consumers should stop receiving new messages too so this is really nice um right now we're just writing some simple strings to the topic so in order to do something more complicated like writing an object we need to serialize and deserialize that object before we put it into the stream because you can't just write um objects or other data structures into the stream it has to be wrote as a buffer so let's stop everything and let's go in now and see how to do that so we have our producer here and right now we're just writing this string we need to define what our event type is that we want to write because then we can um serialize and deserialize that as it comes in and out of the buffer so let's create a new file and let's call it event type and this is going to define our event type so if you remember in the beginning we installed this library which is avsc which is very useful and very quick at serializing objects as you can see here it's faster than json and has some some advantages so i just like to use this one so we're going to import it from avsc like that and now we're just going to export and we're going to export this type um for a certain schema so there was actually a pretty good example here i think so we could actually copy this um and then just export it and you can see here that this is how it works so we're going to write to the buffer like that and then our consumer is going to convert it from the buffer back to the actual object so we can do this and we can just export default so the type here is going to be record because that's the type the data structure that we're creating and we're going to define our fields here so the first field they've defined um a kind and we can probably keep that let's rename it category i just think it's a nicer name and these categories are animals so we have a cat and a dog it's of type enum if i put this to the next line it might be a bit cleaner so we can see there's two fields here on this type and the first one is category it's an enum with two different symbols cat and dog the second one let's call this noise to our our events we'll create events which are these animals basically making noises we're gonna have a producer producing those events and our consumer consuming those so now that we've defined this event type we want to use this event type inside our producer and our consumer so we're going to import that first we're going to call it event type and we're getting that from our event type file so let's just copy that over to the consumer as well so now in our producer here before we were writing um to the buffer like this to the stream like this so we're gonna have to change this a little bit we're gonna create our event here and it's gonna have a category did i call it category yeah so our category let's just say that the first one is a dog and the second thing is noise that we need to find let's just say bark okay so now instead of using the built-in buffer provided by node.js we're going to use um something that's provided by this avsc library and that is using our event type we can create this tour buffer using the event that we just defined like so this is going to write a buffer to the stream with this event and that's exactly what we want so now from the consumer side let's do the same thing to read it so here we were reading the data um like this but now instead of this we're going to be using the event type and it's going to be from a buffer and then we're going to pass in the data dot value like so so i'll be in well this should work we've defined a custom event type so these events here are our custom event we're serializing and deserializing those we're serializing them as they go in to the topic and we're deserializing them as they come out on the consumer side so let's start our producer first of all and see if it works we can see it we successfully wrote the stream and on the consumer side let's see what we get we see we have these messages we're receiving this is great um right now all the messages are the same so let's do something to to change that so we can stop our producer our consumer at this stage isn't actually going to need to change at all because we're just going to write different events into the stream and our consumer should be able to consume those regardless we're not going to change our actual event type we're going to change the events that we're creating so here on the producer side we have basically hard coded this event like this so instead of this let's say const category equals get random animal and const noise equals get noise and we're going to pass our category into that because we're going to get a random noise for a particular category and we can use the shorthand now because our key and value have the same name we can just use category and noise like that so now we have to write these two functions so the first one is function get random animal this is going to be quite a simple function doesn't take any parameters we're going to define two categories and these are going to be cat and dog which is the two things that we defined in our enum here because that's the only two things we're allowed to to pass in our serialization and deserialization wouldn't work if we passed something different in so now to get a random one of these we're going to return categories and we need to get a random index so we're going to use math.floor and pass in math.random multiplied by categories.length so this should give us a random index um of this list and it should return the the value at that index so this should give us our category we're going to do something similar for get random noise this takes a animal as an input so first of all need to say if animal equals cat we're going to have some cases and we're going to have else if animal equals dog we're going to have some other cases so for both of these it's actually going to be quite similar we're going to have different noises for both cat and dog for cat let's say per and meow these are just going to be noises and we're going to return noises and the same to get a random noise like that and for a dog let's say woof and bark and again these are going to be noises and this is going to be noises and noises cool so now we have a function to get a random animal and a function to get a random noise and the same thing every three seconds it's going to get a random category a random noise and create an event with those it's going to serialize the event when it's going into the buffer and our consumer hasn't stopped this whole time it's going to read them from the buffer and print that out so if we start our producer now hopefully this all works something went wrong let's see so it looks like each time i called master random i actually forgot to call the function so let's call that each time and this just shows that our serializer is actually validated and that the the type is correct so that's kind of nice that that happened so let's start again and see if it works so message wrote successfully to stream wanted it right we can see it's it's still writing dog we haven't had any cat events yet but the dogs are making different noises we had a woof and a bark let's see oh it finally wrote a cat and it made it per noise so that's great let's stop our consumer for a second and clear it out and let's run it again to see so we can see here it picked up a bunch of messages in the beginning because they were waiting in the in the stream for us so that is our full end to end we've created a producer we've serialized and published events to a topic kafka is running in a docker container and we have a consumer that's able to consume and deserialize these events from the topic this should be from nice so i hope you enjoyed if you have any questions feel free to write them in the comments and thanks for watching
Info
Channel: Kris Foster
Views: 16,162
Rating: undefined out of 5
Keywords: kafka node js tutorial, kafka node js example, nodejs kafka tutorial, nodejs kafka consumer, nodejs kafka producer, apache kafka tutorial, apache kafka nodejs, kafka object serializer, nodejs kafka queue, nodejs kafka topic, what is kafka, kafka streams nodejs, node-rdkafka, node-rdkafka vs kafkajs, node-rdkafka consumer example, node-rdkafka producer example, kafka nodejs demo
Id: EiDLKECLcZw
Channel Id: undefined
Length: 28min 50sec (1730 seconds)
Published: Sat May 01 2021
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.