A Deeper Understanding of Spark Internals - Aaron Davidson (Databricks)

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
okay let's not finish so Aaron is a spark emitter works at data bricks he's done a ton of cool stuff on the internals of spark he's very very knowledgeable about what's going on deep in the core so he's a qualified person to give this talk he's also super active on the user and developer list so if you've ever sent an email they're asking a nitty-gritty question it's probably possible that he's answered it so a great guy to give this talk so yeah let's join me in welcoming Aaron and all I don't get started thanks hello all right I'm hoping this works okay so yes introduce me and I'll be giving a talk on understanding the internals of spark okay so the goal of this talk is to understand how a spark runs a program that you've written but the focus is actually going to be on sort of this getting gaze understanding to look at performance how are you program runs how do you put an excuse performing okay so I like to think that there are three major component core components of the core of spark and these are the execution model which is how we actually execute your RDV your program rather the shuffle which is really part of the execution one but what the shuffle means is how we move your data across the network in an all-too all communication this is the same thing as the shuffle memories we try to avoid this whenever possible but it's important understand what it means to do a shuffle and why it's expensive and how we can reduce the expense and finally we all know that spark is great for caching stuff in memory explicitly that's just that's a totally separate part by itself so I'd like to spend about 15 minutes on each of these unfortunately this is a 30 minute talk so I'm going to take off caching why well it's my talk I said so okay okay so let's talk about the execution model so first this is getting okay so first the idea is why I understand the internals at all and as I said we're gonna go take it from a point of view of performance understanding your program in order to understand the performance of it to take an example outside of SPARC entirely let's say I'm on a single-threaded program and I have an array of like integers or something sort of you know and I know that if I sequentially iterates for this list it'll be much more performant than if I randomly access the list and sort of that's not inherently obvious but because we understand how caches work we know that sequential access is better and so in the same way you know you might run a write a program and you think you know how it works but really because of how SPARC is working underneath you might not get the exact performance you expect okay that's that's the idea that's why we should why we should understand how SPARC works and so this is gonna be an example that we're gonna use throughout the talk as sort of a vehicle for understanding Sparky okay and so let's just go through this example is the idea of this example is we have a bunch of names HDFS and we want to find the number of distinct names and group them by the first letter okay so like how many distinct names in my data start with letter A all right so I just example we have three names let's say I hear Pat and E and then we the first thing as we do is we make a pair of them we take the first letter a common the name spark inherently interprets this as a key value pair so this is a key a with I here P with Pat by the way I tried to do Patrick first but I stuck with Pat instead who's who gave the first talk because it's a shorter than most lines and then we do a group by key and yeah no problem and then we do a group by key where we rearrange the data such that all of the data for the single key is with the same data so we have a width like here and E and P with Pat okay and it's sort of a list or something and then we take them at we map values we so the value is this this sort of list for each each letter and we we first take it to set and this is this is important because I said the distinct names so where this will do is will actually build a set in memory for each letter and if we had duplicates they would now be removed but in this case there were no duplicates there's no real difference then we take the size of that set and so now we know that there were two distinct names starting with a and one starting with P and we just collect that back to the driver all right so that's just a example program way to work with okay and and sorry it turns out that this isn't a very well written program and it may not be entirely inherent why additionally some of the problems with this the code that we've written is actually not even on this slide it's about the data that we're working with and the cluster that we're working with okay so let's try to figure out sort of what's gonna go wrong when try to execute this so first of all how do we execute this and I'm gonna do this very high level first and then we'll go to through in more detail so first we create a directed acyclic graph or rdd's where an RDD is a sort of partition data set to represent the actual computation so this is how we represent the computation then we create a logical execution plan how can we possibly execute this this this directed acyclic graph and how can we execute it efficiently and finally we take that logical plan and make individual tasks and send those to the cluster okay so what does all that mean well so the first step is to create the rdd's unfortunately you've done this for us it's very nice of you then you've created any of these do you see what all this is is all of each a line of the code except I put it in a box and put an arrow to the next one all right this is where already these very it was very hard this is what sprach does internally it has a box generator and then okay so the second step not to get that much I'm just gonna coalesce this this is the same thing you see we take her we her new party D which moves the data we map over to create pairs we group by he we take each key we map values to find the number of distinct elements and then finally collected okay that's just a reduced version so now that we have this company we know where the program is and we have a representation of it how do we execute it and the idea is it isn't just the hard part isn't how to execute it we did that in the first slide right when we showed you all right here's how you just go through it and then take the map and all that the question is how do we execute it efficiently and the goal to execute efficiently is to pipeline as much as possible and what I mean by pipeline is to sort of fuse operations together so that we don't get like costs of like going over the data multiple times or having a lot of overhead for each operation so just to go through an example the same example we have I here and if we pipe if we can pipeline I means we immediately take out here and do the map we generate a con like here and then we take the next item and do the map again etc okay so the point is that we've sort of fused these operations do you do already in the map into one and such that it's effectively like we take a function call or we get the I here let me take the result of that and we pass it into another function call so the overhead of multiple operations that are pipeline a bowl is extremely small it's basically like one function followed by note all right and so the question is when can we pipe on us it's also when we can pipeline stuff so when is it possible and it's possible until you can keep pipelining into Union reorganize the data all right so you see the next thing we need to do well okay let me actually phase that one other way another way to think about meanings we are against the data is how we generate the result so when the result can be generated independent of any other data then we can pipeline okay that's that's sort of a little bit of secrecy but that's approximately it so if we take a look at like a comma here we don't a comma I here doesn't care about Pat it doesn't need to know about Pat in order to be generated and therefore it can just be done in sort of its own little world and that's sort of the ideal situation but in the next stitch in the next case you see that we have to to do the group by we have to reorganize all of the data such that a here and Andy are in the same place so they can be grouped into the same key and so this we can't pipeline past alright and so we're going to do is we're going to create make the first two operations into something called a stage and a stage is basically just a super operation is just like a merged operation where we we just like fuses together we can weld them together and they reach one one and the other like together okay after that we just do this again we try to pipeline as much as possible and it turns out after this we can actually pipeline we can take the result of this group I take the first element of that matte values it and okay the collect is a little bit weird let's kind of ignore that it's it's it's not really an RDD it's really an operation so let's say we can type line that to which we can it's just not clear why and then we take the P in a pair and we do the same thing okay so the second state the second part is another stage there's another fused operation which we can group them all together into one super operation call the stage all right now that we have our beautiful stages laid out like this the idea is we have to now execute it and so we have to schedule tasks to run us and so what is it so so we split each stage into a set of tasks and a task is simply data plus computation that we can just ship to a node and have it run it get to give an example a second and the ideas that we have to execute all of the tasks with one day we should complete the first stage over we can start the second stage all right so here's our example we have our computation so our computation rate is the super operation is fused stage on the Left which is stage 1 let's draw the stage 1 because we have to execute it first and our data on the right and our data is just for HDFS files for blocks ok and so a task is simply grouping these together we take one like the first block equipped with computation so I got one there 1/4 all right so now we have four tasks to look at this in a slightly different way a task is just like this bundle of work it's this bundle of work that says hey load HDFS block 0 and do this computation overhead or on it ok and then so we can just send this little task little closure off to a machine and have a executed right it's totally independent anything else etc so we have 4 of these tasks all right so now that we have our nice little tasks we have to actually ask you them so this picture is showing a cluster so this is a spark cluster of three nodes for the simplicity of this example we'll say each node only has one core but it's very easy to see how to work if you had more we also know that we have HDFS on the left here yeah left it's always hard up here so we have HDFS on the left with some set of blocks that are distributed among it and then we have time and so the rest of it's gonna be time it's gonna be how we execute the actual quarry okay and so we start at time zero and then the first task comes in and we see that the first task is talking about block zero and so we first look at where is block zero because we want to schedule this task locally and we see that blocks it was on the first node so we just schedule it there and then a very small amount of time later the next task comes in it's a very small amount of time because these tasks are being scheduled from one thread on the driver program if you require spark but it's very small that time later we schedule task to task one and so we're going to take test one and we'll go to put on the second no because it's also local their Tosh three comes in and we see that I could be either in the second or the third node in this case we'll put on the third node because it's the only one that's free alright and so time progresses obviously this is all pretty audience right so the time progresses and then task one finishes luckily this is right yeah test zero finishes and now this we have a free machine again and so now we have room to execute this fourth and final task and fortunately this task is actually local to the first node if it weren't local to the first node we might actually wait to schedule it because we do try to schedule locally through a process called delay scheduling but that's not relevant here fortunately because I don't have time to explain it so we put that we put that on the first node and it's gonna execute beautifully because it's running locally didn't have to transfer any data and then soon after it finishes now will note that one thing sort of went wrong here if you look at during the second half of this computation well approximately only one node was executed anything so so like it was like we didn't have a question we had a single machine so this happened because we happen to have four tasks of an approximately equal length and three machines all right so clearly we didn't have a good alignment of tasks so we'll come back to keep that in the back of your mind but after we've executed these four tasks we've completed the first stage alright and now let's talk about how to get to the second stage so for the second stage you'll remember we have this group I and in order to do this group by what we need to do is we need to shuffle alright this is where the shuffle comes and we need to rearrange all the data such that all the data that has the first letter a goes to the same place all right and so you've probably seen a picture like this on the right it's really hard I would get an icon on the right which is a to redistribute the data among the partitions so what you see on this picture is you have four partitions so the bottom of the first stage is the map and the top and the second stage is the group I and so we're repartition from our four partitions are forced tasks to an other set of four tasks by default the same number so that's why it's four to four okay and this free partitioning is simply a matter of hashing keys into buckets so we hash like all the a is into the same bucket I'll be this ain't bucket such that when we get to the group I I can just look in one place and they'll all be there for sure all right some optimizations to be aware of for the shuffle in general is that we try to avoid the shuffle whenever possible if your data is already partitioned into it by an by the hash key then we will not do the shuffle that's awesome secondly we can we can use partial aggregation and mapreduces is called map side combine where we actually reduce the amount of data we send by pre aggregating it for an example word count you can imagine instead of sending the word few 100 times you can just send oh I saw the word foo 100 times and then you can reduce the amount of data you actually sent in our particular case we can't use either of these optimizations in particularly we can't use partial aggregation group I doesn't get reduced okay so what I want to say is this is a nice picture I won't go a little bit deeper into the shuffle because that's not exactly how it works you'll note that what does it mean to like this looks like I'm pushing tasks but I haven't even scheduled the task yeah I'm sitting here waiting to start stage two and so what actually happens is that the shuffle is not push based as the diagram kind of looks like it's actually pull based and so in order to sort of make a very discrete boundary between the first and second stage what we do is we actually write the intermediate files to disk and then once an entire first stage is done we all start the second stage so this first partition on stage one will at the end partition its own data to four files such as this it'll so it does its own little hash key partitioning into these four files and the others do the same and so now we're left with a set of files and then when we start the second stage the first partition to the left will read from the first file of each of these partitions like so and that's how and thus it gets all that data by pulling from okay etc so I will note that this is actually as of one point one which is not your at least this is actually gonna be a pluggable API and so this will be called the default shuffle implementation and so hopefully we can more specialized ones for that allow you to push instead of country I mention that though but if you're doing one dot o today this is how it will work okay and this is how we move all to all data across the machine across the network whenever we have to do a shuffle so now that we've gotten the data to the other side the final thing to talk about here is how we actually execute this group I and this is beyond a group by it's actually any sort of reduction like if you're doing you if you're doing a group by for instance what we do is we build a hash map for each of those partitions into memory where the key is just whatever and the value is just sort of being upended along so in this case we have like a and all of the names that start with a and E and all the names are starting with D in the case of like a word count you have like a word comma a value and you just increment the value okay so this is sort of how every or most like reductions will just work it's right one thing to know is that we can actually spill and by spill I mean we can write to disk things across keys so if you have so if you know I can spill like a to disk in an eat a disk but as a limitation of today's shuffle invitation and might bite you is that a single key value pair must been a memory and this is only really relevant for these large group bys with a single key uh very large in this case you'll note that my a has all of the names in my data set that start with a not just a distinct ones and so that can be actually very expensive alright and you'll note that we're actually done that's all we needed to do we've taken we've executed or stage one we've shuffled our data around we know how to execute the group I and now it's just a matter of scheduling these tasks for the stage to honor note okay so first let's ask what went wrong in the specific example and then let's generalize that so what went wrong we had two few partitions we had four partitions and we're like three machines so we didn't really get good concurrency we had a large perky group I which could apply extra memory pressure and we shipped all of the data across the cluster we did a group I on all of the data to generalize a little bit cuz I don't really care about this particular problem I care about sort of the general idea that we can get here here are some common issues I have area comma C if people have and sort of just to keep on keep an eye on so the first is ensure you have enough partitions this is pretty obvious right if you have 20 cores you need to make sure that you have at least 20 partitions hopefully more than that and I'll talk about one of it later minimize memory consumption so in the case of sorting and large keys and group eyes remember we have to we are bounded these can use a lot of memory and it can cause pressure or potentially in the worst case scenario and out of memory error minimize the amount of data that you're actually shuffle in this case we did a group I key with everything that's pretty expensive and this is actually kind of funny when it's it's just no the standard library like a lot of a lot of problems can be avoided if you just like learn all the functions in the library it's not that big and just so you get a sense of what you can do with library and use the right function from the right place one and two so I'm gonna dig a little deeper in a couple of these one and two here about tuning the number of partitions so let's talk about this the second comes up a lot so when we talk about spark we actually need a think oh not too much but this is a pretty easy analysis but we need to think about the numbers partitions we have and we're working over and so the first problem that you might run into is to have few too few partitions like we did in this example and you have less concurrency than you might want you're more susceptible to day tasks you if you have for instance you know one task that's taking all of your time then you're not getting much concurrency out of it and finally you actually have increased memory pressure for things like group by or just by key and sort by key even if they don't go out of memory because like as I said our group I and our reduced by key they can spill the disk and as a version 1.1 sort by key we'll also be able to build a disk it'll be our last operation that is incapable of laying the disk fortunately but as at one point one these things won't crash however they will increase the memory pressure meaning that garbage collection problems become more of a big deal and if your your applications using more memory then it can they can also be a bad time it's like a secondary issue is actually having too many partitions this isn't too interesting you'll see this very you'll see this very quickly when you see that you know it took way longer to schedule the tasks than it did to actually execute them escalated and like one millisecond or something yeah all right so what we need really is just a reasonable number of partitions and so just to give you a very hand wavy sense of what what we see commonly so you don't think that something is insane it's just very commonly we'll see between a hundred and ten thousand partitions okay so if you see like oh I think I need a little partitions that's a very reasonable number you can go higher than ten thousand if you have very large cluster or if you had a lot of data and you go over than one hundred a very small cluster everybody's multi ok as a lower bound for a particular job you tend to want at least two times the many cores as number cluster as a mountain and as this number of quarters in your cluster drops a little less than that so you're gonna have like two waves at least this allows you to have a little more wiggle room and it's sort of an upper bound really the biggest upper bound the number of tasks is the sort of the time and so you want to ensure that a stick at least say 100 milliseconds if your task in the range of seconds you're probably experiencing almost no per tapper partition or tasks or scheduling overhead although that is a little job dependent alright nomatter because the last thing I want to address is memory problems okay because I just want to be spaced straight here I'm not trying to sell spark to you I'm trying to tell you how to get spark to run well so one thing that a lot of people come back and say if they say that I can't get my job to run most often they run into memory problems and so the symptoms of memory problems okay so let me talk about one thing before I get to that or even diagnosing it memory problems come stemmed from not so much the spark cord these days as I said almost all of our operators are able to spill the disk but the problem is that spark makes an assumption that it does that it owns most of the memory in fact by default than number is 90% and so if you're out because we might allow you to write write an arbitrary application user code inside of your inside of your job and if your job assumes for instance in art and one we had here we built a set in memory I can use it a non-trivial amount of memory and so really if your amount of memory is getting a very large that can end up causing like a serious problem not just garbage collections but actually out of memories all right and so this so how do I know I have a memory problem first of all so I can diagnose it first of all if you're seeing inexplicably a bad performance meaning that your task is running just fine and certainly the performance just hits a wall and it stopped moving or if you're saying inexplicable executor or machine failures your machine just stopped responding not just a single machine that might have gone down but many machines at once and in fact I've only seen inexplicable machine failures for two situations one is heavy memory problems where the kernel has stepped in and started killing things and the second one is actually shuffles if you have a very large shuffle sometimes you exhaust the inode number so that's just that's just a little aside in case you start running into that but focusing more on memory again how do we make sure that we have a memory problem well there's two hours executor Java options you can include one is to have it print GC what you'll see here is that the GC will start climbing starting in like the seconds and I might get two minutes in fact and so it gets exponentially worse as the dog can progress also make sure you have heaped up on down memory error so that if it does die you're guaranteed to see it and get a heap dump SPARC is actually good about telling you that you're a murmur it should print it back to the executor though I printed out to the driver and finally if you're really at a loss you're looking at the executor log and it just sort of stops one thing that you may have to check is actually D message which is the kernel-level logging and so you might see that the Colonel's own killer has stepped in actually killed your process alright so these things are pretty good at letting you know that you have a memory problem and so how do I resolve a memory problem the final part well first thing is obvious increase the SPARC executor memory increase the amount of memory the heap size of our executives second one is actually to increase the number of partitions that can also be it because as we said larger partitions tend to mean more memory usage which means like you know if I'm building a set in memory in my program from one partition you know a smaller prediction smaller addition is strictly better so there's a big there's no reason that spark doesn't need to say have a partition in memory there's no what as of sort by keys again the last one but as a sort by key there's no reason that whole partition has to ever fit in memory or even a part a large part of it but you can decrease the memory pressure by increased number of origins at some point though you may think this is a hard problem if we lie to execute arbitrary code you can allocate arbitrary data structures you may have to just really your program structure revaluate to make sure that you're using an appropriate amount of memory so that one node can handle it each node can handle it not not why don't you know it's never can handle it whatever it's allocated all right and this is this is a hard problem I don't have a general solution for it but at least hopefully we can figure out when that's necessary okay so let's go back to this example real quick and just try to fix our mistakes and I don't really care too much about this example what I really want to show is that we can sort of apply our for sort of you know common issues to this this guy and see how we can change it am i doing okay yeah I'm gonna go through this very quickly cuz I actually want to show you a demo a little brief demo showing the differences here so for instance we can say that I need to ensure I have enough partitions for good concurrency and so I can do is throw in a repartition this just takes my data and that would but that's four parts and mixing is six I can let say reduce memory consumption and reduce the shuffle by actually doing it pushing the distinct up and when I'm so we're not trying to get here is that first of all this is a very specific example second of all it's not easy to sort of say oh I should throw the distinct up there right this is something you can sort of think about and do but I'm just trying to show you how each one needs from addressing one of the parts down there and then if I do this distinct I don't need to make that set so I further reduce memory usage and secondly I can actually combine I can reduce the data shuffle by combining the repartition shuffle which it does a shuffle by itself and the distinct which does a scuffle by itself into a single operation which is the distinct that does shuffle into six partitions and finally I can actually look at this group by key and named size and see that this is actually a very common pattern in SPARC and there's a very it's a far more optimal way of doing any code reduced by key which simply takes the key and adds applies this function to all the values and now we see by comparing this to our original it's about the same number of lines of code maybe it looks even a little simpler but certainly if you approach this problem or your coworker approaches problem you might have very well come up with a second a second solution and so the goal here isn't to understand how to go from one to the other that sort of requires just thinking about your application but the goal is to sort of be able to understand how the second one has better properties in terms of execution in terms of there's the shuffle in terms of memory usage than unless I can't and I'll actually want to show you this in action if I can demo gods be willing okay see ya okay so this is I mean this is in our product cuz it's very nice to do a demos in it but what you'll see here is this is the first one is the exact example we were looking at except I replace the HDFS with s3 and there are 40 million names in this data set there are four gzipped files to show you my cluster let me go over here you'll see that I have yes indeed I have three machines with one quart each I really stripped these machines down and now I will execute this guy all right let's see if it starts running okay starts running so listen let me just show you a little bit of detail by looking at the application sort of how can we look at the applications you might maybe what's going on yeah okay so what you see here is it started executing the first stage so remember stages we have two stages in it's this guy and the first stage is labeled here as the map stage as the last operation in the first stage and maybe and we see here also we finished the first three and we just started the last task so we can see that already there's some problems in terms of how we schedule these so there's two waves and it's not not very even and we see okay so I just refreshed and so now the first stage has completed the second stage is started again it has this weird wave format and we also see you looking at the right the shuffle right that we wrote 300 megabytes which turns out is all of our data energies at format and the other side we're reading it and we finish reading it okay so this whole thing took only about 41 seconds but now let's try the better one when I just transformed and then should I execute it and we'll let's do this run and we'll see that so the bottom two are the previous ones and the first one is the top first one with a new one and we'll see that it it is starting out having only three guys execute to give you a little clarity on that on what this is showing actually if you look at the tasks succeeded there's a light blue section and the light blue means what's currently running the dark blue means what's finished so we see that 3/4 of the bar is light blue meaning that three tasks are finishing and we can dig in deeper using this you are at you I I'm just starting the highest level okay so now it's already finished so I can see that finish back here this one took nine seconds and the other one took approximately 41 so that's only about a 4 for X speed-up you can expect a 4 or 5 X speed up but you'll know that this is very small data I really trim this data down to just run in a few seconds to give a quick example if this were like 40 minutes versus 10 minutes you might start caring and in fact the suckit the first one scales much worse than the FIR than the second one so you might actually get worse than that and so looking at the second one these are the three stages so this was actually divided up into more stages than the other one but we actually wrote a lot less data if you look at the far right column the shuffle right column we only wrote a 155 kilobytes versus 300 megabytes okay so we're able to reduce the amount of shuffling and after the first stage you'll notice that we have six tasks in each and so the data was was executed for probably in two ways okay so we've sort of seen that some of the benefits we can get from optimizing and we can see how we can use the UI hopefully to sort of see what's going wrong and hopefully that's enough so that's all I have thank you for your time [Applause] [Music] Hey so we'll have plenty of time to have questions on this just wanted to make a quick announcement it's 2:45 now we're actually gonna delay the start of the next session 15 minutes as we delayed this one so we'll start at 3:15 and well that errand go over his time slot here a little bit cuz I'm sure there will be questions so thanks how's it so the shuffle memory fraction okay so I mentioned earlier that spark assumes it has 90% of them oh yeah sorry my budget the question was what about what is the shuffle memory fraction and does spark always write to disk okay okay so and so the answer is that the shuffle memory fraction is by default 20% and what that means is for doing group eyes so try to be quiet if you're actually think I'm moving around to other places the crowd can we collectively shush here thank you if if you are gonna leave please leave quietly because many people want to hear the answers to these questions and just go talk outside it's totally fine to do that thanks okay thank you so the shuffle memory fraction is 20% of the memory and this is actually used not for the shuffle itself this is used on a group by like for the group by the second side the second stage of a shuffle it's used to build this in memory map and so what this 20% means is it's willing to use 20% of total memory to build a map and if it's gonna go over that it starts spilling to disk okay um so okay so it's just playing the disk so that we use extra memory the actual shuffle itself always does write to disk although it will often hit buffer cache if you're not running that much so you have to worry about to highlight in season please start running a lot of data question okay so the question was how can we optimize the memory footprint of our LEDs that's a pretty general question we answer this in terms of caching so when you load data from from loaded editor already one thing that's sort of unexpected sometimes it's sort of the fact that a deserialized object in memory is much much larger than you might expect like a string is like twice as large so one thing you can do if you're trying to keep a lot of data in memory is to cache it in a serialize fashion or to cache it off heap and something like tack young and so it's realized fashion tends to be much more compressed and we can actually see you actually set compression on for rd2 uses like rd d dot storage not compressor something like that to compress them in memory and you can keep a very large amount of data in memory compared to having them deserialized OB the answer is question ok so the question is how do we decide how can one should have come up with a good number of reducers and how can you assign them so the number of reducers is by default the number of mappers and this is by the way when I say reducers and mappers it's the number of partitions in the first stage versus the second stage in any shuffle it's by default the number of mappers and this tends to be pretty good default you start running into problems when you have maybe like a million partitions and you try to reduce to inline partitions because I creates a lot of intermediate data but in general you follow the same rules as the number of mappers you want enough partitions for a concurrency and that's an end maybe if you're doing a very large group you also want to have more reducers so that the data is more spread out among the partitions what was the second question how their digits I'm sorry the reducers are assigned randomly because we read in the data from every year it's an all-too I'll read anyway so it doesn't it doesn't really matter where we put the greasers we just distributed mantra that's right could you repeat a question Oh yes I'm not sure I understand question please yeah so if I if I know some question and if not please come see me afterwards if the question was actually I don't even know enough to answer the partial question what happens on the nav side okay so on the map side over to me maybe you're asking about on the matter to stick oh I see okay sorry now I understand the question was why how did we spill a state on a matter as I believe your question and so I had a distinct here and then it turned out I spilled way less data to disk and why is that true I thought your question that that's a very good question and the answer was this partial aggregation because a distinct is actually just sort of a reduced by key where the value is always null and you just if you find two keys with the same the same that it sorry to kept saying can you just put them in the same value and so therefore you keep it distinct and so it turned out there's a lot of distinct in terms of in the main database and so we were able to only keep around on the map side the distinct names and write only those two discs and so that's why it was much smaller than 150 are we combining the keys back-to-back yes we do combine the values for the state for the same key in the map side and then we do that that's the partial aggregation and we do that again on the reduced side for all of the reduce keys I thought didn't answer a question please come see me afterwards okay so the question was how does spark handle it if you have a heterogeneous system where you like a terabyte of memory on some machines and you're like 120 gigabytes on other machines yes spark will handle this correctly insofar as if you only do tellus product that each of these modes have much memory which is specified on the worker itself there's a configuration then a spark will not try to cash a terabyte on 128 gigabyte nodes prism and and so more data will come and become cached one thing interesting me a cache is a cache actually acts as data locality so as more data gets cash than one terabyte you actually have more tasks executing on that one so basically everything will work out perfectly well so the question is having a hard coded number of partitions to run an arbitrary scales I mean if your idea is to write a job and then run it on a whole bunch of different clusters you would not want to have a fixed number of partitions you can get the total number of partitions the number of cores in your system in SPARC and sort of by default the default number of partitions is the number of cores in their system and sort of you can sort of be like it just a little bit of it arithmetic on that and say I want to X or something 4 and this way it like just read the number for the current cluster and run on that and I would allow you to be a little more versatile okay so if you have a spark application failure are also also known as driver there are two solutions to know like what went wrong first of all typically the driver runs like on a machine that you control so you just run it and unlike the applications running take a look at the logs there secondly we just introduced in a believer one Dido a history server which allows you to see prior like in the same way as like our spark UI allows you to see like current executions you can see prior ones and you can look into their to see sort of previously to click on previous logs it doesn't actually include the application logs though because again that's controlled by your own machine so if you've lost those logs there's not a lot we can do about that suppose suppose you to play along I'm questioning Mancha I said yeah so you've among yarn cluster you run your driver inside of the yarn cluster I don't have actually a lot of experience running on clusters but I imagine the logs are go somewhere and and so the drivers is a fixed point so you'll have an IP so basically that's a fixed point you can go to to look at the logs there okay so the driver logs are at like all of the executor logs if you have an executor error or something it'll be printed in the driver logs so you only should have to find where the driver is when the job finishes the clusters kills him and he left well I mean if you're looking for the longest you have to the logs will still be there I'm not sure I'm not sure what you're looking for not long okay sure so I see so if the point is that the containers cleaned up after the job is finished therefore you lose the logs this would probably right now require a manual solution where you just upload the logs to s3 before completing the tennis course I'm like that I apologize it's actually even easier than that there's a configuration already to save the log somewhere else in a persistent search thank you question how do you force the shuffle Klaus could attack you I'd like to say there's a way right now but there's not currently away and the problem is so the problem is our current shuffle notation it's now pluggable so in one - I'm saying is not a way after that oh there will be a way but the problem is the current travel invitation uses the POSIX file system so it sort of it can write to the local disk can write to it NFS descriptor act like a man disk but I can't write to an HDFS file system so this is a very this would be something we'd love to do but it's not possible within a partition I see okay so the question is um let's say about circa start bike he does not group it by the key it only sorts it by it the key column so it does do a global sort so the question is sort by key doesn't that require loading memory and the answer is yes because right now it does require loading each partition in memory because we do a memory sort but there's a patch out right now which solves this bone so it will not glow both sort will not require you know some of the wit mappers will not require any memory each partition oh well if you just want a search partition but like within a partition but also weak so the standard library includes a way to do a global sort if you don't care about the global part well we'll give it to you anyway yeah that's true so you might have to do read okay we don't anything on the standard library although I mean spark one thing that's nice about spark is that you can sort of like you can extract out what you want so you could use the same logic we use to global sort just to do the sort in a petition it would require some manual work but I don't think it'd be difficult that's right yes okay maybe two or three more questions let's get from this half the room does so the question is a does spark have specula Texa q ssin yes it does it's actually not turned on by default but just take a look at the configuration you can just turn it on and it'll execute when it's getting close to completion or if there's tasks I'm taking - yeah so the question was if I if you want to use packet of execution doesn't make sense to have slightly less than a multiple of a number of partitions and yes it does so typically I might say not actually to X number of cores may be a little bit less than to exporting I mean you want say something random okay so that it look it gave me a weird luck but I always say that actually having a little less than 2 X the number course is actually pretty good for where streams are merged I'm not not you can you control the number of streams that are merged that's not exactly how it works Oh AC yeah yeah right to compare it you don't have a control of the number streams that emerge okay yeah let me I mean random might want a clarification in general we don't really like the number of partitions tends to be much greater than in recorders like I might attend down there perdition's 464 quarter for a hundred chords or something and so you don't often up for rebuttal or about but rather many times you know for what the lower bound you'd have to make sure that Elly's there one more question that's a great question Patrick I'll answer that cuz I'm gonna wrap up anyways so the reason why those metrics are different is that the but so what the question was is we report the bytes written before shuffle and then we report the bytes read on the other side of the shuffle and actually it's almost always the case that the read bytes is strictly lower and it's because we only count remote reads so like reads from other nodes that are expensive if a map outputs on the same Notah then the reduce executes right there it reads some fraction of the data set from the same node so we don't report that because there's kind of no cost to doing that but it's a common source of confusion and there's been debate as to whether we should just sort of artificially report that as a read so that like it matches up and people don't get concerned yeah so that's that but anyway I didn't mean to interject here so we should really give Erin a huge round of applause all right thanks very much so we'll start again at 3:15 for the next session
Info
Channel: Spark Summit
Views: 125,131
Rating: 4.9311743 out of 5
Keywords:
Id: dmL0N3qfSc8
Channel Id: undefined
Length: 44min 2sec (2642 seconds)
Published: Thu Jul 17 2014
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.