Top 5 Mistakes When Writing Spark Applications

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hello everybody welcome my name is Mark Grover with me is Ted moleska those are our twitter handles we would love if you follow us on Twitter Ted is a principal Solutions Architect at Cloudera and I am a software engineer guess what is the most common question we get in our presentations that is where are the slides the answer to that question is the first link on the deck that's tiny cloud Ericom slash spark - mistakes encourage you to follow along but of course you have all the slides visible and you as well another quick thing that we are a quad so it had to take in selfie Hey yeah okay we are co-authors of this book called Hadoop application architectures that we wrote with two other great people the book talks about how do you architect applications in the Hadoop space spark is of course an important part of that architecture but there are other things out of your network off-guard OOP and so on and so forth and we have a book signing where you can get free books at 3:45 at O'Reilly boots if you wanted to come all right move on so the reason we are here today is to talk about mistakes people make when they use spark applications but really what we want to talk about is mistakes we've made but we wanted to look smarter so we talked about mistakes people made first mistake this is you walk into your office and you have this you have six machines you have 16 cores you have 64 gigs of RAM and I want you to just remember all these numbers because these will come in handy as we move along in the slides but when you start using spark sparks says would you tell me how many executors you need would you tell me how many cores you need and how much memory should I give to each executors right so you're trying to bridge this gap between the physical world you live in as to how many nodes you have how much memory and course you have to this world that spark wants you to live in is the number of executors and cores for each executors and memory for each executor by the way I still haven't figured out if it's executor or executor so I'm gonna use them interchangeably in the presentation before we answer that question let's do a quick architecture recap of spark so we'll start from the left side you have a driver program which runs the spark context this driver program talks to the cluster manager which sits in the right middle of the screen the cluster manager could be yarn or Mason's could be standalone this manages a nexus a liaison between the driver and getting resources for your worker know you have one or more worker nodes each worker node can run one or more executor x' each executor can run tasks that get scheduled on the executor and point to note is there's a cache that's shared within the executor between all tasks that run on the executor ok so keep that in mind and we'll move on so the first answer you may come up with is why don't I make my executor x' the most granular possible and what I mean by that is you will have the smallest sized executor x' in particular you will give them one for each remember your machines had 16 cores which means and it has 64 gigs of ram so that means you would give 4 gigabytes of ram to each executor and you would give total of 16 times 6 96 cores in your cluster right turns out that's the wrong answer and why is that because you're not using the benefits of running multiple tasks in the same executor remember that cash you're not really sharing and sort of dealing with the benefits of that so let's go to the other extreme why don't we have the most sorry the least granular executors which means we put the biggest executor we can on that node and that would be just one executor so we give it all the memory and all the cores in the cluster turns out that's the wrong answer either and why is that because we need to leave some overhead for OS and Hadoop demons to run and so what we do is we leave out one one core that should be one quarter note and then we also leave out one gig of ram from the node and we give the rest to each executor so now we have fixed 15 cores given to each executor and we have 63 gigs of memory given to each executor and each node has only one big executor turns out that's the wrong answer 8 so you ask why are you fooling around with us why don't you get us the right answer and before we get there let's assume for now on that we're gonna talk about SPARC on yarn only the discussion actually applies fairly well but I think it keeps things more concrete there are three things we need to talk about in order to get to the right answer the first one is memory overhead so when you request to spark here I want this much executor memory using that parameter executor memory yarn adds an overhead before requesting that from the resource manager and that overhead is the bottom line there it's the maximum was 384 MX or 7% of the parameter that you specified so if your node had 64 gigs of mamb RAM and you asked for 64 gigs it's going to add 7% more to that and ask for 1.07 of 64 gigs from resource manager and resource manager is going to say well you were subscribing and that's crazy so what you have to do is when you pass that parameter you have to reduce it by 7% or so so yarn when it adds its overhead it's not asking for too much memory the second thing you want to keep in mind is this concept of a yarn application master so yarn clusters use spark on yarn can run in two different modes the first one is client mode which is shown in this diagram and the client mode you have the client application which runs the spark driver on the left and the middle you have a yarn container that runs a spark application master this diagram shows the cluster mode in which you have the yarn application master running in the container and then the driver runs within that point to note is that there is a yarn application master running in both these scenarios this is the client this is a cluster and it runs within a yarn container and that poor that's used by the yarn application master cannot be used for your regular SPARC process right so you have one less core as a whole in your cluster to use than you had before all right and the third thing you want to keep in mind is HDFS throughput so if you had bulky executives and you were giving all the 15 course to just one executor you're gonna have really bad HDFS throughput we've found canonically that it's best to keep the cores somewhere between 4 to 6 per executor so 5 cores is what we came up with this okay based on all that let's do some calculations so we'll start from bottoms up this time we'll say only 5 cores at max per executor for maximum HDFS throughput which because we left out one core for overhead that leaves us with 15 cores on each of the nodes we gave five-course to each executor which means we'll have three executors on each in total we will have 18 executor spread across all six nodes of the cluster remember we're going to leave one executor for the yarn application master so we're left with 17 course 17 executor sorry that we're gonna use in that cluster and then you divide 63 by 3 which is a number of executors on the node you get 21 gigs and then you subtract 7% from that because we had a Cointreau overhead that's 19 gigs so the answer as we come up with if 17 executors and total 19 gigs of memory per executor and 5 cores for each executor by the way this is not hte stone right these were kind of hand wavy calculations your workload would be different and different companies for example may want to oversubscribe different organizations may have a workload which doesn't require a lot of sharing in which case you are okay with having very very small granular workloads this is the one we come up with yes so very commonly like maybe that thing's not moving yeah the timer's not moving so we don't actually know how we're doing but you might have ones that have lots of i/o and if you have lotsa IO is your CPUs not the culprit so you can up your cores and also it's a common technique but it's cheating a little bit is if you're gonna use five cores maybe you do set the number of cores and yarn to be 16 and that extra executor and yarn manager will fit in that extra core but nothing else will use it because it doesn't divide into it well right so there are kind of little tricks but those numbers I've found is a good starting place okay the clock is moving now this is great all right okay so one thing we're going to talk about it's not just to emphasize the problems but also what the SPARC community is doing to solve those problems so one of the things the SPARC community is doing to fix this magical problem where you have to pay a lot of money to someone like Ted to figure out the answer to this question is to introduce a concept or dynamic allocation right dynamic allocation is where you can scale up and scale down your cluster depending on your workload and it only works currently with SPARC on yarn however it helps you with one of the three things that we were probably discussed as problems early on so it can help you dynamically figure out the number of executors that you need for your job but it can't help you just yet with the course that you need for each executor or the memory that you need to get bijon if you guys aren't familiar with it what it essentially does is allow your application to grow when you're using it and allow it to shrink when you're not so this solves the typical problem of someone going into spark shell and then going out to lunch and his eating a ham sandwich is consuming half of your cluster so it solves that problem so prevents you from having a mortgage from your AWS bill all right if you want to read more on this topic there's a fantastic blog post and that takes us to mistake number two okay so you see this application area it seems like an integer overflow you look at the stack trace and it's not from your application right you look at the stack trace in some internal spark code and why is this happening the reason is there's no spark shuffle block that can be larger than two gigs right this is crazy we are in big data world and we're talking about two gigs and no spark shuffle blog be larger than two gigs but before we talk any further let's talk about what a shuffle block is so I think it's best to come back to MapReduce terminology I understand SPARC is not the same it's easier to explain this concept of shuffle block with MapReduce terminology so in MapReduce you have a mapper that composes the output that's going to be sent to one of the reducers and the reducers then make this reducer local copy from a bunch of mappers each of this reducer local copies is equivalent to a shuffle block in a diagrammatic way if you had these two shaders and there was a shuffle in between those two shaders where you have the yellow errors crossing each of the blue blocks is what's called a partition each of the yellow arrow represents a shuffle block it's the stuff that gets sent from one mappers output to a reducer that's going to do the computation and do reduce on it so one more time you will get an overflow exception if your shuffle block size which is this yellow arrow larger than two gigs and why is that and this is because varc uses this object off byte buffer type for expressing blocks and byte buffer is limited by integers max size which for signed it's two gigs at max right so that limitation applies to smart blocks as well this is especially problematic for SPARC sequel because the default number of partitions is set to 200 this is a particularly low number of partitions which leads to a higher average block size so how can you fix that you fix that by in general reduce the number of the average partition size which can be done by increasing the number of sorry yeah reducing the average block size or partition size and hence increasing the number of partitions and the second thing you can do is reduce the the skew that you may have in your data and Ted's gonna talk more about that in another another minute or two so how do you exactly change that you've if you're using sparse equal you do sparks equal shuffle partitions property you manage that that's again set to 200 by default you probably want to bump that up in regular spark applications you would do something like repartition or call s and then you may ask well how many partitions should i have and how picture the partition be the rule of thumb is about 128 Meg's per partition that will again change based on your workload but that's pretty standard however there's one more thing so the spark driver usually does bookkeeping around the data that's sent around in shuffles and it uses different data structures for this bookkeeping if the number of partitions is less than 2,000 or more than 2,000 and guess what this 2,000 number is actually hard-coded and spark right so if you if you don't believe me this is the code from map status to Scala and it says if uncompressed sizes are larger than 2000 then you use this highly compressed map status and if it's less than 2000 then you use just the regular compressed map status which of course takes a larger memory for footprint than the highly compressed map status so what am I trying to say if you're a number of partitions is less than 2,000 but not by a whole lot it makes sense for you to bump it just a little bit to be over 2,000 okay let me summarize so don't have too big partitions because your job is gonna fail due to your 2gig partition limit don't have too few partitions because you will slow you'll be slow and may not make use of the parallelism rule of thumb is around hundred 28 Meg's apart per partition if you have number of partitions less than 2000 but really close you want to bump that up definitely to be more than 2,000 and you can track this particular juror for the various improvements that are coming around the 2gig limit all right I'm gonna pass it under Ted who's gonna talk about mistake 3 all right I only got four and a half minutes per mistake so we got a fly in this sucker all right now you have a data set that takes 20 seconds to read over but it four hours to do a join who's experienced that joy all right well they all tell you here just a beauty computing is wonderful and it's because it can take that big long line and it can make it into the short lines and that's wonderful and fine and dandy but this is what happens in the majority of your worlds right and this is because there's skew so skew happens because you have a null key or you have a popular value or maybe you have stocks as your your data set and apples in your data set right and what will happen a lot of times is all all your work will be going to a single core and I'll run into a frustrated customer that's like I doubled my node size you know I'm you know spending all this money on AWS or I'm buying all this hardware from HP and my job isn't getting faster and I'm like well that's great you have a hundred nodes and using a single core out of the hundred nodes why don't we fix that so first thing to know if you have SKU you shouldn't have SKU there's no mathematical or scientific computer science reason why SKU has to exist in your world a simple way of solving it will walk through three of them but they're basically all the same and it's this idea of salting so what is salting right if your normal key is foo let's say a salted key would be foo and some random stuff right so just think of putting a little bit of salt on it an assault Randolph falls randomly what this will do is this will make the distribution you know it'll it'll spread it out more you'll have more keys and especially like if you had a null key right you know you would you would have even more so let's just put this in first put this in a pretty little graph because pictures are good in this data set right so half of my data is going to go to a single core and a quarter of it's going to go to another core and then I'm going to get some distribution on the rest of the data set but if I put a random value of just 0 or 1 next to each key it's the top circle and you can see that top circle looks a lot better than the other circle on the other page now if I put 0 to 7 assaults after each key I get that bottom one and if you can tell from the bottom one skew is now a recent history and I don't have to worry about it anymore and there's no reason why you have to stop at 8 there's no example oops ok so oh sorry about that so how do you solve this right you do the salted reduce by key right so let's say you're doing the sock example and you have apple then you did eight salts then that means you're going to have eight totals for apple right then you just follow that by another reduce from the eight totals of Apple right so let's say you've gone from a billion records down to the number of stocks is about twenty thousand so twenty thousand times eight something like that and then you're just reducing that you're reducing the aggregate and you'll find that this is a lot better than having one core do all your work there's other patterns of doing it you can do isolated so let's say most of your data set is fine I can do a reduce by key but you have one or two keys that suck right you could just salt the ones that suck and then treat everything else the same and then there's another concept where maybe you're doing a join and maybe again you have some keys that have really high cardinality and some that don't and you could put those into memory and do a map join on the ones with high cardinality and the ones that don't you can use the regular group by I am going fast and I apologize for that the other one that sucks a lot and it's not necessarily skew but it's just as evil as Cartesian joint is anyone in this room ever had Cartesian joints yeah they suck you know what they don't have to suck so there's this beautiful thing oh you forgot the other one oh it's okay alright so I change this slide earlier but there's another slide has anybody used a nested types right all right so with nested types we had the wrong side but up here but it's alright you and you can come later if you need more explanation with nested types you can do those concision joins without doing the Cartesian join right you can essentially if you had a thousand times a thousand you did a Cartesian join that would come out with a million which you can essentially do is one key the first cell would be nested of a thousand and the next cell would be nested of a thousand they would all come into memory at the same time you would do your Cartesian in memory you wouldn't have to write it out it wouldn't have to explode to an RDD and everyone's happy this is going to be multiple magnitudes faster than doing Cartesian and then last is just be very cognizant of your partitions again mark had mentioned before about Cartesians during your shuffle but let's just say you have a heavy computational operation you have to do on a very small data set it makes it might make sense to expand your processing just to do that heavily the heavy computation and then shrink it back to do the writing always be cognizant of that and when you're using dynamic allocation you're only using what you're using so it's going to give back those resources and I'm 11 and 11 seconds ahead all right so next problem let's see do you ever run out of memory do you ever have you know 20 stages and is your driver ever doing a lot of work right so this is kind of an encompass of just bad behavior so first off at this conference they're going to tell you that spark is amazing and that it allows you to do anything and what will happen is a whole bunch of people who did not go through the MapReduce era who went through the MapReduce era okay shuffles suck right am I wrong all right now the people who didn't go through that will look at the spark code and say oh it's so pretty and look I can do sequel now and I don't have to think about anything I'm just going to write my business rules in here right and what's gonna happen is they're gonna have a whole crapload of shuffles well shuffles still suck right and if you really want to do a good job you're going to remove as many shuffles as you can look for when you can reuse the same keys look for storage patterns that help you like have things already pre sorted abroad nested tables there's tons of little tricks but in the end all the memory and RDD magic in the world is not gonna save you from a shuffle shuffle suck and they don't scale linearly so it's it's working against you okay next one group by key group by key is like the the you know it's like free money to me because I'm a consultant so I come in and they're like oh it blows up I ran out of memory group like he's a wonderful interface so if you have that skew everything goes into the memory of a single executor and then boom you die there's no reason why you can't do anything that you can do with group I key with reduced by key so unless you you know unless you really really really no your date a hundred percent you can tell your boss you will never have more than a million records or so for a single key and you're fine with that and your paychecks fine with that and your bonus is fine with that and you don't want to get a phone call in the middle of the night it's just use reduced by key all right then there's three reduce and reduce and do we have a pretty graphic for that I have a pretty graphic for that later on and I already talked about nested types and complex types if you guys have not done nested structures or ordered nested structures just do it it's so awesome I've been doing it for the last six months and so an example I was at a client where they were like Oh spark will solve everything I just put everything in memory and the query ran for three days and then I said well can I just use nested types and order it like this and they said yeah yeah yeah but he won't do better and I got it down to three minutes so it's really important how you order your data especially if your data doesn't change that often like with HDFS but even if it changes a lot you can store in something a kudu that keeps the sorted order and partition order for you even if the data is mutable highly mutable am i running low I'm running late ok so I talked about a lot of this stuff dag management you don't want to do shuffles reduce your shuffles as much as you can I talked about all these yes ok so this is the one I didn't talk about because we had a graphic so there's reduce and there's reduced tree reduce so reduce is great it will it's not reduced by key this is reduced this brings everything back to the driver what you want to do is a tree reduce because then it does a reduce on every partition and then just sends you the results back to the driver you don't want to send everything back to the driver you only have one driver be gentle with it if you're not it will run away in a deep exception and it won't come back no complex types we already talked about this yeah nested types aggregated types top endless stuff like that be creative you can put anything in your rdd's oh ok so this is one example and I won't go into it but you can go google this blog so I was at a customer that was using an ETL tool that I'm not going to tell you about because they've been embarrassing for that ETL tool and they wrote a sequel statement that would do table stats on the table and it produced 1005 500 MapReduce jobs to do table stats on a single table it had a lot of columns to do their defense of that 300 columns but anyways this is an example how to do that and in one shuffle right so again it doesn't have to suck if you just think about what you're doing ok problem number five all right almost there okay now this one doesn't happen as much anymore but it used to happen a lot and it still can happen a little bit it happened to me a couple months ago is anybody ever had method not found exception all right good good good those are your advanced users and so they were bringing in a library that was unfriendly to spark all right now spark have they fixed the guava thing yet Wawa's fixed yeah Baba's fixed so guava now works so this this is an iguana exception but that one's fixed but what happens if you're using protobufs like I was using and spark was using protobuf 2.5 and I wanted protobuf 3 well at first it doesn't work right but there's a nifty little trick there's this thing with shade if you can look down on the second thing see the shade pattern thing so I can bring in my own dependency and I can say I want it to be under this own package which essentially means when it compiles my code it looks like a totally different set of classes than the classes in spark I hope that made sense if it doesn't come get it come get me later on so this so don't ever accept class class passes your class loading issues to be a deterrent with spark and spark tech to has fixed the guava we're trying to say is that if you're using any generic library the version of library using in your application has to match up exactly with the version of library that spark is using and this is very common for libraries like guava and port above however the fix to that if you want to use different versions is to shade your library and relocate the package names to something else however if you're using guava starting spark 2.0 has the slides it's fully shaded so you don't have to worry about for Guam ah but other libraries are still problems gonna be some people nuances like shade what the heck is shades like a so I use maven so I'm not an SBT guy sorry about that but in maven there's this thing called shade which makes like an uber are for you so you don't have to think and I like not thinking and that's pretty much it or you're doing a summary alright so we're gonna do summary gangsta style and you get six not offensive questions ya got a size up your executors right so make sure that you are specifying the priorities that you need to specify you got a two gig limit on your spark shuffle blocks skew and Cartesians are evil you gotta award them learn to manage your daghio and do shady stuff don't let class pout leaks mess you up before we go I want to say writing applications and spark is like finding your soulmate on tinder you know the chances are really low but you do it anyway every once in a while you get it right I but you still wonder how long it's gonna last I found mine in art class that's all from us thank you very much this is Ted velasca I'm Mars rover diesel slides we'll see you soon thank you guys on this guy's great give him another hand oK we've got we've got about three minutes make sure you keep your questions short and brevity don't do Cartesian join when you're asking a question and we'll go by line up over here and they're gonna be at the cloud or a boot if you want to talk to them so here we go yeah we'll be we'll be by the booth I don't fly home until tomorrow hi so I have a fast question I actually interested in SPARC because I automatically generate the program logic and I could still see a lot of these techniques being used even in an automatically generated context yes which means that it could be that same sophistication could be in the framework any projects so the I think the question was incorrect correct us if we didn't get it wrong so even with all the auto regeneration that's happening you were you were saying that some of the sophistication can be built into the product how is that coming along is that your question okay so I guess we are trudging through it I would kind of I would say that some of the sophistication can be built in some can't be built in the auto regenerated part for example if you notice the dynamic allocation part that's outside of the any of the realm of auto generation that's up to you how you want to size your clusters and how you want to send your queries so that's outside yeah some of the dag management will get better as the query optimizer gets better but there's a joke among consultants that you can't fix stupid and there are certain things like skew that is very hard to do without with zero cost right because to even know if you don't know the data the program would have to do some kind of like if you do table sets right so like yes if you did some type of table sets then maybe you could avoid something like that but it's a hard problem to solve and it's solved with just you know thinking a little bit and there's a lot of other shinier features that people normally focus on before they'll focus on that but it it could all be done plus that Saab security for us yeah I got a job forever great talk Thanks so do you think group bike is overly demonized do you really think there is no use case where group piety actually will be better than okay I mean you can't do anything in group Ikey that with reduced by key that's that's like that's true but that's true the reason why I dimin eyes it is because I don't want a phone call at nighttime because I unless I have super strong guarantees on the data that's coming in it could cause trouble right okay so just to the risk of extending the cushion I have really seen use cases where people end up using combine by key and aggregate by a key and that like sort of hogs up more memory than it then it would have been with group Ikey so for example if you have a list RDD of string comma string and you just want to collect all the values for a key then group by key actually might be better it might be yet situation in some situations but cause you don't have to you know use hash maps on each of the executors to actually keep a track off yeah that's more work yeah it's one of these like if I could kill up if I could kill a fly with a shotgun and all I had was a shotgun I would kill it it's a very American thing to say yeah but I mean if either let the fly bother me or take it out so I mean yeah I'm just it's always you we're full of I got one more question I think we got one more question that's it keep it short please hi I'm how do you set your Python worker memory in your spark config so when when the presenter stare at each other with blank looks that means they don't know the answer yeah we're both Scala people we don't really use Python sorry so it's pythons defense I don't think there's significant performance segregation but I don't know the answer to you okay guys give another hand to this thank you andum talk thank you
Info
Channel: Spark Summit
Views: 25,157
Rating: 4.913846 out of 5
Keywords:
Id: vfiJQ7wg81Y
Channel Id: undefined
Length: 29min 38sec (1778 seconds)
Published: Thu Jun 16 2016
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.