Kafka Streams using Spring Cloud Stream | Microservices Example | Tech Primers

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
it's been a while since we built an end-to-end tech stack with springboard in this video this is what we are going to build we are going to build three different microservices the producer the processor kafka streams and the consumer so we are going to look at how to leverage kafka streams inside a spring boot application let's get started [Music] press the bell icon on the youtube app and never miss any update from tech primers i'll just walk you through what we are going to build as a part of this particular video initially we are going to build something called as domain controller domain controller will be the classic kafka producer i'm not going to use kafka streams here because i'm just going to produce some data into the kafka topic in order to produce some data i'm going to use a public api which is going to provide me some domain information on what websites are available in the internet so that i can book them for example if let's say i put techprimers.com is that particular website available in the domain registry so that i can book it so i'm going to use a public api to scrap that information and i'm going to use something called as a crawler which is just going to query my public domain and then get those data from that particular api we're going to publish that information directly onto a kafka topic and i'm going to call that particular kafka topic as web domains from the web domains kafka topic we are going to create a consumer or i will call this as a processor this particular domain processor is going to consume all the messages which are coming on to the web domains kafka topic and using kafka streams it's going to filter out some of these messages based on the type we wanted for example there are inactive and active domains which are getting pulled in so i want to get only the active messages so for that i'm going to create another topic which is going to be called as active.web domains and the processor is going to now publish or filter out all the messages and then publish only the ones we need into the active web domains so that's why the domain processor is just a processor because it's just consuming something filtering out something and then just publishing again so it acts as a consumer and the publisher at the same time so once we publish the message on to the new kafka topic we will be writing another consumer which can leverage the same kafka topic and then consume only the active web domains for the sample i am not going to persist this into a database but if let's say you want to extend the demo into another level you can process the message whatever i am going to show into a database also we can add another layer for example from the domain processor we can add another kafka topic where we can publish inactive messages so that we can consume those messages and then identify or do something with it right so these two i won't be building the last the database persistence and the inactive part apart from that i will be building the other stacks so let's get started with the coding part as usual i'm going to leverage the start.spring.io for creating the project i already have uh followed the apache kafka docker setup so i have my kafka instance running so let me show you so i have the kafka instance up and running here and i can access it directly using the 9021 port here so this is the kafka control center from where i can look at the clusters i can go create some topics etc i'll be showing the creation of topic in a bit but meanwhile let's create our springboot project i'm going to create a maven project with 2.3.10 i'm going to call the project as comtech primers uh the first one we are going to create is called as a crawler so i will just call this as a crawler right i mean domain so comptec premise is a group artifact id is domain and then crawlers i don't want to have a hyphen in my package name so i'm just removing that i'm going to use the jar packaging and java 11. for dependencies i'm going to leverage web flux because i'm going to consume the message via web client and i need only kafka so the moment i type kafka i have different ways of consuming messages from the spring boot application so i'm going to use only spring for apache kafka this is just the spring kafka dependency which i'm going to leverage so i don't want to use spring cloud stream here however for the other two micro services the domain processor and the domain servers i will be leveraging the kafka stream so let us first create the uh domain crawler the next one is the uh processor so i'm just going to call this as processor [Music] also remove the name here so instead of the spring for kafka apache kafka i'm going to leverage the kafka streams here also i'm going to add the cloud stream so that way i can use the kafka streams which is going to be their binder for the cloud stream so i'm going to use spring cloud messaging with kafka as the binder and in fact i can remove the web flux because i don't want to add them so we have only two dependencies here the kafka streams and the cloud cloud stream so with that let me create the project the final one is going to be called as the domain service so i'm going to create that as well so let me do that domain source so i'm going to use the same dependencies because i'm going to leverage the kafka streams to consume the data from these active dot web domains topic as well so i'm going to leverage the same kafka streams in the cloud stream what you will be able to differentiate is if you look at the diagram from a high level these two will be using kafka streams with clouds cloud stream and i'm not going to use any cloud stream or anything i'm just going to use vanilla spring kafka so you will be able to differentiate how much code i write here and how much code i write in these two applications so let me go to the intellij i will open all the three projects in intellij and come back so i'm opening all the projects in intellij and it's loading all my moment dependencies meanwhile let's look at the setup what we have done so as i mentioned earlier i have brought up kafka locally in my machine so it's running right now and i'm going to access kafka and create the topics whatever we require so like i said earlier there is a control center using which i can create some topics so right now let's go to the topics here there are no new topics there are only four topics let's create the two topics which we require the first one is called web domains so i'm just going to create that since i don't have a huge machine i'm just going to have one replication factor for my topic so i'll just create with that so that's done one of the topic is created and the other topic is active dot web hyphen domains so i'm going to create that as well with one replication factor so that i don't have any issue done so we have both the topics created active.web domains and then web domains right these are the two topics which we require as a part of our implementation meanwhile let me go back to intellij and then see if the project got loaded yeah the projects got loaded i can see all the project in one view that way i can walk you through all the implementation here so let me minimize the first one is the crawler so in the crawler as you have seen here we are going to produce data by consuming some api and then we are going to publish that into the web domains so i'm going to use the spring kafka now let's go to the pom xml and then see what dependencies are present see that there are only two dependencies the web flux and the spring kafka i'm not using the spring cloud stream here i will show you what all changes you need to do in order to just leverage spring kafka and not spring cloud stream the first and the foremost thing which i would require is to configure the kafka topic or configure the kafka instance into this particular application so i'm going to create a java class called kafka producer config this is the configuration the java based configuration which i'm going to add for connecting to the kafka instance so i already preloaded this particular config so i'm just going to load that here so let's see yep things got loaded i i don't have the domain here so let me create this particular domain so i pre-loaded all these configuration beforehand let me walk you through that before that let me create this particular class so that it doesn't fail then we can come here so we're good here so what are we doing here in this particular file so i'm creating a kafka template and injecting the producer factory which i'm creating manually so why am i creating these manually because i want to serialize my data which i'm publishing so i'm going to leverage the json serializer because i'm going to publish a json message so the message i'm going to create is called as a domain so i'm going to create a domain model so based on the public api i'm going to create this so let's look at the public api how it looks like so let me go back to this ui so this is the public api which i was talking about domainsdb.info if you click on this api documentation you can check a sample information so this is the sample message format and i'm going to consume this particular type of message so my domains will have a list of domains so this particular api is going to give me domains which is a list within the list i will have domain create date update date country is dead if the website is dead or not then we have different route or dns specific information which have for that particular domain here so i'm going to leverage the same domain model i'm not going to change this particular domain model and i'm going to leverage the same in the application so that's why i am creating this domain model here right apart from that i am going to use the uh string serializer for key and the value and the value serializer has the json serializer so this is all coming from the spring kafka serializer so that's all with respect to the configuration and i'm using the kafka which is connected locally so i'm just leveraging the local kafka instance for now with that let's move on to the domain model even the domain model i created and kept it ready so let me just show you so this is how the domain model looks like the same fields whatever we just saw i have just recreated it if you are using lombok you can use it i just recreated them earlier so i just uh loaded them automatically right the next thing is creation of a trigger point now how do i trigger this particular pull right i can have a scheduler which can pull them automatically or i can expose an end point so i am going to create something called as domain crawler controller i'll just expose an api endpoint for us to give a particular text so that we can pull only that particular text for example if you see this particular public api you need to provide a particular domain name so that it can go on search so i'm going to pass this particular domain name say example here this is facebook i'm going to pass this dynamically by fetching the information from the user so i'm going to create a new controller using which i'm going to fetch information from the user here so let's create this as a risk controller so i'll just call this as a rest controller and the request mapping is going to have a mapping so let's add this as a domain and i'm going to add some get mapping so that i can query i'll add my url as lookup and dynamically i want to pass some name for the website or the domain so i'll just pass this information here i'm just going to call this as public string lookup so this is the path variable so i'm just going to use the path variable annotation so that i can get that information into this particular variable i'll just create a variable called name done now this particular variable we are going to pass it to the public api so i'm going to create a new service which is going to be called as the domain crawler service so i'll just create this class under the same folder so this will create a new class so inside the domain class service i'll annotate this as service and i'm going to add some things here so let me go back to this site here i need to load this so i'm going to add [Music] this is private i'll add this as a constructor parameter so that i can inject it here i'm going to return let's say domain crawler has scrapped your data so the user needs to see something right so i'm just going to return saying the domain crawler has cracked your data and i'll call the domain service to crawl the name so we are passing the name whatever the user requested to a crawl method this method this method doesn't exist right now so i'm going to create this particular method so let's create this method and here i am going to use the kafka template so if you know earlier we are going to leverage the kafka template here before that we also are going to use the web client so i'm going to use the web client for querying the public api so i'll just say webclient.create and i want to do a get call which is the get method on a particular url so this particular url is nothing but the um url which we can get it from this particular api endpoint so i'm just going to copy the same here let me copy it and i'm just going to paste it so here if you look at it facebook is dynamic so i'm going to replace that with the name variable which is dynamically coming for us so that is the url which you're going to pass which is the uri and this is going to accept the json type message so i'm going to provide the media type as json [Music] and we are going to retrieve the data right so once i retrieve the response i want to convert this into a type called mono and i want to create uh the domain here right and if you notice the domain is a list so i want to create it as a list so i'm going to create a new model which will be a list so i'll just create this as a list so if you see this will have domain and it's called domains so if you see here in this diagram here so if you see here in this model we have domains and then we have this so we created this already but we don't have domains right so that's why i created something called domain list which can hold this data let me do all the necessary stuff which we require so i've created all the necessary data setters etc right so that we can have the model ready so the model is ready we have converted the whole response into a domain list object and we can just maybe store it into a variable right for now so let's call this as domain list mono now that we have got all the list of messages we need to convert this list into individual messages and then we are going to publish them so all these messages are not going to be published in one go instead i'm going to publish these individually so i'm going to iterate and then publish them so in order to consume messages from this mono we need to subscribe to it right i'm just going to subscribe to this particular mono and we are going to all these are domains right i mean i will just choose any domains yeah so it's a domain list so we are going to consume the domain list that domain list will have domains right i mean that's what we had and for that domain i'm going to iterate each of my domain and publish that message so in order to publish the message we are going to leverage kafka template right so let me create a kafka template we are going to inject this particular variable here it's going to be of type string and domain right because that's what we have we are going to publish the key as string and the value as domain if you remember the configuration which we added here we said key is going to be a string and the value is going to be a json message so the domain is going to be json the key is going to be string so we have the kafka template which we already created as a part of the config which is present here and i'm going to leverage the same so we need to auto wire this so i'm just going to auto inject it using the constructor injection so we have the kafka template ready and using the kafka template we can now send the message to the respective topic now which topic should be send it to we already had topic in mind so i'm going to create the variable for that it's called web hyphen domains so that's the kafka topic name i'm going to provide the same kafka topic name here so that the kafka template knows where it needs to send the message to and what type of message are we sending we are going to send the domain message so i'm just publishing my domain message whatever i got here for our tracking purpose i'm just going to add some log statement i'll add a log statement called domain message i'll just get some domain name right i mean let's let's print that message so that whatever domain comes in we will be printing it after sending it to the kafka topic so that way we know the message got published that's it right so we have uh what we have done here in this crawler is we exposed a controller using the controller we gave some lookup name using the lookup name we are crawling the domains db.info api and we are publishing those response messages onto a kafka topic and that's what we are doing here we already added some configuration information for serializing our data since we serialized this particular domain every particular consumer will have the same domain used there so i need to create the same domain in this particular processor as well so i can i can do that now that our job is done for the crawler let's go to the processor so in the processor if you remember we used the kafka streams we did not use vanilla kafka so if you use vanilla kafka this is how you will be publishing the message using the kafka template but if you're using kafka streams with spring cloud stream it's much simpler you don't have to write this much amount of code so let's see that here so let me go here let's create a new class here i'm going to call this particular class as domain kafka processor so this is going to be a configuration uh class because i wanted to like start up when the spring boot application starts up because this particular processor there is no ticker trigger point for it right the trigger point is just the kafka message so i'm going to configure it as a configurable class so that way it starts up when the application starts up and i'm going to inject some beam which my spring cloud function or the spring cloud stream will understand and this is going to be a function type so i'm just going to inject a function uh i'm going to take in kafka stream so k stream is a kafka stream interface i'm going to use that the key for it is string and the value for it is domain if you remember earlier we created that and we are going to convert that message into again a kafka stream with string and domain so that's what we are going to do here i'm going to call this particular method as domain processor meanwhile we need to create our domain i'm going to copy this whole thing here and then paste it here i'm going to create the same folder structure so i'll keep the same name domain caller and i need to save it into the processor so i'm just selecting the processor and done so i'm creating the same domain if you're using a single monopole kind of a setting you can leverage the same if you have a multi-module project you can do that since i did not create a multi-module project i'm just recopying everything right i mean that's that's the thing so i have the same domain copied here which we can leverage in the um processor here so that's it we don't have to code everything i just copy pasted it now we got the message we got the function so the message is going to be coming as a k stream we can have we can have it called as anything let's call this as k stream and this is how it comes right i'm going to filter the whole data of the domain based on the active or inactive scenario if you remember we had a field called is dead right so i also mentioned in the architecture diagram that we are going to consume only the active web domain so we are going to filter out inactive one so i'm going to do that in the implementation logic here so i'm going to use the filter to filter out messages as a part of the kafka streams where my key is going to be present and value is going to be present key is the string and the value is going to be the domain right and i can maybe call this as domain to make it simple so using the domain i can just say domain dot is dead and then filter out the data if it is not dead then just return the result so i can simplify this but what i want to do is i want to filter out and i want to log some message let's say i want to log if the um if the domain is dead i want to just log a message just for our tracking purpose so i'll just say inactive domain if our particular domain is inactive and i'm just returning it also i thought okay we can publish the active message so let's do a else and then let's see i can print the active message as well so i'll just do instead of doing a peek i will directly do an active domain so this will print out now the active and the inactive messages before even pushing the message so i've just filtered the message from the kafka stream so what the processor does is gets the message filters it based on the active ones and then just publishes it back again that's it right i mean if you look at it i don't have to do anything else i don't have to code any other thing into this processor application because i'm just leveraging spring cloud stream with kafka streams so that i can consume the message process it or filter it or transform it and then publish onto another topic now how does it know which kafka topic to publish to i'm going to create a new configuration file instead of adding the java configuration we are going to create a yaml configuration here so i already loaded some yaml configuration so i'm just loading it here so here you have you can configure the broker information you can configure the binding information what is the consuming topic what is the publishing topic for example in is nothing but the consumption topic which is web domains out is nothing but the publishing topic which is the active.web domains so if you see here that's what we uh mentioned right so we are going to publish it into consume it from web domains publish it into active web domains that's what we are doing as a part of this particular configuration and finally we want to serialize and deserialize the data right because earlier when we published it from the crawler we already serialized it so we need to de-serialize that and that's what we have done here the configuration for deserializing is present here and again when we are publishing it out right to the active topic we are going to again serialize it that's all so this is the only configuration you will require for the spring cloud stream to function and you do not have to write any java code and it's simple right i mean that's all we don't have to do any other coding in the processor we have created a processor in that we have consumed the message from the kafka stream using the k stream processed it filtered out some information and then we are returning whatever we want so that way only those information which are active will be sent to the publishing topic that's it so our processor is also done now let us move on to the domain server so let me minimize these two folders so let's go to the domain service in the domain service again we have to consume this message so we are going to leverage the similar kind of example so we already have configured to use kafka streams so i'm going to use the kafka streams to consume the message so let's call the new kafka stream as domain kafka consumer and this is again going to be a configuration class similar way i am going to create a beam so for this i am not going to create a function instead i am going to create a consumer why we created function in the previous example is we consumed a message and then we published it again right that's where we used con function but here we are going to use a consumer because we are going to consume the message and then not publish it we are just going to do whatever with it right i'm going to just log it for now right so let's use the consumer interface and we can leverage the kafka stream interface so i'm going to using the kafka stream instability interface the key is the string and the value is the domain so we're going to leverage the same the domain is not present so i'm going to copy the domain again from the processor it's the same domain i'm just going to copy the same folder and then put it here done so we have the domain as well so let's import this here the consumer is ready i'm going to call this as domain service because it's a source here again we have to do the case stream case stream so i'm going to i trade the key value pair and then i'm just going to print it that's it right i'll just do key and then domain that's what we have right so i'm going to print uh the domain so i'll just say string.format i'm going to print the domain name and also the active or inactive status so i'll just say status that way we know whether that particular domain is active or not so i'm just getting the domain and i also do is get so if you see here what i'm doing here is let me move this little bit so we are consuming the message from the k stream similar to what we did in the processor we are consuming it since it's a consumer we don't have to publish it or we don't have to send anything back so i'm going to use the forage to consume the message directly so i am not returning anything back except the handler right and we got the message here we are printing the exact domain information whatever we got here i'm using a string format so that i can just parse the message into this particular the square braces here that's it we don't need to do anything else right i mean if you look at how simple it is to consume a message in kafka streams we don't have to leverage any other implementation instead you can directly use the consumer or the function which is provided by the java util and you can still leverage the kafka streams to consume those messages the only other thing which you will require is the configuration similar to the configuration which we did in the uh processor we will have to add complex configuration for the domain server so i'm going to add the configuration so i already loaded the configuration so we are going to deserialize the data here so i have the deserialized configuration with the kafka binding for the spring cloud stream and i also have the configuration for the topic destination which from which we are going to consume from so this is the active dot web domain so this is the topic which we are going to consume the data from so that's all i think we all have all the application done so we created three different microservices the first one is the crawler using which we are going to pull the data from an api publish that message into a kafka topic we then created a domain processor which can consume the message from the web domains and then we are going to publish that into active web domain based on the filtering logic which we have and we finally created a domain service which is going to consume the message from the active web domains so let me go and start all these processes so that we know it's all working so looks like i added a configurable here instead of the configuration that's the problem in the kafka processor let me start that with the configuration so i should have added configuration so i think the same mistake was done here as well yep yep see here the processor is also up let me go to the service and start the service as well so we have all the three applications up and running now so the first one is running on the port 8080 that's where we have an endpoint exposed so that we can trigger some data so that it can pull the data and we have the domain processor which is connected to the kafka topic and also the domain service which is also connected to the kafka topics now let's go and trigger the end point let me bring the browser here so we have to trigger the endpoint localhost 8080 we have created a domain slash lookup and we can provide some data so i'm going to provide the facebook as the domain name so it can go and fetch all the similar domains which are uh having the names called facebook so let me press enter the moment i press enter you see in the background this is going and pulling all the domain information yeah so this is going and pulling all the domain information from the domains api which is the crawler now the crawler would have published the messages after this so let's go to the processor in the processor yes i can see the um active domain filtration it is printing all the domains see that there is a website called facebook wait 19 facebook currency etc all these are getting filtered out which are active right all those are here now the processor would have filtered all the inactive ones and it would have sent only the active ones to the domain servers and in the domain service also i can see that the status is uh false which is basically like um it's it's not dead right i mean because the the flag says is dead so i think we should have made uh um act uh this is active right or uh inactive or dead right i mean we can we can change it but yeah that's the thing right so the application is working so i'll just summarize what we just did right we created a domain crawler which is going and crawling information about a particular search text from a public domain api it gets those information from the api publishes on to the kafka topic here we are not using the kafka streams here using we are using this spring kafka which is the vanilla spring kafka and we are publishing the message on to a topic which is the kafka topic called web domains from the web domains we are consuming using a kafka stream's application using springboot using spring cloud streams we are consuming the application from the web domains topic using the kafka streams we are filtering out only the active messages and publishing only the active ones on to the active.web domains topic from the active.web domains topic we are consuming the message via the domain service and we are just logging it here if i look at the code the spring cloud streams code is much simpler by just creating one particular class so see this this particular class is for converting one particular stream to another using function and for the consumer bit we just created a consumer using which we consume the message and then just logged that's it and we had some application yamls where we had some configuration information which we had done here and also in the processor where we have some configurations as well but in the crawler we had to do lot of configurations with respect to java here because we were using the spring kafka if you are already using kafka streams here as well it will reduce to a greater extent i just wanted to show you the difference between the spring kafka versus the kafka streams that's why i put the producer as the spring kafka and the consumer has the kafka streams so that's all i had for this particular video i hope you found this particular video useful do leverage the code from the github repository i have put the diagrams and also the and also the code into the github link in the description below i hope this was pretty much helpful for you to create an event driven system using kafka streams as always if you like the video go ahead and like it if you haven't subscribed to the channel go ahead and subscribe to it meet you again in the next video thank you very much
Info
Channel: Tech Primers
Views: 72,047
Rating: undefined out of 5
Keywords: techprimers, tech primers, kafka streams, why kafka streams, spring cloud kafka, spring cloud kafka streams, sptring streams, spring kafka, spring cloud kafka tutorial, spring cloud kafka example, end to end microservices example, reactive microservices, event processing microservices, microservices using kafka, kafka microservices, kafka streams example, kafka streams microservices, kafka example with spring cloud, kafka streams with spring, spring with kafka streams, kafka
Id: rqjdSbIOrJ4
Channel Id: undefined
Length: 34min 5sec (2045 seconds)
Published: Fri May 07 2021
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.