AWS re:Invent 2019: Build reliable data lakes with Delta Lake & Databricks (ANT347-S)

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
I am super excited to be here today to talk about Delta Lake and specifically how Delta Lake can help bring reliability to your data Lake but before I get into that let's see if I can get rid of the tech alert Chad I make this work perfect so before we get into that I want to talk a little bit about why people are so excited about data lakes in general what is this technology why are people spending millions of dollars on it and also why do a lot of these initiatives fail so to get started what what is the promise of the day like as far as I understand it the idea is this you have a lot of different types of data it could be customer data it could be telemetry it could be metrics it could be console logs it could be unstructured things like video and images and the nice thing about the data Lake is you can just store it all it's just a file system it's not a database you don't have to spend a bunch of time ahead of time coming up with a schema doing ETL you know doing all these complicated processes to ingest the data you just dump it there it's just a directory in a file system and you collect everything and that's actually really valuable because sometimes you don't know what's going to be valuable until later and so making it really cheap to collect things is actually useful and then once you've got it all stored the idea is you're gonna do data science and machine learning you can do really cool things like build recommendation engines you can detect fraud you can do predictive maintenance you can even cure cancer using advanced genomics however I have some bad news for you and that is your data is almost certainly garbage somebody upstream from you change the date format without telling you some records are missing because the machine was out something's missing data because you haven't joined it yet and so that means that the data stored in your data lake is also garbage and now you have this garbage in garbage out problem where your analyses are also garbage and so I have seen this pattern over and over again where people try to work around the quality issues in their data lake and I want to talk a little bit about the architectures that I've seen develop so the problem is this it's pretty typical I bet many of you actually have done this for as you know as part of your job but you know my boss came to me and said you've got have a bunch of events they're coming in from Kafka or it could be Kinesis or you know s3 or anything like that and you have two jobs you want to do streaming analytics so you can understand what's happening with your business right now at this very moment and you also want to do AI in reporting where you take a more longitudinal view and you actually look at a long history of time so you can make predictions about the future so I might be a little biased but when I'm given a mission like this I'm gonna start by using Apache spark so spark has streaming API is it supports event time aggregation has native connections to both Kafka and Kinesis and so you can you know produce your your streaming analytics pretty trivially but that brings us to challenge number one which is historical queries Kafka and Kinesis are great for storing a week maybe a day of data maybe a month at most but you're not going to store years of data in these systems and so people often do this thing I read a lot of blog post I tried to figure out what to do here I've heard about this thing called the lambda architecture and as far as I can tell the idea of the lambda architecture is we're just going to do everything twice we'll have a separate system for batch and we'll do streaming one way and then we'll save everything on HDFS or s3 and we're good to go and you know from there we add that to our architecture it's a little bit more complicated but that's okay you know we're engineers we can handle this and once we've got the data Lake now we can do AI in reporting right well of course it's not that easy this brings us to challenge number two which is messy data as I mentioned your data is almost certainly garbage when you first get it without some cleaning it's gonna you're gonna have incorrect analyses and so what I've seen a lot of people do is you start doing validations so you write a whole nother set of spark jobs whose only job is to look at your data and tell you what's missing count how many Knolls you there are count how many distinct values there are make sure that this thing is actually correct send me an email when it's wrong and I'll go in and fix it so you know it's another complication in our architecture because we did the lambda architecture we have to do it for both batch and streaming but that's okay we have unified API is it's not too difficult we added to our architecture that brings us to challenge number three which is mistakes and failures so validations are great but something bad already made it into your data lake and so now we need to clean that up and like I mentioned before this is not a database it's a file system so how do you do updates safely inside of a file system you want to be very careful not to crash in the middle because you might actually corrupt the data that's there so again a pretty common pattern I'm sure many of you have implemented this in your companies we're gonna partition it we won't have just one giant directory we'll break it up into partitions by our day month week whatever makes sense for your scale of data and we'll build an engine to do reprocessing so when something goes wrong rather than recompute the whole thing we'll just delete that directory and recompute that directory from scratch so with partitions plus reprocessing now we can handle these mistakes and failures that unfortunately brings us to challenge number four which is updates so gdpr happens or something like that where you actually need to modify your entire data like you need to do change data capture and so now your partitions aren't gonna save you because you actually need to modify things across many many different partitions again you know we're good engineers we can solve this problem we're great a whole nother set of smark job to do updates and merges we'll be very careful about failing in the middle maybe I've even seen people do things like copy their entire data Lake when they want to do gdpr once a month just copy everything because that way if it crashes in the middle it's very easy to recover from so we can add this up but again you got to be very careful don't let these jobs run at the same time as your other analytics you don't have isolation so if you're modifying it well people are querying it they'll get the wrong answer so we'll make sure to run our GDP our jobs at 1:00 in the morning and then everything else can start at 2:00 that GDP our job had better not be late otherwise we'll get the wrong answers but that's ok we can do this it's pretty complicated but the problem here is what you see is you're wasting a lot of time and money solving these well known systems problems rather than doing what you really want to do which is extract value from your data and the problem with the data Lake to me is it has a lot of distractions these well-known systems problems that prevent you from doing the work you really want to do so at the top we don't have a de Missa tee atomistic a is this really nice property that when something happens it either happens completely or not at all and if something goes wrong in the middle we can roll back so it's as though it never happened at all it makes it much easier to reason about partial failures in a distributed system and you don't get that when you're just putting things in a file system another problem is there's no quality enforcement this is just a directory anybody can dump any data into it independent of whether or not it matches the schema of that table and then when you go to read it it's going to make your program crash and then finally there's no consistency or isolation anybody who's modifying it sees things as soon as they happen whether or not it's partially done or or not done at all you couldn't put something there and roll it back and you won't see it at all and this means that it's impossible to make streaming and batch on the same data set it's impossible to do modifications while you're reading from it and all of these make it much more complicated to do your job so let's try it instead with Delta Lake and the idea of Delta Lake is we're gonna take this very complicated architecture and switch it to something like this where you're thinking mostly about data flow rather than about these systems problems and why are we able to do this dramatic simplification well the primary trick is this thing called acid transactions this is the data this is a trick that databases have had for many years acid stands for atomic consistent isolated and durable so we have that nice property of atomicity when something happens in Delta it either happens completely or not at all if something goes wrong Delta automatically rolls about that transaction is though nothing had happened consistent and isolated mean that everybody will see a consistent snapshot of the table even if other people are modifying it at the same time you will always see the correct answer and it will be as though nothing is happening it'll also be as though people are taking their turn one at a time even if they're operating concurrently and then durable means of course we're not going to lose data now another kind of key thing here is if I was going to collect petabytes of data that was incredibly valuable to my organization I would not want to put that in some black box vendor specific format or if I want to read it with other engines I need to do a very expensive ETL process I would want to keep it in an open source system with open standards and so Delta is a full Apache License project we actually just created a sub foundation under the Linux Foundation to be a kind of permanent vendor neutral home for that codebase the data itself is actually stored in park' which is another Apache project so it can be read by other engines and there's a growing community around it so it works a day with spark but we're also working with presto and hive and knife I and a bunch of other engines you know for support as well and of course today though it is powered by spark and it deeply integrates with spark api's so whether you're doing streaming or batch it's very easy to take your existing spark programs and translate them to Delta and I'll actually show you in some examples in a couple of slides but now I want to simplify this picture a little bit and talk about something that I've seen emerge once people stop thinking about the system problems and start thinking about their data quality they start to develop this new vocabulary and you come up with these data quality levels and the important thing here is your data is going to start out as garbage but what Delta is going to do is it's going to give you the tools to incrementally improve the quality of that data until it's actually ready for consumption so these are not specific features of Delta you do not have to have a bronze silver gold table or one of them you can have 10 bronze tables and one gold table and skip the silver step but still I think this vocabulary is very interesting for communicating to others in your organization the quality of the data that they're working with so starting at the beginning we have bronze so bronze it's still on fire it's a dumping ground for raw data and you might say wait a second why am I even storing this raw data why why don't I just start by you telling it well there's a kind of interesting reason for this and that is there are no bugs in a parser that you don't write if you just keep the raw data there can be no mistakes in your ETL code you can always go back to that original record and start from scratch and Delta supports keeping large amounts of data so many of our customers actually use it to keep years and years worth of data we've had people switch from other systems where they were able to store maybe a week or a month and now they're keeping three to four years worth of data and that's actually very powerful because you can now go back and realize that things you didn't even know we're interesting actually are so after bronze we get to silver this is kind of the first step of cleaning you've maybe parsed out some JSON and made it into top-level columns you may be joined with other interesting datasets to augment it you may be filtered out bad records or things like that and again you might be asking wait a second if this isn't my final answer why is it useful to create these silver tables and there's actually a couple of interesting reasons for this one is first of all you might not be the only person that can benefit from this partially clean data a silver table is a very useful forking point for everybody in your organization to get access to this this partially clean data and do interesting things with it but another more subtle one that actually surprised me but you know I've seen a lot of people get a lot of value from his silver tables are great for debugging when something goes wrong at the end of your pipeline it's very difficult to figure out where did these bad records come from but when you have a silver table that is kind of this intermediate result and you have the full power of sequel to ask questions about it it makes it much easier to understand where things went wrong you can ask questions like hmm and how many rows is this column null or how many distinct values are here and you'll see things that surprise you and I'll talk in a couple more slides about some really cool things that we have coming for this kind of data quality concerns and then finally we get to gold these are clean high-level business aggregates that mean something to somebody important in your business it's ready ready for consumption you can connect it to your favorite tool so if you have tableau or power bi or any of those kinds of things you can connect it directly into your Delta Lake you can query it with spark you can create with presto we are working on support for Athena and there's also a pull request for support for redshift spectrum as well so now now that we kind of talked about the different quality levels I want to talk about how people move data through their Delta Lake because this process of ETL is something that needs to happen continuously and something that I've noticed is a lot of people start using streaming here and I might hear a lot of you thinking hmm you know I don't need streaming that's too complex I don't have low latency requirements I don't want to deal with that and I actually think that's the wrong way to think about streaming to me you know low latency processing is one cool thing that streaming can do but what streaming is really about is incremental computation it is this problem if I have a data set that is always changing and I have some interesting transformations that I want to happen on this data set and I want that to just happen continuously and I don't want to worry about a lot of the kind of system control flow details when you think about ETL think about the things you have to do part of it is data transformation but the other part is figuring out what's new and what's old what's already been processed and taking that new data and moving it downstream transactionally guaranteeing exactly one semantics don't introduce any duplicates and don't drop any data make sure you check point your state so that if things crash you can pick up from where you left off and those are all things that structured streaming can do for you automatically and there is a really nice kind of cost latency trade-off that you can make using structured streaming so if you do have super low latency requirements and you're willing to pay for it you can run structured streaming in what's called continuous mode where we actually grab a core we are continually pulling from the source and you can get millisecond latency which is pretty fast however you're gonna pay for that because that core is reserved for that one query and even if no data is coming in its gonna be kind of just sitting there eagerly waiting for new stuff to come in so what many people choose to do is they run in micro batch mode is instead in micro batch mode what you do is the stream will actually be processed in tiny little increments multiplexed across the resources of your cluster so now you can get seconds to minutes latency but have many queries sharing the same set of cluster resources which can dramatically reduce your cost and then finally at the far side of the spectrum there's also this thing called trigger once mode trigger once mode is great when you have data that derives once a day once a week once a month instead in trigger months mode instead of running continuously what happens is you start up your cluster this is great because you can take advantage of the elasticity of the cloud start up your cluster it processes everything that is available right then shuts down and you stop paying for it so now you can get these benefits of streaming where you don't have to worry about the control flow you can only worry about the data flow but also take advantage of the cost benefits in the elasticity of the cloud now so I've talked a lot about streaming but of course Delta is not an only streaming system batch jobs do happen and so Delta has full support for the batch API as of SPARC and also standard sequel DML as well so you can run update delete and even things like change data capture with merge into this is great when you want to do things like gdpr when you have kind of regulatory requirements that where you need to have a retention so you need to say I can only keep data for two years you know delete from where date is you know greater than two years ago and this is actually very powerful and all of these operations are completely transactional and can be mixed and matched on the same data sets that you're doing streaming on one final pattern that I want to talk about is the the ability to do recomputation so the really nice thing about using streaming and keeping all of that data and your bronze table is when something goes wrong you can actually recompute from scratch and it's actually very simple because when you start a stream from scratch the way it works is it starts by taking a snapshot of a Delta table it breaks that snapshot into a bunch of little pieces processes them incrementally and when it's done processing that initial snapshot that backfill it'll switch to tail transaction log and computing the answer incrementally so what you get is the same answer as though you would run a batch job on the same data set so this means when you find a bug in your code or when there's some new interesting analysis that you want to do for the first time all you have to do is start a stream with a fresh checkpoint and it will eventually compute that answer for you and then continually keep it up-to-date this is another place where you can take advantage of the elasticity of the cloud when you're running that first initial backfill I'll have customers scale up to a thousand node machines they can get it done in a half an hour and then when they are done with that kind of backfill part they'll scale it down and in data bricks we have auto scaling they'll kind of do that work for you automatically so that's a lot you know kind of about what what Delta Lake is I want to talk a little bit about who's using it and then you know as we talked about before we'll have Kyle come up in a little bit to kind of talk about his use case as well but so while Delta Lake was just open sourced in April it's a relatively young open source project it's actually been a product inside of data bricks for the last two years so it's used by this slide is a little out of date almost 4,000 organizations worldwide and we processed over two exabytes of data last month alone and it's just on data bricks I don't know what's happening an open source and I want to talk about one particular use case that I thought was really cool so this is Comcast and you I'm sure you're all familiar with them but they have data from all of their customers who are sitting and clicking their remote on their couch and they want to figure out what their content journey is like so they want to know you're watching the Home Shopping Network and then you switched ESPN and then you go back to home shopping netwerk they want to understand what that's like so they want to do this segmentation job and they've been using spark to do it for years but they have a problem they have too many customers and this meant that this job was so big that he would not fit into the SPARC scheduler he would just tip over the scheduler with too many tasks and so they did what any good engineer would do you take this job and you hash partition it you take those user IDs mod them by 10 and spread it out across ten different clusters and so now the job is running great yeah so that that's good engineering but there's a couple problems with this you now have ten clusters to manage ten clusters to pay for ten sets of logs to deal with ten sets of errors ten sets of schedules it just massively increases the costing of Plex 'ti of your system when you have to do this kind of scale out by switch to Delta and using some of the kind of cool tricks that we have under the covers for scalable metadata management they were able to reduce this into one spark job running on one cluster and also cut their cost by 10x as well and that is a dramatic savings not only in engineering time but also in you know your cloud costs and so I think this is this is kind of a pretty good example of what people are able to do once you have transactional scalable metadata so if you're interested you might be asking how am I going to use this thing and it turns out it's very simple to get started with Delta so it's published on maven so if you have an existing spark cluster using it is as easy as using the - - packages argument you can see here over in the corner that will automatically download Delta and install it on your cluster it'll update on all the the things I would also recommend I see a lot of pictures being taken there's a great getting started guide on our website which I'll show you at the end that is even better than this but you can include it in your your projects you know if you if you're using maven and finally changing your code is trivial if you're using the data frame reader and writer all you need to do is change JSON CSV o RC whatever you're using today - Delta and you'll automatically get all of the benefits of transactions and not have to change anything else one thing I'm going to talk about before I go into the kind of nitty-gritty details of how Delta works under the covers is something that's in progress just to give you kind of some some future looking where the Delta project is headed so everything I've talked about transactions and scalable metadata that's great for making sure that your code runs correctly but it doesn't help you if there's a bug in your code and so we have some cool things coming on called declarative pipelines where the idea is once people start using Delta they often have one table in one stream then ten tables and ten streams and a hundred then a thousand and this becomes a nightmare to manage to deploy but also to test and reason about the correctness of it and so the idea here is rather than thinking of all of these tables is independent jobs with independent streams running on independent clusters what Delta pipelines does is it gives you this thin DSL on top of apache spark so you're still using the api as you know and love-- data frames and Scala Java Python or even just pure sequel so you don't even have to do any programming here you can kind of declared if we specify your entire data flow graph from those bronze tables to those gold and the dependencies in between them and so as you can see here I'm kind of defining a data set called warehouse the actual data set is defined just using data frame code so just Scala Java Python or even just pure sequel can go there we can specify details about how I want these tables to be materialized what bucket they should go in whether or not I want strict schema checking so Delta actually has kind of two modes for schema management we can either do automatic schema management where as new columns appear we just automatically add them to the table this is great if you're ingesting JSON data into a bronze table and you kind of just want everything to be there however if you are kind of at your gold tables and you want a little bit more quality enforcement you can also do the standard DML thing where you need to call alter table add column if you want to modify the table and we support both of those and then finally you can register Delta tables and I've met a store so they're discoverable by other people in your organization and you can give them a human readable description so people can understand what is this data who owns it where did it come from and so it really helps with data discovery and then finally my favorite feature is this really cool thing called expectations expectations allow you to take your notion of what quality means and put it into the system you can take that extra domain knowledge that you have and tell the system about it these are very similar to invariants in a traditional relational database so in this example I'm saying I expect that this table will always have a valid timestamp and a valid time stamp doesn't just mean that it's present and it has a year month and date I know the data break started in 2012 so if data from you know 1970 for example shows up that's almost certainly a parsing error somebody you know cast a zero into a date and so we can actually kind of put those there as these constraints and you have this tunable severity slider where you can decide what happens when an expectation is violated in the earlier bronze tables typically you just want to be alerted if more than 10% of the data doesn't parse let me know about it or and in the gold tables you might want you know that you're reporting to some regulatory agency you might want strict enforcement any transaction that adds bad data should be failed and alerts should be you know should be raised so never let bad data into this table and then finally my favorite version of this is this thing called data quarantine in a day to quarantine what we do is you know basically when a record that is seen that is unexpected rather than stop processing we will let process and continue and we'll just divert that record into a quarantine table where you can come and look at it later and figure out how to fix it so like I said this is you know kind of forward-looking this is something that we're hoping to release in the next three to six months so stay tuned to github for more but now to kind of close out my part of the talk I want to talk a little bit about the nitty-gritty details of how Delta actually works because scalable metadata and full asset transactions and a distributed system sound a little too good to be true but it turns out there's actually a couple of simple tricks that we use to get it so starting off Delta on disk looks exactly like your data Lake today it's just a directory that contains a bunch of files with one important difference we have a transaction log and inside of the transaction log there are a bunch of different table versions so as you can see here this directory Delta log is is our transaction log and each entry is a separate file so we create version zero of the table and then we create version one of the table by adding files into this directory and then along side of this we have optional partition directories I say that they're optional because these are just here for your kind of own debugging the actual partition information is stored in the transaction log and that lets us do really cool things there you can even take Delta and modify it to work extra good on s3 so for those of you who don't know s3 kind of has two different tiers there is this range partition metadata tier and then this hash partition data tier and a best practice if you are going to overload s3 is to make sure that the ranges that you're producing are random so a anti-pattern is actually two every day at 12 a.m. create a new directory for that day and flood it with new files s3 does not like that you can overload it so Delta has this mode where we'll create random partitioned directories to kind of avoid that anti-pattern and we actually have overloaded s3 which i think is kind of crazy and then finally data files which are just stored in standard Apache park' can be read by any of those existing tools so what actually goes into the transaction log the transaction log is a set of versions and each version contains a set of changes from the previous version and by playing the whole transaction log forward you can come up with the current state of the table which is the current metadata in the list of files and so the type of actions that can go into a transaction are you can change the metadata so you can you know change the name add a column do something like that you can add a file and along with that file you can also add optional statistics like min and Max values for each of the different columns those min Max values let us do really cool things like data skipping in one particular use case we had a customer who's storing trillions of TCP connections into a database every day and he wanted to be able to very quickly find connections between two computers so you know where source IP equals this and destination IP equals this using these statistics we were able to skip 97% of the data in the table which took a query that would take hours and made it run in seconds which is super cool when you're doing ad-hoc analytics on your data link and then finally remove file which you know can't intake files can take data out of the database okay so that's kind of the high-level structure now how do we get those acid properties so starting with a atomicity we're gonna play this kind of cool trick so we already talked about how changes are stored in these files and each of these files is an atomic unit so each file is a transaction files are atomic but now we have this problem of how do we create files atomically well fortunately we have this really nice primitive on s3 when you do a put to s3 you start by saying here's how many bytes to expect and then if it doesn't get that many bytes it doesn't accept the write so that means you kind of get atomicity automatically out of the box on other file systems like HDFS we use transactional rename we create a temporary file and we rename it to its final destination and so now we have this really nice property let's say we have a table that has two small park' files and I want to do compaction I want to collapse those down into one bigger more efficient park' file well it would be a disaster if I remove the files but didn't add the new one or if I added it but didn't do the remove but now because we create this file atomically we are guaranteed that that compaction Y there happened completely or not at all okay so now we get into the consistency and isolation so in order to have consistency we need to have a serializable schedule we need to agree on what order changes happened into the table so just to give you an example user one can create version 0 user two can create version 1 but they cannot both create version 2 if they both try to create version 2 one will win but the other person has to get a file already exists exception this is something that s3 cannot give you the SD documentation pretty clearly says this is not a lock service we we do not make this guarantee multiple people can put and they will both get a success so we need some other system on the side you know dynamo my sequel something to mediate between these on systems like HDFS it just works out of the box transactional rename can be set to fail if the destination file exists and then finally you might be saying wait a second if every time something goes wrong it just fails then I'm not gonna get a lot of work done it sounds like I have to deal with a bunch of systems problems again well fortunately we have this really cool trick called optimistic concurrency control and the idea is this let's say for example we have two users streaming into the same table how are they going to mediate with each other so what's gonna happen is we're gonna follow the following algorithm when they both start streaming they will start by recording the start version so ok I'm streaming in starting at version 0 of the table they will record what reads and writes they do so in this case they read the scheme of the table to ensure that the data they're writing matches that schema and then they speculatively right out park' files so they create a bunch of park' files that will be the data of this transaction they are not part of the table yet even though they are written out into the directory where the data is stored and then they go to commit this is what actually makes them visible to other users of the table creating that transaction log entry that adds them there in this case user 1 1 and user 2 lost but fortunately what user 2 is going to do is we're just gonna check and we're gonna say okay wait a second what happened in version 1 does it change anything about the state of the world that I care about and in this case it doesn't I only saw the schema of the table the schema of the table has not changed I cannot know whether or not I happened before or after transaction number 1 so what the system will do is it'll automatically reorder these things and try again and so everything will work and you won't even know this happened and then one final cool trick is massive metadata in your data Lakes you often have tens of thousands I've even seen hundreds of millions of files and that starts to become a scaling problem in its own you're starting to have a big data problem even with your metadata and we have a cool trick here which is let's just treat metadata like data we already have a big data system here so the Delta transaction log is constructed in such a way that you can actually use spark to process it as well so we'll take all of these actions from the transaction log the ads and the removes and we'll load them into spark we'll do a big shuffle - to reconcile them and then we'll write the result out as part a and what's called a check point a check point is the snapshot of a table at a specific version it's it's basically shortcuts having to read the entire transaction log up to that point and the really cool thing is since it's written in this nice binary encoded efficient format barkay that spark can query you can now do filtering on the metadata very quickly so in that case before that I was talking about where we have trillions of TCP connections and we're gonna very quickly sort through them we can actually just scan through the statistics in the checkpoint to decide which files are relevant without ever actually reading the data and like I said that can reduce the time to run a query from hours to minutes in some cases so very quickly I want to talk just a little bit about the roadmap since this is a relatively young open source project so you know we we just open sourced it in April but my goal is to have full API parity between everything you can do in data bricks and everything that's available in open source and so you know we kind of started with adding support for Amazon s3 we added support for update delete and merge and with spark 3.0 we're gonna be adding full support for create table alter table all of that DML they're also really cool pull request opened add support for redshift spectrum and athina so that you can use those tools to read from your delta lake as well so i just want to put this up here this is the the kind of delta website i encourage you to check it out come join the slack channel i have a team of 10 engineers who spend all day sitting in the slack channel a string answering questions about delta there's also a great getting started guide that shows you how you can start writing Delta programs today and with that I'd like to invite Kyle from cabbage up to talk a little bit about how he's been using Delta production awesome thank you Michael while we get Kyle set up two things I just wanted to share with everybody you can find out more at the data bricks booth which is number 416 if you go over to the expo you can also sign up for our party tomorrow night the other thing is that we have six hours of free training on data bricks Delta spark all available on our website if you go to data bricks comm slash AWS on that page you'll find links that will take you to that and also other customer sessions and webinars and stuff I think you're all set they're all so great can everyone hear me yeah I think you can I can hear myself so all right so my name is Kyle Burke I'm the head of data platform at cabbage today I'm just gonna talk a little bit about our architecture we have a data like we've been used in Delta for I think about a year now this is second data like I built the last time we didn't use Delta took about a year and a half with Delta I think we started rolling out kind of our top priority things within like three months so it absolutely makes things easier so we'll dive in out a little bit and I apologize my slides aren't available we'll do the best we can here so I'm gonna talk about our previous platform and our new platform to make a little sense when we start talking about the day lake and why did we build the data Lake because it wasn't all you know usually when you see these presentations all about how everything is great I'm gonna share about all the challenges we had like then why do we build it so I think it'll help people curious to see if people are in the same boat and I'm gonna talk about real quickly about some Delta optimizations that we use that are that are very helpful because our goal was not only to make data available but our users wanted a to be more accessible quicker so there's this optimized function that's very helpful that we'll talk about a little bit and lastly I'm just going to talk a little bit about batching streaming because we actually use both we take a stands is like unless we don't want to incur the cost unless there's a real reason but we do actually have a few reasons we're up to five or six streams that are running now and I'll go over one example that talks about how we made a decision like yeah it is it actually is valuable to have this data in real time all right so so first I want to talk about like it's gonna head of myself here first is who is cabbage like I wanted anyone talk about it because it'll help understand like why is our data exploding so much and basically what cabbage is is we have an automated platform to do small business loans we will make decisions about whether to extend a loan to somebody within a few seconds we actually we actually have a spinner on our website and we spin it for 10 seconds because we want people to think like we're taking all this time to make decision but we connect to their data sources we import them they may be like Wells Fargo and maybe Amazon or maybe PayPal all different sources we import them we run some data some machine learning mom's on and decide like their revenue and expenses and then we'll make a decision and fund them within 10 minutes so the nice thing about for small businesses and it's kind of in line with ADA based is saying is we do we feel like we do what's right for the business if you look at the small business market right now what happens is these smaller businesses 20 or less it's incredibly hard for them to get the loans typically they can't get them and if they do get them they want them to take it much more money than what they need what we're trying to do is just give them the amount of money they need and not anymore to get them to right at the at the last minute we actually have a data set like a model that we're doing right now that are particular people's balance there's gonna go too low and then we can fund them money right away but only the amount they need so it's very helpful to small businesses it kind of helps them manage cash flow and that's that's one of the number and reasons small businesses go out of business they don't manage cash flow well so we're we look at ourselves as trying to help them out to do this the problem of taking in all these transactions is it's a ton of data wouldn't we have about 200,000 return customers that are happening all the time and we're getting all their transactions every night going back to years we have about a million people that have signed up so we have all their transactions so we had this kind of explosive growth the data and we had to figure out how to use it effectively in essence we were doing a lot of some things Michael was saying we were cutting it off at a month or cutting it off for three months but in our Delta Lake work or data like now we have you know two three four years from most a lot of our clients other things about cabbage just don't let you understand the scale this year we'll do three billion in loans to date we've done eight million we've been in business since 2011 we're based out of Atlanta we have offices in San Francisco New York in Denver and Bangalore oops I'm gonna I'm gonna keep thank you all right so to compare our old platform a platform we were very sequel server heavy and map are heavy we we kind of break the world up in a real-time system meaning like people are on our website and then our batch system which people doing analytics we were using map our streams I got I got the cabbage about a year ago less than a year ago the real-time system was trying to implement map our streams and they're they're batch system was trying to boil sequel server we had other we have the other minor stuff but in essence we were a hybrid and we had some on pram and some in the cloud the new system is all completely in the cloud one of the biggest problems that we were having is it between map bar and sequel server it's a very labor-intensive when I talk about like trying to keep all this going the biggest problem I saw in our environment is that we went at the time when I started my team was like 12 people and they were all data engineers and they were spending all their time trying to keep servers running and that was not what the point of their jobs were the point was to deliver data that was valuable so we had it we had to come up with something that allowed them to kind of focus on what they were good at some of the side benefits the cost when I just talked about software and hardware we were spending about two million dollars a year when we're looking a new platform it's gonna be somewhere between a million and 1.4 million so we're gonna actually save money and actually deliver more value so that's a you know that the cost is nice but actually delivering the data that our business needs was the challenge I don't I can't have the slide up here but I think it'd be helpful is we actually had for sequel server environments we had our runtime system which is everyone at our website we were a monolithic company where everything was in this one big database hundreds of tables huge environment which we've since we've been switching to more of micro services which has been going very well we actually had to take our etw put it on another server that was separate and we we got the point where only our jobs were running people who weren't actually hitting our EDW on that cluster because we were replicating all the data too we had a offshore server that everyone from all sure we used another server and then we had a us analyst server so a these for instance is running which sounds great when you first start but then they have these problems because we were using transaction replication and we're using SSIS to move the state around and what we end up having a problem is is that you know one server would say one thing and then once or save another we'd be constantly getting slack messages saying hey the data in this server doesn't match the server let's go check oh yeah oops we didn't we didn't catch that we didn't that data didn't replicate fine so we were dealing with those three issues all the time and then we got into situation or like sane or like weird you know we have years of transaction data you know nine years basically if transaction data that we can use so we started deleting data or splitting tables which was which was our our latest tack where we have just the last 30 days and once at one table and then we have this other server that would have all the historical data that got real people didn't like that because it got complicated we'd start seeing these crazy queries running against our sequel server that we limonade cases I was in Bangalore and in May and that one guy said to me goes I come in every morning an album for everyone else and I'm like why do you do that he goes because if I run this one query it'll take it it'll take an hour to run it before everyone gets here but if I run it when everyone's here it'll take between five and eight hours so I was like wow that sounds terrible so so we actually we sat down with them this is true case we took his queries and loaded onto data which we had a lot of data there the queries are running in ten minutes he was like a static so that was that that was the nice thing is that we had this kind of group that wanted a new solution so it was it was that's what made it easy to migrate to the data like the second piece I'll go through like I my real-time system is really cool it's all like acha streaming rights into a Rohrer database one of the neat things we've done is we've with data bricks we've actually released our first deep learning model that actually predicts when people's balance is going to a certain threshold but today I'm mostly going to talk about our data Lake because I think that's what everyone was kind of here to hear about so what we're doing now is we're unloading we unloading some datum a sequel server we have a lot of data just kidding it gets written to s3 and this Michael was talking like our copper tables any sort of decision we make any sort transaction automatically goes to s3 the problem was is its millions and millions of files and it was almost unusable unless you knew exactly what you were looking for general person couldn't get to it so we started taking those directly off s3 concatenate them and combining them and releasing them so that we can basically get rid of this transaction data that's sitting in sequel server other things we were doing is we have a feature store we called our metric store because it has metrics and features data science features one of the issues data scientists that was having is that in there when they're trading a model they calculate their features one way but when they go into production they may not be calculated same way because they would kind of do the throw it all over the wall and let the engineers worry about how to calculate this feature so what we're doing now is we're calculating features in real time and we just unload our raw database onto our data like and so the same data that they're training against they know that those features are gonna be calculated the same ways there are in production so I call it like it's like a drift that happens or they now they're not getting that it really think it's gonna go one way and it's gonna happen another way with the models other things we do let's see what else am I forgetting about we have a bunch of first party third party it's in there to probably take a little all the stuff you guys do the way we make it available to people is we we actually let them to use data bricks and I said I'd let it open I said if data bricks is for you use data bricks if you're just going to do sequel queries and you should go to sequel server we make it available Athena the data bricks released a feature to basically let your data bricks files be available that your Delta tables available as manifest files athena so you can easily query delta files with the in athena and data bricks so i think it is actually a little bit cheaper so i i would love it if they were use it but we're seeing like a huge spike in our database uses because a lot of people are they want to do python they want to do sequel it's not just let me run this query it's like i have to i have to string all this stuff together now so that's why data bricks has taken off for us we we also use tableau a lot one of the things that we we started doing is we were trying to simplify our stack so almost all of our tableau reports directly go off data bricks and there are certain cases where it's not fast enough because we're not using the the most the most recent version but most cases it's fine but the way that gets through it as the tableau allows you to do extract so we basically if it's a really high throughput case we just we just do extracts and we've been really successful with it other cool things we do we do you model monitoring things like that that we when we unload we actually when we're actually doing our feature store we actually write metrics about the model to the to the metric store we unload them and that tableau can read those and we we've got a bunch of cool graphs about how our models are performing and lastly we kind of tie it all together one thing that we do do we use air flow we'd like air flow a lot because we are current sequel server has about 300 jobs that's running and that's it as soon as you get to the point where you have a lot of pens ease it's really hard to kind of manage it in date of birth but with it with the air flow you basically get a this dag that shows you everything all the dependencies so if you know if you're changing something upstream all the potential down streams we know it gives us a way to test what we when we make changes so it helps that a little bit alright so uh how rapid real quickly some of the Delta table operations it's really the one thing that I'll say is before Delta we basically use partitions partitions was our way to kind of make things fast so I had a case where we were doing like we had a very large data set with day hour and geography because we had a lot of like time-based geography type of queries going on I thought that'd be great that'd be fast the problem with we end up running into is that and a bit too many files too many partitions and and spark spent all this time looking for files so now we had very simple with our perdition's almost everything is by day and then we use the indexes there's a function optimize there's a couple different ways you can use it you can say optimize table and what that'll do is it'll basically take all the small files and tournament big files we have any we have an example of one of my one of my engineers just sent to me had some old code we hadn't really looked at in a long time and it's like we people are telling us it's running much slower so he went and he just ran optimized table on it and he sent me the results and basically in one the before the optimize table the spark said I was reading a hundred and fifty thousand files to actually return the query after he ran an optimized table on it it rained five hundred files so it was like a forty fifty percent improvement it's just by doing that one command so what we've been doing is one of the nice features is because we're loading things generally by day there's like you can put a where calls on it and basically it's where bipartition column so every time we load it then we just run an optimized bipartition and it keeps up-to-date and that's something we just started using it's it's super easy we also had an interesting use case I wasn't expecting this but when we're trying to move everyone from sequel server off into the data like you know I sort of think of day like this hey everyone a greet some day to me want to run some queries and things like that I don't really want to do these single user lookup things but in actuality we had that case we had customer support that actually needed to actually look at a particular business and see all their transactions or see whatever so our transaction data's our biggest ones so to move this move our customer support off a sequel server onto our data Lake we had to optimize to let them quickly look up transactions so we had a we did a test we had a table with 750 million records in it and we want to be able to very quickly look up all the users transactions so before we put optimize on it we were doing some tests and generally was between one and one minute and minute 30 seconds but would you felt like people would get gentleman people said it if this thing takes more than 10 seconds I'm bored and I move on so so we we ran out to Mai's on it and the same query that was taken that long only took three seconds so was it gave us the confidence that you know like hey we actually have the ability for all these things that we don't anticipate we have these kind of very simple tools that we can kind of use to basically meet the users demand all right so the last thing I'll talk about real quickly is batching streaming as I said most of our stuff is batch we're getting into hourly like our Lee is kind of the the most I generally like to do until we can get more of my data out of sequel server because a lot of our data is coming from sequel server and we're hitting that we're you know forgoing every hour hitting it we're actually putting a tax on it but there are cases where where the data is not coming from sequel server and at that point I don't it doesn't bother me that much and one example is we have a streaming application that basically every time somebody's on our site and they may like add a new bank or they can do anything we have this then stream that comes in that we write to a delta table and it in our application developers write to about 10 different topics could be a channel ad it could be wizard completed like all the events say you know a website underscore event underscoring that gives a different name we take all those events like 10 or 15 different topics we stream all into one table into one Delta table and then what we do is what's nice is that if somebody falls is going through our wizard and they fall out we actually can follow up with them and the way we do that is basically we read this event table and then we stream these events we enrich it with things like what is their revenue what is you know what is their expenses and what that helps us do is it does a couple things worse one is we have three different sales teams we can we can stream the event to the right sales team sale source the second thing we do is at any time there's thousands of people that we could call back at any one point but we want to call back the people we think there that are most likely to qualify so now with this additional event list additional metrics that were putting on this event stream and sales force now we can order them based on some of things like that maybe revenue or we have some scored some data science scores that we call and we score them and then that goes in a sales force and now our sales team is actually focused on calling the people most likely to qualify and it's helpful driving that's why we were growing but like 40 percent is like it's because we're very focused on calling the right people so it's yes that is lower expensive to do real-time but it's absolutely worth in that case so we have like four or five of those these cases that are that are helpful that is it Kyle obviously you guys are doing some really cool stuff first stuff how can people get in touch with you if they want to find out more or get engaged with you in terms of learning more about what your team's building sure I'm on LinkedIn my LinkedIn is just look up cowboy kind of work with cabbage I should come up look at go to cabbage calm the one thing is we we I do these talks online my company tells me to go do these talks is basically hiring so we we're trying out like 50 people next year which sounds a bit like our HR people or going out their minds again how they're gonna hire 50 people next year so go to the cabbage website there's all different areas of IT that we're looking for so yeah we'll definitely put a link to you blog post as I mentioned we're gonna do a blog post here at the end of the week recapping everything one of the things you mentioned was the tableau connector and so in 20 19.3 the latest version of tableau that released there is a new optimized connector it's 30% faster and it now connects to specifically to Delta Lake so that's just one thing if you're using tableau what's what's kind of your timeline for upgrade when are you guys thinking you're gonna move to 2019 no three so the issue that we have is tableau online or that yet okay let's go dance that are coming very soon okay we looked at you know the reason we did data Britax and tableau and things like that is we're trying to get out of like everything roll around and so as much as we want that new tableau thing is we don't want to bring that in-house and have to manage it you know we had too much issues trying to keep servers running that I'm very much like hey let's if they give me an online version I'm gonna use that because help let's focus make sense have you used the time travel feature at all is that something you've come up for any New York yeah maybe you can kind of explain what that is and then talk about how you've used it if that's yeah that's time travel is basically you can look in like a time stamp on any of the files and say what did this data look like at this time stamp and that's basically for data science we have a lot of cases where as soon as they the models aren't performing the way that they think they're performing they want to go back and look at the data as it was when they actually train the model to see if something else is going on so that's generally what it could use is like that yeah awesome I mean and we've had a couple of customers who've had data you know sets but that have been deleted by mistake and stuff and they've been able to bring that back obviously if you're looking at auditing you know there's another ability of Delta as that ability to time travel and look at you know data sets that were used in any of your algorithms or any any work that you did so so that's another great piece Michael did you have anything that you wanted to kind of kick in and ask Kyle as well no okay okay we've got about eight minutes left so what I'll do is I'll walk around the microphone if anybody wants to ask Kyle some questions you may want to raise their hand to support the Impala the Impala support the elderly yeah that's a great question so the transaction protocol is open-source I published it a couple months ago pretty easy to implement I've not heard any interest from the Impala community but if anybody wants to send him to me I'm on Delta slack I've been helping the starburst people and presto quite a bit so I'm like happy to answer questions if if anybody wants to build that hi so we have we have a data Lake and what we're doing is we are using DM the using data migration service to get the data into s3 which gets it into CSV format and then we use EMR spark to to generate the park' files is that something that I can use use Delta Lake for yeah that's a great question so you can use structured streaming to read from your CSV files and you know put it into in the Delta Lake that all works with just apache spark and open source Delta Lake the other thing that we have in data bricks is we have a system called auto ingest that's significantly more efficient because what it does is it actually subscribes to notifications for that bucket so that rather than listing the entire bucket describe what is new as we all know s3 metadata operations aren't quite slow you can get like maybe a thousand files a second out of it it's it's significantly faster to do this by both notifications as well so either of those work but it'll be it'll be much cheaper to do it I yeah so definitely so the question was do you support change data capture including deletes and the answer is yes we support merge into which kind of lets you take any stream of data from anywhere and say what to do when certain conditions are met so you like basically give it a key so you say merge on you know this equals this when it's already there update it when it's not there insert it when this flag is set to true delete it and Delta takes care of all of that and that's available and you know both open source and the data bricks version of Delta Lake is the data brick tableau connector able to work with the open source version or it's something that we need data bricks yeah so the the data bricks tableau connector which is now supporting Delta is supporting Delta on the data bricks implementation so that's really what you're looking at with that 2019 dot three release to be able to hook tableau to open source Delta Lake that's something we'd have to look into actually if you come by the booth we could talk a little bit further about that you can connect it we're using the spark JDBC server but we made a lot of modifications the JDBC server to make it better yep hello how do I actually run queries on the Delta table I mean how do I get all these different versions of the table and all the metadata yeah so I mean it's just normal spark so you can use spark sequel and write sequel queries against it you can use the spark data frame API is to read from it and by default what you get is you always get the current version of the table but in the data frame Reader API you can also specify which version you want to read when you load the table so you say like load this table version as of and you can either give a timestamp or this like version number and data bricks we also have a feature in sequel where you can use that syntax in as of and we are working with the spark community to push those changes into the spark sequel parser so you can do them there too yeah so the stuff that we have out today for presto and Athena the way it works so okay so we only support yeah so basically for those those two what we do is we create manifest files and I do not think that it lets you specify which version to create the manifest files but I could be wrong but one of the reasons that starburst is trying to build their own native connector is so they can unlock all these features pressure them go talk to Bevan tell them to hurry you don't tell them you want it I I am here to answer any questions about the transaction protocol to help them build that definitely if you come by the booth you can see a bunch of the notebooks that we have that demonstrate a lot of these capabilities so that's a great way to get a good look at that it's booth number 416 okay so I have a couple of questions worries on power bi is there a connector for delt Alec okay and then the second question is more about I would say enterprise question that if I have a big enterprise platform with 10,000 jobs running a day can data dealt Alec replace all of that tomorrow yeah I mean so now you're venturing into data bricks territory like Delta is a transactional storage format for a table it gives you all of these cool properties it is not a workflow Orchestrator that said tens of thousands of jobs running on lots of SPARC clusters is what data breaks so just to reiterate for anyone who doesn't know the the expo area over at the sands Expo which where the Venetian Palazzo and all those buildings that's where the the main expo is so if anybody's looking for us there I think some the updates deletes it kind of implied that's on the scholar API only is there or is it inspire know we have Scala and Python and in SPARC 3.0 we all support sequel as well hopefully but it is implied in Python already thanks yeah the slides are out of date hi how does the compaction work is there a process that you guys support within Delta cuz you have all those versions and you probably have a lot of files because you're only appending yeah okay so there's a bunch of different questions here so the transaction log automatically takes check points every ten commits that just happens automatically we automatically clean up the transaction log after 30 days but that's tunable in terms of cleaning up old versions that no longer exists there's a command called vacuum and so when you vacuum the table you give it a retention period by default it's seven days and we will delete any version of the table that is more than seven days old now that prevents you from time traveling or streaming from seven days before so your streams need to be up to date within the window that you that you're doing from okay at this point we'll go ahead and close out big hand for Kyle and Michael thank you so much great presentation
Info
Channel: AWS Events
Views: 4,729
Rating: 4.9322033 out of 5
Keywords: re:Invent 2019, Amazon, AWS re:Invent, ANT347-S, Databricks
Id: txNGR4ZiuhU
Channel Id: undefined
Length: 58min 20sec (3500 seconds)
Published: Wed Dec 04 2019
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.