Tuning and Debugging Apache Spark

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
everyone this is Patrick Wendell here and this is a talk about tuning and debugging in an Apache spark this is a conference talk I gave a few days ago and I'm recording it in this sort of screencast format I'm interested in feedback on whether people think this is a useful format and please leave comments in the video one way or the other whether you think it's useful just a couple words about myself I'm an Apache spark committer I'm a PMC member and I frequently do the release management for spark I worked on spark in the UC Berkeley days that was a many years ago but when the project was kind of starting and and today I am managing most of the spark work that we do at data bricks a little bit about data bricks just one slide this company was created by the sort of founders of the spark project in 2013 we donated spark to the ASF as one of the first things we did after we started the company so now it's sort of governed under the Apache foundation but we remain the largest contributor and sort of most active company in the spark ecosystem today and our product is an end-to-end cloud service called data breaks cloud that's not the focus of this talk but you'll get to see a little bit as part of a demo and the goal with data bricks cloud is really to kind of build the easiest place to get up and running with spark with kind of little or no upfront investment since it's a cloud service so the goal of this presentation is really to help you as a spark user understand and bit and better kind of debug your programs and and also to offer a few tips on tuning and performance that are not maybe totally obvious for beginner users and I've structured this talk in a way where I assume that you know basic concepts from spark that you understand the RDD api and that we can then go just a little bit deeper than that kind of first that first level so if you're a newer user you may want to kind of read up on spark or write a few example programs before reading or listening to this talk because I'm going to move pretty quickly on the beginner material so I'm going to start by talking for a very good amount of time maybe half of the whole talk about the execution model in spark and and what's really happening under the hood when you run spark programs so you might ask why you know why is this the main focus when it's really supposed to be a debugging and tuning talk and the main reason and you may not get this joke I delivered this talk at a conference where President Obama gave a quick kind of keynote so I was joking that he also said this but it was cut ended up on the cutting room floor the key to really being confident and able to tune spark applications is really understanding what's going on under the hood this is true about anything I mean you want to be really good at fixing cars you know you need to understand how engines work and so forth so so a lot of this talk will focus on just going a little bit deeper on what's going on inside of spark for that reason and I think the real question we want to go over is kind of and something users ask about a lot is if I have a spark program you know I wrote I wrote my code the API I can understand it compiles down into some some execution terminology like stages and tasks but you know how is this what is the actual correspondence between things that I get instrumentation about in the spark UI and what I've written it's not always obvious to people kind of how that compilation happens so that's going to be a big focus so let's start with the user facing API and I'm assuming people are comfortable with this but the main abstraction in spark is this thing called an RDD it's essentially a big distributed collection of records and you can create them from sort of on disk storage systems from in many different ways in this example we're creating an RDD by just paralyzing a local collection using the parallelized operator and spark and once we have an RDD we can transform it into existing from existing rdd's into new ones so there's this thing called transformations and that gives me back just another data set that's derived in some way logically from the one I had before so a transformation is filter okay I've taken the RDD I filtered it using this this sort of function that I passed you and I'm filtering just lines that have error in them when I get back is what I get back is just this thing called errors which is also an RDD and then the other main sort of API operator type in spark is called an action and that's something that actually materializes a value or causes some side effect to happen it's not just giving me back a new RTD so in this example we have the count action so I'll go walk through kind of an example user program that uses both transformations and actions and then this program will become the basis of or we'll deep dive a little deeper into this program in in the next slides so in this program we say that we have an input file so that's the red box at the top right this is something called input text and it has some it's some server logs so you know it has a bunch of log messages in each line and and the log message format is a level of severity so like info warning etc and then a message that follows and let's look at what the program does so first we read the the file using text file that's basically a way to read from Hadoop style input so this could be a big file in HDFS or s3 or wherever and then we tokenize because the what I'm actually going to try to answer with this very short program is actually wanted to account for each of the types of of severity levels of how many I saw across my entire maybe potentially very large data set so I want to know how many info warning and error messages were there so I'm going to do it in a few stages first I'm going to tokenized have this already be tokenized that splits each entry into individual words and then it also does a filter where it says look if there was no words you can see the third line in this input just as empty I want to just get rid of those because those are going to screw up my downstream processing and then I create another RTD counts where I start with this tokenized I run the map function again and an extract sort of emit a single counter just a one for each you know first element and then I reduced by key because now I have a key value RTD and I add up all of the sort of corresponding components I add up all of the counts so then it globally this will do a reduce which will give me back like exactly how many total records there are for each of this type so this program is not understandable to you I would just go ahead and look at the spark programming guide or something but I'm going to assume that you can kind of get this program and the question is you know how to spark interpret this what is the underlying execution engine to do with this and and how is this viewed in terms of the kind of internal details of spark so to show that what I'm going to do is a few things so first I'm just going to take out all of the specific arguments like the actual functions that were passed that don't have to do with the real logical structure of the program because from sparks perspective those are sort of handled and they're not fundamental to the way that the program is compiled okay so all I did was just erase a bunch of function arguments and then as a next step I'm just going to write this as a single change set of calls what we did in the program is we created these intermediate variables and then we we call functions on them but but in reality to spark you know every single one of these function calls transformation calls has a pointer to the RTD which was transformed and so it's kind of logically a single set of sort of relationships being created and then I'm going to I still haven't really done anything too huge to our original program I'm going to just draw out the individual rdd's that are being created for each of these operations so you can see at the beginning there's like this thing called Hadoop RDD that's created and and there's also then we map and filter and eventually we do our reduce by key which creates something called a shuffle Rd now a couple of things to note here so one is that whereas before I said rdd's are a collection of objects in this picture of the world there's kind of another level of hierarchy which is that rdd's have something called partitions and a partition in spark is something that's sort of exposed in the user API but it's really more of an internal concept and it's just a subset of the elements in the RTD and it's used as kind of a unit of parallelism when we're doing computation so as a good example when you have a Hadoop RTD that you load a piece of data from Hadoop using text file or some other similar operator we're going to create a new R DT and that Rd is going to have one partition per input block which is already a way that in the Hadoop world they chunk files up into smaller units so so that's kind of the built-in notion of partitioning in Hadoop RDD and then other types of rdd's have their own way of defining partitioning which I'll go into a little bit in a minute one other thing to notice is that if you look at the bottom here I showed that like we didn't create references to every RDD in our program we we created a variable that referred to the Hadoop RDD it was called input and then we also created a variable tokenized and another one counts but those other artis were sort of implicitly created when we did these chaining operations so just to point out that you know you may not have a reference to every single RTD that exists in your program but they do exist there under the hood and SPARC is aware of them by virtue of these pointers that we track internally when you create new rdd's so sparks internal scheduler can see this this is the the view of the world now I talked about transformations and what they do is they kind of chain together rdd's to build up in this case it's just a single sort of line of rdd's and their parents but but it can be even a more general dag but they don't actually do anything they just it's only a sort of building up of a logical execution plan and if you ran the code that I've given you so far in the spark shell it would just like sit there and return to you immediately so the way it works in SPARC is that evaluation of this dag of of sort of logical data sets you've defined is is lazy and the thing that sort of causes them to come into action is is something called what we usually call actions but I actually want to forget about the term actions and actually talk about a slightly lower level concept which is very closely related which is a method in spark called run job what run job is is the single sort of very narrow interface between a bunch of the logical parts of spark that deal with defining what an RDD is what its partitions are how you compute it and the lower level physical execution so there are thousands of functions in spark this is probably one of the most important ones and the interface is fairly straightforward so the interface to run job is the following I give you an RDD so this RTD itself maybe has ancestors or something as we saw I give you a set of partitions that I want to compute and then I give you a function that use you know if spark is going to call on every single partition the full materialized final output of every partition and then your what run job gives you back is it gives the color a array with the kind of result of that function for every single for every single partition so just as a really simple example think about I can explain there's a function called save as Hadoop data set in spark and it just takes whatever RDD is at this point in the dag and it will write it all out to HDFS and the way that that works internally is a calls run job it calls it on whatever RTD you've invoked save as data set on and the function that is passed is one that reads through all the data in the final sort of materialized partition of that RDD and writes it out to HDFS so it only really exists for a side-effect and then it may return some sort of status value or something like that back to the caller so I'll give more examples but I just want to say kind of point out that run job is really this important narrow interface and it corresponds to what you see in the spark UI is what's called a job so that's why it's in it's kind of useful to know even as just a user of spark so I like to think of it as like when Picard says make it so you know you've kind of come up with this whole plan and now you're gonna act it now how does this work logically so what SPARC needs to do when you call run job you said I have one of these rdd's in my graph and now I want to finally compute it well logically what it needs to do is it needs to compute that are these parents its parents parents etc all the way back to an RDD that had no dependencies that were within you know our TV land so for instance the input our TV was from Hadoop I know how to get that from nothing I can just it can compute itself just from a Hadoop data source so in this example let's just assume we're calling run job on counts that's the final RDD in our little program that hopefully will give us the counts for each log level and don't even worry about what user function I'm passing to run job this is going to be true of any type of action or invocation of run job regardless of what it's doing so let's say we call that okay so logically what needs to happen well spark needs to somehow compute you know these chain of rdd's each of which has some partitions all the way back to the input now the very naive way to do this would be to you know literally separately compute and store every single one of these rdd's so for instance you'd first read all the data from Hadoop you do one map function on it and then you write that data out then you'd read it all back into a filter on it write all that out etc so that would be a correct way to implement what the user is asking for which is I just want to sort of run a job on the very final RDD but because SPARC uses lazy execution we can do something a lot smarter than that so what we can do is we can exploit things we know about this graph to execute it in a more efficient way and those are called physical optimizations there's sort of two big ones that you'll run into a lot and these are things that make your job run much faster there are also things that can be a little bit confusing for users because it means the physical execution is kind of different than the thing that you built up logically so there's two kind of main things one is that you can pipeline certain types of Trance Nations which I'll explain on the next slide and the other one is that if you've already cached or persisted rdd's at some point or even if they were just involved in a shuffle where we end up persisting some of them then spark can kind of smartly truncate the graph we don't need to keep going back and back and back and computing earlier ones we can just start there and move forward towards what you're trying to get so once these things occur you end up with the kind of more familiar spark UI version of work which is called stages and tasks and I'll go into that in more detail in a second so in this example four run job you know just the same example I had before I kind of lied to you about the way that one thing works or I oversimplified it when an RDD keeps track of its parents it doesn't just have a sort of simple pointer to the parent it actually like stores a little bit of other metadata in there that explains something about the relationship between it and its parents and specifically there are two types of dependencies one of them is called an arrow dependency and one of them is called a wide or a shuffle dependency so in this example narrow dependencies are where there's just a simple arrow between partitions of this RDD and of its parent and a shuffle or Wye dependency is when it's more complex so logically what this this means is that in order to compute an RDD for a narrow dependency I only I can do it sort of in place there needs there does not need to be any data movement I can safely kind of pipeline these things for instance if I'm doing a map and a filter I can kind of put those into one operation and I can just run it individually on each partition without having to worry about its correctness in contrast so a wide dependency is one where I am I may need to move data around in order to compute an RDD from its parent so for instance when I do this final sort of counting by key well it could be that there there may be you know I'm doing some aggregation basically and there may be lines with the word error in them in multiple different partitions that I'm trying to do a final global count so hopefully it's intuitive that at some point I'll need to shuffle and move data around in or to do that so the main reason why it's helpful to store this metadata is that spark can exploit this to make your job run faster so what it will do is that all things that have narrow dependencies between them can get sort of glommed together into one physical unit of execution in a way that significantly improves performance over that really naive version that I said before while we just kind of compute each one together so in in this example that type of pipelining that's kind of the first optimization will result in this whole thing taking two stages to execute so it actually will end up looking like this because all of those narrow things we could just collapse all those maps and filters we just put them logically into one sort of function that's going to do the map and the filter etc they're all those chained calls so what it's going to look like is it'll have two physical stages okay so one stage each task will read the Hadoop input perform that sequence of maps and filters and write out some partial sums for each partition that's kind of embedded in the reduce by key operator what it's doing and in for the other stage it's going to read the partial sums these are like the counts per log level and it will then invoke whatever you know that will allow it to kind of compute the final partition and then it will invoke whatever user function was passed to run job so so as I if you remember I didn't really tell you what what we're doing with run jobs so it doesn't matter this is just kind of for any action we run on this final RTD this is how it will compute that RTD and then it will invoke whatever the user function is so now we can start to talk about terminology you'll see in the spark UI and with a little bit of a better understanding of what does it mean to be a stage and what does it mean to be a task and so I can kind of see now what these things come from one thing I actually realize a I didn't explicitly clout is that we've now introduced this thing this notion of tasks and tasks will correspond to individual partitions of potentially multiple rdd's so you know a single task will operate on unhook input split but then it may do also these kind of subsequent operations that were not defined in Hadoop are 2d it was defined over a series of Artemis so a couple of things that you'll see in the UI one is input read that's like if I'm reading from an external data source for each partition that we have in this pipeline task how much like data did I read in another familiar metric is shuffle right and shuffle read and that has to do with when data movement occurs because there's an RT DS that relate to each other in a way that requires that movement we store like how much data is output and how much is then fetched on the second physical stage so to kind of recap a little bit the terminology you see in the SPARC UI is tied to physical execution not only the logical structure of your program so job as displayed in the UI is the work required to compute a single RTD when you invoke one instance of run job stages are these kind of waves of work with many parallel tasks that correspond to some set of rdd's that were pipelined maybe it's only one maybe it's more if we chained together some filters and maps and other things like that and tasks are this lowest level unit of work within a stage that corresponds to one partition one sort of logical unit of parallelism and then the shuffle is the transfer of data between stages for certain types of RDD relationships where we need a shuffle like things that do global grouping and ordering and stuff like that so you might ask like ok great you know I watch this talk and now I saw for that example how the physical graph comes out of the logical one but what about and I'm doing my own prototyping in the shell or I'm doing some my own spark program so for that we do have an operator called to debug string which will print out a fairly nice list of all the dependencies of an RV so these are things that will need to be computed if you invoke an an action on this RDD and it even shows you by using indentation where there's going to be a boundary between things that can be pipelined and things that can be and the numbers in parentheses indicate the sort of parallelism level at each step in this kind of physical plan the for each stage so I'm going to describe now let me just yeah yeah I'm going to describe now how certain actions as you've sort of learned about when wearing the spark API are defined in terms of run job and the first one I'll do is the count to action it's a very sort of common action you just want to count how many things are in the RTD well in order to count it I really do need to materialise because I need to learn you know how many records it has and that may be dependent on different operations and filters that happen beforehand so the way that count works and this is like really just how it works there's very little extra stuff that I'm not showing you is that it calls run job it gives to run job just itself because count is just an operator on an RDD run job is a little more general so you could invoke it even on you know a different artery that's not you but in count is just an RDD operator so it invokes run job it just gives it itself that you know I'm the thing you need to materialize it says every partition because I want to count the entire thing just just go in materialize all of my partitions and what it does with the result is that it computes its size that will just kind of do it go in a streaming fashion through an iterator and add up the size and then as I said earlier a run job gives me kind of the results for every single one of the rdd's that I computed and and what I served for every single one of the partitions I computed and so what I need to do at the end is I need to take all of those and sum them so this will do kind of per partition counts and then sum them and you can return that to the caller and that is actually how count is implemented there's another action which I'm not going to go into detail and explain every bit of this code but I just wanted to explain that one thing some users find confusing is that certain actions may trigger multiple jobs it's almost never the case it's usually one-to-one but there are cases where a single action will trigger multiple jobs and one of them is the take action so what take does is it says look I want to sort of I want you to give me a few elements of this RDD maybe the RTD has billions of records or or it's a some huge data set but I only want to take maybe ten of those because I'm you know let's say I'm prototyping in the shell and I just want to play around with a small sample so a very naive way to implement take would be to let's say you know compute every single partition of the RTD right all of it out somewhere and then just grab a few of the outputs but that requires materializing a lot of state and it's something that we can do a little bit better on so what SPARC actually does is that it creates a little temporary buffer for results and it actually steps through individual partitions one at a time and first just computes that partition then it sees if it got enough data to meet the user you know the user asks for some specific number so it sees if it got enough of that number and then it will keep doing this for subsequent partitions until it kind of gets enough and this can help a lot if you have an RDD that only has kind of narrow dependencies all the way back to some storage system like you're reading from a dupe and then you do some filters and you do some small transformations and then you just take ten it will only have to go and ever even read one piece of input from my dupe it won't have to go and scan potentially very large data set so don't worry too much about kind of the specifics of this code here but I just wanted to give you a sense of like sometimes you can have multiple jobs that show up in the UI but they're related only to one action doesn't it's not a normal case but it does happen ok so now I'm going to do a quick demo and I'm actually gonna I had something on the slide here but I'm going to do it inside of the data bricks cloud and I've just created a notebook here let me zoom in a little bit so first I generated some example data and I'm not going to run through this because I already did it but I just created some random log messages in a Hadoop file and saved it so now I'll get to the user code so this is literally just the code I wrote on that slide verbatim I'm just going to step through so first I'm going to create this input RDD then I'm going to go through the step that token eise's and extracts filters out sort of empty records then I'm going to go to create the final counts RDD which hopefully will give us some give us the sort of log messages levels with their accounts and then I'm going to call collect on it so collect is an action it's fact it's an action that just goes and computes every part of the RTD and gives it all back back to you at the sort of driver program where you're calling it and then for each of those I'm just going to invoke this is just a skala for each that prints prints out the output you could do this in Java or Python or are api's as well so I'm just going to run that okay so that actually was this thing had maybe ten let's see maybe hundred thousand record so was very very small but it still ran quite fast and we have now we can see like okay we had you know this many errors and this much info and warning and debug so that's kind of what I wanted and let's look at in the spark UI how this manifested itself do I have my spark UI and oh I realized I ran another job earlier that is making it more confusing but it's saying that the most recent job I ran is this thing where I you know it just actually tells me this is a little bit of data bricks magic but I think it's very similar in the sort of vanilla spark shell it's telling me that yeah this is from a collect and let's go look at the job so if I go and look at the job this is a little cramped because I've zoomed in here for clarity let me just zoom out a little bit more it will show me exactly what I said before which is a look I have two stages this stage says map this stage just collect I'll go over that in a second the first stage had some input from Hadoop it had some shuffle right and then it had the second stage had shuffle read which then read those kind of partial sums and the parallelism the first one was three because that was sort of the number of splits in the Hadoop the second one is to actually explicitly gave this level of parallelism when I did the reduce by key and it's just giving me two stages that I can then go drill down and see for each of the tasks in this stage what was going on I'm not going to walk through everything that's displayed in miss UI there's a lot of cool instrumentation but hopefully being able to understand where that is coming from is helpful to you so one thing I did want to point out is how are these stages named so that the name is the operator here right at the blue link so for stages that are an RDD being materialized as part of run job they're named for the action that I invoked that called run job so run run job so in this case that's the collect action for stages that were a dependency of another RDD through transformations at one of these kind of boundaries that we create when we pipeline that is named after the RDD that is kind of the last one before the shuffle boundary occurs so right now this says console : 26 because it's in a shell if you're in your user program it will give you the exact line number where that RTD was created so then you can go back and look at your code and kind of understand okay yeah this is where this thing was coming from so I think we'll probably end there for this little look in the UI but basically the the long and short of it is you have a jobs page which tells you kind of what are the jobs that have run and how are they doing or ones that are in progress and you can drill down for each of the individual jobs to see stages and you can drill down into those to see tasks so let's get back to the to the talk so that was really the first big goal of this whole talk was to give you a way to understand sort of the physical execution and the terminology used there and how it relates to the logical thing that you wrote in your program which was more about kind of declaratively saying what you want it to happen so for the second half of the talk what one good news is even if you didn't follow every single aspect of that the second half of the talk is sort of largely independent that you should be able to still get a lot of what's coming so the second thing is just a kind of survey of things that I have seen impact performance in production spark workloads and not all these will impact every workload but I think a lot of them will be broadly relevant so this is kind of a performance checklist maybe you would say or a list of things that can influence performance so the number one determinant of performance is often the amount of data that is shuffled inside of a spark job so as I kind of mentioned before shuffle is the main data movement that occurs at the physical level and spark and in general if you avoid shuffle your program will typically run faster a very common way to do that is to make sure you're using built-in aggregation support and spark instead of for instance doing a very large shuffle with a group by and then doing your own aggregation there's some other talks that cover this so I'm not going to go into tons of detail this was covered in another talk the same day as I delivered this talk which is why that's mentioned on this slide but there's there sort of various ways of just making sure you've prune and decrease the amount of data that moves over the the shuffle another key issue can be the degree of parallelism so as I said before there's kind of this notion of partitioning that's very innate in rdd's and in SPARC we've more and more tried to infer the level of parallelism to use for you and that's something that will continue in future versions of SPARC but nonetheless there are still pathological cases where you can have a huge performance increase by tweaking this on your own so I'm going to give an example let's say we read in some data from an FC text file that's again the way of reading from Hadoop and I'm actually giving a path with a glob a star in it and so let's say that this is going to read from thousands of random gzip blog files I have sitting in s3 in fact if I get the number of partitions it's huge it's like 35,000 this is just a Ton Ton Ton of random files and then we'll assume that now I'm filtering this down so I'm I'm calling filter and I'm filtering down actually only this is a folder of logs for an entire year I'm filtering only one hour worth of logs and then so you may ask like well why didn't I just store the logs in the format where like I had a folder for each hour and that is a very prudent way to do it but sometimes you're given datasets that are ugly and you don't have control and they're not organized nicely and you actually need to brute force scan everything and just extract the records you want and one nice thing about SPARC is you can usually do that pretty quickly so let's assume that the did this sort of there's content from this our potentially to any of those log files so I run this filter and if once I filtered you know SPARC sees a filter as an arrow transformation it's just going to apply that filter independently to each partition and then it's going to give me back a list of partitions that have you know potentially way fewer log messages so in a case like this where I know as a user that this filter is going to be extremely selective maybe there's going to be thousands of partitions that end up just empty after the filter and other ones that have very small number of data well it can move me to explicitly tell SPARC to scale this down into fewer partitions and we can do that actually fairly efficiently using the coalesce operator which sort of gloms together partitions so if I want to do some subsequent cache analysis on this data I can actually call s it down to maybe five partitions then I can cache that and then when I do subsequent analysis I'm operating on a sort of more compact RDD now why does that matter for performance you might ask well it turns out SPARC is very efficient at launching tasks but nonetheless if you're launching thousands of tasks to compute and operate on partitions that are totally empty the overhead of just launching and scheduling those tests is going to be significant it's going to be maybe you know several seconds or tens of seconds when if that had been coalesced down to five partitions you could be talking the millisecond range so so this can definitely help improve performance and a little bit of a rule of thumb because this goes in both directions if you end up seeing that you have like tens of thousands of totally idle or mostly idle tasks it can be good to coalesce you can probably avoid all that overhead if you have fewer partitions by the way if it turns out you're running tens of thousands of tasks and your job is still really fast anyway then don't sweat it I mean performance optimization should occur when it's causing you a problem and not just for the for the sake of it so SPARC can handle huge volumes of tasks in a very short amount of time it's designed to be optimized for this but nonetheless if you're seeing this as a pain point then yeah go for it and do some tuning on your own also on the other degree if you find that your job is taking a long time and computing these partitions is taking a long time and you're not even using as many cores as you have in your cluster for instance maybe you're doing a very expensive operation and you have only five partitions in this RDD but you have a thousand cores in your cluster well sometimes calling repartition which is this other operator and spark which will sort of spread out the data into more partitions can be a benefit to you it is a little bit of a cost because you have to maybe shuffle and move some data around but if the operation you're doing is expensive and you have extra capacity to paralyze it it can be beneficial to call repartition and and spread it out more within your cluster so again these are like if you want to really go in the weeds and tune your thing to be as fast as possible but in some cases they can make a large difference another issue is the choice of serializer so so what is the 0s or a serializer is a way to take in memory objects that are in this case JVM objects and write them out to a byte stream or encode them in a binary format that is recognizable to be can be read and sort of return back into objects that's what serialization is in spark we use it in a bunch of places we use it in some cases when caching data we also use it when shuffling data which as I mentioned is like the potentially most expensive thing inside of a spark program so so this is an area where users can often get a lot of performance improvement by just using an alternative to the default serializer by default that we use Java serializer which is very robust it can handle many many types of inputs but it is slow there's a whole other serialization framework called creo which bark has built-in support for in fact to use it all you need to do is just set the setting spark serializer to this other value to tell us to use creo and optionally you can do another thing which makes cryo even faster which is you can add a little bit of boilerplate to your application to register explicitly every class you're going to use this lets cryo avoid having to write the name of the class along with every object which is like one of the largest overheads in serialization so if you and if you turn on the sending I just gave you then cryo will actually force you to do that so if it encounters a class you didn't register it will throw an exception which can be helpful if you're trying to step through make sure you find every class you're using in a shuffle and register them so almost all users will benefit from this change it's only not the default because there are some corner cases where it doesn't work well on where cryo we'll have trouble sterilizing objects I would absolutely recommend turn this on register your classes and you'll likely see a major improvement over what comes out of the box another thing that can be important for performances is caching formats so if your caching intermediate datasets which is common in SPARC to avoid sort of having to recompute things many times the default way of caching is that we use a storage level called memory only so this means we're going to cache them as deserialized Java objects sitting around in the JVM and that is is very fast because sort of scanning that is basically no cost they're just sitting there already in your programming language runtime but but it can have some side effects so one is that it can be worse for GC pressure because you have all these random objects sitting around inside of cache and so storing them in a serialized format will actually take all of those maybe thousands or millions of objects crashing serialize them into one large buffer which is only one object from the perspective of the java virtual machine memory manager which is a huge performance implement for GC the only alternate that the only trade-off of this is that it takes a little bit longer to access these because you need to deserialize serialize them on the fly as you go so but if you are seeing GC issues and you're caching a lot of data this can really help another thing you can do is you can cache using memory and disk storage level and what that does is that it avoids the default policy is if the cache basis getting contended and you need to add new things it will just drop the oldest one and we can always recompute it because of the data model and SPARC but it will not it will not keep it around well if you're reading data from let's say JDBC input or something where recomputing it could be really slow it can be beneficial to add memory and disk which will actually just persist the thing to disk if there's contention the only trade-off here is that in current version of SPARC writing out to disk can be itself a little bit of performance overhead so you need to decide kind of which is worth it for you but in some cases like tweaking these slightly can help you there's also another option that both serializes and right to disk so these aren't mutually exclusive so a few other things here so one is hardware people often ask me like what's the best hardware to run spark on and the good news is like spark is very resilient to many different types of hardware architectures and and configurations because of the way it's architected it does scale horizontally and so in general more is better if you can add more cores more memory more more disks than yes spark like will run faster and your jobs will complete more quickly that's a nice property that not every system has so so that's definitely true you know whether how you exactly size disks to to memory or CPU it depends though if you want to really go crazy it kind of depends what you're doing if you're doing ml workloads that are just spinning tons of CPU and doing you know linear solvers and stuff like that that are expensive then you might want to really over provision CPU if you're doing ETL workloads where the entire spark cluster is devoted to reading huge data sets and and transforming them and writing them back out you may want to optimize for having very large number of parallel disks so it's really dependent on what you're doing but SPARC is very happy to just use some kind of standard balanced configuration that you may get as recommended by you know your server provider so it's not something you need to stress about too much one thing I'll add that's maybe not obvious is that it can often be good to limit the size of individual executor --zz even if your machine's physical machines have tons of memory just watch multiple executor z-- the reason why is that the data model and SPARC is already such that it's everything's embarrassingly parallel you don't get any benefit from having one huge heap compared to a few smaller heaps and the cost is that the JVM does not like to deal with hundreds of gigabytes sizeof heaps it makes garbage collection a much bigger issue so so in general it can be beneficial to have sort of smaller a larger number of smaller executor x' maybe around 64 gigabytes is one reasonable size and bin pack those into nodes you pay a little bit of extra overhead for having additional JVMs running but that overhead is very very modest a couple of other just grab bag of things that I've seen influence people's performance one is that you may be able to get better performance by switching the default compression codec used during shuffles so when we do this massive shuffle not only do we serialize the data but we also compress it to improve performance and our default compression codec is one it's this library called snappy Java and we like it and it's the default because if you're doing a massive shuffle where you're maybe have you know hundred thousand partitions and you're potentially having to move data between many many machines snappy has a little bit of better performance around memory usage there it's more conservative and how much memory it allocates per kind of partition that is really only relevant for a small class of users I'm guessing that most users if they switch to lzf would actually see better performance in their compression used during shuffle so something you know you literally just change this value and see if your job runs faster and if not just forget about it but it can help in some cases another thing that you can look at is what's called speculative execution so this is when you spark if it sees a task running slowly it will launch another copy of the task and just let them race to see which one's finished everything in spark is sort of idempotent and immutable so you can do this safely this is very useful if you're for instance in a large shared yarn cluster and some nodes have some crazy other workload from some other team that you don't even know and it's causing that node to thrash and be really out of control well if you end up having some of your tasks scheduled there you can just speculate launch them on another node and you can forget about the really slow ones because these really slow tasks can can often hurt the performance of a single job a lot because it's kind of you have to wait till the very last one finish on the other hand if you're in a very tightly controlled cluster where it's kind of your dedicated thing and you you understand all the hardware and it's maybe only ordered 20 to 100 nodes then this can have some extra cost because you're kind of wasting utilization just launching extra copies of random tasks so it may not be beneficial for you but it's worth turning it on and seeing if you get any performance benefit the last of these smaller things is that you want to make sure spark for shuffling has as many disks as of all because it will if memory runs out stripes shuffle output across disks when it's moving all this data you set this in standalone and mesos mode by sending this environment variable spar cloak orders and you just give a comma-separated list you want to ideally give directories that are in different on different physical disks and give you know give us as many of those as you have discs in yarn mode we learn about directories from yarns built-in infrastructure for providing these locations so that just goes to how you configure your own cluster but more is always better I also recommend maybe not using the same disks that you're using for HDFS to minimize contention at the end of the day if you don't have that many disks you may have to do that but if you can have dedicated shuffle directories or even better SSDs that spark can use them you're going to get much better shuffle performance and again shuffle is like a big important part of the overall performance of a job so with all that said there's really one trick that can give you really good performance for very very little cost and that is to use higher-level api's that is we are investing in in SPARC and many of which are already available so a big one is this new data frames API we're we've just added in SPARC 1.3 but will become a major focus in future versions this is going to work in all languages it's a slightly higher level API and allows us to do lots of optimizations under the hood both in the logical structure of the graph like can we do John reordering and stuff and also at the physical layer because we have a much better idea of the schema of the data that you're using it's not just that you have random objects and you have your own serialize Urso so the more you can kind of move towards this higher-level API the better your performance will be another major issue for data frames is that this Python performance is identical to other languages which is not true for the general PI SPARC API other things are used the machine learning library instead of ruin your own machine learning implementation that will you'll get all the benefit of the tons of optimization the spark core engineer has already did so a major reason why we've added this and the the sequel library is another example for kind of structured query processing is that if you use higher-level API you can punch all of this problem to us as the spark engineers and we can make sure we tune it in just the perfect way but at the end of the day sometimes you do need to kind of use the lowest level of core api's they are very flexible and they're yeah you can you can often get wins by tweaking some of the things I've talked about in this talk also when I mentioned this talk was based in a good large part on a chapter that I actually wrote in the recently released learning spark book so check it out it's chapter eight tuning and debugging it will go into a little bit more depth on the concepts that are in this talk but it's very similar in spirit and you'll get it in the context of a broader education around spark so I definitely recommend that and and last thing we do have our own spark conferences and I don't know when you're looking at this video this conference may have passed already but spark summit the next one at the time of recording is spark summit 2015 it's in June in San Francisco there will be a lot of talks about internals and performance and all this kinds of stuff so we'd love you to comment and if that one's pass we'd love you to join us at future spark summit conferences in on the East Coast in in the Bay Area and I think eventually likely in Europe okay so I can't take questions here because it's not interactive but I hope that was useful for you please leave comments on this video do you like this format could you look could you hear it well and would you like to see more of these in the future those are all things I'd love to get feedback on thanks a lot and I'll see you later and maybe at the next bar conference Cheers
Info
Channel: Databricks
Views: 55,003
Rating: undefined out of 5
Keywords: Apache Spark
Id: kkOG_aJ9KjQ
Channel Id: undefined
Length: 47min 13sec (2833 seconds)
Published: Mon Feb 23 2015
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.