Focus On: The Apache Airflow PythonOperator, all you need in 20 mins!

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments

Very charismatic presenter and the tutorial overall was well timed and to the point! I enjoyed the gradual improvement approach, it feels like it’s a great way to also teach how software is becoming simpler to use (but more abstract) over time. Thus always good to start with a bottom-top approach. Thanks for posting!

EDIT: Happy cakeday! 🍰

πŸ‘οΈŽ︎ 1 πŸ‘€οΈŽ︎ u/faaaaaart πŸ“…οΈŽ︎ Apr 17 2021 πŸ—«︎ replies
Captions
how to pass agreements to my python function with the python operator how to get the execution date from my python operator how can i share data with the python operator well guess what all of those questions you are going to find the answers in that video welcome my name is mark lavatti i'm the head of customer training at astronomer my goal is to make sure that you truly understand airflow and get the most out of it so in the next 20 minutes you are going to discover everything you need about the most widely used operator the python operator so without further ado let's get started as i strongly believe in learning by doing you can do exactly what i'm going to show you on your computer by following the link in the description below and i strongly advise you to do that because it is the best way to learn so that being said what is exactly the python operator so the python operator is the operator allowing you to execute a python callable what is a python credible it is anything that has the callable method in it for example a python function you want to execute a python function you can do it with the python operator you want to execute a python script you can do it with the python operator that's it about the python operator keep in mind with the python operator you can execute a python column now let's see it in action so first you have to create your dag so in the folder dax right there create a new file and let's call it my underscore dag dot py then type from airflow import dag instantiate a dag object with tag attack id my underscore dag a start date start underscore date equals to a datetime object let's put 2021 1-1 a scheduling interval equals to daily like that and the catcher parameter equals to false as dag okay you need to import the time object as well so type from date time import date time perfect so at this point you have your data pipeline ready now let's say you want to execute a python function how can you do that by using the python operator first you need to import the python operator and as it is built in in fo2.0 you don't have to install any provider for it so right there you type from airflow.operators dot python import python operator now you have the python operator you can use it right below under the dag object type task underscore a equals to the python operator the task id let's say task underscore a as well and a python collaborate so right there you type python unless or collaborate and you have to pass a collab a python collab in that case you can create a function so let's create a function just above the dac object def underscore process and let's put let's say print process done like that copy underscore process and paste it there save the file and you are done with that python operator you are going to execute the function the python function underscore process and so print process done on the standard output let's verify this in your terminal make sure that you are in the folder demo and type the command docker compose up d in order to start your air for instance hit enter and let's wait a little bit perfect airflow is up and running if you go to your rep brother and type localhost colon 8080 then airflow airflow as you can see you land on the beautiful user interface of airflow from there you can click on my dag then graph view turn on the toggle to schedule your dog refresh the page as you can see the task a has been completed click on it click on log and if you carefully take a look at the logs you can see the message process done so that's how you can execute a python function it is as simple as that now another thing you might want is to simulate a failure in your task on purpose why let's say you would like to test how the next task behaves according to if that task task underscore a fails well you can do that by just adding a very simple instruction in your python function so if you go back to your code and right there type exit 1 or actually any other number than 0 then save the file and go back to the user interface then click on graph view click on the task and click on clear in order to restart the task click on ok refresh the page and as you can see this time the task ends up in failure so that's how you can modify the status of your python operator if you want again this is really useful for testing and debugging purposes next what if you want to pass some arguments to your python function for example a file name or a path how can you do that let's discover how to do it go back to your code editor and you have two ways of passing arguments to your python function to underscore process the first way is by using op underscore arcs op underscore args expects a list of arguments and this list of arguments will be positional arguments let me show you this right there let's put a file name so data dot txt and then a path slash user slash local slash airflow then to get those arguments in your python function you just need to pass them in parameter so right there you can type file name and path then remove exit 1 and print process file name and path or actually the other way around so path first and find name like that with a little f and let's see if it works so save the file and in your terminal type docker ps then copy the container id of air flow let's set the scheduler type docker exec dash i.t the container id of airflow slash bin slash bash hit enter now you are inside the docker container of airflow you can execute the command airflow tasks test my underscore tag the tag id the task id of the task that you want to check task underscore a and an execution date in the past let's say the 1st of january 2021 1-01 that command is a best practice i strongly advise you to always test your task first before triggering your dag it is really your best practice so hit enter and let's see if it works and as you can see from the output right there you obtain process the path and the file name then done so that's how you can pass positional arguments you have to be careful with that because if you change like let's say path here and file name here then test again as you can see this time the path is equal to the file name and the file name is equal to the path so that's why you have to be careful with op underscore x but that's why also there is another way to pass arguments which is op underscore k works with op underscore keywords you are going to pass keywords arguments so you don't care about the order in which you pass your arguments anymore that's why it is really useful let me show you this here instead of having oop underscore args let's put keyworks then create a dictionary this time and let's say for the data.txt the key is filename and for the path obviously the key is path put path like that okay so you have this dictionary and try the task again as you can see this time you obtain the path first and the file the keys you specified in the dictionary are automatically matched with the name of the parameters you have right there so that's why it works whatever the order you give for your op keyworks argument which wasn't the case with op underscore x so in my opinion i strongly advise you to stick with op underscore k works because it is cleaner and also you don't have to take a look at your python function to understand what's going on you can just put a meaningful key and then just by looking at the python operator you know what each argument is so that's why astronomy allows you to use op underscore keywords now there is something else that you have to know about ope underscore args and op underscore keywords those two arguments are actually completed arguments which means you can pass dynamic values into those arguments let's say the file name or the path can change over time so instead of pushing a new data pipeline or changing the code of your data pipeline each time the file name or the path change you can leverage variables that's what you are going to do right now so your goal now is to pass variables instead of hard coding values for the file name and the path so let's do this go to the user interface of airflow and click on admin variables right there add a new record and let's create a new key file name the value airflow.txt like that click on save create another variable let's call it path and let's say slash opt slash airflow this time click on save and now you have your two variables file name and path your goal is to get them and path them as arguments of your python functions so go to your code and right there instead of having data.txt you can use the template enzyme of airflow to pass dynamically the variable so here a pair of curly brackets like that you put var dot value dot file name and then same for the path right there a pair of curly brackets var dot value dot path like that save the file run the task again and as you can see you obtain for the path slash rpt airflow and the file name is airflow.txt so at this point you are able to pass the variables dynamically into your python function and the good thing is if the path or the filename change well you just have to change the variable values and not your code that being said you can do better indeed instead of having filename and path you could create only one variable and path directly the variable for op underscore k works let me show you this go back to the user interface of airflow and remove those variables click on actions and click on delete ok add a new record and let's call it my underscore dag underscore let's say settings here create a json value with the key find name and the value airflow.txt then a path with the value slash opt airflow slash like that click on save so you have your variable with this time adjs and value and notice that the variable is prefixed with my underscore dag and this is the best practice i strongly advise you to always prefix your variables according to where they are used with the tag id this will save you a lot of time when you will have hundreds of different variables go back to your code and here instead of having this dictionary you can just remove it then type variable dot get the variable key my underscore tag underscore settings and the parameter desalis underscore json equals to true obviously as you use the variable object you need to import it so right there type from airflow dot models import variable and by doing this you pass the argument's path and file name from only one variable which is in fact a design value that means you are going to make only one request to the database and so it is better optimized also all the values that are shared the same logic the same meaning they are put into a single variable so this is much cleaner as well let's verify if it works save the file and in your terminal run the task again hit enter and as you can see again you obtain slash rpt slash airflow and the file name airflow.txt but this time it is much better okay here is something that will save you a lot of time and will make your dag much cleaner instead of putting your python function into the dark file you should always try to put your python functions out of the dac file why because you will end up with many many python functions and so the clearer is your dac file the better how to do that it's really simple just go into the folder includes and create another folder let's say my underscore tag then a file let's call it functions dot py and again this file is in the folder my underscore dac right there you just have to go back to your dag copy that python function like that and in functions.py paste the function remove the underscore save the file go back to your dag make the import type from includes dot my underscore tag that functions import process remove the underscore from the python column save the file run the task again and as you can see you obtain the exact same output but this time your dac file is much cleaner so as the best practice always put the python functions out of your tag file and now you are at the last step of the video but this step is actually one of the most important one indeed what if you want to share data with the python operator in that case you will need to call the method x commander push but how can you do that in airflow 2.0 or how can you get the execution date of your current diagram if you want to get some data corresponding to the current execution date of your dag how can you do that indeed back in the day with fl 1.10.x you had this parameter provide underscore context like that that you set to true in order to get the context and so access all the information you need to either share data between your tasks or get the current execution date but now with effort 2.0 you don't have this parameter anymore so how can you do that well it's pretty simple actually instead of having to set that parameter you can go to your function and right there you only have to specify the key from the context dictionary corresponding to the information you want for example you want to access the task instance object to share data between your tasks you can type ti and then in your python function you can call ti.xcom underscore push like that you don't have to do context and square brackets then ti like before but obviously you have to know the keys of the context dictionary so how can you know those keys well it's pretty simple again you can remove this and here instead of having ti you can just unpack all the parameters that are given by default to your python function corresponding to the context dictionary so here if you type two stars and context then print the context like that then save the file and in your terminal execute the task again as you can see right there you obtain all the information all the parameters that are given by default to your python function so as you can see you have the key conf the key dag if you want to access the dag object you can also access the diagram ds corresponding to the execution date execution date but this time with the daytime object so i strongly advise you to take a look at this context dictionary it is really important to know what you have in it and what you can do with all of those parameters all of those values another example you want to access your data according to the current execution date well you can remove context here and put ds then here let's type for the execution date ds let's remove that instruction save the file retrade the task and as you can see from the output you get the current execution date the 1st of january 2021. it is as simple as that and that's how you can access the context values in airflow 2.0 with the python operator alright i would like to give you a quick bonus for you do you know the task flow api do you know the new way of creating your dags in airflow 2.0 if you don't i strongly advise you to take a look at the link in the description below you will see the taskfree api and why it is so important for you to know it it is just mind-blowing but let me give you a quick taste of it right there import a special decorator so type from airflow.decorators import task then put the decorator like that just above process remove those parameters and type params for example and let's print params then save the file so this is your python function now if you go back to your dag instead of calling the python operator with the task id and so on copy this the variable remove the python operator call process then path the variable in parameter save the file and that's it you are doing the exact same thing as before but with less code which is much cleaner than before and that's why the taskbar api is so powerful and that's why i truly believe that it will be the new way of creating your dags pretty soon so if you save the file right now and you can actually remove the python operator here you don't have to import it anymore it is automatically done for you by airflow so if you save the file and in your terminal execute the task again as you can see you end up with an error why because task underscore a is not found indeed you don't specify the task id anymore but how can you do it with the task api it is pretty simple you just have to add a parameter to the task decorator task underscore id equals to task underscore a like that save the file try again and as you can see it works and you get the file name and the path so that was the task for api i strongly advise you to take a look at it it is just amazing it is just amazing you will create your dags much faster than before that's it about the python operator now you know everything you need in order to fully use it in your dax you have discovered how to execute a python function which was pretty obvious then you learned how to path arguments and keywords arguments to your python function you have discovered how to inject data at runtime so to fetch a variable from the metadatabase of airfluent pass that variable as an argument of your python function you have discovered how to call a python function out of your dac file which is much cleaner then last but not least you discovered how to get the context dictionary in order to share data between your tasks or get the current execution date and how to use the taskbar api in order to make much cleaner code in your tag so i hope you enjoyed that video if you do please subscribe to the channel like smash the like button smash the like button and i see you for another video take care
Info
Channel: Astronomer
Views: 5,465
Rating: undefined out of 5
Keywords:
Id: FtBpD-FX1HM
Channel Id: undefined
Length: 23min 43sec (1423 seconds)
Published: Fri Apr 16 2021
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.