Asynchronous Tasks with FastApi, Celery and SQS

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hello guys welcome to describe Lee in this video tutorial I am going to walk you through how we can integrate salary in our first API application and use sqs to store the messages in in the queue okay so let's get started so here you can see I have created a empty project that has only S one file that is requirements.txt and here I have added the basic requirements that we need in this project uh one is the first API so first API is the core of our application that we are going to use and then we have added the UV icon ubercon is used to run the fast API application and then the salary with the sqs dependency that means that we are going to use sqs as I have said previously and this go to three so go to 3 is required to communicate with the AWS Services through our application okay so let's get started so to install these dependencies uh I'm going to create a virtual environment here so to create a virtual environment we can say Python 3 hyphen m v e n v and v e n b okay so you can see now we EnV folder is there that means we have created successfully our virtual environment and I am going to activate it Source v e and B will activate so now this is active as you can see the EnV the name of the virtual environment here just before this line okay so let's install our dependencies so to install this we can say pip install hyphen r requirements.txt okay or something no matching distribution found for the go to three seems like I have typed something wrong so let me check the latest version of moto3 go to three Pi Pi here you can see 1.2 point 1.28 okay I have not correctly type it okay that's right okay so let's install it again okay so it worked correctly perfectly nice okay so now we have installed the dependencies as well now let's create our first uh first API uh first python file that will contain our first API application instance so let's name it main.py okay here I am going to import the first API class so from first API import first display okay and I'm going to create a click as an instance by saying f equals to fast API class okay so now we have uh created our app instance so I'm just going to create a basic root route so for that let's say app Dot get and I'm just going to say slice only and here I will say Def and I'm just going to name it root okay as you can see uh my code editor is already suggesting me so that's good so now we have created our basic route so let's check if the application is working perfectly so to run the application we can say uvi corn okay so as we have as I said earlier this UV icon is used to run the first API application that we have installed here in this file okay and this main means this particular main.py file and this app means that we are initializing this particular application that we have created inside this main dot UI okay and this reload means that whenever we make any change in this directory we application will get reloaded automatically okay so this is basically a watch you can say let's press enter and it has started our first applications let's copy this URL and open this in the browser okay perfect so this is working perfectly fine okay so now let's say configure our salary into this application okay so for this I am just going to create a folder called core okay and in the core folder I'm just going to create a file called config dot UI oh sorry py I am going to create another folder I I will say it's salary underscore eutil dot UI and uh one more file that will be uh salary underscore config Dot py okay so next step should be uh configuring the salary into the first API okay so for that I'm just going to create a folder called core okay so before that uh since we are going to use express we need to provide the AWS keys like AWS access key and secret access key right so for that we need a way to read the environment variable okay so for that I am going to install another package that is called uh python.anb okay so python.nb this particular package so I'm just going to put that into the requirements Dot txt okay and latest version is 1.0.0 oh sorry what happened okay so let me put it here and let me just run the install okay so it's install now uh okay so now let's continue the configuration so in this one I am just going to create a file called config dot py so this is not uh for the sld this is basically for the application related configuration I prefer to keep in accept that in a separate file right and since we are using the first API uh 0.103.10 okay let's let's check what is the latest version of the first API for step eie okay so this is the latest version and this uh uses I guess by identic 2.0 so in pointed identity two point version if we need to install another package it's called identity setting a hidden tick settings basically if you want to uh override the configuration so this uh Library we need so for that I'm just going to put that in all also in the requirements.txt file and I'm just going to install this as well okay so this is install now perfect so now we can continue with our configuration okay perfect nice so for this uh I will create a class and I will call it settings okay and this settings will extend a class that will be imported from the identity setting so from a polidentic settings import base setting so this will extend this wave setting class okay here uh what I'm going to do I am going to do couple of things like I am going to import the dot EnV so from dot EnV Library dot ENB Imports load Envy right so here I am just going to call load NB load the EnV environment variable from dot NB file okay so this will load the dot EnV file for number we call the configuration okay and what else here you can do couple of more things let's say if you want to set the app name app name you can set the whatever name you want you can set the version and you can set the debug false maybe you can get this debug variable from the EnV uh so for that you need to import the another python Library called OS so you can see it here OS dot Envy by default this will be false so here in the uh we need to create another and here you can say true in if you are working in a local but if you are working in your deploying this project in your live environment you might make want to make it false okay so that's the basic setting I'm just going to create another method called let's say diff and get settings okay not get DB sorry about this get settings and this getting get settings will return settings okay you can override this setting by creating another class and by getting the environment variable let's say f e and V and if you set it to development then here you can create another class and written appropriate version of the settings that you want to return for a different environment so that's up to you I'm not going to do that and I'm going to remove this variable as well so our setting is ready so for the salary integration I am going to create another folder and this will be called salary okay not inside the core outside this curve and in the main folder so here I'm going to create two file one will be config Dot py and one will be util dot UI okay so so let's let's start configuring these files okay so for in the config one uh I need couple of imports one could be let's say uh okay from kombu basically this is this comes with your salary so combo is a library that is installed with your salary package so this will be we will use for configuring the queue okay so we are going to import the queue okay and we are going to also import the settings application settings okay so from we are going to say core dot config import settings I am going to say this app settings okay and here I am going to initialize this settings equals to app settings so this means when we are calling this app settings this particular class is instantiated that means our environment variable is loaded now okay so this is uh oh I am doing this in a wrong file so this would be in the config one okay in the configurations we are creating the configuration so it should be in the config one and here I'm just going to again create a class and I will call it base settings so this base settings class is basically for the salary configuration so let's write it uh salary config uration okay here and what I am going to configure it broker URL basically broker underscore URL so broker URL you can set it in your settings file but since I am creating a different different uh configuration file so I'm just going to say sqs colon there are multiple ways you can set the broker URL but I prefer to do this way I don't want to provide the access key and secret access key in the URL itself I want to provide that via our environment variable okay so another is like result backend so I'm not going to use any result backend edge of now that is also a default Behavior Uh if you want me just comment it out I will create a separate video how we can configure the result backend okay and also I'm going to say broker sorry about my typings broker transport is you or the broker transport I am going to say sqs since I'm going to use that space okay we can also configure a default queue so task default you uh I'm just going to say default okay so this will be our default queue now what are the cues that we we want to use in this application right so to configure that maybe you are you have you are having a multiple task one is to send the email one is to send one is to send a email related to the welcome message one is to send the email related to the uh account verification so for that you might want to keep a separate uh queue right so you can configure it here by saying task queue and this will accept uh Q class that we have imported here so you can say queue and queue basically if you don't provide routing Knoll it will be by default or default one so I'm just going to provide the qname so I'm just going to say let's say describedly q1 okay I'm just going to Define it two classes here describably Q2 you can Define many Q as many queue as you want okay doesn't matter and then I am going to define a broker transport option so here I am going to configure the region where I want to use the sqs so broker and basically transport option okay here region I am for now I'm just going to leave it uh Us East one but if I check here I am going to use the AP South one actually so AP South one okay so you can get it from the environment variable if you want but I'm just going to leave it hard coded so it's up to you and you can Define the worker concurrency here also worker concurrency so I'm just going to have it one you can Define uh as many as you want if you don't Define uh based on your processor in your machine it automatically clicks uh let's say you are having a quad core so that means it will automatically pick four uh concurrent request at the same time but you can Define it here also so I'm just going to leave it one and there is another uh variable called uh include so what task you want to include right so this will actually uh accept the path where your task is defined okay so this should be a list or whatever you want to Define so list could be I'm just going to create here a folder called task not here t a s k s task here okay and I'm going to create a file in this folder this will be called let's say dummy task Dot py okay and another file dummy job Dot py Okay so in this one you can Define uh basically the path of the folder or you can Define the file as well so if I Define let's say task only right that means all these two uh files whatever task these two files contain will be included in the cell the process but if you want to define a particular file only then you can just put a dot and then you can Define like dummy jobs dummy task whatever you want so I'm just going to leave it like this okay so this this is basically uh the configuration that we need for the salary to work okay so now let's go to the utils okay so here we are going to create the salary uh salary instance okay and then we will be calling this instance so here also I am just going to import couple of things from the salary so one is like salary and import and current app as I will say it uh current salary okay and in the configuration also I am just going to create Define a method def get s settings because we need the settings when we initialize our application sell the application right so for that let me go to this utils okay we have created our app I will Define a method create salary okay in this method I will create the salary app and this will be create salary app and I am just going to provide the settings also so sell the app Dot you can call config from object okay config from automatically fix no this is not the quick so we can import that settings also from salary Dot a config import get settings as early settings I think I need to rename this one otherwise it's going to config with the official package so I will rename this salary app okay so now we can say salary app perfect now it's fine so you can say settings say here and this will be salary settings so I will just provide the settings and if you want to provide you can provide your name space Also now this is basically configured but I'm just going to provide the content what kind of content we are going to use in this queue also so that will be salary app salary dot uh update and in this update I'm just going to provide accept content this will be Json and also I'm just going to provide a pickle right uh there are a couple of more configurations uh you can you can configure like result sealer serializer if you want to use the result so you can do this like this one update task serializer actually this is pickle you can also configure result serializer that is also a pickle couple of more configuration you can do whatever uh you you can just have your office look at the official documentation of the salary and you will find it there okay one more thing that is important like I'm just going to define the worker prefetch multiplier okay so that is salary update so this basically this tells celery to pick one message at a time okay and one more thing let's say whenever your application is starting there is something that Rick that needs some time to get started let's say you are connecting to a database or something then you can just tell the salary to retry whenever a connection fails right so if you want to do that you can just say that salary app dot update and this this will be called like broker connection retry broker connection and retry sorry retry and you can say it to true retry on startup you can just copy any of them and or you can just go here and type salary configuration options and you can find lots of configuration options that you can configure here okay so you can what based on your requirement you can just copy it from here and just put it here that will work also you can uh put some of the configuration here if you want so both ways will work either you can update it or you can configure it from here okay so this is fine now I think we have configured we have also written our salary application perfect so now let's uh call that in our uh main.py okay so to do that what we need to do uh we need to import the salary create salary that we have created so from uh let's say salary underscore app Dot utils import create salary okay I think method name is create salary itself let's confirm it the accurate salary so this method okay and we can say uh app dot salary app equals to uh you can call the method create salary okay so this means whenever our application is starting the salary application uh salary app is integrated and working with our first API application and we can also create a variable here salary equals to create or we can say app Dot create salary sorry basically salary application so why we need this variable whenever we want to run the salary in a separate thread or in a separate process let's say you want to have if you are running your application in Indian Docker so you need a container to run the first API to serve the API but you need another container at the same time to run your background task okay so for that uh we need the salary okay we cannot run the Cell at this moment we need couple of more things one is since we are we have configured it here uh that we are going to use the sqs right so to connect to the AWS sqs we need the AWS credentials okay so let's take uh copy the AWS credential in our environment variable so by default if you go to the salary and you can search your salary and sqs okay this will give you a link and here you can see this it it's expected the key that it is known as the AWS access key and AWS secret key right so AWS access key ID and AWS secret key ID we can put that into the environment variable then we can use the excuse like this so we need these two keys okay so let me put that into the envy so you can we can say like AWS Keys AWS config okay so this one here and this one here we copy the values okay okay this is to know so let's let's try to run this early now and see if it's working okay so to run this early let's open the terminal and we can type uh salary and then we could type hyphen a main Dot salary if you remember when we were running the fast API application we said main call in app but here we will say main dot salary that means we are saying this one okay and then worker if you want to put the log label you can put the iPhone iPhone log level equals to whatever you want to put in for whatever you want to Define you can also Define The Hyphen iPhone cues here you can also Define the concurrency here also so this if you define the concurrency here or Qs here this will overwrite the default configuration that we have created in the config.py okay so I'm not going to do this just for your info I have to leave it so let's run this uh it says salary option command or uh what is wrong did I miss anything oh it says uh function object has no attribute config from object okay we might have basically made some mistakes so let me just uh go to this config we have said at least create silly note it should be salary up and then it should be config from objects okay let me make let's check if this is the method actually uh this is config from object salary config from object you can search it here and probably you will have yeah this is the method basically so you can say you can see here app dot config from object okay so basically your salary application and then config from object so this would be fine now save it and let's go to the terminal again okay so let's run now attribute update where okay why this is saying that oh sorry it should be Dot conf dot update so we are updating the config so it should be confident update sorry about that update okay so let's try now hopefully this time it will work yeah so your salary is started but this is saying the curl client request the particle Library so I'm just going to solve this issue in my local credit out why it was not working basically this issue uh comes sometimes in my OS so to fix this one you can run this command uninstall Pi curl although it was installed correctly but you need to uh whenever you are installing you need to tell that to use the SSL Library okay for that uh you can just uh uninstall it and install it with this command okay so you can say Pi curl underscore SSL library and then you can tell open SSL then the path of the library and then basically flag ldf LD flag and then CPP Flex okay you can provide the path like this here and then you can say pip install no case dir Piper and once you install the pi curl again this would work so you can again run the command called uh this one sadly hyphen a main dot salary worker login for equals to info so now this we don't see that error that means it it is correctly working and it's connected to sqs and if it's working correctly we should see actually the queue is already created in the express so if I go to this queue okay you can see uh this got created and it created 11 23 right and you can say 1124 is right now in my local time zone so that means this is created just now so that means we can say our uh sqs and salary is perfectly uh or and correctly integrated with our first step application okay so let's create the task so I'm just going to create one task in this one and one task in this one and this will just print whatever argument we receive whatever message we received from the queue right it's not going to do anything it's a dummy task so I'm just going to import couple of things here one is like uh create salary application that we updated so I'm just going to say uh salary app Dot utils and import antly okay and I'm also going to import our settings because we need the environment variable uh here also because this will be running in a separate thread so we need to initialize your settings so that our environment environment variable is loaded so for that we can say uh core dot config import get settings Okay so I'm just going to initialize the settings get settings and now to create the task I am just going to define a method call diff uh dummy I will say dummy job okay and this one will basically receive some uh argument from argument means like message whatever is coming in the queue so keywords and I'm just going to print that print dummy job and this dummy job will print uh what it will print it will print whatever we are getting okay and how the salary will recognize this is a salary task for that we need to Define uh basically we need to tell the salary so for that we need to initialize the salary application create salary and then we will say salary and Dot task okay and then we can Define the name and all so this is basically dummy job and bind should be true acknowledge uh basically we can Define the acknowledge lay true that means ack late if you define the acknowledge let true like this that means like whenever your job is successfully processed or returned true then the salary will unders understand that this task is successful and it will delete the message from the sqls and if you don't Define this by default it's false that that means whenever a message is read from the SBS it will be deleted from there okay so it's depend upon your requirement I'm just going going to leave it you can Define max type 3 default read write lay you can Define 10 seconds or you can leave it okay there are a couple of more uh things that you can Define here like a task reject on worker loss so this is another kind of error sometimes we see uh when the salary is running so this this could be due to the uh lack of memory let's say your application your application is requiring lots of memory but your maybe container or wherever you are running doesn't have that NF space right so in that case this will fail so if you want to delete the message whenever there is a failure you can just put it through else you can put it false by default this is true right I'm just going to leave it one is also called Auto retry Auto retry for so you can put it exception that means any exception occurs this will automatically this will be automatically retried but if you want to retry for a specific kind of exceptions only you can put those type of exceptions and you can throw those type of exceptions in the uh logic of this particular job so I'm just going to leave it what else you can [Music] Define the queue also here what you that this particular task will be using so you can Define the queue equals to and here I am just going to use the queue name that I have defined here so like I said you can create an environment variable for this queue in the settings and then you can use the same settings here I'm just going to hard code it here so this particular task deliver for this queue okay and I think that's it for the more info I will suggest you to go through the salary official documentation you will find the information there okay and for the other task I'm just going to copy it just name it to uh dummy task here and I will be using this to second queue and I will say dummy task okay so the me task so this means whenever we post a message into the queue we should see some log in the salary that will say dummy job and whenever we post some message into the Q2 we will see dummy task not the job or we can say here also you to name from where this message is coming from we can just copy paste it here for our identification purpose Okay so so this is the node and for this to push a message into the queue we need to do we need to create a way how we can push the message into the group so now we need to push the message into the sqs right so to push the message into the sqs we need to uh actually I am going to Define two methods okay and not methods basically two routes and using those routes I am going to push the message into the queue okay so let's do that so I am going to Define like Q job okay so here uh I am going to not I'm not going to use this one okay so here I'll get a job name in this route and this whatever I receive here let's say I will call it query this that was the auto suggestion I'm not going to use that and whatever I get in this query that I am going to push that okay so for that I need to import the task that we have created okay so to import the task we can say from and therefore the task exist here task Dot uh damage of import the major okay so this is the dummy job that we have created and let's say let's define the route name also so I will say app dot get the music and this dummy job will have this query okay so this demo job will have the query and whatever we get in this query uh we'll get that query here and I'm just going to push that so for that Ewa RGS I'm just going to keyword create a dictionary and in this dictionary I'm just going to define the query and this will be the query okay so this I am just going to this will be basically the payload of our uh Q and this I will be pushing into the queue so for that uh method that we have imported here so that is the dummy job so we can say like dummy job Dot and then we need to say apply async and then we can pass QX equal to key box also we can Define the queue here Q equals to in which queue we are going to push right that is also required so that where we are going to push this is the qname okay so this is the queue name since uh like uh we have created two separate cues so it's better we Define if there was a single queue then if you don't Define the queue it will it is going to go into the default one right so you can Define the queue like this here and keywords so this is ready now let's create another method that is for our task okay and here also this will be dummy task okay and this query equal to query and again we need to import from task dot dummy task import dummy task so this will be dummy task and this is going to go into the second queue right so this is ready now and let's start our uh fast API so we can say uh UV icon main calling app iphone reload so this is starting now so let's go to the browser and you can see the currently there is nothing in the queue let me include these tabs not 2.0 so this is done now okay so what I'm going to do if you know you can just go here like talks okay when you go to the docs you will see the all the API here and you can directly type in the browser but I'm just going to do it from here because it's actually from your whatever I type here I can just execute and we can see so for the dummy job I'm just going to put job one and just execute okay so this has thrown some error let's see function object has no attribute apply async by this is saying this is Task actually did we know Define what did we miss uh I think this is correct that's why this is saying oh this because of this basically this is this should be at salary okay so sorry about this is silly mistakes that I do a lot I guess now it should be fine okay so let's see if our application is running uh I think it's running I don't see the log I have closed it let me see yeah it's running but but can I see the logs okay so terminal oh let me delete this here our application is running I have created so many judges let me close them this is also not running this is not running this is not running it's not running this is actually salary is running here so I'm just going to stop this because if it's running we push the message instantly this is going to read and we won't see we won't be able to see the message into the queue okay so let's retry so retry and job zero zero one that's good so we don't see any return because we haven't returned anything here and hopefully this has push the message into the queue so let me just see did we not written anything we did not return anything so if I go to the sqs we see one message here right so let's put couple of messages uh so we have done one we are going to do two we are going to do three we're going to four we are going to do five okay so this is done now let's put push the message into the second one and that is our task one so here also I am going to say task one internal server error did we not change this salary tasks oh this is not saved oh Okay so one two three four and five so we should see the five message in both the queues so it's here and now let's start the salary and let's see the message is being consumed okay so to do this we can run these ones you can see task three task two task uh task three jobs so this is running asynchronously that means there is no sequence okay and if we open another terminal if you open any another terminal and we run the audio icon application that means our first API application so this is uh now our application engineering I'm just going to put some space here and if you run these two uh processing a separate container or separate uh ec2 instance as soon as you push the message so Let me refresh the page I'm just going to push the job on right and I'm going to say hello word hello so as soon as I post a message I should see the output here so execute and you see that this has received I don't know what it's doing message is gone okay so this is this has a Facebook process I think this was taking some time uh during because it needs to make the connection to the sqs and after that it needs to delete the data from there so this is uh that's why it took little time but you can see the instant output here that means our salary integration and sqs integration with the first API it's perfectly working so thank you so much and I'm just going to put this code in a GitHub and I will also provide the link in the description okay so if you need you can have a look at the code thank you so much for watching uh watching and if you haven't subscribed this channel yet please do subscribe it helped me a lot thank you so much
Info
Channel: Describly
Views: 2,558
Rating: undefined out of 5
Keywords: FastApi, Celery, FastApi with Celery and SQS, Celery with SQS
Id: CjkC34fYuLw
Channel Id: undefined
Length: 48min 6sec (2886 seconds)
Published: Sat Sep 02 2023
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.