Tuning Apache Spark for Large Scale Workloads - Sital Kedia & Gaoxiang Liu

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hello everyone my name is Rafael Correa and this is my colleague hua Xiong and we are going to be talking about tuning a purchased part for last Cade workload in this presentation on a high level we'll talk about the nature and scale of spark workload at Facebook then when we talk in detail about some critical configuration tuning and optimization we make to various component or spark in order to scale for our workload then we will talk about some application specific configuration tuning and lastly we will highlight some tools which we use to find performance bottlenecks and onion in our spark jobs in facebook we use spark to run large scale batch workload so in this presentation the configuration tuning and optimizations you are going to hear about are mainly focused on large scale batch workload and they might not be applicable for other use cases like streaming or interactive or maybe small workloads to give you a sense of scale X which we operate we run tens of thousands of jobs every day on compute machines space spanning thousands of nodes in terms of single job scalability we are able to scale spark to run workload which processes hundreds of terabytes of compressed input data and shuffle data and can run hundreds of thousands of tasks of course running SPARC on such a large scale does not work right of the back we need to make various configuration tuning and optimization to various component of spark like driver executor and the shuffle service in order to run the job scalable before we talk about those optimization I want to point out that all the improvements we are we talking about here are already present in open-source Park or the some of them are under review so the community can benefit from them let's talk about scaling the SPARC driver the spot driver is an entity which hosts the SPARC context it coordinates with the cluster manager to launch executors on the cluster and it's schedules tasks on the cluster so it becomes obvious that while running jobs which you can run thousands of tasks the SPARC driver becomes the bottleneck so let us talk about some configuration tuning we do for SPARC driver to scale first is enabling dynamic executor allocation the dynamic exists the allocation is a feature which enables SPARC job to add and remove executors on the fly as compared to the traditional static resource allocation where we have to request and reserve resources beforehand the dynamic and visitor allocation you get how much resources you need and not more so for example if the task queue in the scheduler piles up then the driver talks to the cluster manager to spawn more and more executors on the cluster and when the task queue decreases and the executors are idle the driver coordinates with the cluster managers to release the idle executors this is really helpful for multi-tenant environment where the cluster resources are shared across multiple jobs and users and in those cases you should enable dynamic executor allocation to get a better resource utilization and fair resource allocation across multiple jobs apart from simply enabling the dynamic executor allocation there are certain other configurations like the minimum executors maximum executors and executors identify mode which you should tune based on your nature of the workload and the scale the SPARC scheduler architecture is event-driven in the sense that the scheduler for scheduler generates events for example tasks start event tasks and event and different components of the driver implement the interface to listen for those event and take action based on that the event processor currently is a single threaded entity which loops through all the listeners and processes those events synchronously while running jobs running multiple many tasks in parallel producing large number of event we found out that the single threaded architecture of the event processor can easily become the bottleneck and the event processing latency can be up to several minutes this is really bad for a performance of the job as well as it might in the worst case feel the job so we made a change to multi thread the event processor where we have a dedicated queue and thread per per event listener and even processing latency went down from several minutes to in the order of few milliseconds for a large jobs please note that this pull request is still under review in the open source and we expect to get it soon in the in the master next better fetch Rayleigh handling while running a job status scale we this is one of the major issue we absorbed is inefficient fetch freely handling in SPARC driver when the size of the cluster increases the chances of encountering failures of like Network timeout node reboot or shuffle service restart increases and as a result your chances of encountering any such failure unfortunately the spark driver is not robust enough to handle all kinds of fetch failure we can see a classic example of this is we were seeing one single such failure was causing multiple retries of a state which is not desirable this not only increase the latency for the job it also fails can fail the job because the maximum number of fetch failure that ask that is currently at all is 4 this is due to a bug in the scheduler which was not running all the map tasks needed in case of fetch failure we fix the issue and our jobs were much stable after this change in case of fetch failure apart from that we also improve the scheduler logic to handle various such failure scenarios which I am NOT going to talk about in detail but you can refer to the open source JIRA to know more about them one thing I want to highlight is the maximum number of fetch failures which was hard-coded to four we have made it configurable and in case in your environment you are encountering many fetch failures and job failure because of them then you can bump it off to stabilize your jobs next is tuning for the RPC server threads well the running jobs of X scale we observed that the driver was frequently you mean looking into the driver heap dump we found out found out that there is a huge backlog of RPC request built on the driver and the RPC request we're consuming most of the driver memory the default number of RPC server thread which is set to 8 is just too low for running large jobs and in case of many events Arjun rated in parallel then this RPC server can easily get back locked so you can try increasing the number of threads and that might help get rid of oom and jobs might run stably let's talk about scaling the SPARC executor one of the major challenge we face while scaling the SPARC executor is executor failing the job so proper memory configuration for SPARC job is very crucial to running the jobs reliably before talking about the specific memory consideration it is important for us to understand the memory layout of the executor as we can see the executors memory is divided into four sections called a shuffle memory user memory reserved memory and the memory buffer out of these four sections the most interesting are for us our shuffle memory and the user memory and we should work towards tuning them the shuffle memory is used to buffer the shuffle internal data structure inter when the map tough is running as it reads more and more and more data it stores those data into the shuffle memory and as you run out of the shuffle memory the buffer data is spread to disk so in theory if you increase the shuffle memory then you can avoid spilling often to disk and that will speed up your job the user memory on the other hand is used for user specific data structure and how much use the memory to allocate it totally depends on application the shuffle memory and user memory these are both configurable by one configuration SPARC memory fraction and by default 40% of the executors memory is allocated to the user memory in case your application does not need that much of user memory then you can actually bump up the chef a spark memory fraction so that you can trade the user memory for shuffle memory and and avoid frequent spill to disk which will increase the performance of your job okay now you have tuned your spark memory fraction what else can you do to speed up your job you can actually enable off if memory for spark what afife memory it does is it enables you to allocate the shuffle data structure off if in native memory which means that they are not allocated or managed by JVM memory manager and so they are not subjected to garbage collection so you by enabling shuffle off each memory you can actually avoid garbage collection overhead for the shuffle data structure which will help you speed of your job third configuration for memory management we found is tuning for garbage collection the shuffle internals allocates large large contiguous in memory buffers and this frequent allocation of large contiguous buffer actually is does not play well for some garbage collectors like D 1 GC we found out that G 1 VC suffers from fragmentation because if the size of the object is more than the region size which is 32 MB so you can switch to parallel using seed of G 1 JC to avoid any room due to memory fragmentation let's let's talk about tuning for disk i/o before talking about tuning for desire we should know where SPARC actually does disk i/o as discussed previously the shuffle memory is used for storing intermediate shuffle rows as the map task reads more and more rows the size of the shuffle memory the buffer for the shuffle memory increases and as you hit the limit the data is sorted and spelled to disk a map task can actually produce several such intermediates spill files in discs which are later read from disk and most to produce the final shuffle output on disk since the disk access can be in the order of 100,000 times slower than memory access the disk i/o which happens during spill and shuffle can become the performance bottleneck for the jobs you might think that your data is small enough to fit in memory so you don't need to care about disk tuning but that that's not entirely true because even if your data is small enough and you can avoid spilling into disk the final shuffle output has to be written to disk so it's become vital for even for small jobs to tune the disk i/o performance for better performance fortunately the buffer sizes used by shuffle & Spell are already configurable but the default size of 32 KB does is just too small for for large jobs we recommend to increase the sizes too in the order of few MB to amortize the disk cost and speed of the jobs even after configuring the buffer sizes for disk i/o we found out that the Spill merge process was still taking a lot of time the reason for that being the default spin merge process does not do a buffered read and write so you can change to buffer swimmer purposes by disabling spark dot file transport to equals false and the buffers will merge process will actually read multiple partitions in memory and buffer them and merge merge the partitions and store the output in memory and flush to disk if the buffer size increases and the buffer size used for the Spill merge is also configurable and the default size of course is too small you can try increasing the buffer size for better performance third part of tuning for disk i/o is compression block size tuning we found out that the default compression size used by SPARC is suboptimal and you can actually cut down the size of the shuffle and spell as much as by 20% by increasing the block size for compression from 32 KB to 5 512 KB apart from this we also made various memory leak fixes and improvement to spark executors which I am NOT going to talk about over here but you can refer to the open source JIRA to know more about them lastly let's talk about scaling the SPARC executor externals shuffle service while running jobs which can which can shuffle hundreds of terabytes of data it's easy to see that the shuffle service becomes the bottleneck we were observing that our large job we're spending as much as 50% of the time trying to read the shuffle file profiling the shuffle service we found out that it's spending significant time reading the index file from disk the civils data produced by a map task is split into two files index file and the data file as shown in the figure the data file actually contains the data which is consumed by the reducer and the index file acts as a dictionary to look up into the data file typical shuffle read involves reading both the index file and the data file so let us take an example how the shuffle grid happens let's say we have two reducers and they want to read the shuffle data from one shuffle service so the reducer one issues a supple fetch the shuffle service goes read the index file and finds the offset and reads the data file sends the data back then it is ER to issues the shuffle fetch the shuffle service again goes read the index file finds the corresponding offset and reads the data and censored data back we can see that the index file is being read over and over and we made a change to cache the index file state and after I change we can see that when the first leisure issue is a shuffle fetch the shuffle service reads and caches the index file so the next time we do so two issues a shuffle patch the shuffle service does not have to read the index file from disk it just uses the caches content in memory and sends the data back so in this way you can actually save one disk i/o operation per shuffle fetch of course one mapper produces one index file and when you have thousands of mappers running then there will be thousands of index files and you do not have memory to cache all the index file so we use the LRU cache which which is used to evict the unused index files and depending on your size on the size of your workload and amount of shaffer memory ever use by the shuffle service you can soon the LRU cache size to increase the cache header rate and speed of your shuffle fetch apart from this there are several a few other configuration we find useful to tool for shuffle service mainly the number of worker thread and backlog and the shuffle registration timeout and we try we often observe that the shuffle service becomes unresponsive when it is transferring large amount of shuffle data and tuning increasing the worker thread and backlog and the registration timeout retry helps us recover from those transient issues that's it from my side on talking about scaling different components of spark but if you are interested you can read about more about it on our blog post on scaling spark next I will hand over to my colleague to talk about application tuning and tools hello everyone so my colleague said how just talk about the turning large spark workload from the system perspective so I'm going to switch gear a little bit to talk about performance turning and the true means from the application point of view so my name is Gaussian I'm a software engineer in the add infrastructure team so a scheme is a large user for the sparking Facebook we have been running a lot of run large and medium-sized workloads in the spark cluster and the first experience I want to share from the users perspective is about the autumn automatically turning the parameters for some of the key conflicts of the spark so so why do we want to do this and there are primarily two motivations the first from our customers perspective we care about the latency of the critical jobs of course and the same amount resources because we want to make sure our critical workloads meet the SLA criteria instead of validated and from and the second motivation is about the usability of the spark so or the especially for the new user of the spark a user may be confused by a lot of the spark parameters so that when the first time they try the spark engine they cannot get optimal performance compared to their previous experience for example using hive or price code of etc so we want to limit the number of manual journeys as much as possible to achieve compare what your performance with manual turn parameters so that's the reason why we have an auto tuning of the my per and reducer or partitioner whatever you call it so the basic idea is that we think that for large workloads so if the job is granted to few number of my friend practitioners it will take forever for the job to finish but on the flipside if the job has been granted to many number mapper and reducer z' then it will overload the cluster it will increase a lot of burden to the cluster which may take over the whole class and block other people's job and it may also increase the burden to the external shuffle service so so we we have find that in in general the number of mappers and number reducers are roughly proportional to your input table size so this is not a catch or solution but we do find it practical for a lot of use cases and so we developed his clearest based approach based on the table input of size and also we kept the maximum number of mappers and reducers and the minimum to make sure it neither overloaded cluster and in the meanwhile it guarantees the minimum amount of resources so currently this feature is the rollout in to all the production jobs in facebooking spark cluster and it has been used to onboard new spark you see cases as well as a migrating Automag rating existing hive queries from the hive to spark automatically to have a better performance and CPU reservation time next I'm going to show a little bit about the tools we have been used to debug the spark job so summer tools may not specific to Facebook but we still talk about here and some are Facebook specific tools so the first two is is not Facebook specifics but we do find a sprat UI metrics to be very useful to kind of break down each of the stages to see where the time is being spent on average and on the p99 so it will give the user a rough idea why their job is so the second two I'm going to talk about is a flame graph so we have a service that periodically just stack the stack trace on the worker node and we aggregate the account and look at the metrics and dump it to our internal tools called scuba so scuba is an Facebook internal two that is used for route to real-time metrics and aggregation and breakdown so the user could slice and dice into the scuba dataset to find how their job is performing in real time so we find that by utilizing this - it will make the user easier to find where other where are the hot spots of the CPU so for example some time's the report on that might be some user-defined funk u D F and we use this to to find the bottleneck and we'll find there's some park in the UDF and we go and fix it and last but not least it's also we have built a couple monitoring tools on top of the scuba which are just missing about so for this one is about the task level metrics so we we collect the task level metrics into and real-time and in real-time family to the scuba data set so this could be very turn out to be very useful for the user to answer questions such as that how many jobs filled in the last hour due to the auto memory error and all for example is there any stragglers of them my job because the user could very easily sing on a time series graph like how many drag lers thread stragglers are there in the in the job run or how long that the job takes to gather resources so sometimes the job is slow not because itself but because it is blocked by other jobs so the user could figure out the route cost more easily in real time by utilizing this tool and also for some of the critical jobs user could very easily set up alerting and monitoring such as dashboard or more monitoring or alerts so if something goes wrong they can receive the alert and so that they can go to and fix it and with that I have just one minute left and that concludes my part there are some references about the scuba which is a facebook internal to is our developer blog you can go and look at it in some Apache spark skill it's about the big pipeline we have been productionize in facebook and with that I conclude my talk to have any questions feel free to talk me into me I said help okay we've got time for a few questions for Gaussian and C tell any questions one here hi yes and I'll talk so as I know Facebook has a lot of current ranges including spark and presto and a hive probably also hive yeah so we into the performance tuning you can choose each engine to to so you know experience so which engine will to the tune you can get a lot of benefits they say if they're priority so which went to to first or which one you make a little chewy hiking a lot just want to hear some experience so back like which one could tune actually depends on your will use case and workload what you want to achieve so and also the scale at which you are operating right so if you're on your experience now say inside the Facebook so what's your experience so having this yes all the configuration parameters I mentioned are needed to be tuned to run park at the scale at which we operate okay okay so thank you okay any other questions I you had a slide for that application are tuning yes oh you had number of mappers and reducers that's not related to spark right because spark to them you don't specify the number of mappers and reducers so I think it's a general concept it's a basically controls the number of parallelism how to split your data so it's not a spark specific conflict but we do that calculation here in the query planning phase based on your input size okay yes so I guess this follows on from the last one is the map number of mappers and reducers is that analogous to the sparks equal shuffle dot partitions because that's by default of 200s if you've got like a lot of data and you've got more than 200 cause then half your class is going to sit there doing nothing right so the number of mappers the number of reducer is equivalent to the sequel partition which we are talking about and depending on the workload size you have to because the 200 is not probably optimal number for a large workload that's why we auto-tune it depending on the input side because so that I always wonder because I usually just lazily at runtime just got a number of calls times number of number of instances so at least if I've got basically one call has something to do for that but D I'm guessing d do you have any do you have it you just do you have a similar thing or as you say you do it based on the size of the data rather than number of executors you have so so this one thing is like the number of course is not really fixed as I said you are using the dynamic exhibited allocation and there are multiple jobs running on the cluster right so you're going to know upfront how many tasks or executors you know you are going to get and also they're giving us you have let's say infinite resource you cannot just bump up the number of reducers to infinite because then your shuffle service becomes a bottleneck or you're doing too much of disk i/o so tuning those becomes very important for the for the performance of the job so thanks ok guys thank you very much please show your appreciation thank you
Info
Channel: Databricks
Views: 37,008
Rating: undefined out of 5
Keywords: apache spark, spark summit
Id: 5dga0UT4RI8
Channel Id: undefined
Length: 32min 41sec (1961 seconds)
Published: Mon Jun 12 2017
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.