Airflow XCOMs and BranchPythonOperators explained

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hey there everybody my name is sarv and I create free content on data engineering and in this video we will learn about the branch python operator in airflow there may be scenarios in which you would want to run only some airflow tasks in a dag and not all of the tasks and in order to determine which task to skip during a dag run we will use xcomps to let the task communicate with each other a dag like that can easily be built using a combo of Branch python operator and XCOM where xcomps can be used to let the tasks communicate with each other by sending messages and the branch python operator can be used to determine which task to run and which ones to skip now let's dive in and figure out how to exactly Implement that [Music] so let set the scene by sharing my screen so let's imagine a scenario so let's assume that there's this dag that automates a social media workflow so what do I mean by that so let's assume there's this social media influencer or company who wants to figure out if they should run an ADD campaign or not based on the number of weekly views on their YouTube channel or Instagram page so the first task here is a get YouTube data task and Then followed by a pre-process task so get YouTube data gets the data and then pre-processing is done and then the third task is make weekly views prediction now that that task could be a machine learning code that predicts the number of views on the channel or page for the next week and as you can see the task after that is a branch python operator and it's connected to two Downstream tasks depending on the logic of the branch python operator only one only one out of the two Downstream tasks can run so let's say for example if there's a vuse threshold and if the threshold is 3500 views in the branch python operator task so that means if predicted views is less than 3500 then the Run ad campaign needs to run otherwise the end task would run okay so now let's check out how to implement that in airflow code so I'll go back to pyam and here is the dag and yeah like I was saying the very first two tasks are dummy operators right and then the next task is the make weekly views prediction task which is calling a python function called predict views so let's check out the function definition of predict views so as you can see it's returning a dictionary what's that dictionary so each element has a key whose keys are video IDs and the values are the weekly views and this is where XCOM comes into the picture right so the return value gets stored or gets pushed to XCOM now we will very soon see how we can pull that XCOM data when we look at the next task now a bit about XCOM so I want to briefly touch on what exactly happens when something is pushed to XCOM well XCOM is literally a postgress table so airflow has a metadatabase where it stores all the metadata that it needs that is right so let me show you the ER diagram from the airflow official documentation this is the ER diagram for the metad database which is a post database and if I keep scrolling down as you can see there's so many tables here but there should be an XCOM table there it is so when our function returns that dictionary that dictionary gets stored in this particular table the XCOM table okay moving on back to the code so the next task is the branch python operator which is calling another python function and it's called Branch function so if we go back into the definition as you can see so the X Comm lives in a context dictionary called karks and we are pulling the XCOM data in line number 23 and what XCOM data are we pulling we are pulling the XCOM data of the task make weekly views prediction so we are passing a dictionary and we are calculating the average views from the dictionary and that's where the threshold that I was talking about if it's less than 3500 it will return run add campaign string it's a string but it's actually the ID of the next task that it wants to run and else it'll run the stop task task okay so if it's a bit overwhelming let me try to summarize so every task in a dag can pull the XCOM data for any other task in the same dag now in our case here's what the branch python operator is doing so it's pulling data from the XCOM of the predict views task and then it does some processing on it like we saw it's calculating the average and then it Returns the name or the ID of the next task that it wants to run so now let's run the dag okay so now it's running cool so what happened here so the end task was skipped and it ran the Run ad campaign so that was the data driven decision made by the branch python operator so now let's look at why did it do that yep so the average was 3327 so which was less than the threshold of 3500 so that's why the Run ad campaign task was run simple and that is it for this video Everybody if you have any comments or questions please feel to let me know in the comment section or you can also message me on Instagram otherwise click here to watch the next video where I talk about how to handle errors and set up failure alerts in airflow until next time peace [Music] [Music]
Info
Channel: TechTalkSourav
Views: 188
Rating: undefined out of 5
Keywords:
Id: Sm7P7QOeO24
Channel Id: undefined
Length: 5min 51sec (351 seconds)
Published: Mon May 20 2024
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.