Elegant data pipelining with Apache Airflow - Bolke de Bruin

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
[Music] thank you very much Vincent I didn't know it was a technical talk no just kidding it's going to be a technical talk and there is actually going to be some code on the screen even I think and yes we are going to be talking about elegant data pipelining with Apache airflow and we'll dive into some paradigms of functional programming actually folk will do that because he just lost functional programming in Scala Scala lover but we'll talk about Python don't worry this would by data write this so that's what we do to introduce ourselves to briefly you might want to sorry yeah so my name is the four go to this bloom I'm a data engineer at go data driven what we do is help the company companies to become more data-driven so I as a data engineer health companies setting up the data infrastructure we also have a lot of smart data scientists like FinCEN developer companies build nice predictive models I'm CEO of House of banking advanced analytics growing from 55 people now to 320 people in the next three years so if you would like to join after this talk talk to me if you don't want to join it I understand that as well but that's alright work in the bank for the ones that do not know what ing is that's financial services and we'll actually touch some of the subjects and maybe you've heard the GDP are things this is actually where our talks aligns a little bit and it becomes maybe a bit interesting there so Apache airflow what is Apache airflow well Apache airflow allows you to who knows Apache airflow by the way oh that's bigger than the last time who doesn't know it okay okay that's still like well 30% doesn't know well Apache airflow and I have to say incubating between your bracket because we still need incubating offer of Apache we're trying to get out but we sometimes have a bit of difficulty doing that it's a task based workflow scheduler we do this programmatically so I mean you might have you know you see for example and I think booking.com actually uses a sushi still quite extensively is an xml-based scheduler and Apache airflow is a python-based schedule so you create your workflows in Python we do not really care what you run as your jobs but the workflow itself is being created in Python it was developed in 2015 by Airbnb it was brought to Apache in 2016 and you can use it for ETL machine learning predictive and general generic pipelines and it's about it's used by 120 plus companies including ing which I work for gala driven spreads it around but also Airbnb left reddit people and a lot of other companies are using it and we have a big amount of contributors I would say for the 60 plus and I think our largest the part of our job for Apache airflow is actually merging a lot of PR requests instead of writing a lot of our own code because we can't keep up under eyes anymore it's it's the speed by which it's being developed it's been incredible it's the downside and the upside of having Python by the way because that's it's easy accessible but it's also easy accessible so than people you know do try to do a lot of things you might I don't know if you know but we are fully open source there are some companies that actually built on top of an Apache airflow as a platform Google has released a Google Cloud composer as a better which is Apache and airflow underneath and you can use it as a better now and a storm astronomer the IO is having it as a platform you can use that on on a WB AWS for example or as your if you need it to it if because no not everybody knew well actually what Apache airflow is doing and maybe you don't know how workflows what workflows how they are visualized and we talked about directed acyclic graphs and you see an example of one behind me or well if you see it on the screen you don't worry you don't have to make what all the older parts are but well what you need to grasp that they can become pretty complex this one is not even too complex I've seen spider that you know went everywhere and actually ones that are too difficult for the visualizer to actually visualize and then it basically kind of crashes the scatterer still works fortunately but the the visualizer it can't handle anymore that's with thousands of tasks interestingly here what you need to remember or what you should stick in your mind some of the scalars that you'll see around are about data scheduling so every bit you will see here is basically waiting then for data in this example here and white and this is where Apache arrow comes in and what is strong is what it is strong at is also scheduling tasks so if you see the the white underneath the white green one here it says start cluster start cluster has nothing to do with doing data it starts up a cluster maybe a nice cluster or you know you schedule something on iws or you do something on Prem that's all fine but it has nothing to do with you know moving data around it makes sure that you can start processing data and that's something different then you all see if you just move data around for and you're doing your scheduling based on data only so why is this important in um it is well ETL consists of a complex network of dependencies you've just seen one basically and it's not just by data it is also the things that you need to do around data that come into as come into this as a penny off as a dependencies and analytics and batch processing is still missing critical although we love streaming nowadays for for a booking that come for example to have their air the resource of their a/b testing arriving late and people would probably get a bit nervous at the end of the day for a bank if we do not do our end of day processing in time you probably won't receive some of your payments or you would not have made some of the payments real-time pay me paying is still bit something that's in the future these things happen at certain points in a day or in the week even and because of that even a lot of time or too much time is being spent on monitoring troubleshooting jobs we live we live in the real world and we know that some of the server's might not be available for some time and if your our system requires you to jump on that right away because it fails at the first time it can't reach that server then you're going to be doing a lot of operational work and you have you're going to have a lot of people standing standby for this this is also where Apache airflow helps you out because it understands things like during retries and waiting for later that understands backfill so you can change history these kind of things are particularly created for a data error instead of something that you you know was maybe some enterprise you feature for a for a for a for a particular the company so when we were talking about you know discussing this talk what does elegant actually mean and why is this important when you build data pipelines well firstly it is about creating reproducible outcomes and pipelines so the outcomes that you create through a pore through a pipeline need to be deterministic and Foca will tell you a little bit around this why well functional programming will help out in this error area and they need to be idempotent so when you run either part of the dag or you run the full deck again the outcomes need to be the same so you can't have changing outcomes that's important they also need to be future proof so this means that you can do back filling and back filling we'll go into that a little bit later allows you to actually change history because there're future proof and you need to do versioning a versioning of your data and versioning of your code and people forget very often that you need to version your as well you need to be robust against changes easy changes to deck so that the world around us changes everyday and that you will be updating your DAGs accordingly will definitely happen we assume that your decks will be slowly changing so not like on an event-based change but you still need to be able to add another task or or remove a task and change history maybe even accordingly or just continue in the future with it and then finally this is maybe what's becoming more more interesting over well since GDP our introductions but also since maturing what understanding what data engineering is about and what what happens there is that clarity where data resides of what it means and where it flows is actually quite important it's not just important for example for the GPR but actually important for your data science where did that you know that one transformation happened why did it happen what did it change and can I do this again in the future can I do it do it again in the past actually this is something that's probably still where we're looking for best practices I I take as an example here there a lot of the with which what Michele at Ford other four is talking about from Netflix she's director of data engineering there and she does really interesting talks around these kind of subjects is everything ok oh you know if we make an impression but no self oak oh yeah thank you Volken are there any functional programming and to attest in the room like some high school people no I don't do it as well see too academic for me yeah Scala yeah yeah Skyler's always a bit dangerous because if you see Scala people with a strong Yaffa background is just like alphabets the Scala extensive yeah so but I think yeah from the functional programming and wealth we can learn some things that will help us write better and more robust easier pipelines or elegant as we call it so when we do a transformation we should look at it as a function with a defined input and I defined output and if you have established this input output we can determine some kind of contract of what's operation should do when we have this it's much easier to test this operation isolation and we can just test this behavior with for example expected input but also an expected input yeah so when when we look a look at transmission yeah what do we need to do to make this yeah to make this search matrix pure and also to to isolate them from the from their payment if for example to make sure that there are no side effects for example is a top example we see some global variable some external state that we pull into the into the function and increment it then we turn it and yeah ideally it would something have like in the bottom one like where you have all the state internally and just make a transformation as is so we apply this transformation to the to the data for example this is a really simple example of course but in the real world you sometimes see that stuff is being written to files being read with all the processes and stuff like that yeah I want to keep this you want to don't want to change data along with two other processes you want to keep everything into one single operation one unit of work so what's important is the else to be I deposit in your in your actions so when you develop something you have to make sure that the actions that you write our atom powder for example those top one it doesn't matter how much you how many times you execute it the result will be exactly the same but the bottom one yeah if you invoke it twice now you will undo the previous operation again and this will will result in some unpredictable behavior so for example if your task feels in the middle and then it's hard to determine where you read stop so when you rerun it yeah it will be an inconsistent state he most probably will do some updates and you'll gather with really weird end result also this is a really simple simple example but sometimes when you do something something like counting yeah you have to make sure that the counts that you do are idempotent it's also really important when writing a Dax and airflow operations that you take the date into account like if you have something that has been run like like a 1 week ago something change you want to rerun it I want to make sure that you get the same result so using airflow you can do templating so for example this is invoking an HTTP endpoint you will get some some some currency data from a coin desk with a set started and set end date and this will guarantee unless going desk revises history it will always game give the same result but in the bad example when you just get the current price if you rerun something like a task that you'd supposed to run like one week ago yeah we'll just get like the current price what's also important is being immutable so you should not see any delete operations insert operations absurd operations when you process data for a specific period of time actually just overwrite the whole petition so instead of appending like additional rows you should fix the logic and rerun the whole task to just override the whole and the whole partition so for example what you end up that that your sources are have a specific date in there and then the whole transformation or the whole pipeline will keep this date into account so for example if you have this is a more like a date arousing example where you have like some some dimension tables or some fact tables that you want to that you want to combine you just process all the dates that are that are the same you can also see easily like if you have something like this it's easy to paralyze like for example we process in troops of one day but it's also easy we can also easily just yeah I've run multiple days in parallel because they don't interfere with each other because there's no external state and they're not colliding with each other this is something where we're heading to like with aho and GDP our stuff we want to become more aware where the date data is and what days are being stored so instead of having all the metadata inside of your sequel script I want to have one central store for example think of something like hi are you metadata live and you can take template this inside of your airflow operations so yeah it makes it easier to keep track of where the data is being processed and what a kind of transformations are being applied to the data so when we take this into account it allows us to rerun the pipelines for previous days for example at the NP over I'm currently and we have some some kind of particle system for user agents so I want to keep track of okay what kind of people are using the platform what kind of devices are they using and we actually use a user agent parsing of booking feed comb to do that recently I think it was updated because of a new user agent string of the iPhone 10 and then we update the code and then we say okay I want to reprocess one month of data because we didn't know of the existence of the iPhone 10 by them but it could be that these strings the data is in there so we create a new version of the of the code and we rerun this code over the past month of the data and this allows us to just rerun all the data with iPhone 10 in there then yeah having this in place is really important first of all to keep me saying in the end if you have like like Boca explain like if you have this big fish like spaghetti on your screen and something breaks somewhere and your task it's not I'd opposed that you don't have any predictable behavior yeah you it's hard to be to keep saying and also from a legal standpoint like okay we have to if something is there some resulting in the data now we have to keep you have to we have to have the ability to back trace where this comes from so this is always also very important that you keep predictable behavior it is your a slider camera okay here we go so we're talking a bit like creating well making sure that what if what's in your code is actually clear so island potency reproducibility all these kind of things what Foucault talked about and where we use the paradigms of functional programming for however you need to apply this to your data as well so we talked about that partitioning that you need you have the atom policy also available within your within your data but you also with GDP are involved we like to like to also know where the transformation happen and this answer stuff if you if you talk about clarity and we talk about clarity by lineage this answers the following questions for for for developers but also for their business users so what is the latest version of the data that I need so imagine that you re running parts of your part of your scripts then is that if you had a model that was running on a previous set or a set will generate a new kind of output because you rerun history because you created new a new data from that if that is actually the case you need to know that because for example if we as a bank provide you with mortgage advice you can actually ask us well how did you come to that decision and we need to be able to explain it if we changed history or we're and rerun the data and we came with a different output for whatever reason we still need to be able to go back into the past and explain you why we made the decision in the past because obviously in the future new things might happen and you can do that but you need to go into the past and explain why this happened and even for auditing reasons if it goes really far very far you need to be able to able to rerun that model on that old set of data and see if you indeed came up with that decision so do you need to save versus review data yes you absolutely need to and this can be done by snapshotting or by features that we're introducing in 1.10 plus an airflow so it won't be immediately all there but some things are popping up and we actually need your help by code please or by best practices to understand how this really works because I don't think the industry has come to a consensus yet how do how to do this properly so you see things according with with the big companies that they're trying things out but how to properly do these things it's still a pretty hard thing to do it also answers questions where did I get the data from and this gets where it's you know it's not only important for all the reasons but actually it's interesting for your data scientists well because you know if you have a certain data source where did all the transfer transformations happen and why did I get the outcome that I got and what does it mean do I need to change that in the beginning to get a different outcome no no there's something that yo your data scientist will tell you eventually and this is where they like that clarity to go back into your into your lineage and understand how that came to be and eventually we need to store this somewhere and for now that will be explained us in the context of Apache Atlas which you might not be familiar with but this is where you can store lineage information and from 1.10 there is some basic so in the master and the master branch of air flow some basic integration with with Apache air flow or SRA with Apache Atlas but what is lineage and I'll put it on the screen here a little bit and I'll explain how it works so we have a original data source called temp whole directory and from this place we run to batch operators which correspond to two - an airflow or airflow operator and this base operator generates new outputs which we call contact organization and cross-references and there's a date attached to it and then another batch operator in this case it can be anything right it could generate a final and final output I don't know if you support the problem here because there's a problem in this diagram because all the things that talked about they might not fully apply to the diagram I'm just showing you here does any one spot it oh I'll get all explain it without the small level that screens might be too small here so a good point good point we should put bigger screens over there in the future so you see that the original file does not have a date attached to it and the final file doesn't have that either so imagine this this was running for two days because these are their imagining if I do that for a year then it suddenly explodes and we get 365 of these kind of blobs just attach you holder cheyna final and the final one how do I get to an idempotent outcome and how the informer from an original source I can't because I don't know what's in here and how to split up according according to those partitions so to make it elegant as we talked about there needs to be some changes to this to make that make it work so let's build a proper pipeline and imagine that you are using machine learning models that need conversion rate for currencies so the example that Foucault's just gave you with you know picking up something from coyness because we love cryptocurrencies we'd like to invest them into them but we also like to have the predictions there - you know when should I invest when should I step out just a short example and you can partially rerun it and you then use this kind of outcomes for giving advice to your customers you should never give a fast investment advice but in this case for sake of a sake of the example we'll do it here and for your business it is important though that you're able to explain to your customers how to you how did you get to a certain decision so versioning is important and and also going back into history so bring this together our deck consists of three tasks it's a pretty simple one pick up the data run the model and put the outcomes at the place where it may make sense we choose the Apache druid for this because yeah it was there for fun and the and the the deck definition is quite simple that's a start date of two days ago we just ran it for two days and we build a call it build currency predictions and it runs every day this is a chrome tab for the ones that you know might recognize it is a daily cron trap it runs at twelve o'clock for for every day so then we love to download the currency data and for the ones that are familiar with air flow this is a bit of an update that kind of syntax that you can use somewhere in the near future I'm not saying that you can do it right away so don't try this at home now because that won't work probably but we're starting to have this in is that you define inlets which allow you to set the execution date and as I spoke of earlier explained and outlets which in this case is a is a s3 bucket and you let this allow allows you to specify the amount of versions that you'd like to keep air flow should automatically determine what version you are at and then apply the new version and move the old ones out of the way if it's doing that and you specify the said that HTTP operator gets the currency to determine the inlets and the outlets so this picks up because you actually we talked about idempotency part we can run this independent from anything else we will pick it up and we'll put it there and puts it at the right place right away with a version even then secondly we've run the machine learning model we say I assume it's just you know running SPARC it's pretty simple we have one outlet because the interesting thing is air flow is aware of the previous output so you don't need to specify the inputs anymore because if you say inlets are automatic okay sure air flow will pick those up and use them as input for your script and the out the outlet in this case is determined we were looking at how to do this automatically and obtain this from spark it's a bit hard and because they're there the infrastructure is not really there for it but we can pick up the pick up the outlet or define it manually and we call it create currency predictions and we set the upstream for this task of course to the previous task then finally we drop the table into droid and we do that by table and you can actually think of the files already as tables so thinking about that in the future you could say ok we'll just have tables with files as a placeholder where the location is but we create a druid operator again it has five max five verses for the invest predictions table and we create the auto auto inlets again so it's automatically determining its make it easier for you as a developer right there's not a lot of metadata programming and it actually helps you to do it properly and we can write a new outlet which is their predictions here and we set your upstream so what does it get you so this is the up the one on the top this is what you'll see for now in air flow so this is pretty simple this is just the dag and it has the gap currency get predictions load into drew it because that's what we've what we just talked about right and this is how it connects this one is the of the create predictions depend on get currency loading to do it is grace dependent on create predictions if we go to applause however now yep we go to Atlas however now then you'll see hey I'm picking up something from coin desk and the name is not really clear in this case but this one gets of corresponds to get currency it outputs is something to earn us free bucket well it's there you can see what's doing create predictions corresponds to the next operator here trades new s3 bucket and then we end up and to draw it if you would rather II run this the next day you would get the same line so instead of that it explodes in a graph eventually no you would get it for the same for Alfred partition you get the same same line every time that makes it an elegant pipeline because that makes it very clear where your diet data resides and your and it won't get as complex that you have a difficulty finding where your duplicate data might be or these kind of things so concluding please build pipelines that are idempotence deterministic have no side effects and use immutable data sources and destinations don't do updates happens or absurd or anything else make sure that you version your data and make sure that you version your code accordingly thank you very much yes we're hiring you can talk to focal or me yes we're competing that is also true but we like each other - don't worry all right cheers guys yeah yeah I think I know where this question will be going so I'll repeat it I'll repeat it I'll do that so you're basically asking how do we handle packets dependencies if we need to do item potency yes this was and it still is a bit of a challenge in the Python world these kind of dependencies aren't more difficult to handle however with the new cube Anita's executor that's also in 1.10 and you are actually able to specify the dependencies for a task now and that will you know that will with the containers created from that those will be idempotent so yes understood that question and there is well we're getting to a solution yeah any other questions in the back first then you know sorry I'm looking a focus the question and yes it will I mean oh you will you can no no I'm not I'm not no sorry sorry I don't I'll repeat the question sorry sorry guys sorry oh that was not the so the question was will Atlas actually be able to handle the the across different decks the the lineage across different decks yes obviously will because basically at loss itself determines its its lineage according to the end the inputs and outputs that are being created and attached to a certain data source so in this case and I'm not a Atlas expert by the way they if you enter you can integrate with air flow only but there's a lot of integration work being done on hive there's a connector for spark there's connectors for a lot of other systems nowadays as well and eventually it will go what they call open metadata exchange it will actually allow you to go across different linear system so they'll do something if the IDC is an IBM and IBM component which is more used in enterprise context so as long as you reference the right locations for air flow then it will be able to determine what the lineage was our core across tags any other questions oh there was one here in front yes you did so maximum versus of output files you saw that there's a new parameter yep it's a good question so you're asking whether you can instead of you know for the vs. you can use time frames well not yet actually the max versions right now doesn't doesn't work yet it's still something that needs to be implemented so that's actually a good a good suggestion yeah so the idea around so you say that those files might be overwritten that could be the case it is actually up to the developer or to the company to determine how they want to implement this so it probably depends on the only on the company how to do any policies around it airflow is not going to be strongly opinionated about it I see yeah you were first here we go say again we don't so the question is how do we handle the different execute to break-ins and then the data inlets outlets we don't support different executors at the same time and so air foil doesn't do that so I don't fully understand your question I think in the end Sparks only a workflow orchestrated so spark will orchestrate your workflow keep track of everything get the logs but in the end for example if you do with transformation it will be done in spark so then you will read full bucket process the spark and then maybe to clarify it's only metadata that's inside the inlets on the outlets and there's no data at all because that won't work that's not what air flow is for so we're concluding right [Applause] every month
Info
Channel: PyData
Views: 17,591
Rating: undefined out of 5
Keywords:
Id: neuh_2_zrt8
Channel Id: undefined
Length: 35min 37sec (2137 seconds)
Published: Tue Jun 26 2018
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.