Spring Boot with Spring Kafka Consumer Example | Tech Primers

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
in the last video we saw how to publish messages onto a Kafka topic from a Springwood application in this video we are going to see how to consume those messages from a Kafka topic from this spring good application press the bell icon on the YouTube app and never miss any update from tech primers so I'm in the start of spring dot IO let's create a spring boot application from display initializer I'll reuse the group IDs comm tech primers Kafka and I'll create a project named spring boot kafka consumer example because this is going to consume messages from a Kafka topic and I don't need the spring MVC because I'm just going to create a standalone spring boot application and let's add the dependency for Kafka and I'll generate the project let me open the project in the IntelliJ as well so the project is open intelligence a fun yeah we have the main 5 main class here and I have the application or properties by default I'm going to use the port 8 0 8 1 let's say right so that we don't have any conflict with the default eight zero eight zero put and I need to create some calf configuration then I need to create a consumer right I need to create a listener basically I'll just name it as listener and I need a model because I might need to consume some JSON message so by default first I need a configuration I'll just call it as Kafka configuration similar to the configuration which we had in the producer we need to have similar configuration so I'll and edited with configuration so that springboard can load this during the start and the beam which we required here is the consumer current factory in the producer we use the producer factory so here we are going to use the consumer factory and it expects of type something so I'll just say string and swing because first what we will do is will consume a string message and then later on we will move to consume a JSON message we need to create this consumer factory I'll just say return new default kafka consumer factory and this expects some configuration and let's create this configuration so this is exactly the same thing which we did in the producer Raziel but here we might use the consumer config instead of the producer config so we need to add some key and value here I'll use the consumer config to provide first we need to provide the bootstrap server configuration I'll just move this since it is localhost I'll just say one 27.0 this is where my Kafka is running so right now it is running in my local machine so I'm using that as my bootstraps over you should be able to get this from the configuration we need to provide a group ID I will show you why we need to provide the group ID for now I will just say group underscore ID I'll just show you where it is useful later now we need to provide the key for the deserialization class this would be string d's utilizer in this area in the producer we use the serialize the class here we are going to use here please he realized a class same way for the values consumer config dot value t serialization class we need to provide a string DD serializer I'm going to use a string for now so I'll just provide a string here it is utilizing so our consumer Factory is set now we need to inject this consumer factory on to our cough Carl is in a factory cough Carl is in a consumer factory we are going to use the concurrent Kafka listener container factory I'll just say cough called as an orphan tree and I am going to create this concurrent so we cannot provide it as a constructor so I need to create a object and then I'll just set this object I'll just say factory dot set consumer factory and I'll just call that consumer factory so that we can directly use it here and this will expect some mob cheering I'll just say string string string string here so that way our car a container factory is using the string object and I will return this Factory so we are now set with the configuration we have created the consumer factory configuration where we are providing some default configurations for the spring boot to identify this is where the kafka is running and these are the PC realization classes which it needs to use wise cut while converting the messages from the Kafka topic right and then we are setting that to the concurrent Kafka listener this listener is required for spring boot so that it can inject on to the required class now where do we write the listener that is why we created a package called listener let's go to that listener class and write some Kafka consumer listener I'll just turn it with service for now and we need to provide a listener right calf car listener annotation helps us in doing that so whenever there is a message inside the Kafka topic this particular listener will be called so that we can directly access the message here so for example I'll just say consume here and the message which we are going to get will be coming in here directly so we don't have to worry about anything else so what I'll do here is I'll just say consume message so this is what we consume now so I'm just printing the message which comes here but how does it know which topic to get it from so this is where we will provide the topic here we need to provide the topic under the topics we need to provide multiple topics if we have multiple topics if we have only single Tommy we can provide that so we have this topic created in the last video right cough condors for example I am going to use that additionally we have to provide something called group so this is where the group configuration was helpful where we created that group configuration here so we said that this particular consumer factory comes under the group group ID so this particular configuration whatever we have right this will be grouped into one group ID and we can mix and match this group ID with the different topics let's say we have a different topic and there also we are going to use the spring message we can use the we are going to use the string message then we can reuse this particular group ID right so that is why this group ID is helpful here if we can have multiple listeners we can have multiple topics to the same listener that's what we can do here also spring boat needs to identify that this particular kafka listener is there for that we need to add the at enable kafka annotation so that way spring will now keep an eye on all the kafka listeners so this is something new in the consumer part in the producer part we don't require it because we did not have any listener however in the consumption part we need a listener to be identified so spring boot needs a handle to it and spin boot needs to scan the code to know that okay there I need to go and check for a listener so that is why we are adding the add enable kafka annotation so I think we have said let's go and start the project so meanwhile let it come up let's go to the console here so I already have these zookeeper running so this is the zookeeper which is running and I have the Kafka cluster also running so this is the Kafka classicism there is only one node basically there is only one zookeeper and one Kafka know right and I need to start a producer there is an option in Kafka by default there is something called Kafka console producer I'm going to use that let's see okay it is up now so I have a Kafka console producer and I am going to listen to the same topic which we used here so let's see if it is the same topic yeah it is the same topic Eric Afghan ask or example I'm just using the same topic and I need to publish that particular message whatever I need so let me run this so now the Kafka producer is ready to publish some message what I will do is I will just say hello YouTube and let's see if this particular message is getting see right now there is nothing here there are no cities here let me publish this message I need to see it here so yeah we got it so this is how we are able to publish the message from the console and we are able to directly get it here let me show it real-time you you I'll just say hi take primers this needs to come there see that trait it is coming hello viewers so all these messages are pushed now to the Kafka from here in the console and automatically these are getting consumed using the listener and these are getting printed here right so this is how you can consume messages basically string messages from the Kafka topic using the spring Kafka in the spring boat application now let's try using the JSON message so I'll create a new model called user which is exactly the same which we tried in the previous example right I'll say I have a name and Department so I'll make it simple I'll just have two fields one is name one is Department you so I have that model ready now we need to inject this model into the configuration here so we already had only one listener factory right so we already had only one rule is in a factory where we had string messages pushed on to it so we are using a default consumer factory which I which is getting messages called string and sing let's create a new consumer factory I want to have this as well right so what I will do is I will create a new one and I'll publish that on to a new topic and see I will show you how we can listen to both the topics from the same consumer so I'll just say consumer factory and this is going to expect a message of type user cell say user and this is going to be user consumer factory this needs to return a new default kafka consumer factory with some configuration right and the configuration i'll just copy from here and I'll just do some slight modification here the group ID would be group underscore Jason and here I'll use the JSON deserialize in additional to whatever we are passing here we need to pass the string D civilization class and JSON D serialization glass of type user you you you so it's not expecting anything okay it is expecting J civilization dot class okay I'll just say user expecting here user dot plus-- so that's it so what I have done here is since we are using a custom V civilization we are going to provide that we are going to use JSON desalination however use this particular model in order to pass that model we need to pass it to the default Kafka consumer factory as an argument so that is what I am doing here in the old string one right we did not pass anything here but here we have to pass these two because we need to get a custom message out of that Kafka so that is what we are doing here and I need to provide this country my factory on to the calf Karl is in our factory right so what I will do I will create a new calf collisional factory when you say consumer I'll say user calf listener factory and we need to create the same object concur and this a factory that's a factory dot said consumer factory and here I will use the user consumer factory and I will return this factory we need to have this off type string comma user string commonly use that's it and they say open closed that's it yep there are no errors no warnings nothing right so we have now created two core new configurations for our new JSON consumer Factory right the user consumer Factory basically and let's use that in the consumer class now so we already have one Kafka listener I'll create one more Kafka listener and I'm going to say use my Kafka underscore example topic and I'll use a different topic in this case so and I will say group is group underscore Jason and there is an option to provide listener basically we can provide the container factor here so what is the container factory their container factory which we use is usually user is in the factory I'll just mention this so that we can directly use that specific container here and I will just say public would consume Jason and I'll just say user user so that way directly the user message gets DC relies and comes on to this particular place so consumed Jason message and I'll just print that user so let me go to the user class and I'll over at the two string so that we know what we are printing that's it so I have just printed it and one more thing we have to do is we need to publish onto the new topic right I have to create that particular topic what I will do is I will just stop this consumer or I will create a new tab you and I'll create this new topic I'll I created this topic called Jason right underscore Jason what is the topic name here and ask Oh Jason right I'll just create this topic let this topic get created once this topic is created I'll just consume the message from this particular topic now so I need to provide a producer so I need to run the producer on this topic right I'll just run the producer on this topic called Kafka and the score example underscore Jason and now I will provide Jason message on to this topic meanwhile let me restart this bring good application so that we have this bring good application ready and we can publish the Jason message on to the new topic from the console so notice here we have multiple DC realizes they're multiple listeners there so what is it what is the thing class cannot find okay so this is because we need to add the we created the Jason D civilization however we forgot to add that the XML dependency we need to put add the Jackson binding dependency here let's define that you you just say to wrote six dot there is a 2 dot 6.7 I'll just use the two wrote 6.7 version because that is what is compatible here let me run this project again you you so the compilation is successful now let's go to the consumer part the server is coming up so if you notice here there are two containers which got registered right the one is the Kafka one is for the Kafka topic with the partition zero the other one is the Kafka adjacent for the partition zero we created only one partition so it is partition zero now if I publish a JSON message from this so I'll just publish adjacent message I'll just say name Sam and Department s technology so this is the JSON message I'm going to publish now that JSON message should be pushed on to the Kafka and it should come here directly see here there is a consumer exception it says Kafka error deserializing key or value petitioners Kafka at offset zero so it's not able to deserialize and this is happening because it's continuously come consuming let me stop the consumer with this is let's see why this is happening no suitable constructor found cannot DC lies option missing default constructor okay so this is because I missed the default constructor here what I will do is I will just hired a default constructor so in the model which I had I did not have a default constructor so it was by default failing sold me rerun this again you and if you notice you're already the message came in because the message was not consumed previously so now it came in let me republish another message and we can see what's happening I'll have it side by side this time what I will do is I will just publish a different name I will use Peter I'll say Peter is from operations now this message should come by default here yep see this we got the message saying consume Jason message let's try publishing the string message which we already had right here we already had this pling comes here probably publisher it's publishing string message this also should come but it should go to a different listener see that that also came published publishing string message I just typed had some type of it right but the consumers are all working so we had two different listeners here in the consumer with two different topics and two different groups and with a different container if you don't provide a container here it leaves the default one but if you have a container you can provide that directly so what we did here is we created a calf configuration for the Jason we had to provide the Jason decent Eliza and we had to create a new Jason deserialization class with the model which we need to convert it to so that is what we need to provide by default and this consumer Factory we need to provide it to the concurrent calf call is inner container factory which would be useful when we are consuming from the listener rate which is here spring boot will by default inject the data into this particular method once it consumes and these he analyzes with whatever configuration we provided and if we need any custom factory we can provide that here so since we had two different listeners with two different message types we used a custom factory and we provided that here and spring was able to consume those messages right additional to that we had to add the a tablecloth connotation in order to make spring boot understand that there is a Kafka listener running in this particular project along with that we had to know add the Jackson data bind dependency I have added the 2.67 version of Jackson data bind which will be required in the application while it is decoding the message from the string to the Jason so it will be required when we convert that message to the model that's how we can consume messages off from Kafka topic in a spring good application I hope you guys understood it as usual this particular project will be available in github you can take it from github you can focus and modify and try it out if you liked the video go ahead and like it if you haven't subscribed to the channel go ahead and subscribe to it meet you again in the next video thank you very much [Music]
Info
Channel: Tech Primers
Views: 100,186
Rating: 4.9432287 out of 5
Keywords: techprimers, spring boot kafka, spring boot with kafka example, spring boot kafka tutorial, spring kafka tutorial, spring kafka example, spring boot with spring kafka, spring boot kafka integration, kafka example java, kafka topic creation, kafka spring boot, kafka spring tutorial, distributed messaging system, distributed messaging, distributed message queues, distributed topics, spring boot kafka explained, kafka consumer example, spring boot kafka consumer example
Id: IncG0_XSSBg
Channel Id: undefined
Length: 23min 2sec (1382 seconds)
Published: Sat May 19 2018
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.