Spark + Parquet In Depth: Spark Summit East talk by: Emily Curtin and Robbie Strickland

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
thank you guys are we on good so I appreciate all you guys coming out unfortunately I think most of you are here because the weather has kept you here but I'm also disappointed that only like a point zero zero zero one percent of my colleagues showed up that supposed to be funny 400,000 people so anyways he said I'm Robbie Strickland I lead the engines and pipelines team on the Watson Data Platform and also the spark Technology Center this is my co-presenter Emily Curtin who also works at the spark Technology Center she's a software engineer there we are both from the beautiful warm Sun filled city and a forest of Atlanta Georgia just to clear things up if your impression of Atlanta is too hartsfield-jackson north terminal or the 7585 connector at rush hour or maybe you've never been and you're kind of surprised we drive cars and not cows to all of you I say gently please come back and come see us we have great music's great southern an international culture National Park with whitewater kayaking within city limits amazing food and art great boards teams although we won't talk about Sunday actually we were going to come wearing Falcons jerseys and I sent her an email at like 9:30 to that effect and then we got we got canned so but we won't talk about any of that so we hope you'll come back and it's a lot more than concourse B so anyway today you guys don't care about that today you care about Parque that's why you're here so today we're going to deep dive into Parque it's a quiet but powerful tool that will change your workflow if it hasn't already so I'm going to talk about our reasoning for using parque and then I'll hand it over to Emily and she'll walk you through the technical details because of course I'm a VP and she's an engineer so without further ado let's talk about storage storage sounds exciting right you know we've been talking for almost two days about all kinds of high-minded things like distributed computing and whiz-bang technology and machine learning so let's take it down a notch and talk about storage why would we do that because storage matters yes streaming is cool and important and yes we want to think about real-time applications in machine learning but often we just want to churn and burn through absurd amounts of data to train our models or query for patterns or whatever so we need to make sure that our data storage is performant efficient and reliable like this bagger 288 here so it let's define our goals for efficiency a little bit more so first we what we need is a data like right so let's make a data Lake so what are some good goals for our data Lake well first we want to integrate it into our stack we want our users to be able to figure out to figure it out without bothering us too much we want it to be resource efficient but we want it to be resource efficient because resources cost real dollars real man-hours real compute cycles and bad resource utilization really impacts our ability to provide fast queries so really we want fast queries and we don't want to spend an arm and a leg to get them everything else you could be considered a bonus so how do we get this kind of resource efficiency well we get it by sweating the small stuff the small stuff will blow up when your data gets big when your data can no longer fit on Emily's favorite flash drive here and when you're talking more like tens of petabytes of data like we have talked about in our past past experience then hundreds of megabytes those little costs add up both in terms of money and compute time so let's look at i/o time as an example IO costs really hurt and we're not just talking about disk i/o we're querying in a distributed environment so we have to get our data off disk and then across the network to our distributed computer so we have disk IO and network IO to consider its nanoseconds to memory but it's microseconds to a solid-state drive and it's milliseconds to spinning disk and then it's tens of milliseconds for network this is definitely a detail worth sweating over when it comes to choosing a storage layer for our data lake so let's take a slightly opinionated look at some options this is an opinionated table so don't blame me for it I'm comparing three options for storage files compressed files and databases let's first look at the files I'm talking TSVs CSVs txt JSON whatever as so long as it's filed files are really easy to use there's no learning curve there's no administration you don't have to set up page or duty on your files and they integrate really well with your stack whatever your SEC might be because everything handles filed here I'm considering my stack to be just sparked so where do files fall down well they're big they're actually enormous and slow they hurt badly on those discs and Network IO costs and when you're querying them the performance is terrible because spark has to pull in the whole file to pull the columns and filters you're looking for so can we do better with the database well databases are a great tool and sometimes they can be the right tool for the right job but data Lake storage is the wrong job it's impractical and crazy expensive to scale a database to handle multiple petabytes so yeah we can get better queries but now we're paying for servers per cycle for man-hours to administer it for your time on pager duty it's a mess so can we do better than these two options can we combine the ease of use of files with the query time of a database I say yes we can enter our hero our K this is a benchmark done on a Wimpy SPARC 2.1 cluster with one master in three worker nodes all data in HDFS and the CSC weighed in at about 420 gigs you can see we've got a hundred X speed up between the raw CSV and Parque compressed with snappy and you might be thinking that's unfair that's a column collection query well the table squints can query looks just about the same the parkade formats shuffled around in terms of who is fastest but we're still close to a hundred X speed up between CSD and the fastest Parker so hopefully I've convinced you that parquet is your best option for data leg storage so now I'm going to turn it over to Emily who will take you into the details Thank You Robby so what is this mystical parte format well the project will tell you that Part A is a call nerd storage format available to any project and I will tell you that it's a binary format so if you just cat it in your terminal you're going to get mush there's an API for the JVM and the Hadoop stack or if you're at the wrong conference today there's C++ and native Python it's columnar it's both encoded and compressed two different processes and it's machine friendly what do I mean by machine friendly let's hang on to that the best way to learn about Part A is by example so I'm not going to take you through the data set we used for the benchmarks instead I'm going to use a very important data set and if you've never encountered this data set before I highly suggest you get familiar with it particularly like the first column kind of in the middle what do we have here we have some strings and different strings and some data types I'm going to write this out in two different ways I'm going to use a flat schema and the nested schema I'm going to hammer on this nested schema point because one cool thing about park' is that it can handle arbitrarily nested data here I'm just doing some really simple string to int string string maps but it can handle arbitrary nesting as far as you want to take it so for illustration here's a really ugly TSV this is my flat data and this is my JSON data and it looks even worse in slide format how do we write this out well you guys are spark experts I'm not going to belabor this too much we're going to do spark read teledensity SP say it's got a header and then we're going to take it to an RDD and map over it well we're going to do that because if we don't infer the schema for this it's going to work but it'll write everything as strings so this is kind of an old-school way of doing this there's lots of ways to do but I'm going to make a case class with like everything in my schema and I'm going to make this awful brittle method that will break your students the guys upstream to change their schema and then I'm going to use that to map over my RTD take it back to a data frame and then I just write with the compression as snappy that's default I'm just doing that here for illustration and then out to park a so not too bad but that schema inference makes me really sad and it's a lot of typing and then methods are really brittle but at least I'm not doing this in Java and at least I'm not doing this in MapReduce so it could be a lot worse so now that we have now that we've seen how to write the flat schema lipstick to the nested schema because everybody loves JSON parsing right yeah well I do in this case because all I have to do is spark dot read dot JSON it'll pick up on all of my data types and all of the nesting automatically so if your data is a little messy this may not work if beautifully is on this slide but for my data it works just fine so again I just take that out right option compression snappy and out to park' so I feel pretty great about one-liners I don't know about you guys so now that we've seen how to write this out let's go to the terminal and see an actual example look throughout the clear my screen so in this folder I have to park a files if you can't see it ah that's problem nope sorry guys better real alright I have two files here but I'm kind of lying to you when I say that because these are actually folders so I have a little success message from Hadoop and then what I'll call a compressed file it's compressed with gzip in this case so if I just want to crack this open and look at it it's going to be a bunch of compressed garbage because it's a binary format and it's not made to be used like that so instead I can pull out some special tools and there we go this is park' tools this is available for free as part of the Apache park' project you download it compile it run it through the dupe so you can see I've got all my columns cat it out really nicely so if I come here and hit the back button for five minutes and then do meta I'm going to see all of my metadata for this part a file as well so that's all I wanted to show in the terminal let's go back to presentation okay so we just saw that in the terminal we've got a little bit of an intuition for it let's dig into what that schema actually is so it's pretty obvious that the first column will be our column names oh and I didn't show you the nested schema and the terminal but there it is for reference so I've got my column name got whether it's optional required or repeated optional required whether or not it's no that's what that means repeated is what it sounds like the data type in this case it was binary if it's binary we have some encoding info and then these repetition values and definition values this is where Part A is borrowing really heavily from Google Dremel Dremel was a system designed to do arbitrarily nested fast query storage sound familiar so I'm not going to lay into these are values and D values too much you can think of the Arve use as whether or not this calm is repeated and at what level of my nesting it's repeated so we can see in this case link is a repeated column and we're going to have to dig down a little further to get that repetition definition is how far we need to dig in the nesting to see whether or not that individual piece of data is going to be no so if you want to talk about this later I'm happy to but I'm going to leave it there for now you can see down at the bottom there's you to read all about this just for emphasis here is one part a row two different ways again I'm hammering on this nesting because part a and handle nested data so now that we have this all written out and we've seen it in the terminal how do we query it Yeller spark at first you've probably seen this a million times so I'm going to read it in from s3 in this case I create or replace a temp you there's lots of ways to do this this is just one and then at the bottom run my sequel queries to slightly different queries same result in Zeppelin about Zeppelin which makes me happy so hopefully you've got an intuition for how this looks how we work with it let's talk about how it works so I'm going to take us to the 30,000 foot view then we'll dig all the way down into the details and we'll fill up the in between so stay with me here's our 30,000 foot view in the file system so I've got my folder as we just saw a little success my fish from Hadoop some common metadata and metadata I'm putting you here for illustration these are schema summary files written out by spark and these are actually turned off by default in more recent versions I'm faking a partition file here we'll talk about that later the meat and potatoes is these compressed files once again so this is the high-level view all the way down in the details let's talk about what we're doing on disk so with this really simplified table I'm going to if I was writing this in the row oriented way I would have on disc string different string and string different string and all kind of mix together and this is tough on our cash when we read stuff into our like l1 l2 cache it's not going to be very aligned also if values are no we may get some branch mispredictions that's going to be unfriendly to our processor so much better way is to write this out in a column oriented way so I have everything from column 1 everything from column - everything from column 3 I'm not storing null it's all right they're all the same roughly the same size and it's much more friendly to our bits and bytes kind of hardware level stuff another thing I get from this column your storage is I'm getting this data that's of similar property and that means I can pull out every single encoding and compression trick in the book to squeeze this down as much as possible so let's take an example it's what you've incremental encoding I can store album titles Led Zeppelin Led Zeppelin - on and on 458 bytes if we don't get technical about de limiters or I can store just what we need for the first string just what we need for the second string to derive the second string and so on and that's a 58 percent reduction another example looks like a dictionary encoding so these record labels I've only got two of them so why don't I make this tiny little lightweight dictionary and then I can take 84 bytes to ten bits and squeeze that down into two bytes and then I've just got this little dictionary that'll get amortized as my data gets bigger so there's lots more of these encoding schemes I'm not going to walk through the mall that would be boring and we haven't had our coffee break yet so you can again there's a link at the bottom if you want to see more let's take you from so we saw the file system we saw the details it's fill in the in-between park' has within each compressed file there's a tree of structures so the SIA gram is a tree and we're going to start at the root node which is the file metadata this is where we store our schema or thrift headers offsets some other stuff number of rows so feminist part the row group let's think of ourselves as sparknotes and if I give you a million rows to write you're not going to write all a million rows at once let's just consider the first hundred thousand you know I've got this group of rows the row group exactly what it sounds like so from that row group I'm going to consider each column individually and I'm going to cut these columns up into chunks two important details here if an individual value is no not going to store it if the whole column is no I'm also not going to store that why would I bother so some you need at least one column chunk per column that has a valid value you can see up here in the available column two has two chunks the other sheets have one within that column chunk will have a shared page header which brings us to the main event which is pages pages are the indivisible atomic unit of parquet this is we've got a little bit of metadata some R values and some D values another detail if we have a flat schema we don't store those because they're all the same and then where we came from just a minute ago are encoded data so encoded data it's all the way up the tree and then that that all makes a compressed file and if my diagram confused you if you or if you want to read more about it there's the non slide friendly version of the actual format spec and you can see more of that on the web so now that we see how parquet works let's talk about how SPARC works with it to get some efficiency how do we make those queries so fast well the first thing is that like any column store we're going to pull in just the columns that we need and that's common across anything any columnstore you're going to use that thing is that we can write using partitioning this is not entirely unique to part a but the API provides for a really nice way to do this so I can write partition by whatever columns I want a really really common case partitioned by year-month-day our time series so another thing we can do is spark we'll take our queries and if they have filters on them it's going to take those filters and push them down into the data scanning process so I'm going to borrow an example from some of my colleagues at IBM I have this query where I want between 1200 and 1200 plus 11 so I'm going to push this down to the column chunks because the column chunks have statistics like mins and maxes where I can evaluate this filter condition and see do I even need to pull in this column chunk if the max is 800 and I'm looking for 1200 I don't need that column chunk maybe I don't even need that whole row group so this is a huge level of i/o savings and all kinds of savings we get there just to put some intuition on this this is a physical plan for the query I showed you a second ago reading it from a CSV source there's a big stress it's really ugly you're not supposed to getting into from this slide except that it's big here's the plan for reading part a again there's a bunch of stuff but what I want to emphasize is pushed filters this is where we're pushing those filters down and you can see they match exactly what we were looking for in that sequel query so to summarize that little section we're going to get just the data you need just partitions you need just the columns you need just the chunks of those columns and you're thinking that that's the best thing since sliced bread this is wonderful what's the catch because there's always a catch so there is a catch on this push down filtering it's not going to work exactly correctly when you're using store like s3 because there's no random access and we have to do that initial network IO to pull that over before we can dig into it and then it'll work pushdown filtering doesn't work on nested columns yet and there's a whole binary versus string deal which means that it doesn't work on strings for the moment but it'll work on numeric fields boolean that kind of thing some people would say that the next limitation of Part A is Right speed and to that I say who cares because in most cases this is going to be right once read forever and writing parquet is fast enough you know we do have to do some encoding and compression and that takes a while to do but it's worth it because you get such fast reads so the other thing I want to talk about is immutability these compressed files you can't just crack them open and manipulate them on disk because of all the headers all the offsets all that tree of metadata that will be invalid if you try to mess with it so effectively these are immutable how do we get around this there's a couple different ways we can say write using partitioning we can reimagine our data as a time series for example and append more partition folders we can do something that we do at my former home of the weather company comma an IBM business and front this with Cassandra so data comes into Cassandra and when it's sufficiently historical we spool it out to park' or we can do write mode append and add additional row groups with their schema embedded as file metadata so how does this work in a streaming content this is an ongoing thing on the walking dead platform it's pretty similar to what I was talking about with Cassandra so data comes in until some watermark condition is met time size number of rows we groom it and then we write those groomed rows out two part a and just append them as additional compressed files I'm running a little shy on time so I'm not going to talk about tuning in tips but these slides will be posted and you can see those later so I may skip straight to the summary back to this chart hopefully you'll agree with me at this point that parquet is cheap fast wonderful amazing etc and in summary Part A is a binary data storage format that in combination with spark and they both fast queries by getting you just the data you need getting it efficiently through encoding and compression and keeping much of the work out of spark and with that I want to thank you all for coming and I want to open it up for questions thank you having in Rabi give them a big hand for braving coming from Atlanta and facing a hostile crowd in Boston and I'm just kidding yes please go ahead we got five minutes so you've got in Temple time what's the encoding strategy so there has to be a default encoding and then how would you do a manual encoding override is that possible that's an interesting question I'm not sure if there is a way within SPARC to do manual encoding overrides most of the time it's definitely done automatically and we just like spark and the API picked for us that sampled is that how it determines the encoding that is a great question okay we're just going to alternate back and forth go ahead how does spark compare torque ha I get to open up my extra slides Oh RC is also a columnar format its index and it doesn't handle nested data so those are the the differences not better or worse different some of the benchmarks I've seen the table scans are a little bit better with RC in selected columns I've seen the park' ones but really what I've seen is that these benchmarks are really really old and they're using old versions of spark and old versions of the specs so I don't really trust them so there you go there's some information that'll work thank you for the question get the slides brilliant go ahead okay what if your data is a radius I'm serious you mentioned and one way of dealing with imitability is by using time series layout property what if it's already time serious how do you apply changes to a point in time so are you talking about once you've written the data how do you go back and fix it yep so I'll answer that so the the reality is this is the hardest problem and we could probably spend you know a significant amount of time just talking about that we we've had no choice in that case but to have we actually literally have a process that goes back and rewrites the park' using you know a sequence of updates that we have fed in and automatically goes and regenerates so there's not really a great way around that the only thing that you can do is if you write your data sufficiently granular enough that that you can isolate where you have to do that so if you for example are writing very large files with large amounts of time in them you're obviously you're going to have to write more when you go back to to regen we call it parquet region we have you know a word for it but it's something that we have to do quite a bit thank you thank you go ahead hi nice talk if your data is inherently kind of binary byte array type things and maybe not be as amenable to the encoding and compression type techniques is there any real gain or win for using parquet over a straight kind of HDFS file or other some kind of other binary format that doesn't get fancier dat storage well as your binary data oriented into columns and so that that's where you go you'll get your game because the predicate push down capability will allow you to only extract what you need so you don't necessarily get the encoding benefit or a compression benefit that you might with strings or rinse or whatever but you will still get benefit from just extracting the data that you need as opposed to getting everything okay so currently I'm using something like a cumulonimbus kind of symbol to do a similar thing on their tablet servers and only send me the stuff that kind of matches it but that doesn't obviously get me in the sequel right space that's right okay thank you okay next talk thank you I have a question since parquet is a columnar format if you wanted to do a pen of an extra column does it support that yes so that's that is something I forgot to mention so thank you for the question another benefit of the columnar store is that you can evolve your schema on the fly so these compressed files all have a schema contained in that file metadata so in your net compressed file it'll just have a slightly different schema there is a big asterisk about schema merging when you're reading that's usually something you can turn off but this short answer to your question is yes I would also add to that that addition column additions are usually something that you you can deal with but keep in mind that just because the format can handle like a change in columns or deletions or whatever you want to be careful that your application reading it can also handle it so that's a big caveat to thank you all right any time any more questions but again give a big hand to Emily and Robin thank you guys [Applause]
Info
Channel: Spark Summit
Views: 58,302
Rating: undefined out of 5
Keywords: apache spark, spark summit east
Id: _0Wpwj_gvzg
Channel Id: undefined
Length: 29min 50sec (1790 seconds)
Published: Mon Feb 13 2017
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.