Michał Karzyński - Developing elegant workflows in Python code with Apache Airflow

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
thank you for coming to my talk it's going to be on airflow and workflows and but first just a few words about me I'm focusing ski I work with bison but also with JavaScript and with Linux I also write a blog which some of you may know and I'm currently a tech lead at Intel and also a consultant at a company called iPad all right so let's talk about workflows how many of you know what I mean when I say workflow raise your hands okay that's like four people in the room so it's a very vague term but it's a popular word also so it can be very confusing when I'm just saying that I'm going to talk about workflows I'm going to define them a little bit to narrow down what I mean when I'm talking about workflows I mean that we have a sequence of tasks that are started on some schedule or may be triggered by an event happening somewhere and these will carry out some work for us this is frequently used with data processing pipelines and other jobs in the Big Data field the typical workflow would could look something like this I get some data from some data source came in and I'm downloading it for processing then I'm sending it off for processing on on a system somewhere else then I have to monitor whether the processing was completed successfully and if it's done and when it is done I get the results of the processing back I generate a report and I sent this report out by email to some people so that's a very typical workflow example but this workflow and methodology is so generic that there are examples almost everywhere so many types of ETL workflows could be defined data warehousing is also a place where you would user workflow you could use a workflow when you're doing a/b testing to handle some of the automatic steps for you anomaly detection is another area where workloads are used training recommender systems that were just presented in the previous talk are probably using some workflow system to get this job done and orchestrating automated testing this is actually what we are using air flow for in at Intel so that's what I marked it there and another example from bio informatics field you could be processing some genome every time a new genome file is published somewhere so all of these jobs could be handled by by a workflow because of this a whole slew of new and workflow managers has started to come up in recent years and there's these which are listed on the screen these five are just a small subset of all of the ones that are available and these are the mo more well-known ones but I will be speaking today about airflow and air flow or Apache airflow is an open source project it's written entirely in Python using some well-known Python open source technologies itself so it's based on flask and use of celery and it was originally developed by Airbnb but it's grown very quickly and extensively over the last couple of years so it's currently has almost 300 contributors 4,000 commits and many many stars on github and it's used by hundreds of companies we're using it at Intel I guess Airbnb is of it and Yahoo PayPal and many others are using it also and at Pat the airflow provides you with three things I provide you with a framework for writing your own workflows it provides a scheduler and quite a scalable executor for running the workflow tasks and it also provides a web UI for monitoring the workflows as they are running and for viewing logs so in this talk I will focus primarily on this first point on the framework which you can use an airflow to define your own workflows and tasks and I will won't be speaking very much about the executor that or the scheduler but there already was a talk on that this morning a good tug by Federico Mariani who was speaking about airflow also so if you missed that one and I'm sure you can find it on YouTube later all right so before I begin showing you code examples and other things I will take a minute to show you air flow itself give you a demo so when you set it up and run air flow on your computer you will get this web interface to it so it lists and all the workflows you have to you have defined in that table and if you click on this little play symbol right there you'll be able to start the workflow manually just like that and then you can go in and take a look at your workflow this one is called hello world and you will see that it's currently being executed it's running this task already managed to complete this task is going to be scheduled in a second and you can see how the whole history of your work your workflows run and for each of them you get for each task you get an entry in this table you can click on it and view the log of a particular of a particular task if any errors occurred the logs would be here and you can click on the graph view and then you will see the another view of the same of the same workflow from which you can also view it's love so my hello world example returns hello world so it works that's that's the UI and it's actually very easy to get to this point and installing airflow and setting it up is quite easy but I will be talking more about the code needed to write workflows in a second but before I do that and I want to talk about what actually flows in a workflow why is the workflow actually called a workflow so in every task that we have in our workflow makes decisions and those decisions are based on the input to the workflow that was to the workflow run that was started and also the output of upstream tasks so all information flows from downstream to from from upstream to downstream so so it's kind of like a river and I want you to think of a river for a minute like a river a work flow starts begins somewhere so it has a source may have many tributaries and which join together to form this river and it also ends up somewhere like a river flowing down into the sea or it can form many final branches like a like River Delta what workforce also does is it can have branches so ok this is where the analogy breaks down a little bit because rivers don't usually do that and but workflows do they can have very many branches which can split up from the main branch of the logic of the work flow and then join back together to form the final work flow result so this isn't really a river it's a it's a graph it's a directed acyclic graph so where the information always flows from upstream to downstream and you can actually use that very creatively when you're designing your workflow because if you put some information into your into your workflow at any it's like putting a message in a bottle into the river at some point it will flow down and pass every point I guess you would have to put many bottles into the river if you wanted it to reach every point but the point is you can put information upstream and it will flow downstream so if I put some information at point B it will be available to all points in the graph downstream of that if I put some information in point D the same thing happens and then finally at this end point where all the branches combine I get all the information and I can generate my report or do whatever I need to do with all this information so when your generate when you're writing your workflows with this in mind you can make them quite modular and make them use information from from sources upstream in the tasks that are running after that okay so that's that's enough about river sand about and about the magic of the graph let's get to air flow and how work flow how air flow works with this so air flow uses the concept of the directed acyclic graph for all work flow definitions and that allows you to define the logic of your work flow as the shape of the graph and this is very easily done this is the this is actually a complete code example of the hello world workflow that I was showing at the beginning aside from missing some import statements and so I'm going to walk you through these couple of lines but it's very simple so first of all there's some Python function that I want the workflow to execute this one just returns hello world and then I define the dag just by specifying a couple of parameters and using that bag as a context manager I define a couple of tasks by instantiating these operators first one is called dummy operator second one is called Python operator but beef please create tasks and in order to combine these tasks into a graph I can use this bit shift operator that was overridden to allow joining tasks together so this method of defining graphs is very quick and easy and when you get used to it it allows you to create a graph as complex as you need and moreover since this is being defined in Python code you can use any looping logics that that you want to define more complex more complex graphs and the next airflow concept I want to talk about is the operator this is the way you define and the actions of a single task an operator is essentially a Python class with an execute method and that that's all you have to create to have a very robust a very robust entry in your graph and in your workflow because this will automatically be retried and it's it fails it can be repeated until until it succeeds and therefore each of these functions should be and impotant so that if it runs multiple times it won't have unintended consequences but an example is just this simple in fact I made this slightly more complex than it needs to be because all it needs to be the class with the execute method but I added this one parameter up there to show you that you can also parameterize your your tasks by in the definition of the dag when you're when you're putting them in the final in the final dag by passing parameters through an init function another concept airflow uses is called sensor and sensors are long running tasks and this is very useful for monitoring purposes if you have a tat if you have some data processing job running somewhere you may want to check on it periodically to see if it's finished and airflow gives you the ability to do this very simply if you define a sensor class with a poke method the poke method will be called repeatedly until it returns true so a very simple example is this one I have a sensor with the poke method and this one and this example is slightly silly but it just checks if the current time the minute is divisible by 3 and if it's not it returns false which means that the method will be called again after a certain number of time I think it's one minute by default and it until it returns true it will be called again and again and then finally when we reach the point where the current minute is divisible by 3 we will return true and the sensor will exit another very important concept and air flow is XCOM or cross come it's a means of communicating between tasks and and this is just actually a way to save things in a databases simple way to say things in the database and then retrieve them later so because these things this message these messages that you pass are saved in the database as pickled objects it's best suited for small pieces of data like object IDs rather than whole objects but it works very well when you use it this way so it's very easy to use in your operator in your tasks in the execute function you have a parameter called context and if you just retrieve the task instance that the running tasks instance from this and running execution context you can call XCOM push function to pass some information into an intercom and then in another task downstream of of that one you can call XCOM pool to retrieve this information and use it later and you can also do a trick for scanning all upstream tasks and by using something like this code example which has these three lines and in the middle here where I'm getting all the upstream tasks from from the graph and then I'm calling XCOM pull on all of the IDS of the app stream tasks and querying that all the upstream tasks for a specific for a specific piece of information and I get an array of all the above the defined database IDs in this case for example and what you can do when you're defining your workflows with air flow is actually create reusable operators this is what makes an airflow workflows very modular because if you use loosely coupled functions as your operator functions and meaning that you have only very few necessary parameters and passed in by XCOM and most other parameters are optional and have same defaults then you can put an operator like that a task like that in very many different types of workflows so in this example I have a I have a pink operator called Nexus which can be something that collects information from a lot of a lot of upstream tasks and combines it somehow but it can also be used in a different place in a different graph where it doesn't have all the same information coming from upstream but it knows how to behave well in that context as well and it plays a slightly different role in another workflow so this proved to be a very powerful technique for us when we're doing our test or test orchestration using airflow because we're defining like blocks of of code which fit in many different places and and are a and we are able to combine them into very many different workflows by reusing the same components in different contexts so if you if you pay attention to these details you you would be able to do the same thing and so let's look back at that typical workflow that we started with then tasks that are part of the workflow are defined in airflow through the operators the one that's used for monitoring their processing is a long-running sensor and all information that passes from upstream to downstream tax goes through this cross comb functionality okay there is some there are some more interesting things that you can do also for example if you want to follow a certain branch of the graph and skip others you can use an operator called the branch operator with the branch Python operator so that's a very simple example up there you have a graph with 3 with 3 tasks 1 upstream and two downstream and then the upstream calf the branching task decides which has to follow simply by returning the ID of the task downstream that needs to be executed all others will be skipped another way to skip tasks and this and therefore may be skip entire branches of your workflow that you don't want to execute is by using a special kind of airflow exception called the airflow skip exception and this exception will force this particular task to be skipped whereas all other types of exceptions if they're not caught will cause the task to be retried and if the retry doesn't ultimately work out and they will say so this skip exception is like putting a dam in the river you're stopping the flow of the work flow downstream but you can actually control whether you're really stopping or the execution by deciding what trigger rule your tasks where your particular tasks have so by default all tasks require all upstream tasks to be successful but and you can change that to a different one one of the options listed up there and the one that I find particularly useful is all done which means that whether a task succeeded or failed your downstream task will execute and if you write your operators in such a way that they know how to behave even when the upstream failed or skipped then your you can continue the execution through through your workflow and going downstream so all done is like opening the dam from downstream and off task and you can do a lot of other very useful things with airflow that I'm not going to and get into the details of but for example you can run bash commands and as your as your tasks so in order to execute bash commands on a worker you can use the bash operator and which allows you to pass in a bash script which is actually wrapped in a ginger template so you're actually running a bash script generated by a ginger template I guess an example is worth more than a lot of words you can put a template into the bash operator and this will be first executed and then the bash command will be will be executed on on the worker so and air flow also allows you to write a lot of plugins to extend it so the writing plugins is also very simple you just create a subclass of airflow plug-in and put it in the plugins directory and then you can put you can define a whole list of things that airflow uses and then make them available to your instance of of airflow so operators we already talked about but you can also define menu lengths for the web interface that I was showing at the beginning you can create whole admin views because it's based on flask admin so you can you can add create additional views and administrative interface or even entire flask blueprints that you can plug in and it's actually very expendable so I'm sure it will useful for many cases that you may have alright that's it for me there's a tutorial available on my blog so if you want to get started quickly and with trying air flow go there and thank you very much thank you me how if you have any questions please raise your hand so I want to ask how does airflow integrate with distributed systems so if you want to distribute your process for example you run some method and you actually want to distribute it over multiple servers how can you then because you trigger like an execution of that process or they are flow so how can you then decide that it for example not runs on your local machine but then on a different server somewhere else so the the underlying technology that we're using for distributing work along different workers is celery and celery gives you a lot of control over where things get get executed so that would be one way and other than that I guess it would be more manual than automatic but we didn't actually have to make these decisions because all our workers are capable of running the same set of tasks so I'm not going to give you a definitive answer but celery is under there so so going that way would work I have a two question actually the first one is like is it and resilient is it like do you use it in production at the moment yeah okay and do you have any use case for sensors yes yes yes we did because we are actually like I mentioned we're using it for orchestrating automatic automatic testing and these tests can run for a long time so we're actually checking on the executors which are not the same as our air flow workers our air flow workers are just the ones who trigger the test but we're checking on the executors if they finished running all the tests and when they're done then then and make some decisions about what what to do next and sensors work in that context quite well I had a great talk things for that two questions I have is do you roam air flow in more than one node and the other question is have you seen the air flow been used to tasks that require Meno input I'm not sure I heard you correctly okay so yeah the first question was do you run out flow in more than one node like can it handle the workflow being running more than one component more than one node more than one server and the other question is have you seen or have you ever have the experience of running air flow for tasks that require Meno input some user input okay so first question I actually have a good answer because we are running and a triplicate of servers so we have three web interface hosts three three worker hosts three schedulers and without actually having to do very much air flow was able to behave very well in this context so it was running in parallel on three different servers all three services were running together and they are able to exchange information so when I click to view logs on on the web interface it's able to pull the logs from the correct worker it knows where the where the logs are so it's able to work on multiple loans like that very well in terms of manual input to do that we had to use we had to create an API that has that will have user interface but screw that API we're actually calling airflows methods for starting some some workflows with additional input from users that not something that comes out of the box but we were able to extend it by adding some API methods to the administrative side okay thank you for the talk I was very interesting one question that would be very useful for a project of mine is and can you actually group the operators and reuse the groups when you say you want to group them being when you want to group them logically like like class hierarchy or group them into into smaller workflows now for example if I always use the same five operators in the same configuration then can I put them into one overreaching operator somehow so there is there's an operator type that I haven't experimented with and I'm not sure how well it works but it's there for this purpose it's called a sub bag operator so you create like like a bag with a bunch of operators a graph of of the five operators like you were talking about and then you use that as an operator itself so you kind of put the whole graph into another graph but I haven't used that I'm not sure how well it works okay thank you so much thank you mijo please give another round of applause to me help you
Info
Channel: EuroPython Conference
Views: 31,323
Rating: undefined out of 5
Keywords:
Id: XJf-f56JbFM
Channel Id: undefined
Length: 29min 26sec (1766 seconds)
Published: Sat Oct 21 2017
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.