Top 5 Mistakes When Writing Spark Applications

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
we can get started welcome everybody we were in the other room and someone told us that was not the right room for us to present so I hope this is the right room for us to present if you are if you came to see some other talk stay seated because we're going to tell you about our talk anyway so we're going to talk about top 5 mistakes when writing spark applications a lot of people ask for slides so the link to the slides is on the first slide and that's at the right of the very bottom it's tiny clutter calm slash Sparkman takes it's easy to remember so I'm going to move on after saying it one more time panic cloud Ericom slash spark - mistakes I am mark Grover and with me as Ted moleska Ted is a principal Solutions Architect at Cloudera and I'm a software engineer on spark at Cloudera okay moving on something really quick we also co-authored a book on Hadoop application architectures with some other wonderful co-authors so there are four of us in total and that's the title of the book and a github link and pedis taking a selfie so everyone say hello okay we want to talk about mistakes people make but really these are mistakes we made and we wanted to look smarter so we talk about mistakes people make so first that's it sorry that was funny first mistake you've got six nodes you got 16 cores 64 gigs of ram but you don't really know how to size your executors so you ask yourself how many executors should I have how many cores should I have I have to make sure I don't fall how much memory do I give to these executors right and all these properties exist so the first problem that people have in the right spark applications is how do I set all these properties right a quick recap of the architecture you've got a driver program that has a spark context you may be using a cluster manager this could be yarn or maces or it could be your standalone cluster manager and then you have these worker nodes that house one or more executors and the executors run tasks so the first answer that you may come up with is why don't I make all the executors very granular so have a smallest sized executor and you have one core in each executor remember you had 16 cores in each node you had six nodes makes 96 cores in total that means you will have 96 executors because you had 64 gigs of RAM in each executor divided by 16 which was the number of executors on that note that's four gigs of ram for executors this kind of sort of makes sense so far yeah okay so that's the answer but it's wrong answer why because you aren't able to make use of the benefits of running multiple tasks in the same JVM reusing the same JVM for multiple things so the second answer is why don't I make them the least granular I'll make as big executors as possible on one note that would lead to six node cluster that means six executors in total each executor gets all the memory on that node which is 64 gigs and it gets all the course I wouldn't be giving this answer if it was right so this is wrong as well why because you need to live some leave some overhead for Hadoop and other OS demons that are running on a node so the answer three with overhead becomes six executors 63 gigs of memory and 15 cores so I've left one core for some demons and one gig of memory for some other stuff turns out this is still wrong so you ask okay mark stop mucking around with us what is the right answer and before we get to that answer here's a graph of spark on Yarn memory usage so what I talked about from here on applies to yarn something similar will apply to meso is I don't really use standalone so I don't really know what applies there but for yarn executors memory controls the heap size but you need to leave some overhead for off heap memory things like direct buffers and that overhead by default is that a bottom figure which is the default is maximum of 384 Meg's or 0.07 time SPARC executors memory and so leaving that yeah so here I talked about so leap so we have to keep in mind that we have to leave some off heap memory overhead when we size our executors other thing we have to keep in mind is yarn application master also needs one core so yarn application yarn applications for SPARC can run in two different modes the first one is client mode which is what this diagram is showing in the client mode the spark driver is running on the client app which is a top-left corner and in the cluster mode the spark driver is running inside the application master which is this top middle part and in either case you run an application master with just this blue thing in the top middle part which runs inside a yarn container so you need an extra core for that for that application master in the cluster mind you that extra core is required for the application master as a whole in the cluster not on every single node also when we were sizing these brig executors we were putting 15 cores on each executors 15 cores per executors leads to really bad HDFS through bug so it's best to keep the number of executors to something smaller like 4 to 6 executors 4 to 6 cores per executors so with all this new information that we have gathered we've talked about direct memory buffers we've talked about how the application master needs a core and we've talked about the HDFS to put max is maximized when you have 4 to 6 cores per executors here's the calculation we come to will start from bottom up we need only 5 course per executors for maximum throughput our cluster in total has 6 x 15 y 15 because one core was set aside so there's 90 cores in total and the 90 course divided by 5 cores per executors leads us to 18 executors in total these 18 executors are spread across 6 nodes we keep one executors at aside for the yarn application master that leads us to 17 executors that will run the real spark application each node gets 3 executors and then one of these three executors on one of the nodes one of them will run the application master once each node has three executors you had 63 gigs of memory you divide that by 3 you had 21 gigs but remember there's some yarn overhead for a feed memory so you subtract 7 percent from that you get about 19 gigs ok so the correct answer is 17 executors 19 gigs of memory for each executors and each executors gets 5 cores each this is not etched in stone right you may wish to over subscribe your cluster if your cluster is not if your processes are not CPU bound you may also increase or decrease the number of executors increase or decrease pardon me the number of cores in your executor for different HDFS throughput based on what kind of HDFS interface you're using but this is more or less the ballpark or very dool neht if you want to read more on a topic there's a blog post that's really nice on cloud or a block that you can read about it okay so I'm going to move on to the next tip mistake number two you run your spark application and you see this magical piece of text size exceeds and tis your max value right and you're like hmm I don't know what that comes from and it turns out if you'll notice the stack trace it's not from your application it's some code and internal spark and you try to guess where it's coming from so why it's happening is that in spark you can't have a shuffle block that's larger than cue gigs and you ask what's a shuffle block so in MapReduce terminology I know spark and MapReduce are not the same but it's easy to explain in MapReduce terminology so I'm going to use that in MapReduce terminology when you have a mapper that does something it writes data to local disk and the data gets shuffled around and a copy gets written to a reducer this is called reducer local copy which then reads and then runs a reduced process on it so the file on local disk that the reducers read is essentially an equivalent of a shuffle block in other words this is a diagram so if you had some map ring happening and some reducing happening in stage 2 there's a shuffle between stage 1 and stage 2 these yellow errors is essentially the transmission of a shuffle block from one one executed to another by the way the terminology map and reduced doesn't really apply to spark these are all executors but I use that for sake of argument and explanation okay so you remind you again no shuffle block can be greater than two gigs so you say okay well that's done with this is a big data stuff yeah no so spark uses this thing called byte buffer and abstraction for storing box and a byte buffer is limited by integers value which maps to two gigs so I will remind you one more time no spark shuffle block can be greater than two gigs so what does that mean to to put that in perspective if you are simply doing an aggregation like a group buy and there's no or if you're doing a joint in which there's a shuffle involved then yes this limit applies to you if you're doing caching then this limit applies to you anytime you're leveraging memory but if you're doing a simple count then that's okay if you're reading a simple file from HDFS and printing out the contents on screen that's totally fine this limit does not apply it's only when you're doing a shuffle that this limit applies okay this is especially problematic for spark sequel because the default number of partitions during shuffle is set to 200 having this low number of partitions leads to a high shuffle box size which sometimes exceeds the 2gig shuffle block limit so you ask what can I do the first thing you can do is increase the number of partitions thereby reducing the average partition size right but sometimes your data has skew and ted's going to talk about well I'm not going to steal his thunder so we will talk about skew later but I'm not a very good movie critic okay so how exactly do you increase the sparks equal number for partitions that this property sparks equal shuffle partitions like I said before it's set to 200 and in regular applications you can repartition or coalesce and you can supply the number of partitions you want right there you may also ask what's a good number for partition size and the average we found canonically to be around 128 Meg's there's one more thing to consider spark uses a different data structure for keeping on the dock scheduler when the number of partitions is less than 2000 versus when it's more than 2000 that means your memory footprint is going to be different based on this magical number of 2000 and you don't believe me don't you so here's the code map strata so Java Scala there's a number 2000 hard-coded here if the uncompressed size length is greater than 2000 you use this highly compressed mapped at status otherwise you use this compress map status and they have different memory footprint one of them is of course highly compressed and the other one is not okay so what am I trying to say I'm trying to say if your number of partitions is less than 2000 but very close to it bump it up to more than 2000 because that way you will have a slightly lower footprint than you would have if you were lower than 2000 okay so confuse much here's a summary don't have too big partitions when you're shuffling because your job may fail due to a 2 gig limit if you have too few partitions you're not making enough use of your parallelism so you want to increase the site the number of your partitions the rule of thumb is to have 128 Meg's of partitions and if the number of partitions is less than 2000 then just bump it up to more than 2000 and now I'm going to pass it on to Pat all right all right perfect ok a mistake number 3 um ok now this happens all the time one of my clients sites if I can't make your job run ten times faster I really feel defeated so the next couple of mistakes are going to be related to that so you have a data set that when you do a map job on takes 20 seconds but when you do any type of join or shuffle operation it takes like 4 hours right so what's wrong you're probably doing a skew a Cartesian ok so for a brief overview the two of you who don't know who skew is distributed computing essentially magical I hope this works offers this magical solution where if you have 6 threads your job will run 6 times faster who is the enemy of distributed computing where essentially it says you may have 6 threads you may have a thousand node cluster but all your work is actually being done on one core and you won't you would not believe that this happens probably 80% of the clients I go to right so how do you address this I only have 30 minutes so I can't tell you all the solutions but essentially the easiest ones are this idea of salting isolated salting or map now you can't always do a map join but let's walk through each one of these salting is essentially just adding some randomization to your key right so here we have a key that is a foo right and we're adding a salt to it and this would be an example of our our of our skewed data set so in in the real world think about some skewed data sets like if you're looking at stock volumes right I believe like the ticker Apple sells like a twenty percent of the total stocks that are traded during the day but there's over 20,000 shares of stock right so that's skewed if you would just distribute by ticker symbol that means one core is now doing twenty percent of the work of the entire cluster and so this is an example of the breakdown of our data set that we're going to play with here and what you can see here is one ticker takes up fifty percent of our data set the other one is twenty five and then we start getting to smaller portions of our overall thing now if we salted if we just added two salts per tick zero and Apple one we get that first pie up there which looks a little bit better if we salt eight times we get the bottom pie now we're starting to get more distributed right and you can use any level yeah they're supposed to be an example there yes Preparedness so you can see here where if we had multiple salts we can diffuse any skew problem right and we can actually go to 100 or so salts the dag for this would look like such we would have a map when we're converting it to keys and values before we would take it into our reduce cycle we would add the salts then after the reduced cycle we're going to have an aggregation for apples zero and an aggregation for Apple one and then we have to reduce those answers does that make sense but the difference is now we're only reducing to instead of you know with one core as opposed to reducing twenty percent of the entire isolated salt is a little bit different let's say that you have a key that makes up 80% of your data an example and this happens to so many people oh yeah that your join key is null has anybody experienced that we're doing key one of the values can be null and that makes up 80% of your data set and the rest is perfectly non skewed but that one null character God is screwing up the whole thing so what do you do there is you filter out the null and you process that you process that differently you apply a very high level of salting to the null character but everything else you don't apply the salting to okay and then map joins you should know map joins map joins is essentially you hold the smaller table in memory and then you just use hash hash maps to look it up but I want to do the salting first because that's normally a bigger problem because if you could do a hash join you would have already done a hash join I mean I have a in map join okay that's skewing oh man I'm going too slow okay so the next thing is Cartesians if if you haven't done a Cartesian then you won't know you may not know what a Cartesian is a Cartesian is when let's say you have a million records and you have a million records and together what does that even come out to me that's like a trillion or something it's like a watt right it's so your data breaking explodes right and in it's three days after you ran the job and nothing is getting done and you're bought your boss is telling you that your clusters filling up or your s3 bill is now enough to mortgage your entire house so yes so this is a bad situation people will say well I have to do Cartesian joins and of all the Cartesian joints that have ever interacted with I could get rid of most of them you can get rid of them with nested structures with windowing applications or skipping steps sometimes people use Cartesians as an intermediate step to get a ventral answer instead of going through the cartesian set there might be another way to get to your answer just for some background the reason why you may have gone to that Cartesian is because you implemented it in a sequel prior in your old database life and let's be honest sequel is not the best ETL language but we can talk about that another day okay how are we doing we're doing okay okay whoo all right okay mistake number four um okay so let's say you have you ever ran out of memory have you um have you ever had DAGs that were more than 20 stages does it seem like your drivers doing all the work right if you haven't you haven't you spark or you haven't used it aggressively so what's going on here so there's this is this is just mainly dag management mistakes now how many of you have done MapReduce in the past okay you guys will probably not have experienced as much of these it's the same as people who learn Java after they learn C++ but the people who learn Java before they learn C++ do all these stupid things right so for the MapReduce people you probably already know that what I'm going to tell you is you know common sense but to the spark people you're going to the easiness of spark is hiding the fact that you may be doing dumb things okay so the first thing see did I miss the slide here okay the first thing is um shuffles so is everybody notice shuffle is you do a shuffle on a join you do a shuffle on a reduce you do a shuffle on a partition shovels are freaking evil don't do shuffles if you can avoid a shuffle avoid a shuffle avoid a shuffles require lots of CPU lots of serialization lots of network their entire inherently evil if you can remove a shuffle and there's lots of ways to remove shuffles you can use complex data types you can use you can reuse key structures that you would use as the key to shuffle so if you shuffle on similar Keys but one of the main things I do is when I go into somebody who's been using spark spark is so easily easy to write out and express their problem in the spark language or in sequel sequels even worse that the logic is perfectly correct but it creates a that is unnecessarily shuffle heavy and what you have to do is you first have to go through the iteration of just writing the code that it's correct then you have to re-examine your your dag and you have to say can I remove shuffles do I ever join on the same key twice do I ever aggregate on the same keys or partial keys twice in the biggest example I had of this is someone wrote a query to get the table statistics of and we're going to go thing I'm going to give you a link to that in a minute the tables to statistics for a table with that with 300 columns when they wrote this in sequel it turned out to be 1500 shuffles I rewrote it using a complex object and we were able to do it in a single shuffle right so huh two more minutes for this mistake okay same thing goes for oh this is group by verses reduced by key anybody use group by key okay good no one okay to two people don't do it anymore so almost anything you can do with group by key you can do with reduced by key there's a few things that I mean you can do it almost you can do everything with respect the difference is group by key puts everything into memory right so your algorithms may work perfectly great today on your sample set but then one day someone sends you a skewed data set we're on that key now that you have a couple million records that join in that key and all of a sudden your spark job blows up in production you get a call at two o'clock in the morning and then then you call me because I'm the consultant and then you make me come and fix it at three o'clock in the morning and nobody's happy right so don't use group by key use reduced by key because the difference is reduced by key it only has to keep two elements in memory at one time now you can store more in memory you could use collection objects and stuff like that advanced objects in your reduced by but then you have more control you could say well I'm going to have a collection object and I'll keep the last thousand or even the last million records right but then you can put a control figure in there that says if I get more than a million records when I'm doing a group by key and I get more than a million this key throw a log message or spill it to disk instead of just blowing up in the entire job fails right I have one minute ooh I can't really talk too much about this but there's three reduce and reduce don't use reduce use tree reduce and the difference is it distributes the reduce across all the partitions and that only sends the answers to the driver if you do the reduce that will send all the data back to the driver and that will make an unhappy driver I don't have much time to go into complex types you can go look at my blog post which is how to do data quality checks using Apache spark and data frames and the idea is you're not limited to a tuple go use a hash map go use a set go use an ArrayList it's okay you can use complex types and you can do a lot in a single reduce structure are we doing okay yeah we got a minute and half I got a minute okay have you ever seen this and if you haven't you're not an advanced spark user alright no such mess of it err anybody nobody nobody all right come on alright this means that you tried to use a library outside of spark with spark right and it sucks you'll be spending like the next two days in maven and you're like man you'll be pulling out your hair and like I just want this to send a record to HBase or I want to use a different version of guava or I was at a customer site and I wanted to use protobuf three instead of part about two 2.5 and you'd be like why is life so difficult well um then you'll learn that you can use these six lines and then all your pain goes away so if you use a maven shade and if you're using a different library that has a library conflict with spark you can use this thing called this relocate a pattern here and you can see here I was using protobufs right and I want to use protobuf three and spark uses put about 2.5 and what it did is any place at protobuf Google com Google protobuf was in my code when it compiled it actually turned to come company my protobuf so that there was no class loader issues that make sense come on that's pretty badass alright so that was the summary and we actually made it on time okay so if if we look like we were talking really fast uh that's probably right because they only gave us 30 minutes and out of that in the end they came and said leave five minutes for questions are like oh great so five mistakes in gangsta style size up your executors right number two two gig limit on your spark shuffle blocks number three skew and Cartesians are evil award those number four learn to manage your do number five do shady stuff maven shade don't let clasp out leaks mess you up alright thank you the slides are up here I'm our grower with me stead moleska thank you everybody now let's open it up for questions yes sir hey is there anything equivalent to maven shade for SBT that you know of what was it okay all right so when you're when you're presenters have these blank faces that means they don't know the answer to that question in something else in the audience I don't even have a chance to look yeah everyone open the answer yes i'm assembly plugin so another principle is when it works don't change it for me maven works just fine I know some people love SBT I got nothing against SBT I just don't use it that often very talk so we also use a lot of complex data types and I've been pretty hesitant to switch over to migrate to data frames because I don't want to lose my hashmaps deep do you have you guys had Cisco success doing that or do you uh so I use data frames whenever I so I work in very large companies that that have really high data management requirements I use data frames whenever I need to read or write data right so everything is in a hive table for me okay now if I can do the operation in sequel I'm going to do sequel it may not be the fastest but it's the easiest right later on I might want to do something custom now if I'm doing ETL or I'm doing something advanced that's what I'm going to use my special collections but when I'm just reading from disk or writing to disk I'm always doing it with data frames because I want to maintain schemas and I want to maintain all that all that goodness does that make sense but that's a preference that's not a dictation that's more just how I like to do things what was the mistake number six that didn't make the presentation okay that's a good question we actually had a lot I don't know if he put slides for that we what was your big plan we definitely okay you you can it was probably the fault I think the thing is no I don't I think I think if we had like 15 to start with and you'll see that we had to talk so fast because essentially every mistake we ended up being like six mistakes in one so like memory management was huge and we really didn't talk too much about memory management what we did but one of the really nice things about spark is uh as opposed to MapReduce and for those who didn't do MapReduce you're lucky but the other ones that you only had maybe a gig or two with a ramp now that you have 16 or 20 gigs of RAM I think what happens is people get this false sense if I can put anything into an executor and that's true up into a point yeah right and what I generally tell people is if you're doing like a hash hash join or something like that and you start to get where you're putting more than a million records into a hash join you should start to feel nervous right like you can totally fit a million objects in a hash map in 20 gigs worth of remember you can possibly even store like 30 million right but the point of it is if it works today and it got all the way up to a million is it going to get up to two million tomorrow is it going to get up to three million tomorrow and there's this concept of bounded and unbounded collections in I so with memory management I like to only put things in my external memory that are bounded or that I have a rough sense of control of how big they're going to get the danger is if you start doing that with unbounded where one day the provider sends you data and oops I sent you 50 million instead of 60 right or something like that I'm displaying here that outline that Ted and I started working on when we started thinking of this presentation I promise this was not last night at the bottom of this list are the two things that we wanted to talk about was just debugging your spark apps and the problems that you getting your spark apps and caching versus persistence that we didn't get this word mistaken I did this is a penguin don't I don't know if you have really quick if you don't you bug your spark something's wrong should you have yeah you're not excited a debug like crazy yeah so if you want to hear us again how the organizers to invite us back can but these would be these would be mistakes six and seven yet so the unit test I can't I only have like ten seconds the unit test you don't have to test your spark on a cluster you don't have to use up resources on your cluster you don't have to leave your spark shell open on your cluster there should be no reason you blow up on a cluster maybe a little bit of reasons but there shouldn't be a lot of reasons why you should ever blow up or experiment on your cluster you can do all of that in unit tests and when your stuff works on unit tests and is correct on unit test then you can move it to your cluster and if you are if you are on a cloud or something like that and you're experimenting on the cloud that's money right you should be doing a lot of that work in unit tests and you can do that in your JD ES and you can debug through it and you can step through your code it's one of the beautiful things about spark all right that's all the time we have thank you so much for coming I hope you guys have an amazing afternoon
Info
Channel: Spark Summit
Views: 84,980
Rating: 4.8938589 out of 5
Keywords:
Id: WyfHUNnMutg
Channel Id: undefined
Length: 30min 37sec (1837 seconds)
Published: Mon Feb 22 2016
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.