RabbitMQ - RPC with NodeJs (request-reply pattern)

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
welcome to this video where we're going to talk about RPC and the request reply pattern in revitmq RPC or remote procedure call as a way that allows two applications to communicate where we have a client sending a request and then the server or the second application is going to have a function execute something and then return the result to our client throughout this video we'll go over the concept of RPC and the request reply Python and we'll see how we can Implement them using Revit mq so if you're interested in learning more make sure to stick around and watch the whole video if you're familiar with Revit mq you already know that we have a producer a consumer we have something called an exchange and we have cues if you're not familiar with those terms I highly recommend you watch my first video which will explain everything a beginner should know about Revit mq if not as you can see here in our diagram we already have two queues here rpcq and reply queue let's take a look at the first half here first as you can see we have the client which is going to send something to the rpcq and then the RPC server is going to get that and consume it so we can say that the client is going to produce to the queue and then the RPC server or the second application the consumer is going to consume the message so we can say that the client here at the producer and the RPC server is the consumer and then let's take a look at the second half now in this case the RPC server after having consumed the message and then may be processed it and got a result for it is going to play the role of a producer now and then produce the result to a reply queue and this time the client who produced at first is going to play the role of a consumer and then consume the reply or the response generators from the RPC server on the reply queue so if you take a look now we can see that the client produces a message to an rpcq which will be consumed by the RPC server that is going to process the message and then generate a result and then it's going to produce that result to a reply queue which will be consumed by the client so we can see here that we have a request reply pattern here now here there are two problems that arise first of all let's say we had multiple clients client a and client B and then each of those two clients is consuming from a different reply queue maybe the first reply Qs called reply QA and then client B is consuming from reply queue to or reply reply QB then whenever we're producing to the rpcq the RPC server after consuming that message should know to which queue it should produce because in that case we would have multiple reply queues this is the first problem the second problem is we need to be able to match each response to the request because here whenever the client is producing or sending a request it needs to know whenever it's consuming from the reply queue that the result that it consumed right now is match to which request because it might be sending multiple requests thankfully in rabbitmq whenever we are producing a message we can send some properties with that message such as the reply to property which is going to specify which queue the RPC server should produce to and then we can also pass something called correlation ID which is like a unique number which is going to be unique for every request that way whenever the RPC server is producing a response back to the reply queue which will be consumed by the client the client is going to receive the correlation ID and then is going to match the correlation ID of the request with the correlation ID of the response so that it knows which response or which result produced is matched with which request I want to bring up one more point before we jump into the code now you know in Rapid mq we have the exchange and we usually produce a message to an exchange and then the exchange based on its type is going to Route the message to a certain queue now in this case we still have an exchange but the work is happening behind the scenes we will be using the default exchange from Revit mq and it will be of type direct because whenever we are sending a message to a queue right away without sending it to an exchange in The Code by default the default exchange which is of type direct is going to to take the queue name and then use it as a routing key before sending it to the queue itself uh this might be confusing but it will be way more clear whenever we jump into the code so let's do this right now I have here a node.js application and I'm using typescript as a programming language I have a server running on Port 3001 and then I have one endpoint which is doing nothing so far and we have a config file in which I have some Revit mq configurations such as the URL of the local server and then EQ which is the name of the queue I'm going to send or I want to produce to which is this queue right here the rpcq this is the RPC client you already know we are working here right now on the RPC client microservice if you take a look here you can see that it's producing and it's consuming so it's playing both the role of a consumer and a producer let's go ahead and create a Revit mq folder and then inside of this wrapped mq folder I'm going to click create three files the first one which is client and then I'm going to create producer.ts and consumer.ts let's go ahead and export default and create the class we're going to call it rabbit rabbitmq client and then let's take a moment and think about what our average time Q client should have as properties well we already said it's a producer and a consumer at the same time so we can we can say that we have a producer and a consumer and of course we need a connection to our Revit mq server and then on that connection we need to create channels so that we can assert cues and here we can go with two channels one for producing messages so for sending messages and another Channel dedicated for consuming messages so based on that let's say private producer and we're also going to add the types but first i'm gonna just add the names you're gonna have the consumer we are going to have a connection we're also going to have the producer Channel and a consumer Channel now of course we need to install the amqp lib package and I'm also going to install the types package for typescript now once it's done what we can do is we can for example for the connection the connection type as you can see here this is from the amqp lab package that we just installed and of course here we can also use the channel type which is as well from this package and let's do the same here now let's go ahead and add an initialize method and let's think about what we should do in that method well first of all we need to connect to rabbitmq to the server and then with that connection we need to create the two channels on that connection a channel is just a virtual connection inside of a actual connection and just like we said we're going to have two different channels one for the producer and one for the consumer and then on those channels we can assert queues now in the inside the initialize method we should assert the reply queue so that we can consume from it and here be careful you might think that we should also assert the rpcq inside the initialize method of the client well this is possible and we can do that but this would be useless because if you assert this rpcq on the client side and let's say the the RPC server does not exist then whenever we produce messages to this queue it's just going to store those messages and they won't be consumed nor processed so we don't really have to actually assert this rpcq on the client side we're just going to do it from the RPC server and then we're going to be consuming from it and from the client we can just send messages to this name just like we said let's go ahead and connect to our server I'm going to add a try catch block just in case another happens from rabbit and Q also.log to error for example and then we can print the error just in case something wrong happens and then here we already have a connection variable property let's use it and here what we can do is we can make use of a connection function from the amqp lib this function is going to let us connect to a refmq server we have a URL in our config file which is accessible by revitmq dot URL so we need to import that package so let's go ahead and do that and now what we can do is we can say config dot revitmq dot URL now we're going to have the connection which is this object if we take a look at the interface we can see that we have a create channel function which is used to create channels so let's do that here after having created the connection let's say this dot producer Channel which is the property that we have here and let's initialize it this dot connection dot create channel just like we said and let's do the same for the consumer now let's go back to our graph okay as we said we need to assert the reply queue which is the queue that the client is going to listen to so let's go ahead and create that queue so that we can start consuming from it if you take a look at a channel we can see that we have so many methods here we can close the channel we can assert the queue which is what we need we can delete a queue we can do a lot of things like assert which is creating an exchange and so on what we need is the assert queue method so let's go ahead and use it consumer Channel dot assert queue and then here we need to pass in the queue name now we don't really care about the Q a name uh we can keep it empty here this way rabbit mq is going to add a random name okay and then here we can add some options as you can see here this is the second property we can say exclusive true meaning that whenever this server or whenever the connection to Revit mq goes off this queue this created queue is going to be deleted now we only care about the queue name but if we take a look at the Q object that the assert queue method returned we can see that we have consumer count message count and then Q this property the queue property actually specifies the name of the queue that we have here which is the default name that Revit mq is going to create for us so instead of actually getting the whole object what we can do is you can just say q that way we just getting that property from it and here of course let's not forget to say wait and I just want to change the name I want to call it reply Q name so this is the name of the reply queue that we have just created right now now let's go to the consumer file and let's start with our consumer we're going to create a class as well and we're going to export it I'm gonna call it consumer and then what do we have well from the consuming side we need of course we need the channel and which is the channel that we created here in the client the consumer Channel we need to pass it here and then we also need the reply queue name to which we're gonna be listening to consume messages from so let's go ahead and do that okay so here I'm gonna inject those properties right away in the Constructor so private Channel which is of type Channel which is from the library that you imported we also have the reply queue name which is uh going to be the queue that we're going to listen to type string and also this is the same as just doing this and then inside of the Constructor saying this dot Channel equal channel so we can just say private say the name and this is going to create those properties for us so that's a good way of doing it the quick way now let's go ahead and create a method called consume messages and we can add the log here just so we know we can say ready to consume messages and then what we can do is we already set that inside of the channel okay we used a third queue already but we can also use something else like send toq whenever we're producing publish whenever we're sending to an exchange right away and we can use consume and then we pass in the property which is the queue name and then we have a function here which is going to handle what happens on each message that you receive so let's go back to our consumer and make use of the consume method so this the channel dot consume and then the name of the queue that we're going to consume from which is the reply queue which we already created in our initialize method here away desktop consumer Channel dot assert queue and then we are taking the queue name from that object returned so here we can we already we're gonna pass it from the client to the consumer whenever we create it and then here we can just pass this variable here so we can listen to it and then here let's add the function the message is going to be of type consume message okay this is wrong because we forgot to say this Dot okay and then here what we can do is we can just log it for now we can say console.log um the reply is and then show the message actually not the message but the message dot as you can see the consume message class or interface has three properties the content the fields and the properties the content is what we actually need and we need to parse it and then message that content and you need to make sure to say to string now back in our client we can finally initialize this consumer property here we can say this dot consumer equal new consumer which is the class that we just created right now we need to pass in the channel which is going to be this dot consumer Channel we also need to pass in the reply queue name which is this right here okay perfect now let's go ahead and create our producer class now for the producer if we take a look at the produce side we need to pass a few things for example just like we said before we need the reply to property so that the RPC server knows which queue it should send or produce the reply to so the reply to here is going to have the name of the reply queue which is this property here because after asserting this queue which is the reply queue we fetched the name and this is the same name that should be passed here in the reply to property whenever we're producing messages we also need a channel because just like we said before every operation on Revit mq should be done on the channel just like we saw on a channel we have multiple methods such as the consume send to queue publish to an exchange and all of that so we also need the uh producer Channel let's go ahead and Export default so that we can use it plus producer and here let's do the same as before Constructor and then here we need private Channel which is from this class and then we need the reply Q name okay which is a type string and that's what you need so far we also need a method to produce messages so we can say produce messages for example and here we're gonna take some data of type any okay and then what we can do is this dot Channel dot instead of using consume this time we're gonna use send to queue so what we're doing right now is we are sending to a queue and we need to specify the queue name which is the rpcq now if you recall in our config file we already added this queue name but in this code just like we said previously we're not actually asserting the RPC queue we're gonna do the assertion or the creation of that queue on the RPC server side because it would make no sense to produce to a queue that no one is listening to it would just be storing messages that won't be consumed if this does not exist so the creation of the rpcq is going to be from the RPC server side let's go ahead and import our config file and then here what we can do is say config.revmq.qs.rpcq because in Center queue we need to specify first of all the queue name and then the content I don't know why this is happening but this is what we need to specify so here what we can do is say buffer.com because we need to send it as buffer and then I'm assuming that what we have is Json okay so in that case we need to send it as a string so json.stringify and then the data that we got here in our method now other than that we can pass in a third argument just like you can see here which is options now what do we need to send if you recall we said we need to send a reply to so that the RPC server knows which queue to reply to and the correlation ID so that we can match every response or reply to every request because we don't want to be consuming messages here in the consumer without actually knowing this message or this reply belong to which request so here we can check what we have we have so many properties but what we need is we need a reply to okay and the reply to is going to be the reply queue name that we passed should say this because it's a property of the producer class and we also need a correlation ID now we can make use of uuid and use random uuid you should make sure that you're importing it from crypto okay now I'm going to set it in a variable here inside of this I'm going to say uid for example equal this and then I'm going to pass in this created variable as a correlation ID and here we can say for example the correlation ID is your ID so this is what's going to be used to produce messages to the RPC queue and the consumer is actually this part here where we're consuming from the reply queue and just so that you remember all of this is happening in the client service we are still in the client service we're consuming from the reply queue and you're producing to the rpcq but of course you don't have to have multiple files but I think this architecture is pretty nice so I like to keep it clean now just like in our initialize method in the client in the revdmq client class just like we initialized the consumer we're going to do the same for the producer so let's say new producer test dot producer Channel and then we need to pass in the second property to the Constructor which is reply qname now what we also need to do in our initialize method is we need to store the consumer so this dot consumer dot we need to call this method here which is consume messages on the consumer so let's do that all right now if you realize whenever I set consumer dot we didn't get the method name and that's because we didn't specify the types here so let's go ahead and do that so this is the consumer that we just created now and for the producer let's give it the producer type that we also just created now which is this now if check dot as you can see we got the methods that we have so this way now we are actually running the consumer from the client from the rabbitmq client in our initialize method we can also add one more thing which is a method called produce which takes data that way whenever in our application we are going to uh need to produce messages to the queue instead of actually having to access the producer itself we can just do it through the client so the client can expose the produce function here and all it does is return let's dot produce remember this on producer and then dot produce messages and then we send the data itself and just like we've seen before inside of the produce messages on top of actually sending the data content that we're gonna be getting from the produce method from the client this time we're also going to be adding the reply to property and the correlation ID now small node that in this application I'm actually generating the random correlation ID inside the produced messages but let's say that your requests already have a correlation ID field from the header so you could access that from rag.adders.correlationid for example then instead of actually creating it inside the produce message you would have something like this for example type string and then you would pass it from your controller but I'm just Gonna Keep it this way in this project now if we open the rapidmq management we can see that so far we have zero connections we have zero channels those are the default exchanges and we don't have any queue now if you take a look at our initialize we are creating a connection we are creating two channels and then we are creating one queue which is the reply queue to consume from now of course all of this isn't created yet because we have not called our initialize method so let's do that in our server now of course we need to import that class to server.ts and we need to create an instance of that class so const Private Client and now inside of server.listen for example as soon as the server is running we can say Private Client dot initialize I'm going to run the server and let's see what's what happens server running and ready to consume messages now if you go back to our initialize okay we could have created a connection the two channels the assert queue and then remember we are after creating the consumer and the producer we're actually running the consume messages method inside of the consumer in which we have the log ready to consume messages now if you go back to the management here we can see that we have a queue and this is the name like we said this is the reply queue and the name is generated by default because we didn't pass in a specific name if you take a look at our channels we have two channels just like we said a producer Channel and a consumer Channel if you take a look at connections we also have one connection from our client here so this is perfect all right now let's test this API now what I added is I'm logging whatever body we're getting from the request and then I'm calling grabit mq client and then I'm calling the function the method called produce that we exposed so that we can use to produce messages okay to the rpcq that does not exist yet we're just using the name but it does not exist yet so let's see what happens this Postman here I'm calling this route now the idea of the application is you send a specific operation multiply divide sum whatever and then you send two numbers and then the RPC server is going to calculate the result and send it to you obviously so far we don't have an RPC server nor an rpcq we can add them right now but first if we hit send let's see what happens if we take a look here in our logs we can see that we are actually printing the message which is from this server here this is the body okay and then we're also printing a correlation ID which is the unique number that we said here we're sending in our request with our message as a property which we are creating inside of our producer so now we know that we're actually reaching this we are actually sending a message and to make sure that the message is sent I'm gonna send again refresh okay as you can see now we can see that there's a message here so we can detect that message inside of reptime queue now there are a few problems in our code and to show that I created two different services service1.ts and service two both have the same code a function a function called send message which is going to be using the rabbitmq client class that we have to produce messages and this makes sense in an application you're not going to have all of your code or all of your apis inside of your server.ts you can have controllers you're going to have multiple modules maybe and every controller is going to call its own service for example service one service two and then if you need to use the rabbit mq client to send messages from multiple places in inside of your application it would make sense to use the same instance of that mq client the way it's done now is it's actually we're actually creating new instances every time and there's a problem before that as well that right now if I try to produce a message with this new instance here the new rabbit mq client instance inside of service 1 or service 2. if I go ahead and on Postman hit send as you can see we received a another saying cannot read property of undefined reading produced messages if you go to produce here it's telling us that it cannot read produce messages of undefined meaning the producer is undefined and this makes sense because inside of service 1 and service 2 we created a new instance of rabbitmq client but those fields produce or consumer connection are not initialized they are only being initialized inside inside of initialize which is happening inside of the server here but on this instance only so we have we're dealing with three different instances here one for every controller or service that we have in our application now to fix the produce error here the undefined error here what we can do is inside of our client make sure that if not connection for example not distort connection meaning that if this connection was not initialized then we know we didn't call the initialize method so what we can do is say await this dot initialize for example and then after this is done we can return a message this is going to help us get rid of this undefined error so now if we send as you can see we send the message now it was sent three times because in our server we're actually sending it three times once from the client instance here once from the service one service two this is just to simulate that you might be using the produce the client traffic clients from different places throughout your app now you might think that there's no errors here but if we take a look at our client every single time we initialize we are actually creating a new connection two new channels and a new queue and to make sure of that if you go back to our revlmq client we have three different connections now and we have six channels and we have three different queues and that's not what we want and this is happening because we're using three different instances of the rapt mq client instead we should make it a Singleton Now to turn the rabbit mq client into a Singleton we need a few things first of all we need to make the Constructor as private so that we cannot create instances of that class anymore secondly we need to create a private static property we can call it instance which is going to return a Revit mq client so this is the only instance that that we're going to be using and of course we need to be able to actually show this instance to other files in our project so what we can do is we can create a public function Now by default it is public so you have to say public but I'm just going to say it so that you you know that it should be public and of course it should be static here we don't have to make it async here we can call it get instance and then what we do is we check if not this dot instance meaning we don't already have an SS of derived mq client created then we need to create it so desktop instance equal new rabbit and Q client okay and then we just need to return it so now any service in our application that needs to access the Revit mq client to send or to produce a message is going to get that instance from the public static function get instance which is going to check if you already have an instance and return it if not it's going to create it and then return it now to be able to know if we already uh have a connection initialized or if you already code the initialize method before producing we can do it in a clean way and create a private property here because it is initialized and set it to false by default okay and then after initializing here inside of our initialize method what we can do is just to make sure we can't initialize it more than one time we can say if this Dot initialized so if we already have this to be true so we already initialized rabbitmq we can just return so we don't create a new connection new channels and waste resources and then inside of this function once we're done we can say this dot is initialized through so now we know that Revit mq has been initialized and now this class is a Singleton we can't create instances anymore because the Constructor is private to be able to get the single instance we call a get instance function static which is going to make sure we only have one instance and then inside of produce um we can just say if not this dot is initialized so we know that we are trying to call the produce function on a client that has not been initialized yet maybe we made a certain mistake in our code so just to be safe you can say wait this dot initialize so in case we try to produce and the Revit mq client is not initialized we're going to call it and then we're going to produce now what we can also do instead of exporting this the way it is we can say export default rabbitmq client dot get instance this way whenever we import this class we're actually also calling this function I get the instance right away if it does not exist it's going to create it and return it else it's going to return it let's make sure that it is working let's take a look here we don't have connections we don't have channels we don't have queues and that's because the queues are exclusive exclusive so we set it to true so that it got deleted whenever we stopped the server from running um let's go back to our server now and instead of doing this now we can do Revit mq dot initialize right away because this is the instance remember we are importing the instance now the single instance so whenever we need to produce we can just do this Revit client.produce and then here Avid client.initialize and then inside of our services as well instead of creating a new instance we're going to use the Singleton and here as well we're going to use the Singleton all right let's re-round the server and now we're still sending from three different three different places but this time let's open Postman hit send as you can see we don't get an error okay perfect and if you open the management Studio again we only have one connection let's set one connection two channels only and only one queue has been created unlike previously whenever we we use instances so from different places we created different connections and then different cues and so on this way now we made it a Singleton and you can only use one single instance throughout our application now inside the consumer I forgot to add one more property which is the options here and we can call this no ack and if we set it to two whenever we receive a message we're gonna send an acknowledgment saying that this message has been consumed correctly so that it can be safely removed from the queue so far this is what we worked on we made the client we made a produce function we are sending the reply to and the correlation ID we create the consumer we are consuming from the reply queue and now we need to work on the RPC server we need to create or assort the rpcq we need to make sure that the RPC server is consuming from it and we need to create a produce message which is going to send a message with a correlation ID which is going to be the result of whatever the client is trying to get here I went ahead and created a duplicate project from our previous RPC client project but this time I called it RPC server and we're gonna make the changes on uh our rabbitmq client so now we're going to be working on this part right here all right the first changes that we need to do is inside of our consumer for example this time we're going to be consuming from the rpcq and not the reply queue so we can just call this rpcq or we just can call it Q whatever okay and then here for example whenever we receive a message we need to make sure that the message contains some Fields such as the correlation ID and the reply to why because if we don't have a reply to if you don't have this field reply to then whenever we process or consume that message we're not going to be able to know where to send it or to which queue to send it so let's go ahead and get those deconstruct the object and receive the correlation ID and reply to from massager dot properties and this is short for saying message.properties Dot correlationid and setting it in a variable alone and then so this is just a quicker way to do it in JavaScript and then what we can do is we can say if not correlation ID or not reply to then we can throw an error or do anything here I'm just gonna say console.log missing if not what we need to do is await and then here we need to do something we need to call a specific function on the RPC server so that we can process the message that we received and then send it back to a queue but before handling that let's go to our producer and make the changes as well here we don't need to create a correlation ID because we're going to be sending the correlation ID that we already got which is going to be the correlation ID that we sent here from the client so we're no longer creating it here we're instead getting it here so correlation ID inches of five string we can replace this and then here we no longer need a reply to because before like we said we need to specify the reply to queue name so that the server knows where to send the message so now when we're sending the message and we're consuming it here the client does not care about the reply to uh property because it's not going to reply back to the server so we don't really need it so I removed it and you also need to make one more change instead of actually sending the message to the rpcq which is this one so we don't need to send it back here it makes no sense we need to send it to the reply to queue which is the reply queue here so if you go back to our producer here we no longer need this and it should be dynamic so whenever we create a new instance of the producer we don't know what reply queue we are actually sending it to because like we said previously we might have 20 clients and every one of those 20 clients listening to a different reply queue so the RPC server would need to produce different queues not only one queue so we shouldn't put it in the Constructor instead we should put it dynamically in this function here reply to Q which is of type string and this is the queue that we're going to send the message to or the response to so now this is our updated producer we can remove the unused packages and this is again in our RPC server we're working here now inside of the red marker okay now let's make the changes needed in our client first of all our producer no longer takes a reply queue if you take a look it only takes a channel our consumer still takes a channel but this time we're not consuming from the reply queue we're consuming from the rpcq so here we need to assert that queue first of all which is this queue we don't need rabbitmq to create the name for us we can set our own custom name which we already have here in the config file so we can say config.rev9q.qs.rpcq again here we're just fetching the queue property from this which is the name and I'm changing the name to a custom variable name which is reply queue name here we don't need it to be this we can call it rpcq and then we pass it down there and here we also need to make one more change if you recall instead for producer now we're getting the correlation ID and the reply to queue inside of our produce message dynamically because we already got those from the client whenever we consume those we got those attributes so and our client we need to pass those data correlation ID and reply to q and of course we need to add those as parameters for our produce method and those are of type string now that we have made the changes let's go back to our process what's happening here to the cycle our client or is already done we are producing a message to the rpcq that we just asserted in the RPC server we updated our consumer so that it consumes messages from the RPC queue we also added the producer message which is produce which sends to a specific queue and with a specific correlation ID but we still need to add the function or to handle the consumed messages so we need a message Handler because here whenever we're consuming we need to call a message Handler to which we will pass the message so that creates a result and then call the producer to send that result back to our client so that the cycle is fully working let's go ahead and create a new file in our source folder I'm going to call it message Handler okay type TS and then here I'm gonna create a class let's export it as well I'm gonna call it message Handler and then instead of this class I only have one method which is static which will be handling our messages and then we need let's say operation because if you remember we are sending an operation in our body and this is just Project Specific this isn't something related to RPC in general so I just need to know what operation I'm trying to do and then we can also have another property the operations of type string we can also have another property which is the data so the content of the message we are also going to have the correlation ID which is a type string and then the reply to which sdq which is also of type string and let's see why we need them so here what we need to do is let's just create an empty response object first because again we are in the RPC server so we need to create a result or a response so that we can turn it back to the client so here what you can do is of course we already know that our rec.body the data being sent is nom1 and num2 so we are expecting this of course you can make it cleaner by creating dtos so that you know what to expect but I'm just gonna do it this way so that the video is shorter and then you can say just to make sure everything is fine we can say the operation is and then print the operation okay and then we can create a switch case for example on the operation field and then let's check what operation we have let's just assume we have two cases which is multiply and in that case the response would be num1 times num2 we're just multiplying it and then we can add a break and then the second case which is for example sum let's say we can also add a default case you can say for example response equals zero or whatever based on your application needs and after doing that we need to produce the response back to the client which is the RPC client in our case which is this right here let's do a weight rabbit client dot produce of course our rapid client should be imported but this is the client which is a Singleton just like we've seen previously we are actually exporting the instance the single instance that we have so here what we can do is say revitclient dot produce and then we need to pass in three arguments the data which is the response the correlation ID and then the reply to queue just like we said previously the correlation ID is the one we consumed here which is sent from the client so we can match the response here that we're sending to the request being sent from the client and then the reply to is needed so we know the server knows where to reply I know I'm repeating myself multiple times plus just to help you understand better just in case you're confused and we need to send the correlation ID first that way we're actually handling the messages that we receive from the client and then we are producing a response pack using our single Revit mq client instance now we still need to do some work here in our consumer in the RPC server like we said we need to handle the messages and the consumer so whenever we consume a message after making sure it includes a correlation ID and a reply to and of course here we said we could throw another or do something else I'm just gonna log it and then here in case everything is fine no errors we can call the message Handler which is the class that we just created and since we have a static method we can call it this way which is dot handle and we need to pass in those stuff now operation how do we get that we could say message.content dot operation or we could send it in a header which is a good way I think what we could do is we could edit an header for example under a function property I think it would be a better way of doing it but of course you can set it in the content so what I'm gonna do is here I'm going to say const operation equal massage dot properties dot here we have headers and then we can let's say function and this function would be a custom field that we add to the headers and the client itself that way what we can do is the operation could be sent that way and then the data is of course we need to parse it first says stringified but it's of type Json so we need to say adjacent parts and then we take the content that we get and here we also need to send the correlation ID and the reply to of course it's winding here because we need to make this async all right so now what we're doing is whenever we consume a message we're gonna process the result by calling the message Handler by passing the operation and the data and then here based on the switch case we're gonna know which response is the correct one and then send it also keep in mind we could we could have added a try catch block or inside of default if we don't get any correct case we could create another function here another method called handle errors for example which would also produce a message but this time the message produced would for example be of this form for example error true and so on that way whenever the client is consuming back a response it can check if the error flag is set to true then it would throw or return a response of error but we're not gonna go into that in this video by the way now inside of the server here we don't really need Express because we're not getting HTTP requests or anything so we can remove that and then here we could say rabbit client dot initialize and that's everything that's happening now if we hit save and read on the server we should get a new connection as well if we refresh actually uh it is correct because if we check the queue we have a queue called rpcq but I'm not running my client so let's go ahead and run the client as well all right now that the client is running as well if you go back to our management here we can see that we have two connections one from the client and one from the RPC server we have four channels because as we said one for producing one for consuming and we have a client on the server so both of them two plus two is four channels and you have two different queues the rpcq which is this one and then the default generated the default name Auto generated queue from rabbit and q4os which is the reply queue here this is this is perfect this is just like we needed back in our RPC client inside of our producer here and send to queue we're going to add some options other than reply to and correlation ID we're going to add the headers just like we said and then we're going to add a function a function property and then like we said we're going to take data dot operation and here's the data that we're receiving from the produce messages and from the produce is the request.body so from our request here which is this operation of course we didn't need to do it just like I said previously it could have been taken from the content of the message itself from the content itself but it's the best practice to send it if you have different functions on our RPC server it would be best to actually separate it and put it in a in the headers for example like some extra piece of data that we need so it's just I'm just doing it for better practices now back in the RPC server I added a log inside of our consume message just to check what message we're receiving and now in Postman I'm gonna hit send and if you go back here as we can see we have consumed a message successfully consumed operation multiply num1 which is 2 and num3 which are three however the operation is undefined so let's check why it's undefined there's a chance that I forgot to actually save the RPC client because now when I override send a request as we can see in our RPC server we're actually getting the actual operation name now let's go to the produce message and see what the result or what the reply is going to be we can say responding with and then data so in the message Handler whenever we're getting the response after doing the calculations based on the operation we are calling produce we're here now and then we're logging what response we're gonna send let's open Postman send if we check here responding with six and this makes sense because the operation is multiply and then we have 2 times 3 so we got six from our RPC server and if you go to the client as you can see we got the reply is six and this is inside of our consumer now there is one crucial thing missing here so inside of our RPC client whenever we're producing a message here whenever we're sending to a queue to produce we need to wait for the response so here we need to wait for the response so that we can send it to the client how do we do that well we need to find a way so that the consume message here notifies the produce message here that responds has been sent also keep in mind that we should keep track of the correlation ID so that we know which response that we consume here on the reply queue belong to which request that we initially sent to the RPC server to be processed to do that we can use event and methods so and our clients for example we can add a new field let's call it event emitter which is of type event meter and you can get that from plants and then inside of our consumer we need to add this new property so event emitter which is a type event emitter is from events and then here we need to emit an event as soon as we consume a message so now here instead of just logging it we're actually going to say this dot event emitter Dot and we can see that we have a myth now what is the event name if we take a look here the only event name that makes sense is the correlation ID because it is unique so now how do we get that correlation ID well whenever we're consuming the message here we said that the RPC server is going to reply it's going to respond but it's also going to send a correlation ID so and like we've seen before we can say message dot properties dot correlation ID so this correlation ID that you are consuming here comes from the response that the RPC server produced but initially it's the one we produced here initially in the client whenever we produced or sent a request so now you can see how it's all coming together okay let's make sure it's string and then we also need to send any arguments that we have we're gonna send the message which is actually the response here now inside of our producer we also need to add the event method so let's do that all right and now here inside wait for the response what we're gonna do is we need to listen or to subscribe to a certain event name so you can say this dot eventmeter dot you can say once to listen to it once because it's going to have one response only on a certain correlation ID if for any reason more than one has been sent we only should listen to one so here we can say the correlation ID is the uuid so here we listen to that okay and then what needs to happen is whenever we get that event the message or the data that we receive you can call it data we need to log it now just for now just to make sure that it's working json.parse message.content dot to string and here we call the data message okay so let's do that and let's save it for now so now what's happening but before we need to fix this because now the producer and consumer take one more parameter and they're in their Constructor which is the event and matter and you also need to make sure that you are initializing this event emitter because here we just we're just creating it but we haven't initialized it with the value or anything so let's come here and say this test dot event emitter equal new event emitter okay now if we save what we're doing is we're actually consuming the replies the responses from the RPC server and then we are emitting an event on and the event name is the actual correlation ID the initial correlation ID that we have created when we produce the request and then the producer is going to subscribe and listen to that your ID to that correlation ID that we have produced here and once an event is emitted onto that event name we are going to get the data and then show it here on the screen so let's check if it's working I'm going to open Postman hit send and as you can see we got the reply is 6. this is in our RPC client so we consume that here and then inside of the producer we logged six as you can see so there's only one more thing that we should do if we take a look at where we're calling produced messages from it's the produce method inside of our client and we're returning the value but here inside of event if we try to say return we try to return this okay you might think this is the correct way of doing it but if we try this and you go back to postman hit send as you can see still nothing um but first we need to make sure that we're saving this in a variable we're gonna say const response equal this and then I'm going to say res dot send response right now if we save and then we hit send as you can see we got nothing and if we try to return this as well which you might think might solve it if you try to say send now our response is just an event so we need to find a way to return the actual value which is 6. well to solve this in an easy way what we can do is we can wrap this event emitter inside of a promise so we can say return new promise then resolve eject and then here we can add this code this dot event emitter.1s and then whenever we get a reply I'm gonna store it inside of reply for example you can say resolve reply all right and this is the reply that you're getting here from the reply queue from the RPC server just making sure that you're sticking with me now if we save and open Postman and hit send as you can see we got response 6. so we actually sent the request here multiply Norm 2 3 to the rpcq we produce to it and then the RPC server is consuming from it then producing the reply back with the correlation ID to our reply queue we consumed it created an event on the correlation ID as an event name and then we got our response back as you can see this is the request reply pattern in action we can try this with some for example we should get five if we hit send indeed we got five our RPC server is running and working and like I said previously you can also inside of your RPC server produce an error an object with another flag saying error through just in case some error happened and you need to be notified here so that you can return an error to the client here to the client making the API call and that way for example here you could have checked this by saying if after the reply if for example reply dot error if it is true then for example you could throw an error and or handle it or whatever I'm not gonna do it in this video because this video is already huge but you get you get the main idea of request reply pattern now one useful thing you can do as well inside of the produced messages inside the options whenever you're producing a message you can also add an expiration for your message for example you could say 10. and this is a number for example this would mean 10 seconds so the message would expire after 10 seconds it would no longer stay on the Queue because let's say you sent a message and the RPC server was down so the client on Postman or any other client is waiting for a response let's say we have a timeout mechanism that after 15 seconds the response would return timeout we need to make sure that the message is deleted from the queue because we don't really need it anymore so here for example we could say 10 and this would be our timeout for example and then we would need one more thing here here we also need to make sure that we have a timeout function so that let's say if after 10 seconds we didn't get an event with the response here we could also throw a a Time router for the client now I'm not gonna add a time of functionality in this because this video is already too long I have a separate video on how to use promise.trace to create timeouts if you have multiple promises so you would basically use this promise you would put it in a variable and then run it against another promise which is timeout basically and then I'd say just watch my video if you want to implement timeout I'm Gonna Leave links in the description also if you want to watch how you can use a exchange of type direct and have it I'm queue with microservices and node.js JS as well I also have a video on that so I'm Gonna Keep It In the description I hope it's become clear how this flow works if you have any questions leave them down in the comment section please if this video was helpful leave a like And subscribe thank you for watching and I'll see you next time
Info
Channel: Computerix
Views: 7,589
Rating: undefined out of 5
Keywords: rabbitmq, RPC, request-reply, nodeJs, typescript
Id: APfWkfkjRj8
Channel Id: undefined
Length: 62min 47sec (3767 seconds)
Published: Tue Apr 11 2023
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.