Getting started with XComs in Apache Airflow

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments

Thank you for sharing, Marc!

๐Ÿ‘๏ธŽ︎ 1 ๐Ÿ‘ค๏ธŽ︎ u/usculler ๐Ÿ“…๏ธŽ︎ Oct 10 2020 ๐Ÿ—ซ︎ replies

(not going to lie - I had to do a double take as I expected UFO :) )

That's a very interesting though - thank you.

๐Ÿ‘๏ธŽ︎ 1 ๐Ÿ‘ค๏ธŽ︎ u/HashiLebwohl ๐Ÿ“…๏ธŽ︎ Oct 11 2020 ๐Ÿ—ซ︎ replies

Happy to know you enjoy it :)

๐Ÿ‘๏ธŽ︎ 1 ๐Ÿ‘ค๏ธŽ︎ u/marclamberti ๐Ÿ“…๏ธŽ︎ Oct 16 2020 ๐Ÿ—ซ︎ replies
Captions
[Music] hi my name is mark hawati i'm the head of customer training at astronomer and i'm super excited to be with you today in this video we are going to discover what are xcoms and why you might need to use them in your data pipelines indeed one of the first questions you will have is how can i share data between tasks in a data pipeline well that's exactly what x comes allow you to do so we are going to discover what is the next come specifically how can we create an xcom and how can we fetch that xcom from another task we are going to discover different ways of pushing and pulling xcoms in apache flow and more importantly we are going to see the limitations of x comes and why sometimes you can end up with troubles if you don't know what you are doing with them so without further waiting let's get started in order to explain the x-coms we are going to start with a very simple use case let's imagine that you have the following data pipeline with only two tasks in the first task training machine learning models you train machine learning models and for each machine learning model we have an identifier this identifier allows us to know which machine learning model produces which output then in storing machine learning model results what we want is to store the results of the machine learning models along with their identifiers the problem is the identifiers are automatically generated in the first task so how can we share those identifiers in the second task storing machine learning model results well there are two ways to do that the first way is to use an external tool you can think of a database or aws f3 for example and so in the first task you will store the identifiers in that external tool in that database for example and then in the second task you will fetch the values from that external tool this way is truly useful if you have gigabytes or terabytes of data for example you might have a task fetching data from an api and then this data you want to process it with spark what you will do is to put those data if you have gigabytes of data in an external tool and then in the second task to process your data with spark you will launch a spark job that will fetch the data from the database by doing this you avoid getting memory overflow errors because you are trying to share too much data in your airflow instance now what about if you have a small amount of data well in that case you can use xcoms xcom allows you to share data between your tasks only if you have a small amount of data and we are going to see what does that mean but basically what you will do is to create a nexcom from the task training machine learning models by pushing it so you will push the next com in the database in the metastar of airflow and then from the second task storing machine learning model results in our case you are going to pull that xcom using a special function that we are going to see in a minute and that's how it works in airflow one thing to remember here is that we are using the metastar of airflow whereas in the first way we were using an external tool like a database just to sum up xcom stands for cross communication and xcoms allow you to share data between your tasks in your data pipelines now we have seen what are xcoms let's discover how to use them in practice at this point the video is going to be really hands-on focused and i strongly advise you to follow exactly what i'm going to do on your computer also all the materials that i'm going to use are available in the description below now in order to set up and run airflow on my computer i'm gonna use the astromer cli if you don't know what is the astronomer cli it is an open source command line interface developed by astronomer in order to set up and run airflow in the easiest possible way on your computer feel free to check the documentation in order to know how to install it on your operating system it is pretty easy to do it all right once you have installed the astronomer cli we can move on in your terminal create a folder astro for example let's do this astro and go into that folder this folder will contain all the files and folders automatically generated by the astronomer cli in order to set up and run airflow to do this type astro dev init with that command we initialize the development environment and now if you type ls as you can see we have some files and folders corresponding to airflow next open the folder with your code editor from there open the folder tags and create a new file xcom underscore dag dot py then from the materials under the section stantic code copy the following code corresponding to the data pipeline and paste it in the file this data pipeline is pretty simple indeed we only have two tasks training underscore models and storing underscore models both using the python operator in order to execute a python function for training underscore models we execute the function underscore training underscore models that's what you can see here and for storing underscore models we execute the python function underscore storing monoscope models as shown right there before diving into the x let's update the airflow version that we use with the astronomer cli to do this go back to the materials and copy the following line right there then back to your code editor open the file docker file corresponding to the docker image of airflow that we are going to use and paste it like that save the file then in the terminal type astro dstart in order to setup and run airflow on your computer using docker enter as you can see all the components of airflow the metastore the scheduler and the web server are running now if you go to your web browser and open a new tab then go to localhost colon 8080. you should land on the user interface as shown right there if you click on xcom underscore dag then graph view as you can see we obtained the data pipeline now what we want to do is to share the machine learning model identifiers generated by turning underscore models with the task storing underscore models let's discover how to do that with xcoms back to your code editor each time you want to interact with xcoms in airflow you need to access a task instance object and a task instance object corresponds to a task for example training underscore models executed in a diagram in order to access the task instance object with the python operator you need to provide an argument which is provide underscore context equals to true with that argument you will be able to access the context of the current diagram in the python function training underscore models so in the function here type star store context and then you are able to access the context of the diagram i strongly advise you to print that context in order to know all the attributes that you can access from it next we define some machine learning model identifiers ml underscore ids equals to a list ml underscore 1 ml underscore 2 and ml underscore 3. so we want to share this list those identifiers with the task storing underscore models how to do this we create an xcom here type context ti in order to access the task instance object corresponding to the running task training on oscar model dot x-com underscore push to push an x-com x-commando push expects two arguments which are key and value for the key we specify ml underscore ids and for the value we specify the ml ids the key here is extremely important because it will be used in order to pull that xcom from the other task save the file and let's verify if that works to do this in your terminal type astro dps and then docker exec it the scheduler slash bin slash bash in order to open a base station in the container scheduler from there we can execute airflow commands and the command we are going to use is airflow test in order to test a given task so here you type airflow test xcom underscore dag the task that you want to test which is training underscore models and an execution date in the past let's put 2020-01-01 enter let's wait a little bit and as you can see the task succeeded next if you open the airflow ui and go to admin x comes as you can see an xcom has been created as expected with the key ml underscore ids this is the key that we have specified and the value corresponding to the list of machine learning model identifiers in addition to the key and the value we have the timestamp to know when it was created we have the execution date corresponding to the execution date of the diagon when that xcom has been generated and we have the dag id and the task id so that we know that this xcom has been pushed from the dag xcom underscore dag in the task training underscore models one thing you have to remember is that by default airflow looks for xcom that are created in the current execution date and diagram that means if you have multiple diagrams pushing and pulling xcoms you will never end up with x-com pushed from a diagram and pulled from another diagram by default all xcoms are isolated with their current diagram by the way if you create a new xcom with the same execution date and key as you will see that xcom will be replaced by the newest xcom if you go back to your terminal and execute the task training underscore models again as you can see back to the user interface if you refresh the page pay attention to the timestamp you end up with a new xcom as we have kept the key and the execution date all right now we have pushed an xcom from the task training underscore models let's discover how to pull that xcom from the task storing underscore models back to your code editor in the task storing underscore models we need to provide the context as well so provide underscore context equals to true and then for the python function string underscore models we put the context as we need to access the task instance object to execute the function xcom underscore pull so type context square brackets ti dot xcom underscore pool in order to fetch the xcom that we have created in the function training underscore model right there then in that function we have to pass the task id where the xcom has been generated so task ids equals to training underscore models and that task id that value here corresponds to the task id that we have defined for the task training underscore models and the key of the xcom which is ml underscore ids like that let's create a new variable ml underscore ids and print that variable in the output ml underscore ids save the file and let's verify if it works in your terminal execute the same command that we used for training on the score models but this time with storing underscore models like that and make sure that you are using the same execution date as airflow will use that execution date to know the diagram where to fetch the xcom enter and as you can see right there we obtained the list of machine learning model identifiers as expected so at this point we're able to share data between two tasks using xcoms where in the first task we use xcommandoscope push in order to push the xcom in the metastar of airflow and then in the next task storing on the score models we pull that xcom from the metastar of airflow using the key ml underscore ids and the task id training underscore models one thing that you have to know is that in x-com underscore pool you can add two more arguments the first one is diagonal score id and this one allows you to pull an x-com coming from another dag i strongly do not recommend to do that as you will create some dependencies deeply hidden between different dags as you won't be able to see those dependencies in the user interface then another argument is include underscore prior underscore dates and this argument allows you to pull xcoms coming from past diagrams again you have to be careful with that because you will create dependencies between diagrams but let's discover how that argument works if we set the argument include underscore prior underscore dates to true we should be able to pull all x comps that have been generated for a given dag across all diagrams let's verify this save the file then in the terminal let's create a new xcom this time with another execution date let's put two here enter and if you go back to the airflow user interface and refresh the page as you can see we have a new xcom with a new execution date as we use include underscore prior underscore dates in the task storing underscore models we should be able to pull those xcoms from it go back to your code editor and in the terminal execute the task storing underscore models but this time with the execution date the second of january 2020 as we want to get the xcom corresponding to that execution date but also the previous ones the one that we have created for the 1st of january 2020 as shown right there so back to the code execute the command and as you can see we get only one xcom as shown right there why because by default x commander scorpool returns the latest xcom for a given key task id and dag id so here we are pulling the xcom corresponding to the execution date the second of january 2020. so the question is how can we pull multiple xcoms at the same time let's discover this right now in order to pull multiple xcoms at the same time we are going to use a function get underscore many since get underscore many is not available within the task instance object we need to import the class xcom let's do this right now at the top of the file type from airflow dot models import xcom then in the function storing underscore models let's create the same variable ml underscore ids equals to xcom dot get underscore mini and then we need to specify some arguments the first argument is the execution date like that corresponding to the execution date of the current diagram to do this use context with the key execution underscore date next we need to give the dag id where the xcoms will be pulled from so type dac ids equals to context again dag dot dag id like that and finally we need to pass the argument include underscore prior underscore dates as well equals to true let's print mlids print ml underscore ids save the file and in the terminal execute the command airflow test x command score dag storing underscore models with the execution date defense to the 2nd of january 2020. enter and if we wait a little bit as you can see right there we obtain a list of xcoms as expected if you take a look at the execution dates we have the 2nd of january 2020 and we have the 1st of january 2020. notice that it wasn't the case with xcommander scorpool and include underscore prior underscore dates as we got only that list corresponding to the latest x comes with the execution date the 2nd of january 2020. all right now we have discovered how to pull multiple xcoms at the same time let's discover how to pull xcoms from different tasks you might have noticed that x commander scorpool has an argument task underscore ids with this little s right there this indicates that you can specify multiple task instances and so you can pull multiple xcoms coming from different task instances as long as those xcoms have the same key for example let's imagine that you have the following data pipeline with three tasks that are going to be executed in parallel each task pushes an xcom with the key ml as shown here and so one thing you might want to do is to in the next task this one execute the function xcom underscore pool with the key ml in order to get all x-coms that have been generated by all the previous tasks those one so let's discover how to do this back to the data pipeline remove that part get underscore many and include underscore prior underscore dates from x commanders couple then in the materials copy that task processing underscore models which uses a python operator paste it there like that and we need to import processing underscore models as well so back to the materials copy the python function underscore processing underscore models and paste it here so this function will be executed by the task processing underscore models right there as we do with the other tasks training all oscar models and storing underscore models now if you go back to the python function here as you can see we create a new xcom with xcom underscore push with the same key mn underscore ids but this time with the following value ml underscore 1 underscore processed and ml underscore 2 underscore processed now in order to fetch both xcoms from processing underscore modules and turning on the score in storing underscore models and more specifically in x-commander scorpool here for the task ids we put a list of task ids with training on low-score models and processing underscore models like that save the file then in the terminal execute the command processing underscore models in order to create the new xcom with the same execution date the second of january 2020 let's wait a little bit perfect if you go back to the user interface of airflow and refresh the xcoms view as you can see we have a new xcom corresponding to the task processing underscore models back to the code if you execute airflow test storing underscore models and wait a little bit as you can see this time we pulled two xcoms that have been generated by training underscore models and processing underscore models that's how you can pull multiple xcoms from different tasks alright so far we have learned how to push a next com using the function x command score push one thing i would like to show you is how to do it with the return keyword let's discover this right now one thing you have to know is that instead of using xcom underscore push you can create a next com with the return keyword to do this pretty simple you just need to remove that line for example and here type return then the value that you want to push in an xcom ml underscore processed underscore ids in our case then in storing underscore models in order to fetch that xcom you just need to pass the task id where the xcom is created and you can remove the key why because by default this xcom that will be created automatically with return will have the key written underscore value and so in x commander scorpool if you don't specify a key by default airflow will look for an x-com with the key return underscore value let's verify this right now save the file then here execute the task processing underscore models let's wait a little bit perfect now if you refresh the xcoms view as you can see right there the xcom has the key return on underscore value back to the terminal if we execute the command storing underscore models as you can see right there we obtain the xcom that has been automatically pushed for us with the return keyword as expected keep in mind that in general anything returned in an operator will be pushed as an xcom in the meta store of airflow alright so we have seen another way to push an x-com now let's discover another way to pull an x-com using templating i won't explain what templating is but you have to know that you can pull xcoms in templates to show you this we are going to modify the tasks storing underscore models in the python operator you can remove provide underscore context and specify a new argument op underscore keywords equals to a dictionary if you don't know what op underscore keywords is well this argument allows you to pass parameters that you will be able to use in the python function so in our case anything that we put in op underscore keywords will be available in the python function storing underscore models also keep in mind that this argument is templated and that means we are able to inject data at runtime by using that argument let's create a key ml underscore ids equals to two pairs of square brackets to indicate that we want to inject data at runtime at this specific location then ti for task instance xcom underscore pool then we specify the task id equals to processing underscore model like that next in the function storing underscore models we can remove the xcom pool as we do it in a template and we remove context to put op underscore keywords like that then we print op underscore keywords and more specifically the key ml underscore ids save the file and we are ready to check if everything works before doing this actually i just made a typo right there so you have to put a s for processing on the score models then save the file and in the terminal execute the command air flow test with storing underscore models enter then if we wait a little bit as you can see right there we obtain the x-com that has been pushed by the task processing on oscar models by using x-com underscore pull from the task storing underscore models in a template as expected all right now you have discovered how to put an xcom in a template i would like to show you how to push an xcom with another operator which is the bash operator let's discover this right now some operators either push x comes automatically by returning a value like the f3 list operator or you have to enable an argument x commander push in order to push an xcom and that's exactly the case with the bash operator since this operator is what you used i would like to show you how to do it first we need to import the bash operator so here type from airflow.operators dot bash underscore operator import bash operator then after storing underscore models create a new task deploying underscore models equals to the batch operator we specify a task id equals to deploying underscore models then we have to define the bash command that we want to execute bash underscore command equals to eco deployment done for diagram then two pairs of square brackets ds corresponding to the execution date one common thing we want to do with the bash operator is to get the output of the executed bash command and to do this you can pass the argument xcom underscore push equals to true so that the output of that command will be pushed as an xcom and you will be able to pull the xcom from another task if you want so save the file then in the terminal execute the command airflow test with deploying models enter let's wait a little bit perfect if you go back to the user interface of airflow and refresh the view as you can see right there we obtain a new xcom with the key return underscore value and the value corresponding to the command that we have executed in the bash operator remember that with the bash operator you won't be able to specify the key and so that's why each time you push com that xcom will have the key return underscore value finally some operators do not return any xcom at all and so if you want to modify this behavior you will have to overwrite the given operator alright that's it for the hands-on part i hope you enjoyed it now let's discover the limitations of xcoms so far in the video we have discovered how powerful xcoms are as we are able to share data between tasks but you absolutely need to know the limitations of xcoms otherwise you will end up with many troubles and the first limitation that you absolutely need to be aware of is the x-coms are limited in size indeed depending on the database you use either sqlite postgres or mysql you won't be able to store the same amount of data in your xcom with sqlite the value is limited to 2 gigabytes with postgres 1 gigabyte and with mysql 64 kilobytes so you absolutely need to be careful with the value that you put in your xcom otherwise you will end up with a memory overflow error next the problem with xcoms is that it is easier to have the tendency of taking airflow as a data processing framework which is not again airflow is an orchestrator if you want to process gigabytes or terabytes of data you should use spark with all the optimizations that it brings okay don't try to use airflow in order to process gigabytes or terabytes of data you will end up with memory overflow errors and that's not the job of airflow finally and this one is truly important xcoms create implicit dependencies in your tasks that you are not able to see from the user interface of airflow indeed as soon as a task needs a next com coming from another task you create a dependency between those two tasks and so if for any reason the first task is not able to push the xcom the second task will end up in failure so you absolutely need to be careful with that you create hidden dependencies as soon as you start using xcoms in your tasks so that's it about xcoms i really hope you enjoyed the video i really hope you have learned a lot about xcoms obviously there are other features with xcom that we have not seen in this video but if you want to learn more about xcoms well feel free to reach me i will be glad to help you have a good day take care and see you in the next video [Music]
Info
Channel: Astronomer
Views: 2,275
Rating: undefined out of 5
Keywords:
Id: zw9Vy42IwG0
Channel Id: undefined
Length: 28min 53sec (1733 seconds)
Published: Fri Oct 09 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.