Airflow DAG: Coding your first DAG for Beginners

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments

Hey thanks for posting this. i was looking for something similar. keep posting.

Regards

👍︎︎ 2 👤︎︎ u/restlessgeek 📅︎︎ Mar 04 2021 🗫︎ replies

Is this you? I took your course couple weeks ago haha

👍︎︎ 1 👤︎︎ u/bohoho 📅︎︎ Mar 04 2021 🗫︎ replies
Captions
i'm neutral airflow airflow is hard i don't know where to start what is the tag how can i create data pipelining how can i code my first task i mean all of those questions i don't know where should i start well i've asked myself all of those questions when i started to use airflow but guess what you've come to the right place in this video you are going to discover how to code your first data pipeline my name is mark lomachi i'm the head of customer training at astronomer and a bestselling instructor on udemy and my goal is to make sure that you are going to discover the most important concepts of airflow how to code your first lag and how to run it in airflow the only thing i ask you in favor is to subscribe to my channel this will help me a lot and don't forget to smash the like button it is always good for the youtube algorithm now without further ado let's discover what you are going to code in that video this is the data plan you are going to code first you have three tasks training model a b and c as you are not going to train real machine learning models you are going to return fake accuracies then in choosing best model you will select the best accuracy among all of those accuracies as soon as you have the best accuracy if that accuracy is above a threshold then you will execute accurate or if not you want to execute inaccurate this data pipeline may look simple but it is not indeed you are going to discover how to create tasks dynamically how to choose one task or another according to a connection how to share data between your tasks and more importantly how to schedule your data pipeline in air flow but before jumping into the code you have to understand some very important concepts of airflow let's begin with the first one what is a dag dag stands for directed acyclic graph which means it is a graph with nodes directed edges and no cycles a dag is a data pipeline in airflow for example this either data pipeline or a dag you have four tasks t1 t2 t3 and t4 those are the nodes and the directed edges are the arrows as you can see here corresponding to the dependencies between your tasks basically you know that t1 must be executed first and then t2 t3 and t4 in this deck there is no cycle or no loop in that one there is a loop as you can see t2 depends on t1 but also t1 depends on t2 as shown by the red arrow this is not a dag this is not a valid dag and you won't be able to do that in airflow you will receive an error so basically keep in mind that your dag is a data pipeline in airflow where the nodes are the tasks direct edges are the dependencies between your tasks the second important concept that you have to know is the concept of operators what is an operator an operator is nothing more than a task in your dag let's take the following data pipeline here t2 t3 t4 and t1 actually are tasks but let's say t2 executes a python function then t3 executes a bash command and last but not least t4 inserts data into a database well t2 t3 and t4 are tasks but behind the scene each task is actually an operator for example you want to execute a python function you will use the python operator you want to execute a bash command you will use the bash operator you want to insert data into a database you might use the postgres operator so depending on the action you want to trigger from your dag you will use the corresponding operator and airflow has a ton of different operators keep in mind that an operator is nothing more than an object encapsulating the job that you want to run in your dag and each time you add an operator in your dag you are in fact adding a new task that being said you know everything you need in order to jump into the code this is it you are going to create your first detail pipeline in four steps but at this point if you want to follow what i'm going to show you on your computer you have to run airflow on your machine how to do it i made a video about this you will find the link in the description below now once you have airflow up and running the first step is to create the folder dags where all of your dags will be once you have that folder create a new file and let's name it myoscontag.py each time you create a dag you need to create a python file okay the first step to create a data pipeline is to make the right imports and there is one import that you will always make which is the dag class so let's type from airflow import dag you will always have to import that class in order to say that this file is actually a die a day type line once you have that class there is a second import that you will usually make which is from that time import date time why because as you are going to see a data pipeline expects a start date a date at which we will say that the data pipeline should start being scheduled so that's why you need to import date time once you have this you are ready to move to the second step in the second step you are going to create an instance of that class so let's do this first in order to create that instance we can use the context manager wiz so let's type wiz then dag as dag and colon like that so at this point we have successfully created a dive object with the context manager wiz and if you don't know what a context manager is you will find some additional information in the description below once you have the dac object right there you have to put some parameters and the first one is the dag id each tag must have a unique identifier otherwise you might end up with weird behaviors so let's put a unique identifier which is my underscore dag then the second parameter that you have to define is the start date what is the start date the start date defines the date at which your dag will start being scheduled for example let's say the start date equals to dead time you have to specify a daytime object then the 1st of january 2021 so let's put 2021 then one and one by defining the start date to the 1st of january 2021 you are seeing that this dag will start being scheduled the 1st of january 2021. now there are two additional parameters that might be super useful for you and the first one is the schedule interval so let's specify a schedule interval here put schedule underscore interval equals to something the scheduling interval defines the frequency the interval of time at which your dag is triggered for example you want to trigger your dag every 10 minutes you will specify this in the schedule interval how by defining a crown expression a crown expression is nothing more than a way to define interval of times in unix also you have to know that f loop brings some preset chrome expressions that you can use in your case let's say that this deck should be triggered every day you can use the preset at daily and if you are wondering what are the other presets you will find this in the description below alright there is one very important thing that you have to understand in airflow this dag with the start date of the 1st of january 2021 and a scheduling interval to daily will be effectively triggered after the start date plus the schedule interval which means this dag is going to be effectively triggered the 2nd of january 2021 at midnight after the start date plus the schedule interval and since the schedule interval is set to daily it is after the start date plus one day that's how dags are scheduled now what about the last parameter well you have to know that each time a dag is triggered a diagonal object is created what is a diagram it is nothing more than an instance of your dag running at a given date the thing is by default airflow will try to run all the non-triggered diagrams between the start date and the current date so if the start date is set to one year ago and you have a schedule interval to daily you will end up with many many diagrams and so many many instances of your dag how can you prevent this by setting the parameter catch up to false and by setting that parameter only the latest non-triggered diagram will be automatically triggered by airflow that's the only thing you have to remember with the catch up parameter it allows you to avoid ending up with many many diagrams corresponding to the non-triggered diagrams between the start date and the current date all right you have successfully created the dag object now it's time to move to the search step which is implementing the tasks if you remember the dag you want to create there are three tasks to add first training on oscar model a b and c let's do this whenever you want to add a task in your dag the first step is to import the operator you are going to use and in that case you are going to use the python operator to execute a python function so let's do this type from airflow.operators dot python import python operator once you have the python operator imported just under the dag object you can create your task here let's put training underscore model underscore a equals to the python operator and you have to specify two arguments at least the first one is the task id and the task id is the unique identifier of your task you have to make sure that each task in a given dag has a unique identifier this is super important so let's say training underscore model underscore a and then a python callable function so the python function that you want to call from that task so here we put python underscore collaborate equals to training on the score model then we have to create the python function so let's put def training on the score model then let's return a fake accuracy to do that we are going to use rend so let's put return rend int and 1 to 10. so a value between 1 and 10 and as we use rendent we have to import it right there so let's put from random import rendit and that's it at this point you have successfully created the first task training on oscar model a calling the python function training underscore model returning a fake accuracy as we have three tasks we can copy that one then paste another task here and the third task right there then put b c and same for the task ids b and c like that and you are done you have successfully implemented the first three tasks okay so what next well you want to choose the best accuracy and then if that best accuracy is above a threshold you want to execute one task or another how can you do that by using the branch python operator so let's import the branch python operator right there and let's create a new task under training model c choose underscore best underscore model equals to the branch python operator a task id let's say choose best model as well and a python callable function like with the python operator let's say underscore choose underscore best underscore model then once the branch python operator is implemented you can add the python function let's type def choose best model and here there is something specific that you have to do when you use the branch python operator indeed you have to return the task id of the next task that you want to execute let me show you this let's say the next two tasks to execute are accurate or inaccurate so let's create those tasks as well let's type from airflow dot operators dot bash import bash operator then under choose best model let's create two tasks accurate equals to the batch operator with a task id equals to accurate and a bash command let's say equal accurate like that you can do the same for inaccurate so copy the task paste it there and type in accrete then inaccrete and this time you want to print inaccurate on the standard output so with the bash operator you can execute bash commands and that's what you are doing here so the goal is to execute either accurate or inaccurate if the best accuracy is above or below a threshold so how can you do that well in the python function truth-best model let's say we get the accuracy and the condition is if the best accuracy is above eight then you want to execute accrete so in that case you return the task id accurate or if it's not you want to return inaccurate and so execute the next task having the task id inaccurate that's what you are doing here if the best accuracy is above 8 then the task i create will be executed next if not inaccrete will be executed next that's what you have to remember with the branch python operator but there is something missing here where do you get the value of best accuracy indeed training model tasks return accuracies so the question is how can you get the best accuracy among all of those accuracies well that's where you need to share some data between your tasks and more specifically you want to share the accuracies generated by training model tasks into the task choose best model how can you do that by using xcoms x-com stands for cross-communication message and i won't go into the details here but it is a mechanism to share data between tasks in your dag so here as we return the accuracy the fake accuracy from each training model task that value the accuracy is automatically pushed into the database of airflow once the value is stored in the database of airflow we can fetch it back from the other task of our dag and that's what we are going to do right now to fetch the accuracies from the database we have to use a task instance object ti then we put ti.xcom underscore pool in order to pull the accuracies from the database next you specify the task ids of the tasks having pushed the accuracies in that case a list as we have three different tasks training on oscar model a training underscore model b and training underscore model c like that then once you have the accuracies you can create a variable here accuracies equals and then select the best accuracy create the variable best underscore accuracy equals to max accuracies and that's it at this point you have successfully fetched all the accuracies and choose the best accuracy then if the best accuracy is above 8 you want to execute accurate otherwise you will execute inaccurate all right you have implemented all the tasks it's time to see what you get from the user interface of airflow so on airflow let's type airflow airflow to login and then as you can see you have the dag my underscore dac that you have created if you click on it and click on grab view you can see all of your tasks but something is missing here indeed you don't have the order in which your tasks are going to be executed you are not able to say that training model a b and c are going to be executed first then choose best model and last accurate or inaccurate so how can you fix this well that's what you are going to discover in the last step to define the order in which your tasks are going to be executed you have to specify the dependencies to specify the dependencies go at the bottom of your file and right there you can use the bit shift operators with the right bit shift operator you are defining the downstream task whereas with the left bit shift operator you are defining the upstream task in your case you want to execute training model tasks first as you have multiple tasks you can group them together with a list so let's put training model a then training model b and training model c once you have this list of tasks the next task that you want to execute is choose best model so the downstream task of training model a b and c is choose best model in that case you use the right bit shift operator and choose best model next you want to execute either a create or inaccurate so again you can group those tasks together with a list let's create a list accurate and inaccurate at there on the same level then as accurate in accrete are both don't stream tasks of truth-based model again you put the right bit shift operator and that's it that's how you can define dependencies in airflow you can group tasks that are on the same level together using lists so you put your tasks into a list and then you can define the dependencies between your tasks using the left bit shift operator or the right bitshift operator don't hesitate to play with those operators to better understand the dependencies let's go back to the user interface refresh the page and as you can see you obtain your dag with the dependencies defined congratulations you have successfully created your first data pipeline and not only you have created your first data pipeline but you also know how to share data between your tasks how to choose one task or another how tags are scheduled how can you add tasks in your dags and you know the most important concepts of airflow let's schedule the dag to see if it works so turn on the toggle in order to start scheduling your dag refresh the page and as you can see your tasks are getting triggered and now the diagram is completed everything works accrete has been executed oh and by the way you have created the training model tasks right there but you can do much better than that can you tell in the comment section below how can you avoid those tasks again and again that's it for today if you enjoyed the video please smash the like button that will help me a lot i wish you a wonderful day and see you for our next video
Info
Channel: Marc Lamberti
Views: 42,268
Rating: undefined out of 5
Keywords: airflow dag, airflow, apache airflow, dag, beginner, airflow for beginners, airflow beginner
Id: IH1-0hwFZRQ
Channel Id: undefined
Length: 20min 31sec (1231 seconds)
Published: Thu Mar 04 2021
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.