The Airflow BranchPythonOperator for Beginners in 10 mins

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments

I took your Airflow masterclass, absolutely incredible! Made setting up Airflow a breeze (which imo is the hardest part haha)

👍︎︎ 4 👤︎︎ u/Nervous-Chain-5301 📅︎︎ Jan 13 2021 🗫︎ replies

Any recommendation for a good airflow tutorial?

👍︎︎ 1 👤︎︎ u/mrcool444 📅︎︎ Jan 14 2021 🗫︎ replies
Captions
what's up you guys it's mark here if you want to master airflow you've come to the right place my name is mark romerty i'm the head of customer training at astronomer and i'm the best selling author on udemy about airflow i've trained over 20 000 students across the world about getting started with airflow creating complex data pipelines and more importantly making their life easier by automating the manual tasks during the next 10 minutes we are going to discover a very important feature in airflow that you will definitely use in your data pipelines which is the branch python operator if you are a total beginner that's perfectly fine you are going to learn everything you need about the branch python operator and if you already know the branch python operator do you think you know everything about it let's answer that question during the video but before getting started there are two steps that you absolutely have to follow in order to master the branch python operator well the first one is smashing the like button of that video and subscribing to the channel so that you will become the master best of the universe of airflow now you are ready let's dive into the world of the branch python operator to better explain why you might need to use the branch python operator let me give you a very simple use case let's imagine that you have the following data pipeline with three tasks and what you want to achieve here is to if the api is available you want to execute the task download and if the api is not available what you would like instead is to execute the task is api 2 available and if is api 2 is available then you will execute download so how can you do that in airflow how can you choose one specific path this one or another one in airflow how can you do that that's where the branch python operator will help you a lot okay at this point if you want to follow exactly what i'm going to show you on your computer you can do that just by clicking on the link in the description below you will land on this beautiful page where first you will have to install docker and then click on the repository right there in order to obtain the docker file that we are going to use to run airflow and once you have cloned the repository with the docker file you just need to copy the code right there then open your code editor inside the cloned repository which is airflow.docker and in the folder dags create a new file let's call it branching underscore.py and inside this file put the code like that save the file and we are ready to move on what does this dag well pretty nothing right now indeed we import the dac object then the dummy operator we define a start date to the first of january 2020 then we instantiate a dag object with branching for the dac id the schedule interval is set to daily default args capture parameter is set to false and we have two tasks accurate and inaccurate with the dummy operator so as you can see we are really doing nothing so let's verify what we obtain from the ui in your terminal type docker dash compose dash f docker compose.tml up hit enter and let's wait for airflow to run after a few seconds you can verify if airflow is running by opening a new terminal and typing docker ps if you obtain three containers as shown right there with the status up that means ui for instance is running and if you go to your web browser and open a new tab then localhost colon 8080 then type airflow airflow you should land on this beautiful page from there you can click on branching then grab view and we obtain the following data pipeline so as you can see we only have two tasks here accurate and inaccurate what we want to achieve is to choose either accurate or inaccurate based on a condition based on a value so how can we do that how can we execute accurate or inaccurate according to a value well in that case we need to implement a new task using the branch python operator back to your code editor the first step is to import the branch python operator to do this you just need to type from airflow dot operators dot python import branch python operator so now we have the branch python operator imported we can use it in our data plan right there let's create a new task and let's say choose underscore best underscore model equals to the branch python operator then the task id is choose underscore best underscore model as one and next we have to specify an argument which is a python callable function like the python operator so here we type python underscore callable and the function underscore choose underscore best on oscar model and that's it at this point we have added the branch python operator but still we have to implement the function on the score choose underscore best underscore model so let's do this right there just before the instantiation of the deck object type def underscore choose best model and now what should you put inside that function well you have to know that the branch python operator will choose either a task or another one according to the task id which is returned in that function for example let's say the accuracy is equal to 6 and we want to say that if accuracy is greater than 5 then we want to execute the task accurate how can we do that by returning the task id accurate so here we type return and accurate then if the accuracy is lower or equal to 5 we want to execute inaccurate in that case right there we type return inaccurate so to choose which task you want to execute next after the branch python operator you have to return the corresponding task id in the python collab function now we have created the python callable function to return either accurate or inaccurate we have to define the dependencies between the tasks of our data pipeline so here let's type choose underscore best model and then we want to execute either accurate or inaccurate like that save the file and let's go to the ui let's refresh the page and as you can see we obtain the following data pipeline we have choose best model corresponding to the branch python operator and then either accurate or inaccurate will be executed according to the task id returned let's trigger the data by plan to see what will happen turn on the toggle right there let's refresh the page and the task actuate has been successfully executed whereas inaccurate has been skipped you have to remember that all tasks that are not triggered by the branch python operator will be skipped as you can see with inaccurate and if you are wondering why accredit has been successfully triggered well if you go back to the code you can see that accuracy is equal to 6 and we say that if accuracy is greater than 5 we return accurate and so we execute the next task with the task id accurate so that's how the branch python operator works this is as simple as that but now there are more additional things that you absolutely have to be aware of each time you trigger a branch python operator two xcoms are created if you go to admin then x comes we obtain two x comps the first one corresponds to the next task that the branch python operator will execute and the second next come corresponds to the task id that we returned from the python collab function and the problem here is that xcoms are not automatically removed you have to remove them manually and the thing is if you have hundreds of branch python operators executed you will end up with many useless xcoms in your database so how can you modify this behavior well you can use one parameter in your branch python operator so let's go back to the code and right there just after python underscore collab add a third parameter which is do underscore xcom underscore push equals to false and by using that parameter you will avoid pushing the xcom coming from the python callball function so if you save the file this xcom won't be returned anymore and this is still better than pushing two x-coms each time a branch python operator is triggered and by the way if you don't know what x-coms are you can check the video somewhere on the screen and you will learn everything you need about them now there is something else that i would like to show you with the branch python operator and this is super super important if you go back to your code and add another task let's put storing equals to the dummy operator with the task id equals to storing as well and then we say that this task should be executed right after accurate or inaccurate so in both cases we want to execute the task story so let's put that save the file and go back to the ui click on dags then branching grab view and as you can see we obtained the following data pipeline now what do you think will happen with the task storing well let's trigger the data pipeline trigger query view and as you can see storing has been skipped why because by default a task will be triggered only if all parents have succeeded which is not the case here as inaccurate has been skipped stirring the direct downstream task which is storing is skipped as well so how can you fix this well this is where you need to change the trigger rule of the task storing if you go back to your code in storing there is another parameter that you can use which is trigger underscore rule and this parameter is set to all underscore success by default which means if all parents succeeded then the task can be triggered in that case we want to modify that behavior with none underscore failed underscore or underscore skipped which means if at least one parent of that task succeeded then storing can be executed save the file let's refresh the page trigger it again click on trigger then grab view and as you can see this time stirring has been successfully triggered finally one very common question which is can we execute multiple tasks with the branch python operator yes you can do it if you go back to the code right there in the python callable function instead of returning only one task id you can return multiple task ids with a list and then you specify the task ids that you want to trigger like that that's it about the branch python operator now you are able to choose one task or another in your data pipelines and more importantly you know how to use the branch python operator properly with you know the additional subtleties that we have seen related to xcoms and the skipped tasks if you enjoyed that video please smash the like button this is super helpful for me i wish you a wonderful day and see you for our next video
Info
Channel: Marc Lamberti
Views: 4,991
Rating: undefined out of 5
Keywords: airflow, branch, branching, apache airflow, tutorial, airflow 2.0, branchpythonoperator, pythonoperator, operator, marc, marc lamberti, beginner
Id: aRfuZ4DVqUU
Channel Id: undefined
Length: 12min 27sec (747 seconds)
Published: Wed Jan 13 2021
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.