Java Concurrency Interview Question: Multi-threaded Message Queue like Kafka, SQS, RabbitMQ

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hello everyone this is adit welcome back to my channel today in this video we are going to talk talk about a very popular question which is asked in you know most of the machine coding rounds and it is actually also asked in you know uh in the face-to-face interviews also so it is around uh designing a system like a queue so you are you have to design a you know pub sub model and you have to you know write the code of it the thing is in this problem we mostly revolve around the threading like how can you make the uh the code design in such a way so that you are able or you are handling to or you are handling the you know the the threading model correctly so so that you are able to run multiple subscribers at a time whenever a message is published you are not just running one subscriber you are running multiple subscribers so those kind of threading things is some of the thing which is you know very very much required over here and that's what the video is about so we are going to use java as the language and this question is also asked in mostly in terms of java only because like that's how most most of the interviewers ask it and uh like it mostly again revolves around threat synchronization and intercom inter threat communication right so so we are also going to use these two technologies here in the into in this problem to get to the solution right so let's you know dive into the problem and let's see uh you know how to approach such kind of a problem and you know what's the correct solution for this so let's go deep into it uh before before just going into the actual thing um i will just like you know to request you to watch the complete video otherwise you will not get the complete concept and there are so many things which i will talk about in this video so just you know bear with me and watch the complete video so let's start and you know see what what's all in there so so let's start from the problem statement first so as i told you we are going to design a message queue uh message queue is just like a system where you publish a message and then you can you know uh you you you the publishers can publish the message and then there are subscribers who will go and subscribe to to the queue and then whenever publisher publishes a message the subscribers will get the message from the queue the the thing here is like all the subscribers should be able to run in parallel so i'll just go you know go into it in uh in in later on so uh just like a very high level system now the the kind of operations that you should support is you know um we should be able to create multiple topics so whenever a message is published into the queue uh it is published on a topic so you should be able to create a topic into the queue first and then you will be able to publish the messages on it so there is something called topic which you need to create which you should be able to create in the queue so that's the first first requirement the second requirement that we have is like publishers should be able to publish a message to the particular topic right so in the beginning i told you publishers should be able to publish the message to the queue but that is not actually correct the correct thing is publishers actually publish the message to a particular topic so a queue has multiple topics publisher publishes messages to a particular topic now since publishers are publishing message to a topic so subscribers will already will also be subscribing to the topic right so whenever a publisher publishes a message to the topic all the subscribers who are subscribed to the topic they should receive the message so that's the third requirement right subscribers who should be able to subscribe to a topic and whenever a message is published to a topic all the subscribers should be able to you know um should be able to receive the message so that's our fourth requirement the fifth requirement is it's a little bit special one i should not say special one because it's there in most of the queues that we have out there but uh it's somewhat different or i should you know you should actually take a minute to understand this requirement so in this requirement um you have to you know allow to reset the offset of a particular subscriber so what is happening is let's say uh publishers are publishing the messages to a particular topic right so let's say topic number one and publisher publishes message like message number one message number two message number three message number four message number five so let's say five messages of publish it are published on a particular topic now as and when the messages are published published to the topic subscribers would get the get those messages right so let's say message one gets subscribed gets published to the topic so both subscribers and subscriber to would get it right let's say message two then get published subscriber one so as in when each of the messages get published all the subscribers would get each one of the messages no so when message five is published both the subscribers are at message number five right they are writing they have right now read message number five but in this requirement we have to allow a subscriber to reset their offset so let's say some somehow user wants to you know user wants a subscriber to re process all the messages so this particular functionality is mostly responsible or is mostly useful when you want to support replaying of messages for a particular subscriber so in that case you can you know reset the offset of the subscribers so for that particular subscriber you will just say reset the offset back to two so for that subscriber all the messages which are there from two till the end all will be sent and again you know when the new messages come they will also be sent so it will happen like like this way so for example message one to message five are published now now you reset the you know offset back to two so you have to publish two three four five right so these messages will again be sent to the subscriber now in the mean time if new messages get published to the topic so let's say when the subscriber was re-consuming the message number three at that time a new message get published which is message number six so this subscriber will not get message number six until it reaches that particular index right so this is actually the offset reset you have you know reset the offset of the particular subscriber to a particular index in the past right and this is allowing you to replay all the messages so take your time you know to understand this requirement because it's a very interesting requirement maybe replay this particular section of mine uh to you know re-understand or re-listen to the requirement of of resetting our offset so that's our fifth requirement um the sixth equipment as i told you in the beginning this like this problem is about uh you know discussing how how you handle or how you solve this problem and using multiple threads so in this thing you have to make use of multiple threads you have to make your system performant you have to make you know uh make a system in such that multiple subscribers should be able to run at a particular time so if message one gets published all the subscribers should get it simultaneously it should not be like subscriber one get it then processes it then subscriber 2 gets it and processes it it should not be like that all the subscribers should get the message in the panel so that's the requirement and that's the problem statement that we have this problem statement is very popular and it is asked in so many companies so just you know do pay attention uh in understanding the solution of it so let's go to the solution so the very first thing is you can also assume you know um in the model or in the actual queue you will have a particular queue object which will actually support the operations on the queue so we have a queue class um and this class you know it has a uh private map called topic processors we will go come into it later on like why this is required but let us just see what apis does our queue expose so the apis that our queue exposes is you know first of first api is great topic so first requirement was you know we have to support creating a topic so we have a create topic api which takes a topic name and returns the full topic then second api subscriber uh subscribe like a subscriber can subscribe to a particular topic so it takes in a subscriber and a topic as the input and it returns nothing it will just you know subscribe that's a subscriber for a particular topic now publisher publisher can publish a message to the topic so we have we have taken a topic we have taken a message we have not taken a publisher over here as an entity because we thought that is not required but you can actually also take a publisher over here like you know we can create an i publisher and you can you know just take that into the input when publishing a message then reset offset um that's our final api which we have to expose to allow for a subscriber to reset the offset for that particular subscriber in a particular topic right so that was our last requirement so these are these are the apis that we have built on into the queue so now uh let's see what what else do we have here so one thing that we have over here is i subscriber so i subscriber is an interface which is like you know what all in public interfaces will a subscriber should support right so a subscriber is a consumer of queue right so it's like client of you so for those things we have to provide an api for a subscriber also so subscriber has to support two things was first thing is it has to you know give its id what is the idea of it and second thing is it has to give us an api to consume a message right whenever a new message comes into the queue queue has to pass it on to the consumer or to the subscriber right so that cons subscriber can process it or can consume it so that's that's this matter is all about this method will take a message and then it will process it or it will consume it so that's the public api of a subscriber so these are these two things like i subscriber and q these are our public facing apis these are our public facing classes which will be interacting with the users directly or the clients directly right whichever whoever uses our queue they have to you know deal with these two classes so so let's see you know uh how how it works so let's say we are in the queue and create topic is called right so we get a topic name into the input and whenever create topic is called we have to create a topic so let's say let's see you know what what is the topic so topic has a topic name right uh what's the name of the topic a topic has an id so what is an idea of a topic to uniquely identify each one of the topic if two topics have same name they will definitely have a different id so id is here to uniquely identify a topic now each topic will have a list of messages right so like whatever messages are published in the topic all of those messages will be you know stored in the topic we are storing all the messages because you know we have this fifth requirement or the last requirement where we have to allow replaying of messages right so if a particular subscriber is at last index but it can reset it offset to up to a past history to to some point in the past history and it can you know it should get all the messages again so to support that we have to store all the messages so that's why we have kept a list of messages and not just the last message subscribers like a topic will have you know list of subscribers so you know subscriber will these are these are the people who should get the message whenever a new message gets published to a topic so this these are the properties of a topic class and these properties we are taking into the constructor and you know uh we are we have two math two methods here uh add message and you know subscribe at subscribers so these these methods we have created separately so that our list is not changed directly from outside we can only allow adding of subscribers and adding off message removal of messages and removal of subscribers is not allowed right now removal of messages anyway should not be allowed because um you should not allow to remove a particular message from the from the topic uh if there is something called archival or you know you need to clean up data that that should be handled separately but it should not be exposed to the user but subscriber removal can be allowed that depends on the use case but in this design we have not allowed removal of the subscriber also so that's why we have specific methods for adding a message and adding a subscriber for a particular topic now as you can see we have a getter put over here so every property over here will have a getter exposed one thing which i have mentioned is a to do over here also which is that here if you have put the category this means this directly would be you know exposed as a getter uh from this from this from the object of this class so whenever we have a topic object uh you can get the list of messages and this list of messages can be changed now because it is not an immutable list so i have this to do over here like we have we should actually write a you know custom data over here which is which is not actually returning the you know mutable list it should be returning an immutable list because that's what we want we do not want the messages to get changed from outside and if we return the you know changeable list or mutable list from here then it can be changed from someone outside so we should prevent that so i have put it to do here for both of the things now let's go back to the queue um so once we get a topic name we get a topic and we create a topic then we actually also carry a topic handler so topic handler is something which is handling you know a particular topic so i have created this to separate to do the separation of concerns so everything around the topic you know is handled by topic handler so one topic it is handled by topic handler whatever uh we need to know like we have to publish for a particular topic so we this this class handle it uh we have to you know reset the uh offset or subscriber for a particular topic then that also is handled by this particular handler so this topic handler is uh is not being locked it is just managing each topic that's all it is doing then uh let's go back and let's see you know so topic handler we just create and then you know topic processor is like another word for topic handler i should just rename it topic handlers so we create a new topic handler for this topic and then we just put it into the handlers so that later on we can fetch it again for each topic so that's all this great topic is doing now let's see what subscribe is doing so subscribe take subscriber and the topic in the input link this person wants to subscribe to this particular topic so first of all thing that we need to do is we have to subscribe to subscribe for a particular topic so we just you know put the subscriber into the topic and that's all we need to do we just print you know some helpful message to the user then when the publish happens that's where the interesting story starts right because that's where you have to fan out the message to all the subscribers so let us just see you know and try to pay some more attention into this because this is like really important here so publish when happens it takes a topic into the input it also takes a message into the input right so when it happens what we have to do we have to add the message to the topic that's first operation that we have to do in the first task we have to do second task we have to do is once the message is added to the topic we have to fan out message to all the subscribers so these are the exactly two things we are doing over here whenever a message is you know uh published to the topic we first add the topic or add the message to the topic and then we ask the topic handlers you know to start publishing so you know i have put it into a separate thread because i do not want to you know block the caller also while this publish happens so that's why i've just kept this into the separate thread the third thread is like you know it's not persistent thread it just you know publishes and then you know vanishes away so um that's all it is doing it is calling topic handlers for for the topic topic handler for the topic and then asking for topic handler to start publishing for from for the topic right so let us see what this publish do here so what what publish for a topic has to do it has to fetch all the subscribers right uh it has to watch all the subscribers for a particular topic and then it has to you know start or send messages to all the subscribers of the topic so that's what we are doing over here we are fetching all the subscribers for the topic so this is this is you know a subscriber is wrapped into a topic subscriber so a topic subscriber if you see it has two things one is offset and one is subscriber so each subscriber will have an offset in a particular topic and that's what is stored over here the topic subscriber means a subscriber for a particular topic and when a subscriber is there for a particular topic it will have an offset so combination of subscribers with the offset is a topic subscriber so that's that's when again a pojo or a model kind of thing i have created so initially the offset for all the subscribers when they initially start that will be zero right so we have put zero over here and whenever you know we we start publishing messages it will be incremented from here on we have kept here an atomic integer because you know um atomic integer allows or it handles the multi-threading thing so if multiple threads are trying are trying to edit the same object then it will not allow or actually you know it will prevent like unwanted changes i will just show you how it prevent unwanted changes when we see the subscriber worker code but atomic integer is here used so that we can handle multi-threading cases here so let us see um so when when we you know when we start publishing we get each topic subscriber and then we say start the worker for that subscriber so as i told you in the beginning each of the subscribers should be able to run in parallel we should not block one subscriber from another so that's why what we have done we have created a worker over here and each worker is there for one subscriber for a particular topic so one topic one subscriber we have one worker and that worker will be done for that particular for each particular message so whenever a message comes into a topic all the workers for the subscribers of that topic will be run and they will run in parallel and that's why they will run they will all process the message in parallel so let us see again right so uh we we say like we have to start the subscriber worker and what this chart subscriber worker does it actually takes a soft topic subscriber like for whom the worker has to be started and it fetches the subscriber id and you know it just it just keeps those subscriber workers in them in the local maps so that it does not need to create them again and again and if it is not found then it creates a new subscriber worker it just put it into the thread and then start the worker and then it will just say you know subscriber worker it fetches this again so basically the first part is like lazy creation we do not create the worker until it is needed so we lazily create a subscriber worker and whenever uh you know we get a we have to start the worker we have to just you know wake up if it's needed so what actually i'm doing over here is there's a worker which is running until it has messages to process right until let's say topic has five messages and subscriber is at zero right now so subscriber has to process those five messages right zero one two three four uh and so on so until we have messages for a subscriber to process till that time the subscriber worker will be active and it will be working and once we do not have any messages in the topic for the subscriber then it will stop and it will you know go to sleep so that's why if the worker is sleeping right now then we need to you know start it or you know we need to wake it up so that's what this week method is doing now go into the details of subscriber worker you will know you know you will understand about it more how is it doing this thing so subscriber worker it is actually you know taking a topic and a topic subscriber so this is like a normal thing we just need this here so we have initialized and provided these two properties uh in this class now the interesting thing uh thing here is um is a topic subscriber uh so here if you see this topic uh subscriber has you know a current offset uh so each topic subscriber will have the current offset like till what offset the subscriber has read so that is the current offset uh so we get the current offset and as i told you if there are no messages to read for the particular subscriber right then we go to sleep so you know current offset if it is quality greater than the all the messages uh greater than equal to all the messages into the topic then we just wait and actually wait in here in java means you know the the thread will stop working it will just go to sleep sort of thing right so the thread will keep waiting until we do not have any messages to be processed once we have message into the if if that's not the case right that means we have message to to be processed by a subscriber so in that case we just get the message from the topic and then we ask the subscriber you know to consume it um and then we increment and increment the offset for the subscriber now this thing i was telling you right compare and set this comparison set is actually allowing us to support multiple uh if multiple threads are trying to change the same value then this compare and set is allowing us to you know to prevent that so if let's say uh we we first check it right it is less and then if we try to increment it but it at the same time uh some other thread you know changes the value then we should not change it and that's why compare that's what compare and set does component set says increment the value because we have processed the previous message rise in previous message so now what we have to do we have to increment the offset to the next value so we have to increment the offset to the next value but only if the offset is not yet changed right if the offset is already changed by some other thread then we should not change it because that means this is not in our state this there isn't some other state um so that's why we first check if the current offset is same if the current offset is same then increment it and what is the case when you know current offset will not be same because it is just running in a loop so current offset should remain same but it is not the case actually because reset can also happen so we can say reset some subscriber to a previous position so if that thing happens then it will not be same and it will just you know it will change it again so let's say um if i say current offset is let's say five and i reset it to you know maybe two and but when this club when this function happens if i do not do compare and set then it will again five plus one it will again do six or if i if i still if even if i refresh the current value then also it will do two plus one which is three so both things are are not correct if the value has been reset to back position then it should remain there if we say we have to reread from two then it should stay at two and that's why we are you know uh keeping this uh comparing set here it will only increment if the current value is same so that's why it is doing and then we are running this loop forever so whenever we get messages it just starts processing it and whenever it does not have any message to process it will go into the wait state now i was showing you like this wake up if needed so wake up if needed is just you know notify the uh notify the subscriber worker over here so these are like java con java constructs so wait makes the thread to sleep like it will not work anymore notify will you know notify or it will actually wake up any one thread uh on on the object on which it is called so subscriber worker um sorry top topic the subscriber worker of this particular topic one worker will be notified so i can also use notify all because uh for for each subscriber or topic subscriber we are having only one worker so i can use notify here also and notify all also the two will not make any difference but the goal here or the you know what the thing that you need to understand here is the overall flow the flow working like this there are multiple workers for each subscriber which are running in parallel and these subscribers um they will run till there are some message into the topic which they need to process and once we do not have any more messages they will just go to sleep and when we can when we get more messages then they will be waking up again uh they will be woken up again and you know they will start processing the messages again and then they will go to sleep again and that's all this you know subscriber worker thing you're doing it it is it is whenever it tries to run it sees if if there are messages to be processed if there are messages to be processed then it processes them if they are none then it just goes to sleep and whenever it is woken up again it just does the same thing again and again so that's how we are implementing you know a cube or a publisher subscriber model using threading based approach so here uh each subscriber is running a subscriber worker or each subscriber is able to process and consume the messages in parallel we it does not have to wait on another other subscribers like if they finish then only i can start all the subscribers are running in separate threads so they can you know work in parallel they can process and consume messages in parallel so let us see how it is using it right so so as a use of it i as i told you there are two public interfaces which we exposed to the user one is q and one is i subscriber so um you know i i subscriber or you know i subscribe is actually a subscriber so i have created a sample subscriber over here what it does is it is called a sleeping subscriber so it you know whenever it gets a message it will just go to sleep like it it just does not do anything it just sleeps so like it takes a sleep in milliseconds like tell what amount of time it has to go to sleep and whenever it has it gets a message to consume then it will just you know call this sleep method and it will just go into the sleep that's what it is doing it will just lock and it will sleep now when now let us see how it is being used in the the actual queue and the subscriber is used in the queue so we are creating one queue over here first and then we are you know creating one topic we are getting another topic so we are getting two topics t1 and t2 then topic one has two subscribers that we are creating so top we are creating two subscribers here subscriber one subscriber too and we are subscribing those two subscribers to the topic number one now we have another subscriber subscriber number three and this subscribe by subscribing to topic number two so we have two topics three subscribers topic number one is two subscribers topic number three has one subscriber right now until now so far so good right q one topic number one two subscribers topic number two one subscriber now we publish one message to the queue right so m1 gets published into the queue for topic number one so s1 and s2 right here s1 and s2 should get it right here also it is it gets published to the topic number one so s1 and s2 again should get it right and what what should happen over here is so both s1 and s2 should get in parallel since we are we have put us that's why i have put in a sleep over here if we want to demonstrate like both the subscribers are able to get the message in parallel then both should start at the same time and both should finish at a similar time right because they are just the sleeping subscribers like they are just sleeping for 10 seconds so both s1 and s2 will get the same message m1 at the same time they will start processing it and then you know um they will get the ms number two so the independent or the parallelism is between the subscribers right s1 and s2 will run in parallel but the two consumes of s1 will not run in parallel you need to understand this right the two consumes of parallel are running on the same consumer so that's why they will run sequentially so m1 will be processed first and then m2 will be processed because because we have designed it this way we are not running each subscriber each message in a separate thread we are running each subscriber in a separate thread right so each subscriber will run will process messages one by one but multiple subscribers will process this message parallely so that's what we are doing here so here s1 and s2 will process m1 parallely but s1 will process m1 first and then m2 first so for a single subscriber messages will be processed sequentially but for multiple subscribers they will process the same message parallelly right so this is really important these are all threading things right if these things are not clear in your mind then you will really face the difficulty in your interviews or whenever you're trying to implement this or you know going too deep into this so try to you know take a pause maybe pause the video and you know understand it more if if these things are not getting clear in your mind so so this this is like m1 and m2 get published then topic number two gets you know message number three so topic two gets message number three so subscriber number three should get it right subscriber three should get it now subscriber three is just waiting for 5 seconds so we are good you know for the timing just uh remove this reset and then s1 s2 should get it move again because it's just subscribe topic number topic number two so it should be getting topic number three and then s1 comma s2 now between uh the first three and the last two publishes there is sleep of 15 seconds um i just put it you know to demonstrate it better so when i run it it will just build it and then you know it will just start running it soon yeah so it has created some topics it has created you know some subscribers it is subscribed subscribers to the topics and then it published these three messages to the topics right so m1 and m2 get published to t1 and m3 get published to t2 now m1 and you know m2 uh are so m1 is has to be you know processed by s1 and s2 so m1 is processed by s2 as m1 is processed by s1 and at the same time since m3 is also published to d2 or topic number two and subscriber 3 is subscribing to it so m3 gets published subscribe m3 gets you know consumed by s3 now when these three are done um s1 s2 and s3 uh since m you know s3 was running for only five seconds so if i just you know to show you running it again um so s3 will start will finish first because s3 is just running for five seconds and s1 and s2 are running for 10 seconds so if you see over here if i just increasing all three start consuming and then s3 finishes you know and then what should happen is s1 and s2 should you know start finish and then they should start consuming m2 so first they process m1 and they process process m2 now 15 second passed and you know m4 and m5 get published and m4 is published to t2 so it starts publishing it but since m5 gets public published to t1 and t1 subscribers s1 and s2 are already publishing or were already subscribing oh sorry we're already consuming the m2 message right the previous one so they did not start at that time and but they finished their m2 first and then they start the m5 so this is how it is overall working i think uh you should be able to you know understand it like you know just try to it's a bit complex but if you see running it here because they are so m1 m2 messages topics and do so many things happening in parallel but try to run it try to you know to try to analyze how this these things these things are printed and how these things are running you will understand it even more um i have i i'll put the you know link of the github repo with this code in in the in the link in the description below and you will be able to find it now try to you know use use the reset option so what when we do reset we are doing is we are resetting the subscriber number one to zero so let us see running it again what happens it's m one m m one uh gets published by uh because m1 m2 and m3 gets published and one sub done by s1 and s2 and three run by s3 now m2 is being processed by s1 and s2 now when you see the offset has been reset like when the s2 was subscribing m2 the offset was reset to 0 so that's why now s1 you know it will start processing again from m1 so earlier if you if you had if you remember after m2 it processed m5 because that's that's what we have m1 m2 and then m5 for t1 but here after it processed m1 while it was processing m2 the offset change and that is actually our comparison that is helping over here right if we did not had that comparison set then it would not have worked correctly so m1 get processed by s1 s1 processors m2 but while processing m2 the reset was done and reset was again you know subscriber was again reset to uh index number zero so again start consuming from m1 then m2 and then m5 and when m5 done we are all done so this is how you know this code is running so again you know try to uh try to run into this both the things like you know once uh once with this um with this reset and one without reset now uh um this is all actually in the code that i have i have uh so this this question was asked to me in the election or in the so it was not actually led design the focus was not on how do you you know make the classes uh you know use some design patterns and all but that said the code has to be structured the code has to be cleaned uh you know somehow you have to use some scalable design patterns so for example i have used you know the i subscriber interface instead of just directly using the subscriber you know over here because now you can have different types of subscribers so this is sort of a strategy pattern where you can you know have different type of subscribers publish a subscriber model or you know observer pattern is is definitely applied over here that's the that's that's around the question only like question is something about observer pattern only so that button is also being used here i think these are these are some of the patterns which are being used here the focus was not actually on using many design patterns the focus is was on how do you handle this multi-threading thing like you know using the proper you know weight notify using the proper threading technologies so the focus was on you know creating the correct code which is actually running with correct things so and that is all uh i had for this video i hope you you know you you you learned something from this video like how can you implement a threading based or how you can solve a threading bit question in your interviews so that is all about it if you if you have any feedback if you have any comments do feel free to you know put them in the comments uh if you like the video do share it with your friends right it will really help me grow my channel and yeah see you till the next time bye
Info
Channel: Udit Agarwal
Views: 30,073
Rating: undefined out of 5
Keywords: System Design, Low Level Design, LLD, Design, Interview, FAANG, Udit Agarwal, Amazon, Microsoft, Google, Facebook, Interview Question, Placements, Object Oriented Design, Design principles, Design Patterns, object oriented principles, oops in java, low level system design interview questions, low level system design, machine coding, machine coding interview, multi threading, message queue, publisher subscriber, concurrency, handling concurrency, multi-threaded system, java concurrency
Id: 4BEzgPlLKTo
Channel Id: undefined
Length: 32min 59sec (1979 seconds)
Published: Sun Feb 21 2021
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.