How to write integration test for kafka based applications

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hi everyone in this video we are going to see how to write integration test for the Kafka base application while running the integration test we also need to have one Kafka server running so that we can verify different messages are being published to the Kafka topic or similarly if you are testing the consumer application then based on the messages which are being published to the Kafka topic consumer is receiving that and doing its own functionality but by having the external Kappa server running we might get into some of the problems for example if you are running the integration test and at the same time the Kappa server is down so our test is going to fail similarly if we are testing the consumer application we are expecting some message to be published to the Kafka topic testing some of the behavior while running that integration test because of some external Force there is some different message being published to Kafka topic then our integration test is going to fail because it has different message which we are not expecting within our test we have some of the external Factor because of that our integration tests might fail so in this scenario what we have to do is we need to have one server specifically running for the integration test it could be an in memory Kafka server which is embedded Kafka server or it could be a Docker container so whenever your integration tests are running then only these server will be started with the initial State whatever is being required by the integration test and as soon as the car integration tests are done then all of these application will be destroyed as well so in this video we are going to see more the ways one the embedded Kafka and then the docker container so let's get started I have the consumer service so basically here we have the Kafka listener and it is listening to a specific topic which we have defined within the application properties which is massive stopping whenever any message will be published to this message hyphen topic then our consumer will be receiving that and this message will be stored in the consumer report for this Simplicity I have just added one hash map whenever any message will be received by the sample it will be added into this hash map you can have different scenario wherein you can have database or any other data source now within our test scenario here we have the message that we want to publish and after publishing this message we are expecting that the repo has received that message and this is what we have asserted here we have defined the spring boot test that means it will load all the spring boot contacts which is required for the consumer application if I just run this particular test here let's see what happens feeling and here we have defined the assertion and we are expecting that the approach should have the message with id1 which is this muscle but of course we cannot expect this right now because we haven't sent the message to the copter topic but this is not the only problem right now the other problem is here it is trying to connect with the Kafka server but it says broker may not available and If you just look at the Port 9094 here it is trying to connect with the port 9094 and it could not establish the connection with the Kafka server and this is because we don't have Kafka server running at the moment we can run the Kafka server in our local system or in the Jenkins machine but because of the same region as we mentioned earlier it is a bad idea now what we need is we need to have one Kappa server running within the memory for that purpose we can add the embedded Kafka and here I can add the embedded cap with one annotation which is at the rate embedded Kafka and now if you just look at this embedded Kafka interface so here we have the Kafka test context we have added one Library which is spring Kafka test from this Library we are getting the embedded character and now within emitted Kafka we can Define different properties for example if we need to run a Kafka server in different port or we need to provide the partitions or these upper port or other Kafka related properties all of those properties we can Define within the emitted card here we can set in local properties for example if we need to define the capital server and the port then we can Define the listeners property like this so here we have the local host and the pool here whatever Port we will Define Kafka embedded server will be running at that particular port and this is what you have to Define within your application properties file as well so that your application is listening to this particular code now with this configuration I can also add the partition for example if you just need one Partition by default it has two partitions and I just need one at the moment so I can Define that and also you can Define Jupiter property or other properties as well but for the basic setup we just need these properties now if I run these consumer service test let's see what happens again test is spinning same reason we haven't sent any message to the cop car topic but let's look at the logs so here we have the spring application started and then it has also started the Zookeeper here you can see we have the broker and all the setup is done that means whatever server we have started here as the embedded Kafka that is being started and the cup capitalism is also able to connect with that now only thing is we need to send the message to the capital topic for sending the message to the Kafka topic you can either use the Kafka producer or simple ways using the Kafka template so here we can also add one of the dependency and we can Auto wire the Kafka template with the help of template we can send the message to the Kafka topic template.send we need to send the message to the test message topic and then here we have the data in case of data we need to send the message consumer is going to read this message so here we have to send the message and the topic name we have defined within our application property either we can use the application property name or we can just directly use this as a stopping no if you will send this message to the Kafka topic let's see what happens here it is still failing and the reason behind this is when we are sending the message to the Kafka topic consumer is going to pull the messages from the Kafka topic and when we are running this particular test possibly before asserting these lines the poll did not happen for that purpose what we can do over here is we can just apply the sleeve for a while so here chat dot sleeve and let's sleep for say 5000 milliseconds so still it is spinning and here the reason is when we are sending the message to the Kafka topic cannot convert value of class message to class string stabilizer when we send any message to the Kafka topic there can be key and value right now we are sending the value to the Kafka topic this is of message type and the default serializer is the string sea level as we have added the Kafka properties for the D serializer here we have the Json D stabilizer and the string seed Ledger for the key similarly we also need to add the Kafka properties for the producer here the producer key cell Ledger is the stringser and similarly the value say larger is the Json C layer so whenever any message will be sent to the Kafka topic message will be converted into the Json and that is what will be sent to be these are the producer properties usually these properties will remain in the producer application but while running the integration test or while running the test for that matter we need to have those properties in the test as well so that we will be able to send the message to the Kafka topic if I will run the test now this test is successful and application is able to connect with the Kafka topic and then we have verified that the messages which are sent to the Kafka topic has been received by the consumer and because of that we have this message available within the wrap room there is a possibility that our consumer is holding the message within one thousand milliseconds only but still we are waiting for 5000 milliseconds so we are wasting four thousand milliseconds here instead of waiting for this time what we can do is we can keep asserting and if our assessments are successful then our tests are okay otherwise we will wait and then we'll assert it again as long as our tests are failing we will keep asserting and we can have one upper limit on the time so here we have the test containers dependency and this provide us couple of methods which we can use in our test so let's go back here here we can apply the await and this is coming from the test container here we can wait for some amount of time there are different methods in here like for example we have at most so it is the maximum limit of time for which we need to wait and here I'll just use the durations of Millis the same time we have added about we are going to wait for at most 5000 milliseconds until then we will keep trying and for that we have B until asserted method and here we can just apply the Lambda expression here we can add ba searches like this now if our test will run it will start with the assertion if it is filling it will wait for some time and then it will start again it is successful or the specific amount of time is passed now I'll just remove this thing and with the help of this assertion let's see what happens if you will run the test so our test is successful in this case our test was blocked for 5000 millisecond but here it will start asserting if it is failing then it will keep trying and as soon as it is successful it will just finish the test until this time if our tests are not successful that means the test is failing in the same way we can add the test for the producer as well for the producer we have defined one producer application so here we have the producer service whatever message we are passing to the send message it is sending that to be Kafka topic when we are sending the message to the Kafka topic If the message is successfully being sent then we have the successful scenario and if because of some region message is not sent to the Kafka topic then we have the failure scenario and here it is showing the runtime exception in case of success it is returning this successful send message let's see the integration test and gear within the producer Service Test we have should publish message and what we are expecting is we have this message and whenever the producer service send message is being called we will get the success and message so this will verify our successful scenario similarly we can verify the error scenario as well for the producer as well we need to run the emitted Kafka server and if I just use the consumer configuration we can add this embedded Kafka to our producer activation as well within the producer Service Test let's add the embedded Kafka now whenever any message will be sent to the Kafka topic then the producer will return the successfully sent message and if I run this test the test is successful that means whatever message we are sending from here it has been sent to the Kafka topic and after receiving the successful response we have got this response successfully send message so until now we have used this embedded Capital within our test case with embedded Kafka if we need to provide certain configuration for the Kafka server we can provide them here but these configurations would be limited so if we need to mimic the exact Kafka server in that case a minute Kafka will not be able to help us here for example if we need to provide different partitions for different topic so you cannot do it in that way because here if you provide the partitions it will be available for all the Kafka Topics in this scenario we can use the talker container for the Kafka instead of using the embedded Cup car we can use the test containers for using the test container we need couple of libraries and those libraries are here so we have the test container J unit Jupiter here we have the test container and this is specifically for Kafka if you have requirement to use mongodb or postgres or any other data source then you can use the specific library for that now with this Library let's try to add the configuration for our test container in the producer Service Test we will remove this embedded Kafka and here we would need the test container so now I'll use the test continuous annotation this test containers annotation is coming from the J unit Jupiter that is the library that we have used over here now whenever our test will run it will load the container and whatever type of container we are initializing that container will be created to instantiate a container we need to add annotation at the rate container and here we can Define the container for example if we have to work with Kafka so we can use the Kafka container that is coming from the Kafka container library that we have used and container is equals to you can just initialize it new Kafka container and here you need to provide the docker image name and then you can pass the name of the image which you want to instantiate for Kafka we can use the Kafka image and once we do that our Kafka container will be initialized now if we need to provide some of the more configuration so instead of adding this configuration over here we can use another class or the configuration class and I have already defined one Kafka test config here we have the configuration class Kafka test config within this we have defined the containers instead of defining it in the test class we have defined within the configuration and this configuration we can use in all the tests that we have here we have used the docker image for the Kafka and we just need to start the kapha container by applying the start method whenever the toggle container will start it will start the Kafka server and that Kafka server might run in different port every time whenever the application is getting started application property we cannot Define the specific property for our bootstrap server so here we have the bootstrap server earlier we were using 9094 now we cannot use this port because this might be different all the time for that purpose what we need is we need to Define one Dynamic Property and here instead of the Kafka bootstep servers we would use the property and this property we will Define at the runtime and for that purpose within the Kafka test config we are setting the property and here we have the same Kafka bootstrap servers and that property we will get from the stock.get to the step server now we have defined this Kafka test config and this config we would also need to use within r service test at the rate context configuration now classes and we have the Kafka test config.class with this configuration if I run the test for the first time it might take time because it will download the docker image and then it will start with container so here our test is successful and you can see it has started the docker container and then here you can see we have the bootstrap server which is right now listening to this particular Port so this port will change every time whenever we will run the application because of this region we have added the test config here and we have specifically provided the property with the current value of the bootstrap server and that is what our Kafka application is listening to you as well for running the test with the test container we also need to have Docker installed in the system where the tests are running so this is the primary requisite for running the test with the docker test containers this is it from this video we have seen how we can use the embedded kafkas and also we have seen how we can use the test container to run our integration test I hope you enjoyed this video thanks for watching stay tuned and happy coding
Info
Channel: Code with B
Views: 5,665
Rating: undefined out of 5
Keywords: Testing Kafka and Spring Boot, Integration Tests for Kafka Consumer, Step by Step Integration Tests on Spring Kafka, Integration Test Spring Kafka with Embedded, Advanced Testing Techniques for Spring for Apache Kafka, Brijesh pant code with B
Id: XaEdtErIgjQ
Channel Id: undefined
Length: 16min 35sec (995 seconds)
Published: Wed Jul 12 2023
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.