Matt Davis: A Practical Introduction to Airflow | PyData SF 2016

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
so I'm going to tell you a bit about airflow kind of an introduction I'll tell you what it does why you might use it some things that are good to know before jumping in and a little bit about clover health we're a health insurance company we use Python for literally everything for our data operations to our web applications to our data science stack nothing but Python we're here in San Francisco one more stop down Bart by Paul Street and we're hiring engineers and data scientists where we have a booth over in the other room and you can come talk to me come talk to a couple of our data scientists or hanging out there um I'm on the data platform engineering team at clover and I feel a little bit like a plumber because I just mostly moved data from one place to another like a porter I need a wheelbarrow I've been using kind of scientific Python since around 2008 when I got a job at NASA Goddard Space Flight Center it started using Python to make images from satellite data and plots and I go by jiffy Club on the internet because Matt Davis was taken and so if you're looking for me don't try matt davis it won't work um so it's kind of the outline here and give you the very brief history of airflow and talked about what it's useful for give you a brief tour of its user interface then talk about what it's like to put it in production and deploying it and about how you go about putting together a pipeline in our flow and then some useful things you should know so where did it all come from and it came out of Airbnb last year built by someone who worked on similar infrastructure at Facebook on their data swarm application and his name's max this year and airflow joined the Apache program in their in their incubator so I think we can kind of expect to see airflow around for a long time if that goes well and if it officially joins Apache so what do you use airflow for so a pretty common thing that people need to do is on some sort of schedule they want the thing to happen a common thing is taking some data working on it and then putting it back out there in some new and polished form but it could be anything and airflow is kind of like the alarm clock in this diagram it's the thing that says it is now time to run a task and sends those things off to do stuff there are other things that do this there's another python-based thing called Luigi and there are some JVM based things airflow is the only one that I have any experience with and so which is why I'm talking about it so if if airflow is the alarm clock here the rest of this is you you're building a pipeline of things you want done and pipelines in air flow are a series of tasks connected by dependencies so you want one task to run before others and and I like to call them pipelines because I think that's a more sensible and intuitive name but in air flow world they're called DAGs or directed acyclic graphs because you're not allowed to have cyclical dependencies between tasks because then your pipelines would never finish so air flow is really upfront about that and so one of the nice things about this though is if you layout your pipeline air flow can be like okay I need to run these things before I can on these other things but those first two tasks are independent they can run at the same time so you get some parallelization if you can lay out your your tasks like this so what can those little tasks do anything really I'm only being a little facetious because you can run arbitrary Python as tasks and we all know you can use Python for anything but there are built-in operators and for operators the airflow term for these individual tasks so when you want to create a task you create an instance of something called an operator that has a certain interface you can make your own but they come with a bunch of built-in ones so one for calling Python things one for shelling out to bash and running sequel things hive they're more you can make your own um so especially with the easy integration with Python like sky's the limit you can really do anything you want and we use it for running sequel things a lot a lot of sequel queries or shifting data from SFTP to s3 but you can use it for sending email like running your periodic email sending thing on whatever you need it to do so I'm going to give you a couple of examples of real pipelines that we have at Clover this is one entire pipeline they don't have to be fancy it can be one task that you want to run on a schedule no problem you can get a little more fancy so there's a few things we want done and then it branches out and those are those things at the end can run in parallel and a failure on the left side of this would prevent everything from running downstream of it but a failure here a failure on that upper branch doesn't affect the lower branch that stuff will still finish if we can so another nice thing about your flow and having laid out your tasks in these independent Forks of a pipeline or maybe even at different pipelines is is some independence there those things run separately and you can get all the way through one part of your pipeline and get as much done as you can even if one part of it failed and then there's this this is our biggest pipeline you can't even see it there on the right and because there are so many tasks we're going on like five hundred tasks and this and you can tell that the layout like the the interdependency is pretty complicated there's another view of it there on the left and you can't tell but those are arrows on the left's of showing dependency um a few hours all the time that the first thing we do to new people is usually make them debug things it's a little rough no it's not bad we support them um okay so air flow comes with an interface and that's how I got those screenshots so showing you and this is actually one of the really nice things about it in addition to the fact that it runs the things you tell it to run gives you this nice interface for figuring out what's going on and knowing what's going on at any given time so the default view is um kind of like your pipeline status so this is a list of all not all of our pipelines but some of our pipelines the name over on the left it's that you can turn them on and off and the red blocks in the middle and your schedule means that ones running right now and and then the statuses show you how many tasks was completed in the dark green the light green is how many tasks are running right now you can see if tasks failed the red circles and then these links on the right are out to different views about that pipeline so this is one potential view for a pipeline the Gantt chart I love this because it lets you see very quickly in what order tasks ran how long they took so it's very easy to tell which task is taking a long time here and across the top there you can see links over to some of the other views you can see the graph view those were the screenshots I was showing you earlier and the task duration is a historical graph showing you how long into we'll task to take in in the past which is useful for seeing how long are like how your tasks are scaling maybe as you've added data over time see how your tasks are handling that and scaling up with that and it's really useful if you're like wow my pipeline took twice as long last night like what happened you can go to these views and be like which task was responsible for that and sometimes it's just one task that you need to debug and find out what was going on so thanks to these graphs pretty quick to find those things and then you can drill into task details um the bottom things here are actually ways to kind of manually tell air flow to do things you can clear out like history about this run or you can sometimes be useful if you want to trigger a rerun so you make air flow forget that it ran these things by like clearing downstream tasks like the tasks and everything downstream of it and then it'll be like oh I haven't run this yet and rerun it for you which is useful sometimes and then you can under task details you can see all the configuration for that task you've defined that in code but this is the quickest way to go see it most of the time rendered lets you see the actual code some specially this is nice if it was a parameterised sequel or bash like it will render the actual sequel query that ran or the actual bash command that ran and then that log button also very useful and we store all of the task logs on s3 so when you click that button airflow pulls them from s3 and shows them to you but again like when a task fails and you want to know why you go click that button and look at the logs hopefully you've written good logs um but even if you're sending logs out to kind of a traditional logging stock like elasticsearch in Cabana this is still much faster way to find the logs specific to a single task really handy and that's it for the tour of the UI and those very quick the airflow documentation has a more complete tour which you can check out so what does airflow actually look like when you put it in production what are the components of it what do you need running the first thing you need is a database a sequel database so my sequel or Postgres are good things this is where air flow stores all of the information about what things have run in the past how long did they take what's running right now and so on that kind of fills in all of those views I was just showing you so like that plot of how long tasks took in the past as you know it's calling out to the database and being asking like okay give me all of the most recent 50 runs of this pipeline all of the tasks in it and how long they took them in it turns it into a plot and so this needs to be kind of a permanent stable database backing your thing that you don't want data disappearing from the next thing is a web server that's obviously what's showing you all of those things I was just showing you and it's a flask application pretty straightforward and but you need to run this on some computer where I can talk to the metadata TV the scheduler is a Python process that decides what to run when so nothing pretty much nothing runs unless you have started the scheduler and it goes current crawls the filesystem and looks for like it's importing Python files out of us out of a directory you can figure looking for DAGs loading up all of those DAGs looking at their configuration and then figuring out one need to run and actually putting tasks out there to run on the queue so if you if you have a worker queue the scheduler is the one kind of putting those things on the queue and or they're kind of I'll talk in a second about kind of the other ways the scheduler cannot but the scheduler is what is actually deciding what to run when so a pretty important thing and and then the final component you need are computers running the things that you want to run the workers in kind of the full-fledged distributed computing kind of version of this and the recommended engine is celery which has been around in Python for a long time for doing distributed task queues with Python and so it's kind of yet another component of this so and you have to have a celery queue task queue which is usually RabbitMQ or Redis so if it's Redis that's another database you need if it's RabbitMQ those are you have to start rabbitmq servers on the scheduler and on the individual workers and then on top of that you have to start celery workers there's an airflow kind of shortcut for this and are through the airflow CLI to start up the individual workers they don't necessarily have to be on different computers they can all be these could all be on the same computer but you have to keep in mind that the workers are going to be doing work and depending on what that work is they may have different requirements than the scheduler in the web server the scheduler and the web server are pretty lightweight processes they don't do a whole lot of actual computing but your workers may need to be me fee depending what they're doing they may only be sending commands out to other database things and in which case they can be very lightweight as well it just depends and an alternative is to instead of using celery there are other executor 'z which are the kind of tasks they're kind of the components of airflow that is actually executing the tasks and so there are ones to run them locally on the same machine as the scheduler which can save you the headache of dealing with celery and distributed computing it's a little bit simpler but it also means your scheduler and workers are more tightly coupled so for example restarting if you needed to restart the scheduler it's also going to kill all of the tasks turning on that machine at the same time so it's it's a bit simple simpler operationally and but there are trade-offs there I'll talk a little bit more about that in the future but that's kind of the basics of air flow you need the metadata to be the server the scheduler you need some workers and if you're using distributed workers you need some way for jobs to get from the scheduler outs of those workers like celery excuse me so time for some code what is it like to actually build a pipeline it starts with the dag so you instantiate something called the dag you give it a name the ID that needs to be unique among all of your DAGs that air flow is going to get import you give it a start date this is for us most of the time when we put a one we put something into production we give it a date of like today or tomorrow so that it starts running as soon as we put it in production but if you put it in the past air flow we'll backfill all the way back to that start date so it's going to look at the schedule interval and figure out how many times it should have run going back to that start date and actually run all of those so if there's something that you want run on a cadence the where it's like working on a time interval or something and you wanted that going back to some date like when you started importing a certain data source you can do that in air flow will start from the beginning and run all the way up through the present and in that scheduled interval here it's a cron expression there are a couple of simple shortcuts for like daily and hourly and but you can also do kind of a complicated cron expression and air flow we'll figure that out it's a question that's up to you um airflow through a air flow makes available to you the execution date of I'll explain the execution date in a little bit but it makes available to your tasks the date at which it's pretending to run kind of and so it's up to your tasks to take that execution date figure out when and figure out what time period it's supposed to be operating on and make use of that so for example if you're working on selecting like a time series out of a database you wouldn't use like date time now you would look at the date airflow is telling you that it's running four and figure out what data to select based on that the schedule interval can be anything cron can run but you don't want to go to fine-grained air flow only kicks off tasks every minute or 30 seconds or something so you you kind of it it's not really meant to do like sub-second or you know second or minute going down to largest fractions of an hour I think is probably the most recommended level to go with these you can also put that start date in the future if you didn't want it running and it would sit there and wait until that time came and then I would start so a couple of useful dag arguments that I didn't talk about their default args these are not arguments to the dag itself their arguments to the tasks you're going to put in the dag and so if you have a bunch of ones that are in common across all the tasks it's useful to collect them into this default argus dictionary and feed them here to the pipeline and then the pipeline will make sure that those settings are applied to all of the tasks that you add to it um max active runs tells air flow how many runs of the entire pipeline should be running concurrently so you can imagine if you had a daily task that you wonder you put the start date 30 days in the past that's 30 days of pipeline that air flow could start running immediately as soon as you put that in production you may not want 30 of them hitting your database all at the same time or going out of workers all at the same time there's a global setting of how many dogs can run at the same time that you can set for air flow but then you can set it independently for individual pipelines so a lot of clover's pipelines have this set to one because a lot of our pipelines do destructive things to the databases and we don't want more than one of them doing that at a time so that's a way to kind of keep that from happening and then if there are errors in the pipeline and it takes longer than expected to kind of resolve that and stuff you don't get the next one kicking off before you've got fixes in place things like that and finally concurrency limits how many tasks within the pipeline can run at the same time and this is especially useful for something like that really complicated pipeline I showed you where potentially dozens of those tasks can be eligible to be run at the same time that could like take over our entire worker pool and prevent anything else from running at the same time so we limit that to a few at a time we we actually don't use it using this concurrency parameter because I don't think this was an option when we started using air flow but it would be a good way to go now so a question to do retries mmm yeah so the question was if there are errors in the task can you retry them and the answer is yep it's a little bit complicated and I'll talk more about code deployment later but you can ship new code to the tip to the workers and then say please run this one again and if everything is hunky-dory it'll pick up that new code and run the task with the new code and then the pipeline can progress yeah Oh two hours and it hasn't finished by then maybe yeah and so the question was can you put timeouts on things you can put timeouts both at the pipeline level and the individual tasks level um so this is a coding up tasks or importing the operators module and making a Python operator the task ID has to be unique within the pipeline and you have to pass in the dag here to the operator so that it gets associated with the right pipeline and then depending on the task you're telling it what things what to run so here we're pointing it at a function if this was a sequel task we could put a string there that was a sequel like had sequel commands we could point it at a file that had sequel commands in it both of those work same with bash so the magic happens when you start when you make more tasks so here we make a second one it's also Python pointed at a different function but then we do second task set upstream first task and now we have dependency the second task cannot run until first task is completed and so you can put together as many of these as you want you can do set upstream you can use that downstream like this is how you build up a pipeline with tasks that can run concurrently or have to run in some dependency order you have to do it programmatically but I will sort of circle back to that so useful task arguments retries if a task if it's safe to retry a task you can tell it to do that so we have certain tasks where I'll get blocked by kind of database access issues and we want to wait five seconds and try again so you can set the retry and the retry like the wait before a retry and pool is a way of limiting how many of that task can run at the same time so pool is an airflow concept where you you say I'm making a pool with ten workers in it and then you assign workers to that pool and every time a worker runs it takes a slot out of that pool so only ten of them can run at this up to ten of them can run at the same time and so for I was talking about that concurrency parameter earlier pools are actually how we are limiting concurrency right now is by assigning workers to specific pools that only have so many slots in them so that they're they're not taking over the whole suite of workers queue is a celery option where you can assign tasks to specific celery cues where you then start a worker listening on that queue and that's a way to assign tasks to specific workers and this could be useful when you have workers that are co-located with data workers that are really computationally beefy in order to do computational tasks and other workers that are not as beefy because all they do is is send commands onto a database for example where you want to differentiate where tasks run use use that there's the actually huge intime out to put a time limit on it trigger rule is the rule for which the task gets executed um so the default is all upstream tasks were successful but you can change that to things like all upstream tasks failed or all upstream tasks are done which is like they've been run and ended in any state for example so you can get creative you can pass in so in my examples these functions as written these Python functions would have to take no arguments because they're not going to get called with any arguments um but you can set up a dictionary or list of arguments to pass in there to your functions there's one I forgot here called context which is a dictionary that airflow provides that have a bunch of information about the task and like the running environment that's how you would get a hold of something like the execution date like what date airflow thinks it's running under and you can set environment variables for bash tasks you can air flow has a system that I'm not going to describe in detail but doing Jinja templating on your tasks so like if you have sequel or bash a way of inserting data into those or parameters into those using Jinja templates so you can provide variables that will be available inside the Jinja templating so this gets to the question earlier about building more complicated pipelines you have to use code to write pipelines but you can get creative about it because it's Python and so if you have a series of files that you want to parse for example you can write a loop and create a task for every one of those files and you may or may not tie them to some upstream task that you want run first but now you've got however many tasks is you have files and they can all run in parallel you can let air flow schedule those for building that really giant pipeline that I showed you earlier with 500 things in it nobody wanted to code up 500 things and least of all me most of those tasks are in fact all of those tasks are either bash shell files or sequel files so we developed a system where people can write a yamo file that contains the task dependency and the name of the file that needs to be run as part of the task and when that die gets constructed it starts from some root directory crawls all of the subdirectories finds those yamo files turns them into air flow tasks and connects them up in dependency order so you can do things like that to get around having to code up individually however many tasks you need we do the same for parsing files we have you know we crawl like SFTP director file things and like pick out files and again there's like we do a lot of Gamal configuration stuff where like we put you know configurations in a directory and those get turned into tasks ok so now for some things that are nice enough about our flow and one is the executor types I already talked about this a little bit executor --zz are the things the in charge of actually running your tasks and keeping track of them we use the celery executor so we run celery in addition to airflow and it has a task queue air Flo puts tasks on those queues and then the workers pick them up and then send signals back about being done or heartbeats and so on and so it's really relying on celery there for that part of it and the sequential executor is has tasked run in the same process as the scheduler so you'll run it and the scheduler will actually stop scheduling things because it ran a task and it's waiting for it to finish and that's not very useful in production but it's useful for debugging I'll show you the local executor also runs on the same machine as the scheduler but it uses multi processing pythons multi processing module to run the tasks in a different process so the scheduler can go on doing its thing and there according to blog posts I've read about air flow at other companies people are using this local executor in production because they kind of want the simplicity of not having a distributed worker queue and they've made that work for them um right so there would be some challenges around that in terms of like it that doesn't work for us because we have a lot of long running tasks and we want to be able to do maintenance on individual workers or the scheduler without necessarily shutting down any tasks that are running because it would be expensive to have to start them over so the like the distributive model works better for us but if you're if all of your tasks like if you don't mind restarting tasks or or if you're only running air flow for part of the day for example we run air flow 24/7 so that also kind of makes it challenging to deploy good but if there's like part of the day when you're running in part of the day when you can put a new code or something like that the local executor can be an option like people are using that in production and then I don't know much about the Mesa executor and I we don't use mesas but if you already have like a meso s-- system and like cluster setup like there's a community contributed option for running your tasks on that there's a little tip for local debugging so with options like celery or multi-processing when you want to set a breakpoint in your code it's not going to work because the process that you use to start Python is not the same one in which the debugger gets invoked so the sequential executor is a way around that you use the sequential executor and you can put in port PDB and put a stripe a breakpoint in your code and then use the airflow CLI the command line interface to air flow and the command line interface air flow is how you it has a lot of things and it can do most of the things the scheduler does and I haven't gone into any detail about it and because you can read about it on their website but it has web server commands and a scheduler command and a worker command and so on but my favorite utility is airflow test where you give it a tag name and a task name in an execution date and it runs that task for you without putting any of the information like it doesn't care about upstream tasks it doesn't care about downstream tasks it's only going to run that one task it's not going to put any information in the database but it'll still print out the logs if that's what you want and if you use the sequential executor it'll be in the same process as the one that gets started when you're on air flow test so set trace will work so this is really useful for debugging when you're when you're setting these up locally another tip for pipeline testing when you're developing a pipeline locally so in clover we all like everybody who works on the pipeline's gets air flow running locally they have a post grass running locally so that they have the metadata to be and you know to actually develop and test pipelines they'll want to run them locally and so when I'm working on a pipeline locally and I want to run the entire pipeline I'll put the start date some date in the past so that air flow wants to run it I'll give it a schedule integral of once so that it only tries to run once and then I'll run it as far as it gets check the output check the logs if there are exceptions so on all through the interface and then when I like make changes and want to rerun it I can shut things down now just restarting the scheduler won't do that because the record of the dag having run is still in the database and air flow will see that and not try to rerun anything but you can go through the UI and delete those records from the database and then air flow won't know that it has run it in the past and it'll try again but using that the once flag is useful because then it doesn't matter how long in the past the start date was air flow won't try to do any backfilling it's only going to run at once like that means once ever sandy this is one thing we have a lot of fun with is air flow time the and this is the execution date I'm talking about so the execution date is something provided to tasks through air flows kind of context thing and it part of that is the execution date because you want to know at what was the planned date at which this ran but air flow doesn't actually tell you the date the the task started it tells you the date of the previous when the previous task started so a task starting at three o'clock in the afternoon today will have an execution date of 3 a.m. this morning so it's like covering that interval um so this is the thing to keep to remember is really confusing and it bugs us all the time and something you have to keep in mind if you're making use of the execution date in your pipeline um it's kind of general advice make sure that your business logic is separate from building air flow pipelines I'm working on a project right now that's going to be an air flow pipeline and I haven't even imported air flow into the project yet I built a CLI into it so I can run it locally and test it and stuff because adding air flow on top of that is a whole other layer of things you want to debug and you want to know that your your business logic is solid before you start trying to integrate that with air flow because that's going to be a whole other layer of things you need to work with so I reckon being able to run and test code without involving air flow at all deploying new codes workers is challenging because the long running Python processes both the worker like celery workers and the scheduler a long running Python processes and even if you we have cron processes running on those same machines that are doing get polls every few minutes to retrieve the freshest version of the code but those won't get imported because Python doesn't import things twice right it imports them once and then they're kind of globally cached so the next time some code needs that thing it just comes out of that cache so you need to restart the processes every so often so that they pick up the new code you're shipping and we've landed on two things for that for the scheduler it has this num runs flag that makes the scheduler abort itself after having done so many cycles of like coughs and dag imports I think we probably set this to 1 because we want it to to restart like every time it does this so that it's very fast at picking up code changes this means you have to have something restarting it we have a little script that's running an infinite bash loop around this but you could also use something like a supervisor to do the same thing and then on the worker side if you're using celery there's this option celery D masks tasks per child that causes celery to restart workers after having run so many tasks and again we set this to one so that every time a worker starts up it's in a new process and is going to pick up the latest code okay so and wrapping up here so having learned all that why did we actually choose airflow it's written in Python and that's a big deal for us because we use Python for everything internally and it came down to so your only options if you're using Python are pretty much air flow or Luigi air flows UI is really useful um so we like that the programmatic pipeline construction is the big one Luigi's pipeline construction involves sub classing to create tasks and that's much harder to do kind of in a loop for example so airflow one there and just the fact that it Luigi would be fine here also like complex pipelines are no problem for Luigi either but we knew we were going to have complex pipelines and we had confidence that our flow could handle them ok so these are some links airflow documentation clovers websites and my info here and that's it any questions it's one that the tops that anything from yeah yeah so the question is about passing things from one task to another you can there's something called XCOM or trot like cross communication so you can have things passed from one task to another but they have to be essentially strings like very simple things tokens so you can pass back a file name or something like that you cannot pass something like a data frame yep so so you have to put it you have to put it somewhere you have to move it to s3 you have to put it in a new table in the database something like that yeah and yeah so the question was how do we deal with dependencies I know some Python dependencies mm-hmm yeah um you have to have deployment for that um we use docker things because we have to um because of our hosting provider uses doctor so we have dr. things that install all of whatever we need you know but it's very classic Python require you know pip install - our requirements that txt in a doctor image and then the processes start up in those docker images by our fire host you you could use the celery cues to do that but our worker pool is the same so that it doesn't matter yeah um potentially you know like if the Redis thing like if the Redis holding the the celery cube failed like you can't talk to workers anymore if our our hosting provider it can be a little interesting like if they go down like everything's over it is pretty fault tolerance in terms of like like the scheduler is pretty good like we haven't had a lot of it has some interesting quirks but like it doesn't totally fail it can do things like we've seen things like two tasks getting picked up on different workers and like competing with each other sometimes we don't know whether that's airflow or celery's fault or both or you know some interplay yeah it's not the task it's the the scheduling rate and I would say a few minutes like you like less than five minutes would be probably not a good idea and I would recommend even more like fifteen or thirty no no the scheduling interval an individual tasks can be instant and it's fine yeah that's fine yeah it really the the amount of time a task takes to execute I don't think has any effects on airflow at all there oh yeah yeah there's there's absolutely a delay between when the task finishes and when airflow picks up that it has finished but hopefully there's not like like if you're trying to do sub-second coordination between tasks and airflow airflow is not the right tool yes um rarely like will have this problem with um sort of like we'll have this problem with tasks so the question was about like do you ever see things like tasks getting lost or tasks getting into a bad state we do see things where tasks will be marked failed but appear to be still running but when we check on a box are still running or the opposite where like air flow will think a task is running but when we go look on the box there's nothing going on so there's some like community like celery communication miscommunication there we sometimes see like logs disappear especially when they're retries and so like the first tries logs will disappear when the second try of the task happens which is something like I really want to fix that but there is some interesting stuff that can happen and back and that's something we're very interested in - so the question was like does does airflow help you at all with like lineage of your data the answer is no we're very interested in that as well but it's something we're going to have to tackle ourselves the dependency graph is there like the task dependency graph but beyond that no help from airflow okay I think lunch is outside probably - announcement slice is outside
Info
Channel: PyData
Views: 75,431
Rating: undefined out of 5
Keywords:
Id: cHATHSB_450
Channel Id: undefined
Length: 45min 52sec (2752 seconds)
Published: Wed Aug 24 2016
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.