Apache Kafka Tutorial with Spring Boot Reactive & WebFlux | Kafka Tutorial

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
today we're immersing ourselves in the dynamic realm of realtime data processing Guided by technology that has truly transformed the landscape of distributed systems Apachi kfka isn't just a tool it's a Powerhouse reshaping how we navigate and manage data streams whether your season developer a data engineer or someone simply captivated by cuton Edge Tech this tutorial is your golden ticket to unlock the Core Concepts and capability that Kafka brings to the table in this video we're embarking on a journey to unra the intricacies of the Kafka ecosystem we'll explore the Nuance of Kafka clusters producers consumers and topics venturing into the dynamic Universe of partitions additionally we'll delve into the collaborative efforts of consumer groups and Rael the magic of offsets and domestify serialization a pivotal element in cfas enchanting data sorcery here our agenda for today's exploration understanding the fundamentals of message broker and uncovering its advantages immersing ourselves in the Kafka ecosystem breaking it down in components and building blocks then we will explore the intricacies of a Kafka cluster Kafka broker Kafka producer and a Kafka consumer and then demistifying the concepts of Kafka topics partitions and offsets then investigating how multiple customers collaborate for enhanced performance but we're not stopping at Theory we'll put these Concepts into practice by first showing and guiding you step by step how to install Kafka and navigating through the fundamental elements of the Kafka then harnessing the power of Kafka within spring boot framework to send and consume data and I will show you how we how we configure topics how we can configure kafa to for producer for Consumer how to send data and how to consume data and now Envision this captivating scenario a continuous stream of data orchestrated by a reactive spring boot producer seamlessly delivering messages to a Kafka broker now picture a diligent consumer meticulously transcribing these messages into enchanting realm of Dynamo DB exciting isn't it so stick around because this journey is about to become even more captivating whether your season developer or a newcomer to the realm of real data this tutorial is meticulously crafted to keep you engaged from the beginning to the end I guide you through each step demystifying kafka's concept and by the conclusion of this video you'll wield Kafka Powers with the confidence but before we dive in do me a quick favor hit that subscribe button and ring the notification Bell and help this channel grow more and more you don't want to miss the incredible Tech content I've got lined up for you also be sure to check out my website for additional courses and resources so let's stay connected on social media all the links are conveniently placed in the description of this video down below so grab your te gear brew a cup of coffee and let's embark on this exciting Kafka adventure together our code is ready the data stream is flowing and we're on the verge of unlocking the secrets of apachi kfka so let's the journey begin first let's understand what is a broker so a message broker like we see here we have Apachi kfka Amazon sqs and rabbit mq all these are classified as message Brokers so a message broker is an intermediary software component responsible for facilitating communication and data exchange between different applications or systems so its primary function is to decouple producers which are the application that send the data to the broker from the consumers which are the application that receive the data so the message broker acts as a mediator ensuring that messages are delivered efficiently and reliably from producer to the consumer and the key characteristics of message Brokers are first we have decoupling so the message broker enable loose coupling between the application by allowing them to communicate without needing to be aware of each other's existing so here we have the left hand side will send a message to the broker and the right hand side will consume the message from the broker so we don't need to know who sent this message then we have an asynchronous communication so message Brokers often support async communication allowing producers and customers to operate independently of each other's timing and availability and then we have scalability so message Brokers can be designed to handle large Vol volumes of messages and scale horiz horizontally to accommodate growing workloads and finally reliability they often provide mechanisms for ensuring message delivery and reliability even in the face of system failures so this is a global overview of a message broker now let's see what is Apachi Kafka which is the topic of today's video by having a look at this diagram so this is a global overview of the Kafka ecosystem so first of all Apache Kafka is distributed fault tolerant and highly scalable message broker and stream processing platform it was originally developed by LinkedIn and later open sourced by an Apache software Foundation project so kfka is designed to handle large volumes of data streams in real time and fault tolerant Manner and here let's have a look on the key components of the CFA so first we see here we have producers so an application that publishes message to the kfka topic then we have on the on the other side we have consumer or consumers so the consumer is just an application that subscribes to the topic and processes the published messages then we have a broker so here a broker or a Kafka broker form the core of the Kafka cluster so storing and managing the streams of Records then within a broker we have topics so a category or feed name to which records are published so topics in Kafka are used to categorize messages then we have also we will see this later on we have also partitions within Kafka so topics are divided into partitions allowing Kafka to parallelize processing and scale horizontally then we here we see here that we have something called zookeeper so the Zookeeper or Kafka relies on Apache zookeeper for distributed coordination of and management of the Kafka cluster so the Zookeeper will explain it more in details just in a few moments its main role is to manage these Brokers kfka comes with many advantages so first Kafka can scale horizontally by adding more Brokers to the cluster providing high throughput and low latency data processing then we have durability so messages in Kafka are persisted to disk providing durability even the event of node failures then it's fa tolerant so Kafka is designed to be tolerant ensuring that it can continue to operate seam seamlessly in the face down of Hardware or software failures and then it's a realtime processing so Kafka allows for realtime stream processing making it suitable for applications that require low latency data delivery and then as we spoke before decoupling so Kafka topic based architecture enables the coupling between producers and consumers allowing flexibility and Independence in application development then we have the data retention so Kafka provides configurable data retention policies allowing organizations to retain messages for a specified period and finally it's the ecosystem integration so Kafka has a rich ecosystem with connectors for integrating with various data storage systems stream processing Frameworks and analytics tools now let's break down the Kafka components and let's start with a Kafka cluster So within the context of Kafka a cluster is a group of Brokers or we also call them servers working together for three reasons reasons first speed or we also call it low latency and then durability and scalability so several data streams can be processed by separate servers which decreases the latency of data delivery data is replicated across multiple servers or Brokers so if one fails another server has the data backed up ensuring scalability meaning durability and availability so Kafka has also balances to load across multiple servers to provide scalability so to resume a Kafka cluster is a group of one or more Kafka Brokers also we call them servers working all together to manage the storage and exchange of data in the form of messages so Kafka is designed to operate as distributed system and a cluster allows it to scale horizont entally provide for tolerance and handle large volumes of data across multiple nodes so Kafka Brokers are servers with special jobs to do so managing the load balancing replication and stream decoupling within the Kafka cluster so the question is how do they get this job done so well first of all in order to start a Kafka cluster so the developer authenticates to a bootstrap server or a few of them and I will show you that within the code later on these are the first servers in the cluster then the Brokers also balance the load and handle replication and those two features are key to Kafka speed scalability and stability so a Kafka broker is an individual cfca server that stores data and serves client requests Brokers within a cluster communicate with each other to ensure data replication and maintain cluster metadata and each broker in a kafa cluster is assigned a unique identifier the next block is simply a producer so a producer is one of the simplest building blocks of the Kafka ecosystem so in Apache Kafka producer is a client application that publishes or writes events to a Kafka cluster so the producer can be a Java application php.net application or any other type of application even a command line and it simply writes or sends an event to a Kafka cluster right here so this is the definition of a producer now let's move on and see other building blocks so back to this one we said that a producer is the one who sends the the message or the WR writes the event so the consumer is from the other side is the application or the system that consumes or subscribes to the topic or the Kafka cluster in order to consume the events SOI consumers are applications or systems that subscribe to Kafka topics and process the messages and consumers can be part of a consumer group allowing them to parallelize the processing of messages so this is the definition of a customer now let's move on to the next part we mentioned many times the keyword topic so let's see what is a Kafka topic a Kafka topic is a logical Channel or feed category to which records or messages are published by producers and from which records are consumed by consumers so topics they serve as a way to organize and categorize the system of messages within the Kafka messaging system in Apache Kafka a topic is fundamental abstraction that represents a category or feed name to which records or the messages are published by the producers and from which records are consumed by consumers so topics play a crucial role in organizing categorizing the flow of data within a Kafka cluster so they provide a way to structure and manage the data streams allowing for the separation of concerns in a distributed and scalable manner then when we talk about topics of course we cannot skip talking about partitions so in apachekafka a partition is a basic unit of parallelism and scalability it is a way of horizontally dividing a topic into multiple independently managed units each partition is a strictly ordered mutable sequence of records and it plays a crucial role in the distribution parallel processing and fault to tolerance of data within a Kafka cluster so as alwaysi so partitions are a crucial concept cep that allows for horizontal scaling and parallel processing of data within a topic so a topic can be divided into multiple partitions and each partition is linear ordered sequence of records and understanding partition is key to leveraging the scalability and performance benefits of CFA so now let me tell you about some key aspects of partition in Kafka topic so first of all partitions enable parallel processing of data each partition can be thought of as an independent stream of messages so producers can write and consumers can read from different partitions concurrently allowing Kafka to handle a higher volume of data by Distributing the workload across multiple partitions so then Kafka a Ives the scalability by Distributing partitions across multiple Brokers and allowing each broker to handle a subset of the partitions as the data load increases you can add more Brokers to the Kafka cluster and the partitions are automatically reassigned to maintain load balance and improve the throughputs and messages within a partition are strictly ordered based on their offset which is a unique identifier assigned to each message in the partition this ordering is guaranteed per partition but not AC cross partitions once a message is written to a partition it becomes part of an immutable log so this characteristic simplifies data processing and ensures consistency within a partition next let's see how this off setting works so when we talk about partitions we also talk about offsets so an offset is a unique identifier assigned to each message within a partition in a Kafka topic so it represents the position or location of a message in the partition's log so offsets are used to track the process of consumers and enable them to resume consumption from a specific point even in the event of failure or restart so here let me give you some key points in order to understand better the offsets so first we talk about unique identifier so it's each me message within a partition is assigned a monotonically increasing offset so the offset is unique across partitions or topics so one here we can find it across many partition so it's not unique across partitions but unique only within one single partition also they have a sequential order so offsets are assigned in sequential order as messages are produced to a partition so the offset of a message is determined by its position in the partition's log once assigned the offset of a message is immutable it does not change over time even if other messages are produced or consumed in the partition and finally the consumer progress so consumers in CFA keep track of their process by maintaining the offset of the last processed message in each partition so this allows consumers to resume consumption from the point where they left off ensuring that no messages are missed so now let's picture together how offsets works so first let's get it from a producer perspective so when a producer sends a message to a cfat topic the message is appendant to the log of the appropriate partition so the producer receives an acknowledgement once the message is successfully written to the partition at this point the message is considered to have an offset all right so then from the consumer perspective so consumers they track the offset at the last processed message in each partition they consume from as a consumer process messages it updates its offset to reflect the position of the last successfully processed message the offset is committed to a special Kafka topic called consumer offsets topic so this Topic stores the mapping between consumers groups and their committed offsets so now in the event of a consumer restart or failure the consumer uses the committed offset from the special CFA topic called consumer offset to determine the last processed message for each partition so then the consumer resumes consumption from the stored offset ensuring that it continues from the point of the last successfully processed message finally let me explain the offset committing so consumers can write offsets periodically or after processing a batch of messages this commit operation updates the stored offset or the Kafka special offset called the consumer offset topic so Kafka provides different mechanism for offset committing such as automatic committing or manual committing depending on the consumer's configuration all right so we spoke about consumers and we mentioned this word many times and we also in the beginning we spoke about consumer groups so next let me explain to you in details what is a consumer group now let's talk about consumer groups in Apache Kafka so consumer groups are a mechanism designed to enable parallel and scalable processing of messages across multiple instances of a consumer application so consumer groups allow a set of consumers to work together to consume and process messages from one or more partitions of a topic so this concept is particularly use useful for achieving High throughput data processing and load balancing in distributed systems a consumer group is a logical grouping of Kafka consumers that work together to consume and process messages from one or more partitions of a topic so each partition in a topic can be assigned to at most one consumer within a consumer group so as you can see in here so this ensure that each message within a partition is processed only by one consumer at a time consumer groups are especially beneficial in scenarios where there is a need of parallel and scalable processing of large volumes of data also where we have have a high throughput data streams need to be distributed and processed concurrently and finally where load balancing across multiple consumers is essential for efficient resource utilization now let's get started with Apache Kafka and let's have a first handson so the first thing that we need to do just go to open the browser and navigate to the URL Kafka apache.org or just type in Google Apachi Kafka so then open the this link and here will land on this landing page so here you see that this is a brief definition of Apache kfka saying that Apache gafka is an open-source distributed event streaming platform used by thousands of companies for highperformance data pipelines streaming analytics data integration and Mission critical applic ations and then you can also have a look at the rest of the website for us the most important part is to start or download Kafka but to do that just go to get started and then quick start so here you can also watch this video and then just scroll down the first step is is to get Kafka so in order to get Kafka click on this download link it will navigate you to a different page and here you can download the recent or the most recent version so for today at the time and the date of recording this video so the version is 2.13 360 all right so now assuming that you clicked on this one and downloaded already this ZIP file let's go back and follow the rest of the steps so the first thing that we need to do is to extract this zip folder so I already downloaded gafka and it's already here so just double click in order to unzip it and here just for my case I will just call it Kafka uncore server just in order to be easier for me to navigate through the terminal or the command line so next we need to start the Kafka environment so in order to do that we need to start our zookeeper so in order to do that we need our terminal and we need to navigate to The Path where we extracted Apache kfka and simply run this command all right so on the terminal make sure that you are on the root folder where you extracted the CFA server and to make sure just run the command LS and here we see that we have the zip file and also the folder where extracted so first let's navigate to the Kafka server folder and in here we will run the First Command that we have in this documentation so I will just copy this one and then I will run it in order to start the Zookeeper service first so let's space it here and now just in case you want to see what do we have inside this Kafka server so here we have this bin folder where we have all the shell scripts all the scripts and also here for the windows users you have this Windows file folder sorry so in this folder also you have all the best the bad scripts so you can run them instead so this one this beenin folder is compatible with Mac and Linux and also for Windows users you need to navigate to the windows folder in order to be able to run these command so you need to change your path accordingly depending on your operating system all right so now let's go ahead and run this command and see what will happen so here we need to make sure that everything started without any exceptions without any errors which is the case that we have right here so we don't have any exceptions and if everything is up and running with the zooe keeper now let's go back to the documentation and let's check the next step so then we need to open another terminal session and run this command so I will go back to my terminal and I will open a new tab or you can also open a new terminal instance and run this one so here on this instance I'm also again in the Kafka server folder so I will paste the command to run the Kafka servers so hit enter and here let's make sure that everything is up and running so as you can see from the logs right here everything is up and running and we don't have any exceptions in case for example if the port is already used or if you have another instance running and so on so forth so you will face an exception and depending on the exception try to solve it and just in case you you can solve it just baste in the comments comment down below or just paste your issue on our Discord server and you absolutely will get help so as you can see here the Kafka server is already started on the port 9092 which is the default port for the Apache Kafka broker so now let's explore a little bit more Kafka so we will go back to the documentation in here so then we can create a topic to store uh events so all we need to do is to copy this Command right here so I will copy it and then I will open a new tab in my terminal and run this command so I will paste this command so this gafka topics and then we have this this argument create and you want to create a topic let's call it quick start event and then we need to precise the bootstrap server which is our local host 1992 so pay attention to these arguments because we will need them later on when we start our spring application so let's hit enter in here so here it says that created topic Quick Start start event so now let's go back and see what are the next steps in order to publish some events and to consume them later on so here as a next step we can also describe the different topics that we have so let's copy also this command and see what are the topics that we have so for example here I will create another topic and I will call it so for example I will call it test test topic and then I will describe or list the list of the topics that we have in our broker so now I will paste the command that I already copied so Kafka topics and then describe the topics and here so we want to describe for example the quick start event and if I hit enter it will give us the details of this event if I run this command again and put the test topic it will give us also the information of our test topic or the second topic that we created all right after that let's try to write some event or to push some messages to our broker by pushing them to this topic or the quick start events topic so let's go back to the documentation and follow along so here to write some events into the topic we already have a script providing provided Us by Kafka that we can use to publish some events to some topic and as you can see here so we have this command I will copy it here and then let's explore it together in the command line so now if I paste this one you will see that we have this command so it's using the CFA console producer sh so this is the script and we want to to publish to the quick start events so this is our topic and here we have our bootstrap server which is the one that we just started before so I will hit enter and then we will be prompted and now we are able or will be able to publish any event so here hello this is my first message for example and then hit enter so this one was already cued to the Kafka topics now let's go back to the documentation and see how we can consume a topic so now in order to read the events I will copy this one and then I will open a new tab in my terminal and I will consume it because I don't want to exit this one I will publish other message in just few seconds all right so now in this new tab as you can see here we have also a script it's called Kafka console consumer and here we we need to precise the topic which is the Quick Start events the one that we already published on it in here so also from beginning we want to start consuming from the beginning and here we have the bootstrap server of course that we need to precise so now if I hit enter so you see that here we have hello this is my first message and this consumer is already listening so in casee I just type a new message so this is a new message and I hit enter and if I go back here you will see that it's already displayed in here so you can have your terminal like both instances one next to the other and once you push an event here so hello again I will try to go fast to show you so here I publish and now it will pop up as you can see in here so this is how we can publish and we can how we can consume also an event so all this is already in the quick start also you can follow along and you can see the different other the other different options that you can use within from or from this documentation now let's move on to how to see how we can implement or how we can use kfka within a spring boot application and finally don't miss out the rest of the video because we will be implementing a really nice application using spring boot reactive and we will be consuming a reactive rest API and we will be publishing everything to our Apache server and consuming the messages from there all right now let's go ahead and create a new spring boot project so in order to do that just go to the spring initializer or you can also use your IDE anj or eclipse in order to create a new spring boot project so here first I will choose a maven project Java language and then the latest version which is 315 and now I will provide the group ID which is com. alibu and then the artifact I will say just Kafka Das demo all right so here the name would be Kafka demo and here spring demo project for spring Boot and Kafka and here I will just call the package Kafka instead of Kafka demo all right so now we need to add few dependencies first let's start with Lum book and then we need need spring web in order to expose our rest API and to be able to send and consume messages and finally we need to search for Kafka and here you have two dependencies choose the first one which is spring for Apache Kafka all right so now you have your project go ahead click on generate and open it within your IDE all right so now I have my project open and ready so I opted for the monor repo approach and here you see that you have the scafa demo project and then we will be adding other projects when we move to the final application demo that we will Implement together so here for the Kafka demo here let's go first and check our resource file so here we have our properties and you know I prefer using yaml representation so I will just switch it to yaml and now let's add few configuration first we need to configure the Kafka server so here here we have spring. cafka and then we have consumer so we want to give the configuration for our consumer first so and then we need our bootstrap servers so here as you can see it's we can provide multiple servers not only one and for that remember from the command line it was Local Host and then the port 9092 all right so next as you remember when we explained in the in this diagram we said that a consumer is a part of a consumer group so now we need to tell which consumer group we want to create so we can create multiple consumer groups but for now let's just go ahead and create a group ID and here let's give it for example my group just for start then we also spoke about offsets we spoke about partitions and so on so forth so now now also we need to give another property in order to tell spring what we need to do when for example we lose the offset or we want to reset the offset so here we have a property called Auto offset reset so here we want to say what we want to do in case we lose the reset or we want to reset our offset so here we you see that we have several options we will be using the first one which is the earliest and as you can see here see it automatically resets the offset to the earliest offset and here we also have exception so it will be thrown an exception to the consumer latest will we automatically reset the offset to the latest offset and N if we don't want to opt for a reset offset so let's choose earliest in here and then as you can see in here so this is like the update of our offset so this is what we made and now in order to consume a message so the message will be serialized from the pro producer and when we when it comes to the consumer we need to deserialize the message so we have serialization from the producer part and Des serialization from the consumer part so this is what we will be providing as properties right now so here let's say we have a key deserializer so like because the message will be like a key value so we will have a key and a value so the key in this case we will use the org. aache doka and then we have common it's a package called common and then serialization and then dot so for now like we will start with a simple example like we will be sending strings and consuming strings so we will be using a class called string D serializer all right so let's duplicate this line so here as we can see we have key serializer now we need to provide the value serializer the serializer sorry so here the same way we will be using the same deserializer since as I mentioned we will be in the first example sending only strings and then we will see another example if we want to send and receive Json objects for example now this is the configuration for our consumer let's go ahead and configure our producer so for the producer also we need to tell what is the bootstrap or what are the bootstrap servers so here don't worry in this in this demo project we it will be the producer and the consumer at the same time but later on we will be creating a full application where we will be implementing an application and we will have an application as a producer and another application as a consumer so now it's just for demo just to get you started with Apache Kafka so here imagine like now we are on the producer project or the producer application and we are configuring the producer so here this part you put it only on the consumer project and this part you put it on the producer project so here let's go back and finish the implementation for our producer so in the same way here we need to tell or we need to provide the bo strap servers which is the same one as in here since we will be sending to the same broker and then also we need two properties these ones so I will copy them and then I will paste it so we have the key now it's not serializer but it's deserializer all right uh sorry it's the other way around it's not deserializer but it's a serializer as I mentioned before when we send an event or when the producer sends an event or pushes a message it needs to be serialized and the consumer needs to serialize it to deserialize it so now so it's the key serializer and the key and the value serializer and here you need to be careful about this point so for example the prod the producer if the the producer sends a string you also need to have a this a string der serializer but just you cannot for example send uh another type of data serialized and then you try to deserialize using a different der serializer all right so now we have our configuration for our consumer and our producer so let's we can just try and click start the application let's enable uh annotation processing for lombok and here we just need to make sure that the application is up and running which is the case right here so for example here we did not provide for example something wrong and here just we need to pay attention to some property as we mentioned here that it's a key serializer also this one needs to be string serializer not D serializer so also you need to be careful about this point right here so I will stop the application and now let's move on to the next part after providing the CFA properties the first thing that we need to do we need to create our topic so the first thing let's right click here Java class and I will put it in a config package and then I will call it Kafka topic config so Kafka topic config so here so this one in order to make this a configuration as you know within spring boot we need to mark it as configuration and then what we need to do we need to provide a bean of type Topic in order to create a topic so here let's use the be annotation and then let's create our Bean which is will be of type new topic so this means that we want to create a new topic in our Apache Kafka message broker so let's call it for example alibu topic or my topic or you can call this Bean wherever you want so now we need to provide or we need to return an object of topic Builder so we have a class called topic Builder and then we just give it the name so this will will be the name of the topic and here let's call it alibu for example and within this Builder we can also provide other information and among these informations since we said and as explained before a topic is already composed of several partitions so here you can say for example I want to have uh for example let's say five partitions also we can have some replicas and so on so forth but this will be our own configuration and also we can leave all the to Kafka in order to have the default configuration from Kafka so then we just need to provide the name in order to build a default topic we just need to provide the name and then call the build method so this way we just created a new topic called alibu or also you can call it for example alibu topic or anything you want so the next step is is to create a CFA producer so we need to create the producer class that will send or that will cue a message to our broker now I will right click in here and create a new class and I will create a package called producer and then I will create a class called it Kafka producer and now what we need to do first we need to mark this class as a spring component or spring Bean so we can use for example the service annotation or we can also use the component annotation or any other custom annotation you want to create also I will be using the required Constructor in order to have a Constructor with parameters or with the required parameters that I want to so the first thing we need to do is we want to have a private final and then we have an object or a bean called CFA template so this Kafka template takes is already a generic one and it takes let's download the source and as you can see here it takes a key and a value so here the key type and the value type and this one if if you remember the configuration that we created together for our producer which is the key serializer and the value serializer so now we need to provide the same object well like two objects key value of type string so this is how it works I will show you later on when we want to use Json for example so here it will be the key is of type string and also the value will be of type string let's call it Kafka template and now so we don't need to create manually the Constructor since it will be provided by this annotation so now I want to create a public void so I will call it for example send message or publish message you can call it anything you want and for this I will ask to get a parameter uh of type string and I will call it message and then all I need to do is to call my kfka template and then send so this send we have like several implementation or several override method as you can see here we can whether send a message or topic and the message produce or record and so and so forth so for our case we need to provide to which topic we want to send our message and then the object or the message we want to publish so the topic it's alibu as mentioned here in this configuration so you need to be careful don't make typos otherwise you might have exceptions so here just before that I will inject here my annotation slf4j in order to have a logger and I want to log. info and then I will use string. format and then for example let's say sending or message send for example Sending message to alibu topic and here let's say for example this is our message and it will be percent s and here we can concatenate with MSG also if you want to we can also add static import to this one so here you know that this format is coming from the string class so now we have our producer so this producer is able to send or to c a message to our Apache Kafka server all right now let's go ahead and create a wrist controller in order to be able to call this Kafka producer so I will create a new class and I will call it for example in rest or you can call it controller whatever you want just feel free to name your packages and classes the way you want so here I will just call it message controller all right so here it will be rest controller and then I want to have a um a required RS Constructor and also I want to have a request mapping for my controller so I will call it slash API and then slv1 and for example I would say messages so here I will need my private final Kafka producer and then let's call it Kafka producer or just producer and then I will create just a post mapping since we want to post something so here we have a post mapping and then it will be a public response entity and let's say for example of type string so here I will say for example send message and then let's ask for example for a request body which will be just a string and then let's call it message so for our message then what we want to do all we need to do is first calling our kafa producer and then send message and let's pass the message as a parameter and then let's return a response entity do okay for example and here we can say for example message cue and we can add it for example successfully all right so here we have our message and we have our rest controller now before going and moving to testing let's just try to start our application and see what will happen if everything is fine or if the application starts or not so here we see that we have a bunch of extra logs so this is coming from the configuration that we provided for our Kafka and also from creating the topic the configuration to create the new topic so as you can see here we have the Boost servers we have the configuration that we provided and also this is the configuration of the default configuration from Kafka so if we scroll to the bottom here we see that the application already started now let's go ahead and test this application or test this small IPI so before we move on and start testing let's first open our terminal and now let's navigate to our server so here I will just navigate to our Kafka server and now let's go back to the documentation and as you can see here in order to read an event we need to run this command line I will copy it and then I will paste it in here so let's change the topic name since our topic is called now alibu the one we just created within our application and let's hit enter so far we don't have any messages consumed or queued to our application so now let's open Postman and let's send the first message so here I prepared already the request for you so all we need to call is HTTP and then our application is already started on the port 8080 and here our API is sl/ API slv1 SL messages and here we want to send a post request so we need to provide a body so our body will be of type Pro or will be row and the type or the content will be of type string so here let's say this is my first message to Apache Kafka all right and then let's click on send and see what will happen so we have 200 in here and also we have the message queued successfully now if I go back to our anell so we see here that our consumer already consumed this message so again if we go back and we send another message so let's say for example here is a new message and we hit or we click on send and we go back again to our terminal we see that we have the message right here here so this way we are able to send a message to our queue from our producer now let's move on and let's Implement a a real consumer for our topic all right now let's create our consumer first of all I will stop this consumer from consuming the messages and I will also stop the application so now let's open here and let's create a new class and in a new package I will call it consumer and then I will call call it for example Kafka consumer as my consumer class name so this one of course we need to mark it as a spring component so let's give it the service annotation and then all I need to do is to create a public void so this is our method and then let's call it consume message for examp for example so let's call it consume MSG and then what we want to consume so we are already publishing or pushing a string m message so we need to consume a string message all right so now before that let's use our slf for J annotation in order to have our logger so let's say our log. info and here for example let's use again our string do format method and here let's say consuming the message from alibu topic and let's say here let me add this space and here let's also concatenate our message so percent s and here it will be MSG all right we can also statically import this one and now what I need to do is simply add an annotation in order to tell spring that this is a Kafka listener so as I mentioned the name is already Kafka listener So within this kafa listener we need to provide a bunch of information well not a bunch of them but at least we need to provide two or one minimum information so the minimal one is the topics what are the topics that we want to consume and in this case we want to consume our topic called or we named it alibu when we created our topic configuration and then since here in this configuration file we said that we want to have a group ID and we called it my group so here we we can also provide this this information to this Kafka listener annotation so here we have a group ID and let's say that this listener or this consumer is part of this group ID all right so now we just want to consum a message and see that this message is just running so now just make sure again our consumer the one the one provided by Apache kfka is already stopped and now if I run the application we will see that the consumer or this consumer will be logging this information or the consumed messages so let's start our application and see what will happen so all right so here we have our application running and we already see some logging from our Kafka consumer so here it says consuming the message from alibu topic so this one this is the first message and here is the new message so now let's send another message so this is a message to the consumer and let's click on send now if I go back to the browser we will see that we have a new message logged here so here we have consuming the message and this is the message to the consumer if we send another one it will be also logged in here so now this is how we can create a consumer for our application all right so now what if we want to send Json data format so first of all Apache kfka stores and transports data in a bite format so there are numbers of buil-in serializers and deserializers but it does not include any for Json and the reason that CFA does not provide a Json serializer is to avoid imposing a specific serialization format on users so kfka is designed to be agnostic to the data formats used by producers and consumers allowing flexibility in accommodating various data structures and serialization formats so but spring Kafka created a Json serializer and Json derial deserializer which can we use to convert Java object to and from Json so what we will see so we will send a Java object as Json format from the consumer and it will from the producer sorry and it will be consumed by the consumer as also a Json format so we will see how we can configure our application or our consumer and producer in order to send Json and to receive Json so the first thing we need to do we need to go and update the configuration so let's open our application yaml right here and here where we have this deserializer for the consumer and the serializer for the producer we need to introduce a small change right here so the change is we need to tell our application which serializer and D serializer we want so I will just duplicate this one and comment out this line so here instead of using the org. Apache and so on so forth we have another implementation we say org. springframework do cafka do support do serializer dot we have a class called Json D serializer so this is the class for our D serializer we will do the same for the serializer so I will duplicate this line comment out the old one and just change this this path so this one should be Json serializer let me remove this and here we need to talk about serializer not D serializer and of course don't forget to remove the D from here so it's value serializer all right so next what we want to do we want to create an object that we want to send from our consumer uh from our producer and consume from our consumer so here I will just create a new class in a package I will call it payload because when we send something we call it payload and I we call it for example student so I will send a student object having for example private int ID and for example private string first name and last name so this one also last name and we can use the lombok annotations to create Getters and also Setters and we need the all arcs Constructor and no arcs Constructor but we can skip that since it's already we have our default Constructor there so now the next step is let's see how we can or like do we need any configuration to send and receive Json format the the answer is just in few seconds so after changing the configuration the next step is is to change our producer so here we have the example for the string producer let's keep it that way and I will create a new one so I will create a new class and I will call it Kafka Json producer so like you can have and you will have the whole code for everything so the first thing of course this is a service and I need my required arcs Constructor in order to inject my dependencies so again we need our Kafka template so here I will do private final CFA template and now it will be a string for the key and our object as the value so our object is the student so it will be student let's call it again cfat template and now I need a public void and let let's call it send message so to send a message what we need we need our student as object so let's call a student or simply data or let's call it simply student all right so after that what we need to do we just as we did with the Kafka producer just CFA template. send but it's not the case so here what we need to do we need to create an object of type message so here I will create an object of type message and it's coming from the kfka package so we need to be careful about this so this message will be of type student and then I will call it message equals message Builder Dot and then we have a method called Write payload sorry not write but with payload and I guess I imported this message from the wrong package but we we will see so here we have with uh payload and our payload is the student object so next we have to set the header so we can set header or headers so our header is the Kafka topic so we have a class called Kafka headers do topic so this is the topic or like the object that we want to send we want to specify with to which topic we want to send this one and our topic as we mentioned before is called alibu and then do build all right so here this build method it will return object of type student and here we can make sure or check which package is this one so yeah when we imported our message class we imported it from the wrong place so it should be orgspring framework. messaging not the one from Kafka so let's change this one so we can remove this import from here and we can import the new new one so it's the correct one is springf framework. messaging and as you can see it's a generic type so in this way we are able to send our message but here we just built the message we need now to send it so now we need to call our cka temp plate do send and as you can see the send it also like we have an send method that takes a message as an object so here we want to send our message all right so this is the method that we are talking about it's a completable future and it takes the message as an object all right so this is the send method now let's move on to the next step all right now let's adjust our controller so the first thing here we have one kfka producer which is a service and we have the new one which is also ajacent the kfka Json producer which is also a service so when I come here to the controller we might face like an issue we like we cannot know or we don't know which one to inject but here we have like uh the Kafka producer so here we just need to change this one to the Kafka Json producer and consume that one all right so now I will adjust this method so I will just duplicate this method here and I will call it send Json message and here instead I will ask or I will require a student object and here for this send message I will just inject the new Kafka Json producer and I will call it Kafka Json producer all right so now instead of using the kafa producer let's use the Kafka Json producer and let's send the message so here also we can return send CED successfully and let's say as Json just to differentiate between the two different method methods so here we have a post mapping and now I will just add a small thing just to differentiate it I will just add/ Json to know that we are calling the Json Method All right so like this is just for a test it's it's not the best way like to inject to services like doing almost the same thing we need to separate them or like use one of them but it's just for the purpose of learning so now we even have our our consumer right here so which consume this topic and now we can just go ahead start the application tested and we will see how this message will be displayed so it it will be will it be displayed as a string or will it pose an issue since we have a Json D serializer so let's go ahead and check that one otherwise we can also create or change our Kafka listener accordingly all right now let's start our application and test again and see as you mentioned before if our app application or our consumer is able to handle these messages so first we see that we have some exceptions while starting the application so let's investigate this exception so it starts from here so we have like um er error Hunter cannot process the serialization exception let's check the cost by and here it says or the serialized key value for the partition alipo zero and so on and so forth so if we scroll also down we see here that we have a different cuse so it says that the class com. alibu Kafka payload student is not in the trusted packages so this means so and here it mentioned that if you believe this class is safe to dis realize please provide its name if the derial deserialization is only done by trusted Source you can also enable trust all with star so for that we need to add one property here to our application in order to tell Kafka that we trust these packages and we want to dis realize from them and specifying a trusted packages in Kafka is a security measure to prevent certain types of attacks related to deserialization so the Der serial deserialization process uh is the process of converting data in a serialized format such as Json for example back into its uh origin object from so the process can be exploited if not properly secured that's why we got this problem and now to fix it we just need to add one property so here within within the consumer we can say properties and then we have one property called spring. Json dot trusted packages trusted and the then Das packages and here we can we just provide the package as mentioned in here so it says that this package is not trusted so we can whether provide a single value or like a list like we can provide multiple values or multiple packages or if you want you can allow or trust everything by providing a star in uh in uh in commas in colins sorry not commas but uh star in columns and now we will be able to trust this package so so let's start our application again and see if this property works so I will start the application and now let's see if we have something different so yes we have something different and here if you scroll down to the logs we will see that we have now this is our D serializer and also now we have a different exception so let's go down to the cast by and here we see that cannot convert from student to string so this means that you are not able or our consumer is not able to derealize or to transform the Json object to a string object so we need to adjust our Kafka listener so let's go ahead and do it so now within our Kafka listener we can whether Just Adjust this method or create a new listener so we can create another one and I will just comment out this annotation I just want don't I just don't want to use this one so here I just call it consume Json message and now all we need to do is to provide the type so here it's of type student and let me rename this one and now I'm going to call it student so here automatically spring will be able to deserialize the object that we will receive into a student object and print it in here so here let's just also call the two string method and that's it so this is how we can adjust our this is how we adjust our Kafka consumer so also I could just create a Kafka Json consumer and like listening to the same topic or even going here to our Kafka topic and create a new topic for example colonate alibu Json in order to consume it and of course we need to adjust our Json producer to send to this new topic but I just wanted to keep it the same way so just here to explain to you that you have multiple options so now coming back to this one so here this is the small change and now if I restart the application and see the changes or the impact so here let's close this one and now I will go back to postman and send again one message so if I open back my console so here we should see something so consuming the message from alibu topic and here so this is the object so here it's just because I don't have and implemented two string method in my student class so if you want we can do that quickly so here we can also call the two string method from lumbo or annotation sorry and now if I restart the application and send again another message so if I click on send and I go back here we see here that we have the object or the object just get imp printed so if I send again another one so so let's say John for example and here John do and if I click on send we should also see again that we have this joho getting printed right here all right so now our consumer works so now the best parts begin right now so now we will move to the implementation of our real world application so our or the real word example and we will be implementing really cool application using uh spring reactive all right now the best part begins so this is a global overview of the project that we will be building together so let's start from the left hand side so first we will create a spring boot project and this will be our producer and this producer will have an API where we can invoke or even start consuming a stream of data coming from from a reactive rest API so this reactive rest API is the stream of the wikip media and I will give you an overview also about Wikipedia in just few moments so also you can see the link is in here and I will also put it in the description of this video so here our rest API or our reactive consumer will consume these data so here we have a consumer of our reactive rest API coming from Wikipedia and this rest API or this consumer will call the producer that will publish all these events to our Kafka cluster so this is the cluster and we have like the Brokers topics partitions and so and so forth all the things that I already explained before and from the other side like the the consumer side we will also have another spring boot project and we will this will be our consumer so the consumer will subscribe and listen to all the messages coming to a specific topic all right and then afterwards here I will just not implement this part I will just be logging the data but here feel free to do whatever you want with the data you have here so you can for example puras the data to uh mongodb postre SQL an S3 bucket or any other storage system all right so so here like I will leave you the the chance and also I would like to see what you will be doing and how you will Implement that and I would love to see your comments on the video and also if you want just publish the code on our Discord server and I will be happy to review it and give you my feedback all right so also here you can picture these two services or these two projects as two microservices communicating together through a Kafka broker all right so now let's go to this one so before first of all let me give you an overview what is Wikipedia I think Wikipedia is something that's related to you when we say Wikipedia so Wikipedia is a group of all these Services we have right here so this is like Wikipedia is a global movement whose mission is to bring free educational content to the world so like it's uh Wikipedia Wiki uh wiki shary wiki books and so and and so forth and this stream or this stream wikip media is a stream that represents all the changes that are made by the community on the Wikipedia projects all right and in order to see that so here we have this link I will paste it in here and now I will just refresh the page so you see here so if I scroll down it's a stream of data so every time I scroll I will see data so if you want also to understand and to have a look about spring reactive or like reactive uh rest apis I would love to invite you to check the spring reactive course that I published before on my channel so I will leave you also the link in the description of this video so here we see the data maybe we can also zoom in a bit and see what are the the data is so here we have the topic we have the schema and we have so many information we can also go and see this in details later on so just to give you an idea about the stream and the size of the stream that we will receive from these endpoint so there is an open project it's called I will also leave you the link in the description of this video where you can see an overview so here we see that we have a stream of 68,000 and it's still growing so this is a huge stream and in order to be able to process that we need also a really strong and scalable and Powerful message broker like Apache Kafka so here this is a dashboard and you can have a look on it to see the all the data and what are the streams so here for example we have all these events we have the size of the edit and we have how many records are displayed so so far like Since switching to this tab you see that we we got more than 1,000 records all right all right so this will be the parts or the building blocks of our application now let's move on to the next part and start building this project all right so now let's go ahead and create and configure our two projects so first it will be a maven project and here for the group ID it will be com. alibu Wikimedia for example and now to differentiate the two projects here for the artifact I will add this one for the producer and I will keep it this way so now let's go ahead and add our dependencies so first we need our Apachi kfka and then we need our reactive web so here we will not be using classic spring uh web annotation but we need a spring reactive in order to be able to consume a reactive API and then of course we need our lombok annotation so this is the first project let's go ahead click on generate download it and also import it in your IDE and let's also create the second project all right so after downloading the first project now let's change this one to Consumer so this will be our consumer project so for this one it let's remove the spring reactive web maybe let's have for example just spring web in order if we want to add some API but for now it's not the plan but maybe later on all right so this is our second project let's click on generate and let's open both of the projects in our ntity J and start coding all right so as we did before the first thing that we will do is to just first rename this one to yaml property and now we can go faster because we can go ahead and copy some properties from this one so here let's copy these properties from the producer and then we need to add spring. CFA and then in this way so so since we are working on the producer just to bring you back since we are working on the producer we can copy paste this one and also what we want to produce we want to produce a string instead of Json because like I just don't want to waste your time and creating an object and dis realizing uh the the stream or the data that you will receive I will just instead send it as a string so first this is the configuration of our producer what we need to do next is to create our topic configuration so now I will just right click and then inside a config package I will say for example Wikipedia topic config all right so in the same way if you want we can also go ahead and copy this Bean so it's simple even to rewrite it our own but let's just make it faster and copy this bean from here and paste it in here and also don't forget to add the ad configuration annotation there and now let's call it Wikipedia and then Dash topic or Wikipedia stream so let's call it this way all right so now we have our topic ready here let's also rename this one Wikipedia topic or Wikipedia stream topic and then let's move on to the next step which is consuming our stream API so so now within our producer project again in the configuration and in order to be able to consume a reactive API first we need to have some configuration so I will create a configuration class and I will call it web config or web client config to be more consistent so this and here in order to make this one a configuration we need to add add the configuration annotation and now I want to create a bean and this Bean will be of type web client. Builder all right so this web client and then we have a builder so let's call it web client Builder so web and here we simply need to return web clients and then do Builder so this is all we need to do now let's move on and create our web client consumer or our stream consumer all right now let's go ahead and create our producer so here right click new and then producer and then let's call it for example Wikipedia producer so this producer class will be the class as we did before is just the class to send a topic to our Apache kfka so here we can also go back to our CFA demu and here let's go to the Java and then producer and we need or we can take this from the Kafka producer so here we just can copy this one and also we can copy this and let's go back to our Wikipedia producer so we can close this class and now I will paste the annotations and then I will paste the code that I copied so here like for example that we can whether change the log or we can also remove this one for now we don't need it and here we need to rename name the topic to our Wikipedia and then stream all right so also just to be sure and over typos I will recommend so let's close the demo I recommend that you go here to the topic config and just copy and paste the name just to avoid these typos also you can create a variable for that and use it everywhere all right so this is not that important but just you need to make sure that you don't make typos just in case all right so now we have our producer ready now let's move on and create the stream consumer service the one that will consume the stream API and then it will publish the message to the Kafka topic all right now I will right click here and create a new class and I will call it for example the stream or Wikipedia or you can call the package whatever you want and here I will call it Wiki media stream consumer so this one will consume to our stream coming from the Wikipedia let's make this one full screen and first of all we need to make this a service and then let's say for example slf for J just we want to log what we have all right then what we need to do we need to inject our web client so private final and then web client object let's call it web client and then we need of course our private final and then we need our Wikipedia producer so let's call it producer and now let's create uh a Constructor with these two parameters in order to inject them now we need to make some small changes so here instead of the web client we need web client. Builder all right so because this is the this is the bean that we created in our configuration so here instead what we want to do this web client equals this Builder so let's also rename this one to Builder to be more consistent and here equals web client Builder and then do base URL so we need to set our base URL so the base URL for us is the stream. wikimedia.org slv2 so this is our base URL and then we need to call of course of course the build method so let me maybe break this down so you can see everything at a glans so this is how we can configure or how we can configure our web client object so now the next thing we need is we want to create a method so it will be a public void consume so let's call it consume stream and publish all right so this consume stream and publish will consume the stream coming from this API and then publish it to our producer or to our Kafka broker so what we need to do here we have our web client and then it's a get method so do get and then we need to pass the URI so the rest of the URL that you want to consume so it's slash stream and then SL recent changes or recent change all right so this is the end point and and then what we want to do is to retrieve right and then we want it to BU to flux so we want to transform our body to flux and the format is string right so this is what we decided we said okay we want to consume and to produce a string and afterwards how we can still consume what is coming from here so it's the publish subscribe pattern so that's correct we need to subscribe rbe to it so after subscribing let's maybe try a system and then do out just to print or uh let's say for example log and then info just to log what we have in here since we injected our logger here all right and then we will change this one to send the data to the producer so what what comes next is we need to create a rest API which all allow us and help us just to trigger this this consumption of the API and sending the data so let's go ahead and do it or you can also do it in in this place by injecting a command line Runner bean and just trigger this cons consumer but let's make it as a rest API that will be better perfect now let's right click here and let's create a new class let's call it maybe rest and then let's say for example Wiki media controller all right so this one I want it to be rest controller and also I want my required AR Constructor and my request mapping so the request mapping I will call it/ Api slv1 SL wikii media just to make it simple like that and then what we need to do we need our consumer so let's inject private final our wikii media stream consumer let's call it consumer or let's call it stream consumer to be more consistent so now all I need to do is to have a get mapping and then public void and then let's say for example uh start consumption or start publishing since we the main goal is to publish this stream to our CFA so uh start publishing and then we require nothing so here let's make it a void or you can also say response entity of type void or of any type and then just return some message or some confirmation all right so but the most important here is we want to call our stream consumer. consum stream and publish all right so this is our API what we need to do or what we want to do next is to start the the Wikipedia producer application and we need to see that we are consuming the stream from our Downstream API and we need we need it to be logged in our output console so let's go ahead and do that all right so to do that whether you can click here on producer application and run it from here or also within anj you have the option services and then you have all the spring boot applications available in your workspace or in your project right right here so for our case we want to start the producer application and I will start it in debug mode just in case if something happens I want to go back and be able to debug so also you see that the configuration for our Kafka is is working and here we see that we have also this output for Kafka so first I will clean this one and then what I will be doing I will open Postman and send a get request to this API let's let's just say change the url right here and it will be a get mapping so we have nothing in here and I will just click okay so it's 404 because we have a typo here and let's click Send again so it's 200 now if I go back to our console I expect to see that we have a stream of data so this is what we see so we have like the Wikipedia stream uh consumer and it's consuming this API so as you can see as I scroll down it will continue printing the data so we are streaming the data or we are reading or subscribing to that stream API so now let's go ahead and enable our producer in order to publish this data or these events or this information to our Kafka broker and to do that we just need to go back here and here instead of logging let's say for example producer and then send message so we can use this this message uh this method reference since the sent message also takes a string as a parameter also you can reenable the logs right here but this should be also fine now if we restart the application and send we should be able to have this the data published to our kfka so let's keep it for later on when we run the whole application or you can also let's let's also give it a try so here if I open back my terminal right here and I try to consume the topic that we have or the topic that we created which we which we called our weeky media and then Dash stream all right so I will just hit enter here in order to have the stream ready and now I will go back to our Postman and just hit enter and here I'm expecting to see the data starting popping up all right so my bad I just rest started the wrong application so I will stop this one one and then we click on service again so here we have our producer so this one it's already stopped and let's restart our producer all right so the producer is up and running and now if I open back the terminal and I click on send again so this one is running and I'm expecting to see a data right here so this is what we are seeing here and if I scroll down the consumer will keep consuming the data so now like the good think we are successfully publishing data and so far we published 401 messages okay so now so the producer is up and ready let's move on and create and Implement our consumer and have the application fully up and running all right now let's go ahead and do the same thing with our consumer so the first thing we need to do I want to rename this one or change the extension to yaml instead of properties you can also use properties if you feel much comfortable with that and again I can also copy these properties from here so let me copy all this configuration for our consumer and paste it in here so now I just want to bring back that I want to consume my string or like I want to use the string der serializer instead of the Json der serializer also here we can even remove the this one since it's not needed because we are just consuming string messages so but I will leave it in the com commented here so you can use it later on and also this is our group ID you can change the name if you want to use a new one and here we can keep the same configuration and of course our bootstrap server is also the same so the next step is let's go ahead also and copy the configuration from our producer the one that we use to configure our topic so here just we need this one Wikipedia topic config and let's also bring it here so here you might say okay but this is a duplication the answer is not this is not duplication because it's just duplication when you duplicate the code but otherwise this one is not a duplication because these two services are supposed to be separate Services all right so here I will also create uh just my consumer package and then Wikipedia consumer and I will come back to it just in a few seconds also I want to move this uh Wikimedia topic config so like uh there is a cool way to do it with an anj so here within the package name you can just add do config at the end and then anj will propose to move this package so here if you use the auto completion and it will ask you to move the package to this new package all right so here all right so now we just moved this configuration there now I will close all this and here like the consumer part is the easiest one so all we need to do is to create our consumer and in order to do that we can also go back to our cfad demo and we can copy the code from there just not to waste time on something that we already saw together before so so now all they need to do is to copy this code right here and let's go back to our consumer and here let's make this one service and then also I'm going to use slf for J and I will paste this one and reenable this annotation all right so it's a Kafka listener and here we need to use the same topic exact so let's close this demo right here and let's go back so this is our our topic name which is Wikipedia Stream So now going back to our consumer let's give it a name and also if you changed the group ID in the configuration please make sure that you change it in here so now let's say consuming the message from Wikipedia stream topic and then let's also log the message here so now that's it normally this is all we need to do for our consumer because we just want to consume and also here please I'm going I'm going also to leave it as a comment so please feel free to do anything you want with the data all right so I would just leave it to you and as I mentioned before I would just I would like you to implement something and also share the links uh the GitHub links with me and I would be happy to do a code review for you all right now what we need to do next we need to start the two applications and see once we start consuming or once we start producing and pushing data to our broker we need to make sure that we are also consuming it from this part all right so now so now the first thing that I will be doing is to start my consumer so this is the consumer application I will start it in debug mode and see if everything is fine so now let's say Let's see we have already cons we are consuming already some data so this is because we have already some cued messages now if I start my producer application so let's start the producer and then let's just invoke the endpoint and then we will see that okay so yeah that's that's fine we are not able to start the producer because both applications are running on the same port so let's go to our prod producer and go to application Properties or application yaml and here at the end let's say server. port and let's say 8081 for example all right so now I will restart the producer and now it should be up and running normally without any issues okay so the producer is also running I will clean the console and then I will open again Postman and here I need to change the board and send again the request so here we see that we are or we start producing the data and if I open the consumer we will see that we are consuming messages so this is what we see right here so the consumer is up and running and here we are consuming the data coming from our producer now if I stop the producer from sending the data you will see that the consumer will no longer receive any other messages so it stop printing the data that was it for today's video I hope you enjoyed it I hope you liked it and I hope you learned from it now if you're satisfied with the video I would like to invite you to do few things first if you're not subscribed just go ahead and hit the Subscribe button enable the ring bell so you will no longer miss any of my coming videos I'm really preparing a really nice and exciting content for you guys then if you don't know my website just go ahead check the alibo coding.com website check the courses that I have there and enjoy learning finally if you are not connected with me on social media just go ahead and do it you will find all the links in the description below of this video thank you so much for watching and see you next time
Info
Channel: Bouali Ali
Views: 28,332
Rating: undefined out of 5
Keywords: kafka, apache kafka, spring boot, spring reactive, webflux, kafka producer, kafka topic
Id: KQDTtvZMS9c
Channel Id: undefined
Length: 97min 43sec (5863 seconds)
Published: Mon Nov 27 2023
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.