Deep Dive: Apache Spark Memory Management

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hi everyone my name is Andrew I'm a spark committee our data breaks and today I'll be talking about memory management in spark now before I begin I'd like to tell you upfront that I do not have a demo prepared for this talk unfortunately but I promise I'll still make it as as exciting as if it had one so in this talk I'll be covering a lot of internals and spark in particular given a simple spark program that looks something like this we'll take a deep dive into all the cool things that go on behind the scenes so what happens when the user does caching how spark manages that memory the memory contention issues that might arise as a result of that and once we take care of the memory contention issues potential optimizations that we could do to make more efficient user memory the option to use off--you caching and avi execution memory as well and the performance benefits that entails and really other potentially scary things to average you know spark users so you might be thinking to yourself well if I'm just an application developer why do we care about all these internal details of how spark manages its memory well it turns out that not just in spark but in any data intensive processing system out there memory is an extremely indispensable resource and I would even go further to say that efficient memory use is critical to good performance that techniques for managing memory efficiently are commonly transferable across systems so to illustrate what I mean by this I'd like to borrow from an analogy by Jim Gray so to summarize all the distracting graphics on the screen essentially what he's trying to say is if you try to read something from memory is like going all the way to Sacramento which takes about 90 minutes then reading the same thing from disk is like going all the way to Pluto which is many orders of magnitude slower now the way this applies to SPARC is that where possible we try to keep as many things resident in memory as possible and otherwise we have to incur a heavy penalty of performing disk i/os so in an environment like SPARC for this reason memory is always a constrained resource because if you had more then you would then you would avoid spilling two-disc as much so unfortunately because everyone wants memory there's naturally going to be some memory contention issues that we need to address and these issues have posed challenges for designing a sensible memory management system for a system like spark for example there's memory contention between execution and storage which are the two main usages of memory and spark in the next slide I'll explain immediately what these mean there's also memory contention across tasks running in parallel and in spark a task is essentially the smallest unit of execution that represents a partition in your data set further there's also memory contention even within and tasks across operators I'll also dive into more detail about what this means exactly later but for now you can think of an operator as a step in transforming your data so for example a sort or a group by now I mentioned earlier that there are two main usages of memory in SPARC and these are execution and storage execution is kind of like the memory that you use to buffer intermediate results as you as you perform your computation for example if you're doing a group by you're going to need to put your values in some sort of hash map and you need memory for building that hash map so that's execution memory storage memory is perhaps more familiar to the user it's basically what so when the user caches their their data set what happens is we're going to put the values of that data set put it in array somewhere and keep it in memory so that's storage memory the main difference between the two is that execution memory tends to be quite short-lived as soon as you finish the computation and the results of that computation have to have all been consumed then you can immediately go ahead and release your execution memory however for storage memory the purpose is for future computations to be able to reuse our data so we can't just remove it whenever we want we have to essentially explicitly remove it or let you know LRU take care of it so to illustrate this is with an example suppose I have an iterator of numbers I want to sort them so the sensible thing to do here would be to read all those all those numbers into an array of some sort and run quicksort or some other sort sorting algorithms and output an iterator of sorted values then I might go ahead and run some action on it for example I might take the first three elements and so on so the memory used for buffering the input data here is known as execution memory now what happens if I want to get the sort of values again well the contract of the iterator is that you can only traverse it once so what this means is we're going to have to essentially perform the sort again to get those values and this could be very expensive so perhaps a better thing to do here would be to cache the sort of values if we know that we're going to reuse them over and over again so now we can go ahead and run whatever actions we want without having to trigger the sort again so the memory used for caching here is what I call storage memory while the memory used for buffering the input data only to sort them is called execution memory so here you'll notice that execution memory is rather short-lived because as soon as we consume the sort of values we can go ahead and release it we no longer need that memory anymore so now that we understand what execution and stores are let's address our first challenge which is how to arbitrate memory between the two use cases so the simple solution here is what I call static Simon so suppose you have your total available memory here for our purposes you can think of this as on heap memory and the size of your JVM what this says is essentially assign a static chunk for execution and then assign a separate chunk for storage so these are two completely distinct regions they don't interact with each other in any way and this is what I mean by static assignment so this has been the solution as have been the solution since SPARC 1.0 which was released more than two years ago and we recently we changed this a little bit due to some problems that it has which I'll talk about it shortly so let's run through an example of what this looks like in practice so I'm going to acquire some execution memory that's what the little green bars at the bottom mean and then I'm also going to require some storage memory for caching some data set so eventually I'm going to run a execution memory so what happens now is that I'm just going to spill whatever I have in my execution memory pool to disk right and as I mentioned before this could be very expensive all right so let me go ahead and do that and then similarly for storage if I keep acquiring storage memory I'm going to run out of storage memory and I'm going to have to do the same more specifically for storage we're going to evict the least recently used block to disk so in spark caching is expressed in terms of blocks and each block is a you can think of it as an RTD partition for example so let me go in and do that so I mentioned before that efficient memory use is critical to good performance however the inverse is also true which is that inefficient memory use leads to bad performance and in this case the inefficiency is that execution has disability' disk even though there's all this available available space on the storage side that we're not taking advantage of so almost by definition execution can only use a fraction of the memory no matter what's going on on the other side so you might argue that well one way to solve this is to you know simply tell the user have the user tell us you know how large each region should be if the user knows that they're not going to use any storage memory then we'll just go ahead and you know set that to zero and give all the available memory to execution however this poses another problem which is that efficient memory use now requires user tuning essentially it bubbles up the complexity that we should handle internally in SPARC to the user and this is just not a friendly API especially to newcomers to spark so let's fast forward to 2016 what are some things that we could have done a little better well what about instead of expressing execution and storage in terms of two separate chunks we express it in terms of one unified region such that they both it that they both share so as an example what happens is if I try to acquire more execution memory I might borrow from the storage space if there's not any storage memory being used so so previously in the other approach I would have already spilled by now but now I'm just you know happily acquiring execution memory without spilling so this is what I mean by unified memory management it has been in SPARC since SPARC 1.6 which was released earlier this year so this is kind of a silly example right there's like nothing on the stored side so what if there's already some storage okay so I'm going to acquire some execution memory I'm going to keep sorting and sorting eventually I'm going to run out of execution memory and what happens now is I'm going to Vic storage so as before I'm going to evict at least recently used log to disk so once I do that I might keep acquiring execution memory and it make more storage as I need so what about the other way around when storage tries to acquire memory so storage now is the one that's growing you know I'm putting in a new block and another block and eventually I run out of storage memory so what happens now any guesses yeah so it turns out that as before we also a big storage memory so the key thing to note here is that essentially this is what what this is doing is to make space for the new block we go ahead and kick out some old blocks right that's what the LRU is doing so there are some design considerations here you might have noticed that in both cases we chose two big storage rather than execution this might be a little surprising to some of you but the reason the main reason behind this is because build execution data is always going to be read back from disk whereas cache data may not I what I mean by this is if you're spilling something while you do a sort you're going to have to return those values to the color so either way you're going to have to read them back from disk and there's just like no getting around that but anecdotally I've seen many applications where the user just overly aggressively caches things you know they put cache after every single line doesn't do anything it at best such as Bruins or cash locality so that so my point here is that it's a very real scenario where the cash data is never read again you might have another concern where what if the application relies heavily on cashing such as like a machine learning application so we can't just blow away all the cash data nonchalantly so instead the solution here is to simply allow the user to specify an amount of storage memory that cannot be evicted so note that this is not a reservation we do not pre allocate a chunk of memory in advance such that execution cannot borrow from it rather once there is already cache data only then this value comes into play so let's move on to the next challenge where we try to solve memory contention across tasks running in parallel so again the simple solution here is static assignment so suppose my working machine has four cores so essentially what this means is I will have force fixed slots no matter how many tasks are actually currently running so even if I have one task running is only going to get at most a quarter of the total memory perhaps an alternative a more efficient alternative is what I call dynamic assignment where how much memory a task gets depends on the total number of running tasks so if I'm the only task running I can feel free to acquire all the available memory there is however as soon as another task comes in I'm going to have to spill to disk and free space for the new task and this is because we want to ensure fairness which is important so what we end up with essentially is something similar to what we had with static assignment where we have two thoughts however the number of slots are determined dynamically based on the number of tasks and the spill thresholds for each one of these tasks is now much smaller than before and so on if we have more tasks we're going to evict existing tasks to make room for them one notable change in behavior here is that if I have a straggler which is a last remaining task that blocks computation so these are potentially expensive because everyone else is already done but you're the last one kind of chugging along ideally you should get all the memory available and that's what this model allows you because the number of actively running tasks is now one so you can feel free to get all the memory you want and this has been the solution since spark 1.0 in these past two years we haven't really noticed you know major problems with it it seems to have been battle tested pretty well in you know in production code so we haven't found any reason to change it so a simple comparison between the two models will quickly reveal more similarities than differences in particular both of them are fair and starvation free for the static case you can easily see why it's fair it's pretty much the definition of fair like you literally can't get more and that in the dynamic assignment case it's also fair because is basically the static assignment model but the number of slots are determined dynamically such that each actively running task gets the same amount of memory as everyone else it goes without saying that the static design man model is a little simpler and however it doesn't handle stragglers which dynamic assignment which is what spark does does so now that we've handled memory contention across across tasks running in parallel let's address the memory contention across operators running within the same task so due to time reasons I'm going to sort of zip through this part a little bit because I have more stuff at the end that I want to talk about so you can look up the slides afterwards they'll be post online so suppose I want to run a simple query that looks something like the following I want to do a group by and in order so the resulting query plan is something that you see on the right first it'll do a scan and then it will do a project to filter some columns and then our gate and sort so it's worth noting that all this happens within one task right so there's memory contention across the different operators so to illustrate this let's say my worker has six pages in memory so a page here is a essentially a fixed unit of memory it doesn't really matter how much it actually is so aggregation our gate is going to grab all six pages and then it's going to use those pages to build a hashmap that looks something like this and then when it comes to sort it realizes that it actually can't acquire any memory so it has no choice but to fail so a simple solution here is to simply reserve a page in advance for each one of these memory intensive operators so in this case we know sort and aggregate are both memory intensive so we're going to give them a page in advance so this solve the starvation problem but it's actually still not fair right because if we go ahead and run through this area is just going to grab five pages leaving sort with only one the bigger problem here is it's simply not feasible for some query plans what if I have you know seven operators that require memory you know it's just not doable so perhaps a better solution is what I call cooperative spilling so it works like the following suppose our gate grabs all six pages and then when it comes to sort stern it's going to cooperatively force a grid to spill a page right so essentially it steals a page of memory from aggregate and then when it determines that it needs more pages it's going to force our go to spill more pages and so on so here they both end up with three pages so so it's fairer than the one before and aggregate doesn't have to spill its remaining pages so this has been the solution adopted since part 1.6 oh also I'd like to mention that I didn't just you know talk about the first solution you know make this solution look better the first solution was actually a real approach that we actually thought of implementing in SPARC until we realized later on that there are some problems so these are both solutions that we've considered now as a quick recap like to summarize the three major sources of contention that we saw which is between execution and storage between tasks running in parallel and across operators running in the same task perhaps a common theme across all three of these is that it's generally a bad idea to statically reserve memory in advance because that means someone's not going to get some memory even though the other guy is not really using it instead a better way to do this to deal with this is to is to force members to spill when there is memory contention so essentially we're solving memory contention lazily rather than eagerly so now that we've dealt with the issues with memory contention like to talk about the optimizations that we can do so you might have heard of this term project tungsten floating around essentially what the high-level goal is to rewrite the execution engine in such a way that is more aware of the lower level constructs for example it introduces a very compact binary format to represent rows and there are many advantages many many advantages to that value out shortly it also introduces data structures that are more aware of the L 1 2 3 caches to improve the hit rates and there's also cogeneration but that is not really relevant to this talk so not going to talk about it so perhaps the main motivation for tungsten is that Java objects are very expensive a simple string like ABCD actually takes 48 bytes for some reason even though you can really encode it with just 4 bytes it gets worse if you try to represent a row with Java objects so here we have a simple schema you know in integer and two strings so we're going to go ahead and box all the primitives and introduce a bunch of objects and make the hash code really expensive so with tungsten a better way to format the row would be in binary format where we just pull the primitive types directly into the row directly is bytes because we know exactly how large those are and then for variable length data like strings we can go ahead and store the offset and then have it point to the length of the data and and the actual data that follows so the details here don't really matter the takeaway here is that this compact storage format allows us to allows us to essentially reduce our memory footprint significantly and that's one obvious advantage because now you can pack more rows into memory perhaps a less obvious advantage here is that it allows you to skip a lot of serialization and deserialization because the row is already here realized and because you know exactly what the schema is you also know the offset so you can operate directly on the serialized data another aspect aspect of tungsten is what I call Kasia where computation so the high-level idea here is to to lay out data in memory in such a way that is more conducive to a higher hit rate for a one to three catches for example if I want to sort a list of records common and very sensible approach to doing that might be to you know put all the pointers in every way and sort those pointers instead of the actual records however this actually has pretty poor layout in terms of cache locality because every time we want to do a comparison we have to dereference two pointers and that leads to a lot of random memory accesses instead a better way to perhaps layout the data is to sort at least part of the key along with the pointers such that every time you do a comparison you compare two things that are pretty near each other probably much nearer than having to do a dereferencing so this is just one of the many many optimizations that tends to allows there are many other ones that I'm not going to talk about finally all feed memory usage is another very important way to make use of memory efficiently so this has been available for execution since SPARC 1.6 and today I'm happy to announce that it will be available for for caching as well in spark 2 2.0 so a little bit of motivation as to why this is important is actually super important for large heaps suppose you have a heap of 256 gigabytes what's going to end up happening is at some point you're going to see it like GC storms so you're going to see GC pauses that last four minutes orders of minutes and that's very bad it's not only bad because you know like your executor is not doing anything while it's doing GC pause is also bad because for communication intensive system like spark when you send a message do you expect the other guy to respond more or less you know quickly but if it's stuck in a GC pause then it might respond too late and you might have timed out the connection and that could stall progress quite significantly there are also many other potential advantages I'm not going to talk about all of them but I do I would like to talk about dynamic allocation so dynamic allocation is a feature in spark that allows you to scale the size of your cluster up and down based on the workload so for example if you realize that you need more resources to do your computation you're going to ask for more nodes and then when you realize that your cluster is idle you can go ahead and kill your executives so how this applies to off you memory is that when you kill an executor if it had cache data then previously you would have just blown away all the cache data on that executor but here with all feed memory we decouple the concerns of the lifetime of the JVM and the lifetime of of the cache data such that you can read them about them separately such so if you kill an executor the cache data is still going to be there you can access them later okay so for more information you can look at these other talks and and the JIRA Oh before I finish I'd like to point everyone to a community edition it's like a very cool platform for you try things out with spark thank you very much thanks very much to Andrew for this awesome presentation we've got about two minutes left for questions so if you do have any questions please come to the center aisle there's some microphones here and by all means ask away all right well then thanks very much we're sorry go ahead please the first microphone here yes yeah hi I'm we use PI SPARC extensively and I would like to understand how I can better monitor the spawning and the memory usage of my Python workers right now what I do is I just SSH into the worker and I do top and I have no way of knowing which part of my code the Python workers are corresponding to and how the memory usage is related to the Python worker memory I said in spark config it's complete black box for me yeah so the question is how do we monitor memory usage when we use PI spark unfortunately the answer is there's currently not a great way I probably do the top thing - if I were you in the future we might expose some of that memory information on the spark UI this is not really specific to Python but like more generally users can go to the spark urg understand more about the memory usage of their application all right not a great answer I'm sorry all right thanks so we only have time for one more question so the person the second in the blue shirt in the second minute we also have booth duty actually at the data bricks booth so you can also ask questions there ok so just as a quick call out so for the person in the back you're the last question okay so you're trading Java memory issues for poor malloc and scalability and fragmentation issues how do you expect to deal with those instead that's a great question so currently we don't really we don't really have a great answer to that either we manage the memory ourselves in inside spark so yeah actually it just depends on whatever the underlying platform currently does yeah so in the future we might address that as well yeah they're terrible so okay it's definitely a real concern though the next session in five minutes thanks a lot everybody
Info
Channel: Spark Summit
Views: 45,496
Rating: 4.871212 out of 5
Keywords:
Id: dPHrykZL8Cg
Channel Id: undefined
Length: 26min 12sec (1572 seconds)
Published: Thu Jun 16 2016
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.