How to easily test your Airflow DAGs with the new dag.test() function

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
all right thanks everybody for joining us today um if you're new to this event Series this is our live with astronomers that we do every other week uh these are our shorter form Live Events so we typically keep the content to 15 to 20 minutes um very much developer focused so less of the kind of background information that we would go through in a full-length webinar and uh much more practical which today is going to certainly fit into that category I'm super excited about the topic today before I uh let us get into that I do have a couple of just Community updates I'd like to go through if there are any folks in New York City out there tonight there is an ml Ops Meetup um or I should say mlops with airflow Meetup happening uh there will be lots of pizza and beer and some great speakers uh so if you're in the data science world and in New York City I'm definitely consider going to that um there's also uh happening next week uh most of the week I think there is the python Web Conference which is fully virtual not airflow specific but there will be a talk um on writing airflow dags and introducing the Astra SDK which we have talked about in previous events here um tomorrow is actually co-presenting that um with Daniel imberman who's part of the airflow PMC and helped develop the Astra sck so that's going to be a super exciting event I'll throw a link to the registration in the chat again that's fully virtual so you can join from anywhere and I was there last year and there were lots of other great talks as well so definitely consider that uh the last thing of all pro author is uh airflow Summit has just been announced this is happening in September of this year in Toronto uh the cfp call for proposals is open so if you are interested in speaking uh and would like to chat with us about submitting a proposal astronomers one of the sponsors of the event we would definitely love to hear from you uh or if you have any questions about uh airflow Summit you can go ahead and reach out directly to me I will turn my email in the chat as well um and with that I think that's all I have for Community updates again a few Logistics uh just for today um this event is going to be recorded as we said earlier you will receive a recording uh probably tomorrow uh in your email so no worries if uh you want to re-watch hopefully well there's great content today um feel free to ask any questions in the chat or in the Q a feature in Zoom uh I will answer them as we're going through if I'm able and if not we'll save those for the end where for Q a with Tamara and all right I think that's everything that I had to go through so super excited to throw it on over to tomorrow um he's a developer advocate here at astronomers she's going to be covering uh the new dag.test function and how you can use it to have easier development of your dags topic that we all love to learn more about so with that uh tomorrow I'll hand it over to you thanks so much Kenton and welcome again to everyone who joined in the meantime I'm going to talk about one specific function who that is actually very powerful called the dac.test function and at least for me um I got the more I learned about this function the more I excited I got about the feature and I hope it will be the same for you okay before we jump into the code I want to quickly show you some prerequisites um this slide looks like there are many prerequisites but it's actually um just uh three main ones and it's more about the things that you don't need so it's important if you're using dac.test in your DAC code and you run the DAC file the environment from within which you are running the file needs to have airflow installed um the airflow python package and it needs to be version 2.5.0 or later because the dac.test function did not exist in previous versions as I said you can either have the airflow python package installed or you can also run these commands from within a darker container then you would have to bash into the darker container and you can verify that you have airflow installed and which version by typing airflow version the second thing that is important is that if your DAC is using features that meet the airflow metastore or airflow metadata database and then you need to have a database that exists um one big example is if your DAC is using x-coms like my DAC in the demo will and this means airflow the airflow database has to have been initiated if you have run run airflow in your environment at least once this is already the case but you can also verify that by running airflow DB check and you can initialize your database by running airflowdb init and the last thing you need to keep in mind is if your DAC is using provider packages then the environment from within which you are running your DAC file needs to have those packages installed so either the docker container you're in or a virtual environment or your local environment now what you don't need and this is part of the many exciting things about this function is you don't need a scheduler or an Executor that is running so airflow has doesn't have to run for you to debug your Dax anymore and this is the power and that happens the tasks are just run in a python process and this also means that the tasks are running really fast and failing fast all right why and when why would you want to use dac.test um the actor test allows you to debug and test your Dax directly in your IDE so you can do all of the things that you usually do when you debug python code you can step through the code line by line you can use breakpoint you can use the python debugger as I said you do not need to start up the scheduler so you don't need to bother with for example which scheduler is running at least I am often running or I'm often working in different airflow projects and sometimes I want to just quickly debug something in one project but I have another airflow instance running and I don't want to spin that down and spin the other one up and you don't need to do that anymore and you can use your Ides debugging tools I will show you how to use the actor test from the command line to make it IDE agnostic for this live but at the very end I'm going to show you how to use it with the vs code debugging tool when do you want to use dac.test you can use this with any DAC but it's most useful if you have a lot of python custom code so either you are using the add task decorator or the python operator and you want to make sure that the functions that you put in um are correct or need to debug those or also if you're using custom operators and you want to debug your custom operator code and the last thing is it's also very useful if you're using the Astro SDK I will not have time to go into that in this live but if that's something that you're interested in keep an eye out for our upcoming webinars about the Astro SDK because we will cover the actor test with the Astor SDK in those two okay that was all for slides let's jump right over okay we are in my vs code environment and the first thing you're going to notice especially if you've seen several webinars of ours is that I'm not in an astral project so you don't see the Dax file or the include file or a darker image as usual I did this in order to show you that you don't even need to be in an astral project or in an airflow project in order to use dac.test but of course it doesn't hurt if you are so you can use dax.test on your DAC files in your DAC folder and you don't need to pull out your files at all okay so what I have in my folder here is one DAC file DAC Test example and I have one other python file that defines a custom operator that I'm using in this DAC and the rest of the files are just some documentation and some configurations all right first make sure we have our prerequisites make sure I have airflow installed 251 very good next make sure I have a airflow database successful connection perfect I also know that I have the provider installed that I'm using you can see it's green here but this is another thing to make sure and of course you can also use a virtual environment if you don't want to install everything in your local environment all right the next thing I want to show you is that I don't have airflow running I'm looking to my other screen because I'm going to copy paste this command so you don't need to see me type that out we have no a live scheduler and there's also no darker containers running so on this computer airflow is currently not running all right so before we are starting to debug the stack I want to quickly explain on a high level what this dag is doing we can see we use the add DAC decorator to define the DAC function DAC Test example and then we just have three tasks so it's a very simple DAC structure we have three sequential tasks and the first task is called extract orders it uses the S3 hook to connect to an S3 bucket in my AWS account and it pulls a Json file which contains orders from a customer for my imaginary chocolate shop and it reads the contents of these files and Returns the contents so this should be a couple of orders a customer placed next this output is passed into a second task which sums up all the amounts that my customer owes me for his individual orders and it adds a shipping cost and the last task is mainly here because I wanted to show you how to debug a custom operator too but it's doing something very simple that's why I called it my basic math operator it takes in two numbers and an operation and then it um does the math on these two numbers with the supplied operation so what you can see here is um it's a ginger template in case you are newer to airflow this syntax might look a little weird or unfamiliar but this just means we are pulling from XCOM the return value of this task so this just pulls in whatever the second task returns and the second number is drawn from a DAC level parameter which is called discount and then we multiply those two so we are applying a discount for our Customer because they are a loyal customer buying a lot of chocolate from us and then we set the dependencies of the tasks call the DAC function and this is what you would usually have in your DAC so far it's just normal Dag next I want to show you what you can add in order to use the director test method and it's all just a couple of lines of code you have if name equals main this just means only run this if I run the file directly we are setting the paths to some configuration files we can set the discount dynamically so let's say they are very loyal customer we give them a 25 discount and then we use the DAC object here and call that test on it um You can call that test without putting any parameters in but you have four optional parameters you have the execution date so we can run the stack for any date that is past the start date of it so for example I could even run this for a date over almost a year in the future and this is mainly important if you're using the execution date somewhere in your code we can also run it with different connections so that is how passing in the configuration to one of my connections is how I'm able to make a connection to the S3 bucket even though airflow isn't running we can pass in our variables and then we can run our DAC with config 2. so here is where I pass in my discount which is later pulled from the DAC parameters all right there was a lot of talking let's actually run this and I told you I'm going to show it from the command line so you can use it in any IDE um you'd simply say Python and then the file name so you're running the python file as you would run any script we're hitting that and it's already done I warned you it's going to be fast it ran all of the tasks already and if you're familiar with airflow you will recognize these um this these are the logs from each of the tasks that we're running and they were running in their order the xcoms worked the connections worked it worked like a real DAC and I didn't have to start up Airfield and this already is pretty cool and pretty useful if you're developing a DAC but now let's enter the debugging part so it says all our tasks are successful but a few of you will have already spotted this in the end our customer is owing us a negative amount so they bought chocolate for minus 570 dollars that can't really be so there is something wrong in our DAC and we need to debug now previously before dac.test existed what I would do is I would go to the airflow UI I would rerun my task I would start adding print or log statements to my different python functions or even to my custom operator and try to figure out what happened but now with the dac.test function I can use breakpoints and let's add a breakpoint in every single task so in the first two tasks and let's also add a break point in our custom operator here so now we have three break points and let's run this again what happens now is as soon as the code hits its first breakpoint which is here on line 38 it will enter the python debugger and now you can do anything that you can you do in your normal python development with the python debugger in url flow dag so what I can do for example is I can say well I want to print out an expression I want to know if the issue is in my um in the file that I'm pulling from my S3 bucket maybe the input of the data was wrong and I should have done some data quality checks so let's print out output okay now we can see what was in the file that was pulled and we can see someone ordered a lot but it's all positive so the negative value is not coming from here what I can do next is with s i could step through the code line by line so I can step to the next line to the next line and you can see we are stepping through airflow code now through the code of the Python operator but this would take a long time so let's use C which stands for continue and let's run to the next breakpoint which is here on line 45. tipsy and we end up at line 45. again I could print out any expression but for example a shorthand that I really like is a this will print out all the arguments so we can see all the values the same string so this again is not the issue that we're having the other thing I want to print out is the shipping cost minus two thousand I think we might have found our issue here um the shipping cost that is added here to the sum of the orders is a big negative so looking at these numbers yeah this results in a negative number and this means actually if we step through we will see the output of this sum orders plus shipping function this this output is negative and this is why in the end our customer gets a negative value for all their orders but let's do due diligence and jump to the last breakpoint this breakpoint if you remember was actually set in our custom operator code and what I wanted to show you here is that I can do the same tests of Expressions so I can do self.first number and we can see the value of self.first number at this point we can also say what is self.first number it's a float okay good this is super useful if you're doing conversions between strings and jsons and bytes and run into confusion there and want to quickly figure out um how a different value changes for your functions and one of my personal favorites is I can pull the source code of any object that I have access to so the custom operator here is inheriting from the base operator and normally when I use operators that exist or any code that already exists and I pull it from it from a package I will often go to that GitHub repository to look up exactly what are the parameter names to put in and this always takes time but now I can just say give me the source code of the base operator and then I have the full source code so I could find out exactly which parameters the base operator uses and how to interact with it all right and the last C should take us to finish okay let's fix the issue back in our DAC you can see the variable that I'm passing in I'm passing it in through a variables path so it's defined in a variable's yaml file so let's go over to that and yep the debugging was correct we have minus two thousand it's not a reasonable shipping cost let's say the shipping cost because someone orders in Switzerland and shipping is expensive here is 50. and let's rerun our DAC with this new value so you can see I can quickly change any variable value could even change which which AWS account I'm connecting to or also let's say this delays the customer's order so we give them a bigger discount now and let's rerun this time I will just hit C on all of the breakpoints to quickly get through and now the end result is around 700 770 so this is a much more reasonable result for the order for our customer all right that was the main thing I wanted to show you um for debugging from the command line um the other thing that you might be interested in now is how do I Define the connections exactly I will not show you my connections file because it has the secret to connect to my AWS account but I made a template file and you can see you simply name your connection and then you put in all the parameters that you would normally put in to your connection and you can do this with all types of connections to all external tools all right the last thing if you are if python developers are listening you've probably been kept saying why are you in the terminal there's a price there's a debugging tool in vs code and it does work with the debugging tool so if we click on debug and then we do run and debug now we entered the vs code debugging interface and it's already running ran automatically and it will also stop at all of our breakpoints and what we also can do is actually add breakpoints with the little red button here so then we don't even need to Define our breakpoints in our code so we could say we add a breakpoint here and then we can use the usual tools like continue to jump from breakpoint to break point and this just has a lot of other sugar that you can use when debugging uh one of my favorite examples is you can see all the variables that you have access to here and you can actually watch a specific expression so for example I can say watch what output gives me and where it is currently shows me where it is output would give me this Json but if I jump to the next breakpoint here output is not defined here so I can jump through and we'll get all sorts of information here at the side and of course you you will already have that configured the way you like it but you can use all of these debugging tools now with your airflow Dax all right and that was it from me let's jump back I don't have a last slide just thank you and ready for questions awesome thanks for going through that tomorrow um this is obviously such a helpful function we were very excited about it's released in airflow 2-5 um I have answered a couple of questions already we've also had some great discussion about chocolate um so a great example there there are a couple of questions I haven't answered yet that I think would be good to address live um the first uh and I'm gonna broaden this a little bit but somebody asking um is this gonna work with Doc Docker operators so I think let's broaden that to a question of uh which operators in general is this useful for and which is it maybe not yeah that's actually a really good question and I wish I would have tried it out how deep you can get into it um but my suspicion is um that you wouldn't be able to add a break point to the code within the docker operator because you're just importing it like I'm importing the base operator here I couldn't add an additional breakpoint in the base operator code so it's going to be um mostly useful that you can for example pull the source code of Docker operator if you are at a point in your code where you have access to that object but I don't think you can enter the darker image that you're supplying which is probably what you're getting at with the question um as far as I know you can't do that you would have to debug the script the scripts and the code that you have within the image separately so this is really mostly useful if you're using a lot of python inside of your DAC that you have written yourself so in my case I always think whenever I write the more code I write the more chances for bugs are happening so if you're using the add task decorator or the python operator or the SDK and writing python code that's probably when the stack test function is going to be the most useful to you great that's a great answer uh I'm gonna address a couple questions just to go back to the beginning on how to set up use of this function um in general you don't have to import anything um to use it if it's part of airflow um so uh as long as you have airflow installed in your environment you add it to a dag file you're good to go you just add it at the end like tomorrow showed there um in terms of where you can run this um so tomorrow is running locally in a virtual environment right now again you do have to have airflow installed in whatever environment you're running in uh the dag test command in you don't have to have airflow running but if you only ever run airflow like in a Docker container and you don't want to install airflow on your local computer or in a virtual environment or something like that then yes you would want to run it from the container and you can totally do that you would just exec into the container and then run from there and that's why um part of why running from the terminal is helpful although you may even have your vs code connected to Docker and fancy things like that um small addition to that if you're using the Astro CLI there's Astro death Bash and then dash s and that way you can run you can you can exec into the running scheduler container without needing to copy the ID right and then a couple questions about Connections uh if somebody's using a custom XCOM back-end so they're sending their x-coms to like an S3 bucket um how is that going to work with this function um it should work if you're providing the connection in your connections yaml file um but I'm actually not entirely sure if it's going to use that or if it's by default going to use the local airflow database because as you can see we don't have any Dot N like we're not defining these environment variables here that you would normally Define in your dot ends or that you would Define new Docker image so um that's something that would be interested to test out if you have a um a reason to use your custom X comes backend like you have a custom serialization function if that would still work that's a good question for an edge case that I haven't tested yet but I will test it out if it works for example if you're returning a pandas data frame that you couldn't Chase and serialize one way that you could get around this if you're just testing locally um might be by turning pickling on but very good question I think you would have to turn that on in your local airflow configuration file like the airflow CFG yep that was a very long-winded answer for saying I haven't tested it personally but it's a really cool question I think yeah I think that's a good answer definitely is more to explore there and then our last question um is on the connection connection sorry passing and connections um so this specific one was does that template accept service accounts so I think the answer to this is kind of your passing Connections in and now in an analogous way to how you would to airflow um so come here you can show the example template again but uh anything that you would typically pass to you know a connection that you're defining and airflow you would be able to pass in here as well yes this should take any connection that you would normally either Define in the airflow UI or with an environment variable it's very similar to defining connections as a Json so it should take it if it doesn't work then please tell us please say in the airflow Slack should should work okay great I think that's all our questions thanks so much to everybody for joining us today again uh we were definitely going to do more on the topics of you know airflow local development and debugging and and things like that you know that's something that everybody who works with airflow deals with every day so uh again definitely reach out um if you have any other comments or questions or anything about the community events we mentioned earlier and if not like tomorrow said stay tuned for um more on even dags I test in future webinars all right thanks for that everybody and we hope you have a great rest of your day have a good day bye
Info
Channel: Astronomer
Views: 6,568
Rating: undefined out of 5
Keywords:
Id: eDHQ2d1wqD4
Channel Id: undefined
Length: 27min 55sec (1675 seconds)
Published: Tue Mar 14 2023
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.