How to Build and schedule Machine Learning Pipeline using Airflow | #mlops #airflow #ashutosh_mlops

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
so hello everyone so today uh we are going to discuss one of the very important topic that is how to create end to end machine learning pipeline using airflow so if you are a mops engineer then you must know like okay this is one of the most important task or job profile which is required as a mlops engineer even if you are a data scientist and aspir to be a mlops engineer then you must know this like how to create ml PIP plan using different tools available okay so uh in my previous videos I have used Q flow and today I'm going to use another one of the widely used tool that is airflow okay so uh let's start so in my last video I have already explained like how to uh set up and install uh Apache airflow locally on Windows machine using Docker compose okay today I'm proceeding uh uh next to that one so I'm not going to talk about installing part but I hope you have already installed and you ready to uh start building ml pipeline okay so uh I'm sure you must have already created these folders like inside airflow directory you have config DXs plugins so these are the important folders log will be created automatically okay and you must have already downloaded Docker compost. yaml file because that is a part of installation and then Docker file and requirement. EXT these are the two new file why why we need them so the purpose I will be explaining in a moment okay so let's start with that one so don't get confused with this okay I will explain so no worries because I want to First explain you the problem like what problem you will face and then for what solution you need these file okay so I will explain at that moment so let's start uh with uh so create ml pipeline. py file so and where you will create because this will act as a dag file or I would say direct directed asy graph okay so these are nothing but the set of task which will be executed sequentially or based on dependency manner so this file you need to create inside DX repository so here you see I have multiple files so forget about these two file these two other pipeline I have created so maybe I'll explain in some other videos but today focus on ML pipeline. py and forget about this SQL directory this is again uh not used today okay so just focus on ML pipeline. py file so uh you just created a blank file I would say just execute along with me and then uh you import certain libraries here so what all these libraries are basically so from airflow. Models import dag so dag is used to combine all the tasks so kind of create a dag directed a cycli graph and then dat time is some uh operational purpose and then uh from a so airflow has different operators basically so for python related operations we can import python operator if you want to interact with some database related operators then you can uh use or import a specific database operator for example if you are using with postrace then you can import post gra operator if you you uh doing some operation on bassell then you can import bass operators so like what different operators are there okay so I'm going to use Python operator and next I'll will be utilizing some uh pandas operations so I'm importing pandas then OS and then these are specific to machine learning that is from Cy learn model selection because I'm going to uh train a model okay and then uh whatever steps are required in the TR training model and generating some Matrix performance matrics all I will combine in a pipeline so that's the overall idea of this session okay so these are the libraries basically and then here I'm starting with uh creating uh basic prepare data function in Python okay so I'm assuming you are already wellers with python so no need to dig further very much time don't need to invest much time here so this is just I'm um reading a CSV file uh locate in one of the my GI GitHub repository and so and reading in DF that is data frame and then I'm just doing Drop in a basic operation so here my objective is to tell you like you can Define as many operation as you want to do here okay so whatever data cleaning data processing everything you can do here okay so my objective here is not to explain you like how data processing happen but to tell you like how to build the pipeline right so that's where I'm just um excluding that part so here um this is one function and then I'm just saving uh output of this into final DF CSU so I'm assuming this final data frame is created for model training okay next step in model training is like we need to divide this data set into Trend test split so again I will create another function and there I'm just uh kind of uh splitting the data frame into train and test okay so what will happen so I'm just writing some printed statement so that once we are inside the log so you can just see okay like where are you exactly okay so I'm just reading the final data frame which I've created in previous function and then Target column I'm defining that is a class variable and then in X and Y data frame so X is nothing but the like independent variables and Y dependent variable that is our Target variable and then X test y Test X train y train all you can split using train test this is normal machine learning job right so here this train test split will happen and I'm storing this x train y test X test y test all these data frame I mean I converted into naai basically here okay I'm using the np. I'm saving this uh nay array okay so this is fine and then the third function is like a training basic classifier so here I'm creating very basic classifier to give you an idea right so here I'm importing a skill learn again and then I'm loading this x train and Y train and instantiating logistic regression classifier and then using fit it will create train model and then I'm just dumping this model in model. PK file okay so this will be already again saved then the fourth function is like predict on test data so here I'm just loading the model and then I'm loading this X test np. nump array and then using logistic regression predict function I'm just passing X test and you will do the prediction okay and output I will be save again y predict. okay and then final uh method is like a probe on test data so I'm just trying to uh create a like probability values on test data so here it just the class basically and here I'm just creating the probability value okay so this is again kind of another function okay so no to worry and then the last function is like get Matrix so here again um I'm just loading Y test. npy and y. npy okay and then there this probabilities value and then generating the accuracy using accuracy score from sccal not Matrix if I pass y test wi so it will generate accuracy precision recall entropy and everything it will be create okay so till here everything is fine I'm assuming you all know like how to create a ma uh machine learning model OKAY from here the important part will start like how to create a dag or how to create create a machine learning PIP plan using airf flow okay so here I will go a little slow to explain everything okay so the very first step you need to do is Define a dag that is directed a cyc graph so using with keyword you can initialize this dag and inside that we need to pass certain parameters what are the parameter basically so the very first one is like dag ID so here you need to Define and a name of the pipeline which will be visible in airflow UI okay in a moment you will see and then at what interval you want to schedule a pipeline that interval you can Define here so I want to schedule in daily basis okay and then uh when you want to start maybe I want to start it from today so I given the today's date okay and then casing is false that's fine as dag and inside that uh you need to create a certain task basically okay so if you don't want to give as dag or with keyword you simply uh Define uh this part only this much okay and then here you need to uh give dag is equal to whatever oper like for example let me show you so there are options to do so what you can also do so instead of with you can have something like this and you don't need to have this and using dag is equal to Dag you can assign okay so like this maybe dag one that is a keyword like this okay you can do and then you need to have this like this no no indentation is needed and then you can M this part okay then error will go so like this also you can give in that case you need to Define dag here okay but let's go in the previous manner so that's uh okay so this is also I just showed you like another way basically okay so here now uh I create initialize this dag and then inside that I am creating a different task so the very first task like whatever uh methods we have created here so like four or five methods we have created right so now we need to uh stack all these four or five task okay so that they can execute in a sequential or dependency manner whatever we Define so the very first task is like prepare data to prepare data so here inside task ID you can Define any task ID okay and this task ID will be visible in your ml PIP plan in airflow UI okay so whatever name you want to make it visible there you can define that name and then python callable so give exact method name which you have defined above okay so this is the way you can Define the task and similar way you can Define all the task which are which you want to create so the next one is like train test is plate and then training basic classifier and then we want to predict on test data and then we want to also know the predicted probability value right and then we want to generate the Matrix so here we have defined all the tasks right inside that tag right all tasks are defined and now we want to create a man like manner or on which sequence we want to execute this task right so here I have defined like okay start with task prepare data then next you want to run this one then next this one then this one and then this one so here all will leun in sequence okay because there is dependency on previous task but suppose let's say uh I'm just copying this here okay for your explanation purpose okay suppose I want to run these two task together okay so we can Define like this so now what will happen uh after this task these two task will run in parallel okay so these both the task are dependent on this only right so whatever output is produced by this uh task this might be utilized by this and this together okay so there is no dependency for this task to run after this so that that you can also do okay but here uh there is no dependency in our case all will run sequentially so that's where I have defined here but that part also I told okay so now your pipeline is defined so you know what is the advantage of air flow airflow in airf flow workflows work as a code so everything is customizable all the workflows all the task you can Define python file. P file okay and then you can run in UI so how you run in UI so the another important step is like you need to start uh you need to initialize airflow okay how you can initialize so see here Docker compost. yal file is already available here which you have downloaded from the airflow website okay latest release in my last video I have shown explain all these steps right so now if you run Docker compose up so it will um initialize the airflow and respected uh respective services so what are the services basically if you go Docker composed. yaml so here these services are defined so what it will initialize it will initialize a post gra database will initialize radis it will initialize a web server and all different Services it will initialize which are required to run airflow okay and in this case we are not using postp database but if you are using post then we can utilize that as well okay now coming to this Docker file and requirement. txt part so here if you see in our ml pipeline we are using psyit learn right or any other specific Library we might use okay so here we are using only pyit learn and this psyit learn might not be available here in your default image so if I show you here so by default what is the structure of Docker compos basically if you have downloaded it will be like this so it will try to initialize this particular image airflow 2. A.1 from this okay and here it might be the case this while building this image in that Docker file your respect your specific packages are not defined Okay then if you do the docker compose do up then you might see error in the dag itself like okay uh pyit learn or SK learn is not available so for that if you want to install a specific packages inside that cluster so what you can do you can comment this line okay and you can uncomment this line and then now you need a Docker file so now that's where I have created Docker file in the parent folder and here you can Define the airflow uh version which version of airflow you want to install and simultaneously you can also Define the copy the requirement. txt file and inside requirement. txt file you can put all your libraries which you want to install along with that the same cluster okay so now if you go go to requirement. txt file here I have defined C learn M plot liai and pandas okay whatever you need you can Define all the libraries here in the same cluster it will install all the libraries so that's the advantage of using Docker file and requirement. XT like this okay and make sure you comment uh that line this line otherwise it will install from this and you just uncomment this build so it will start building this Docker file okay and now the moment you do uh Docker compos do up and make sure before running this command your Docker desktop is up and running okay here so if you're doing first time then in your case in Docker desktop so you might see empty containers but in my case because I have already ran airflow so one airf flow container is shown here okay but if you have used airf flow previously just stop that here it is already stopped so that's where it is asking to start but if it is running in your case you can stop that one okay so that uh you can start it freshly because we need to build from here okay I don't know if it is available in your case the last time how you have built right so we want to build from this so that's where now you run Docker composed app so now I'm hitting enter and it will start initializing that one and here um you if you monitor this log this uh command prompt output right terminal output then here you see in your case if you're running the first time it will be building and installing all the libraries from requirement. txt and it will show here one by one okay so I have already ran so it will take from the C because container is already available there it will simply start that container okay but in your case if you're running the first time here it will install everything one by one okay and make sure uh there are certain um uh issues in older version of airflow like 2.0.1 with sqlite SQL thing okay so make sure use the latest version that is 2. A.1 I'm using okay so if you are also using 2.8.1 then everything will work fine as expected as I'm showing you okay so now if you go to do desktops here you see like previously it was uh asking to start but now it has already started so now you see so all the like air for initializes C trigger web server worker post gra ready I explained all about all these web services in my installation video you can refer that one okay now see all the services are up and running so let's see you if you click on this link then it will open the local host and airflow from Local Host okay so if you are running the very first time then in your case this window will appear and here you can do airflow and airflow username and password is airflow airflow okay and then you can do the signin H uh what happened invalid login a i RF L this is also airflow okay so if you just sign in yeah it will work so now here if you see in place of all so here uh by default there are many uh DS are available okay many pipelines are available but forget about all these pipelines you need to focus whatever you have built so if you so here if you go into your code and go to ml pipeline. py file and go inside D part then you will see what name you have defined so here we have def ml pipeline demo okay so let's search with this name so if you search with this name here filter DX by tag or search dag so here if you search see ml pipeline demo is available and now if you select that one it will direct you to here okay so now if you so this is our ml Pipeline and if you click on graph part then here it will show all the steps which you have created see first we have created prepare data then train test split then training basic classifier then predict on test data then predict probabilities on test data then get get Matrix right everything is there now so these two lines you see like I have already executed them okay but I will execute freshly so now if I just click on this uh part trigger dag okay so it will trigger the dag so now if I just click on this then it will start running so now you see it is running so this third block will come and if everything is fine then it will run successfully you see so this is Step so prepare dat step this ran successfully this uh test split also ran successfully and now it is running training basic classifier so this also got successful then let's see predict on test this also got successful right so if you go to graph so here you will see these parts are executing okay so all part are successful if I click here then it will show the dag is specific to this one now you see green green green Green all are green right everything ran successfully now if you want to see the output of a particular uh steps then you can simply click on that step and then here you log will appear and inside log you can see whatever you have printed okay so for example in train test split so let's see what I have printed in train test split task so where is train test split task uh this one right and here I have saved this and then printed all X train X test everything I have printed right so if you go here so here these are available right so and data is also available so like this CLE andle everything right this is X train so this is xra data and then this is X test data see X test right so whatever you print there it will be available in the log right and now if you see like we are also storing certain files for example this we have stored and then final DF do CSV this we have stored so where this is stored so this will be stored inside your cluster sorry inside your cluster okay and there will be a volume created so if you see here airflow Pro so this is the volume it has by default created and inside that it will uh store all the files and everything okay so now if you go to The Container so here you will see airflow worker and and if you go this container and open Terminal okay and now if I do LS so here you will see like final DF do CSV is created okay and all this y y test and everything got created okay and it is stored here and if you do PWD so this is inside opt airflow in this directory okay so now um this uh pipeline ran successfully and now if you see here if you go to the Dax again and go to the active then here you see so it is scheduled on daily frequency and we have started on 182 and this is the current run so current time on the server and then this is the daily so if you over here night 12:00 it will run every day so that's how it is scheduled okay but if you want to manually run this then using this you can trigger this tag right and then if you go inside that one and then you want to see the particular run output if you click that one or first one you want to see okay you can come here and you can click on the log and then you can click uh any step and then you can see the logs if you are interested in the logs right and you can download the log as well for some analysis purpose so this is how you can uh create ml Pipeline and schedule ml pipeline right it's very simple so all this uh python related function you can create then you second step you can create a dag and then inside dag you can bind all the task once you bind you define the dependency and in that dependency manner it will run okay so this is all about today's video and most importantly guys so I was struggling in the beginning like okay this psychic learn package was not available because I was using this image by default the default Docker compos provider okay provided so there are two options basically you can in uh create a Docker file and requirement. txt and you can start building from there or you can install those packages inside your container in the terminal itself that also but that is a temporary option right but the better solution is like build from this Docker file so this is all about um creating a machine learning pipeline using uh airflow I hope uh you learned something today and if you have any doubt then please feel free to ask using the comment section and share your thought as well so thank you
Info
Channel: Ashutosh Tripathi
Views: 605
Rating: undefined out of 5
Keywords: how to create machine learning pipeline using airflow, build ml pipeline, apache airflow tutorial, apache airflow tutorial for beginners, apache airflow tutorial python, mlops, ashutosh mlops, mlops tutorial, data pipeline, model deployment, ml model pipeline deployment, airflow tutorial for beginners, airflow tutorial, apache airflow
Id: Hlvy4fPW6nE
Channel Id: undefined
Length: 22min 18sec (1338 seconds)
Published: Mon Feb 19 2024
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.