Nest.js + Kafka Tutorial With KafkaJS in 15 Minutes

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hey there today we're going to take a look at using kafka js in an sjs project to be able to connect to a kafka server produce messages and consume messages so i'm going to show you how to integrate this into an app and kind of my preferred approach to working with it uh in a new directory we're gonna of course start off a new project by using nest new and we'll use kafka js so in a new directory i'll use the nest cli and create a new project and i'll call this nest js kafka so go ahead and let this uh finish scaffolding and i'll use yarn for our package manager so once that finishes we can cd into the directory uh and then i'm gonna open up a code editor and then we can also start the development server by running yarn start dev so the first thing we're going to go ahead and do is add the kafka js package so in a new terminal i'm going to yarn add kafka js so now we have kafka js in our project i'm also going to use an scli to generate a kafka module so we're going to call nest generate module kafka so now we have kafka js and we have this new kafka module we're essentially going to set up two services that are applicable to work with a producer service and a consumer service so to start let's start off with a producer service so we'll create a producer service here and this will be an injectable and will export the producer service here now the first thing we want to do is actually get a handle to the kafka server and the way we do that is we can actually declare a private read-only variable here we'll call it kafka and we're gonna set this equal to new kafka and this is from kafka.js as you can see up above and now we need to specify the brokers or the servers that our kafka server is listening on so when we start our server up locally we're going to use localhost and it'll be the default port which by default is 9.092 so if you're running locally this is the broker you should use so now we have a handle to the cactus server we actually want to actually get a handle to the producer we're going to make now too so i'm going to declare another read-only private variable called producer of type producer and it's simply going to be this dot kafka.producer so now we have the producer here we need to actually connect to this producer so we want to do that when the application starts up and we can take advantage of the nestjs application lifecycle hooks so we're gonna make sure this service implements on module emit that way we can actually have a bootstrap method to hook into uh the on module knit here and this can mean asynchronous method which is very useful for our purposes because we need to call this producer.connect which is an asynchronous method so this line here will allow us to actually connect the producer to our server now we're going to add another async method to actually produce messages we'll just call it produce and it's going to take a record and this record is of type producer record from happyjs okay and then we can simply call await dot producer dot send and then pass in the record that's really all we need to do uh we're just going to add one more life cycle hookup here actually on application shutdown so that when the application shuts down we can disconnect the producer so we'll call on application shutdown and we'll call away this.producer.disconnect so that when our when our sjs application stops we disconnect the producer so make sure in our kafka module we actually instantiate this uh as a provider uh the producer service here and we'll also add it to our exports array so we can use it in our app in a bit so now we have a producer service we can use to produce messages let's add a consumer service that allows us to consume messages okay and this will be a little bit more involved but not terribly difficult so this will also be an injectable if you want to maintain state here and we'll call this the consumer service so i'm going to go into the producer service and just copy this line here where we get a handle to the kafka server and we can import kafka again now we also now want a read-only variable and we want to get an array of consumers here because in the producer's case we only really need one producer to produce messages to our server but in the consumer's case we're going to have multiple consumers potentially that can subscribe to different topics on the other hand for the producer the record itself here specifies the topic that we send the message to and the messages themselves so in the consumer service this will be an array of consumers because we want to actually track the amount of consumers we have in our application so that we can disconnect from them when the server shuts down so this consumer will come from kafka js and then we'll add an async consume method here so this consume method is going to take the topic and this is of type consumer subscribe topic okay and the config is the consumer run config so if you want to take a closer look at what these look like the topic itself is just the topic name which is a string a regular expression and the from beginning boolean which instructs kafka to read from the beginning of the topic when it starts to consume messages and then the config itself and then the config itself is where we're going to specify the implementation or the of the method we want to run when a message is received so we allow this to be defined in each of our implementations later on so now on the consume method let's go ahead and actually create a consumer we'll call this dot kafka.consumer and we need to specify the group id here so in this case this could be scoped to the application we're currently using so i'm just going to call this nest js kafka because that's our app we're currently using so once you've created the consumer we can call consumer.connect as we have done with the producer so that it actually connects the consumer to our server and now we need to call consumer.subscribe and we're going to subscribe to the topic that we've passed in here so that it listens to the correct topic and lastly we're going to call await consumer.run and this is where we pass in the config that our implementation will pass in so that it will run certain code whenever a message is received so finally we're going to actually push this consumer to our array of consumers and the reason we're going to do that is because similarly to our producer service we want to implement on application shutdown so that we can disconnect from these consumers when we shut down so simply we can iterate over the consumers or const consumer of this dot consumers we're going to await consumer.disconnect and we don't need this signal parameter here so now we have a service that we can use in our code to easily consume messages and disconnect when our application shuts down so as we've done before we'll have to open up the kafka module and let's add the consumer service here and also export it so that we can use it in our code so now let's actually use these services we just created in our app controller let's actually change the return type here and we're going to go into the app service into our get hello method and let's change this to an asynchronous method so we actually produce a message in this in this method here in our get route so we'll remove the return type as well and now in our app service we want to inject the producer service so let's do that we'll inject the producer service here so we can make use of it the producer service and now in our get hello method before we return we can call away this produceservice.produce and now we have the producer record that we can provide and as we've seen before we'll have to uh provide the topic name so in this case i'll use a test topic and call it test whatever we'd like and then we have an array of messages that we would want to produce so in this case we're just going to send one message and then we have the value here we can provide i will provide hello world so now we are correctly producing a message to the test topic so now that we're producing properly uh we need a way to consume messages so let's open up a new file here we will call this the test consumer and i like to call this or scope this by the name of the topic we have in this case so in this case is tests we'll have a test.consumer so this will be an injectable class as always so we can inject the consumer service so export the test consumer and importantly we're going to have to implement the on module and knit here as we've done before but firstly we're going to need to inject the consumer service here so let's do that first the consumer service and now we can implement an asynchronous on module init and this is where we want to actually subscribe to the topic at hand using the consumer service right when the application starts up so let's call await this consumer service dot consume and now we can pass in the topic or the consumer subscribe topic which in this case will have the topic name inside so we have the topic name and now we can provide the consumer run config and this is where we can provide the each message property which is the code we want to run when we receive a new message on this topic so in this case it's going to be an asynchronous method it doesn't have to be but i'll make it async because typically you want to do async work in here uh and then the object we get back is the topic has the topic inside it has the partition and it has the message itself so then we can go ahead and inside of this method we'll simply console.log an object and we'll include the value which is the message.value and it has a tostring method we can call here the topic itself has a topic.2string and then the partition is partition.2string so now this test consumer when our application starts up it's going to subscribe to our test topic and execute this code whenever a message is received so finally don't forget to add this as a provider in our providers right here and the app module so lastly we need to make sure we start up our kafka server locally if you don't have kafka already installed i'll include the link here in the description a pretty simple installation we just need to get the latest kafka release unzip it and then cd into the directory where we have a couple of scripts that we can run to start the server so if you already have kafka installed let's go ahead and start up the server you're just going to cd into that kafka folder that you have installed and then once you're inside of this directory we're going to run this command to start the zookeeper server using zookeeper properties make sure you start up the zookeeper server first and then you're going to open up another terminal and go to that same directory to start the actual kafka server so i'm going to cd right back into the kafka directory that i have installed uh paste this next command which starts the kafka server using server properties so let's go ahead and let the server start up now lastly let's go ahead and stop our nestgs server and let's restart it all together so i'm going to run yarnstar dev to restart the server and you should see logs here from kafka js stating that the consumer has joined the group so our group id that we specified earlier in sjs kafka is here and we can see it's using the round robin assigner so that our consumer has correctly joined the application so let's test this out if we open up postman we should be able to make a get request on low cost 3000 which is where our server is listening and remember we set up our producer to produce a message when this code is executed so if we send off a few requests here we can see the consumer service is correctly logging out the message from the producer we have the hello world value here with the topic and the partition so using these services we can very easily set up any amount of consumers and producers we want anywhere in our code by importing this kafka module here and following this simple example i hope you have learned something so thanks so much and be sure to like comment and subscribe if you enjoyed the video and i'll see you in the next one thanks
Info
Channel: Michael Guay
Views: 21,821
Rating: undefined out of 5
Keywords:
Id: jueU212S57c
Channel Id: undefined
Length: 13min 32sec (812 seconds)
Published: Tue Jan 11 2022
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.