From Query Plan to Performance: Supercharging your Apache Spark Queries using the Spark UI SQL Tab

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
- Hello everyone, and welcome to this Data AI Summit presentation, in which we will talk about supercharging your Spark Queries using Spark UI with the SQL Tab. So my name is Max Thone. I'm a Resident Solutions Architect at Databricks, and my co-presenter will be Stefan van Wouw who is a Senior Resident Solutions Architect also at Databricks. So what are we going to talk about today? So the first part of this presentation, we are going to give an introduction to the Spark SQL Tab, which is one of the tabs into Spark UI that most of you are appropriately familiar with. Next Stefan wanna introduce you to all the most common components that you will find in the Query Plan, which is again the Spark execution Query Plan within the Spark SQL Tab. And finally, I will then walk you through a couple of scenarios, some real world Sparks scenarios in which we can use the knowledge that we gained about to the Query Plan and to Spark SQL Tab, to optimize your Spark queries to make them really as optimal, as fast as possible. So, yeah that's just it, in this first part, I'll just give you a brief introduction to this Spark SQL Tab. So first of all, why should you really know about the SQL Tab? Well, really gives us two important pieces of insights, right? So it shows to how exactly the Spark query is executed. So as most of you probably know when you submit a Dataframe or Spark SQL query to Spark, it first goes fruit is optimization engine called catalyst before it actually generates this final Query Plan that gets entered into the JVM. And this SQL Tab gives us some insights into this final Query Plan before it goes into the JVM. And so we can use to SQL Tab to really reason about whether this final physical plan is actually the most optimal possible plan that can be executed. So we can also use this Query Plan and to SQL Tab to really reason about to query execution time. And again, critically think about whether it is the lowest execution time that we could possibly achieve using this query. So how do we actually get to the SQL Tab on the below part of the slide? You will see a screenshot of the Spark UI. So what you simply do is first you of course go to the Spark UI within your cluster, and then below there, you can click on the SQL Tab as you can see into little, a yellow rectangle. When you click on the SQL Tab, you'll then see a list of all the SQL queries that you have executed within your cluster. And when you click on one of the description so a one description is associated with the SQL query. We then actually get a visual overview of the Query Plan which I will show you later on in this presentation. So what exactly is a Query Plan? So as I already just mentioned it Sparks SQL Dataframe or Dataset query, it will go through this optimization engine called the Spark Catalyst Optimizer before is finally being executed by the JVM. So actually the Spark Catalyst Optimizer really executes a series of optimization step before finally arriving at the final execution plan which is called the Selected physical plan that you can see on the right. And so what we typically meet with Query Plan is this final Selected physical plan which is the output of the Spark Catalyst Optimizer. So where does this Query Plan actually sit in a hierarchy? So I think most of you are already aware with this famous hierarchy that goes from Spark Job to job stage to Spark tasks. So if we now go from right to left, we know that one stage, one Spark stage actually executes a number of tasks and in your cluster each executor core can execute one task at a time, but and one Spark Job actually again, executes multiple stages. So maybe you already noticed but one Spark Job is usually cut down by the what we call shuffle boundaries, and shuffle boundaries then dictate how many stages you have for one Spark Job. The query actually sits, the Query Plan actually sits on top of these Spark Jobs. So one Spark query can actually generate multiple Spark Jobs. And then on top of that again, is this Dataframe action. So that could be something like Dataframe.count or Dataframe.writeto_parquet which can again actually generate one or more queries. So in this slide, I just want to show you a simple example of how does hierarchy actually plays out. So on the top left, we have a very simple Spark query in which we have a Dataframe that contains some sales data, and what we want to do here is we want to filter on certain item_id codes and for all these item_id codes we want to get the total number of sales for each item_id. So this will be a filter query off to which we do a group by, and then we aggregate the sales per item_id. We're then going to trigger the query by doing a ride command on this Dataframe. So this is stand the actual Dataframe action which has done the first step. So this query will then generate in this case one SQL query which you can see on the top right next to number two. So that will be the associated SQL query that's associated with this Dataframe query into in the following slides we will actually click on this query and see what kind of information we get then. So this query again, generates one Spark Jobs. So this will then be job 24 that you can see on the top right, and this job will again have generated two stages that you can see on the bottom right, which per stage 50 and stage 51. Finally, you can see on the bottom left on the summary Tab that these two stages in total then generates nine tasks. So what it just wants you to pay some attention to, is that one way to go to the SQL query is that you simply just go to the job that's associated with this data from query, and then you can just click on the blue number next to it associated SQL query on the top right. So what does the SQL query look like? So we see a visual overview of it on the right and we see the actual generated physical plans. So that is the Query Plan on the button. And so we can actually do a simple mapping exercise, right? Using this simple query. So on the very top right of the SQL plan, we see what is called an InmemoryTableScan. And this InmemoryTableScan actually references the fact that we reach this Dataframe from a cached Dataframe. So it's actually reading the Dataframe from in-memory and that is what InmemoryTableScan mean. Next we're doing a filter operation, and that is what you can see by following the yellow arrow. So a filter operation using the different query actually just leads to a filter operation in this final physical plan. And then we see a bunch of operations has have to do with the aggregation query. So it actually performs two HashAggregate, and in which between is doing a shuffle exchange. And Stefan will explain a bit more on why it's actually doing two HashAggregate. For now it's okay to know that this group by plus aggregation actually leads to these aggregation steps in this final physical plan. The final step of this physical plan is then the actual right step which is denoted by the overwrite by expression that you can see on the bottom right. So this is really a very simple query and you can kind of already intuitively see how all these steps in the different query mapped to these physical plan operators that you see on the right and in the physical plan on the buttom. So, but what we now really want to cover in this presentation is, what possible operators exist in this physical plan? So we have now seen a filter and a HashAggregate, but what do these actually mean? and what more operators like these do we actually have? And next, we also of course have these small details the detail section within this plan that you can see on the rights, and the details they convey all the metrics, all the Spark metrics that are a component of this physical plan and Stefan and I were also go a bit more into detail in how to use these details to potentially optimize the query more. So finally, once we have all this knowledge so once we gave you this brief overview of all the operators and all the metrics that you can analyze using this SQL Tab I will then go over a couple of scenarios that uses the above not above knowledge to optimize to some examples, SQL queries. So with that slide I just finished the introduction to this presentation and Stefan will now take you for an overview of all the common components of the physical plan. - Yeah thank you Max, for this gentle introduction. So I will actually go over some common components of the physical plan here to kind of give you a breath for you over, like what kind of components do exist. So this is quite technical, but I think it's important. So essentially, the source code of the Spark project can bring a lot of information through you, right? So let's first discuss how a physical plan is actually represented in the source code of Spark. So it's actually presented by the SparkPlan class in the source code which both functions as a data structure, and it can be iterated over and manipulated as like a tree of operators, but as well as it will dictate what properties each of the operators have to adhere to. So to make it a bit more visual on the left, again you see an example, physical plan as shown in the Spark UI. So here you see different operators. So really most of the boxes, each of the boxes, is basically an operator, right? So you see scans at the top, you see transformations represented for example, the columnar to row, but also union and override by expression. So if you generalize these, you can basically look on the right that there are certain operators who extend from the LeafExecNode in the source code. These are typically scans or that read from disk or for memory or reuse results from other queries. And then this is followed by one or more binary or unitary transformations such as, the union in this example on the left is a binary transformation which extends from the BinaryExecNodes. And for example of columnar to row is a unary transformation extending from the UnaryExecNode So sometimes you will see a specific operator dedicated to writing the output. It will not always be feasible but actually in the code, there's no difference. There's no specific right outputs ExecNode, it's all just a UnaryExecNode. So actually, if you go through the source are over 150 subclasses of this physical plan, again like just to give you an overview, to kind of get an idea of how many operators exist in each of the categories. Actually the biggest category is to UnaryExecNode, which contains all transformations that work on one Dataframe as an input and produce one Dataframe as an outputs. There are only very few operators that work with multiple Dataframes just inputs, such as joins and unions, et cetera. And there's also quite some operators dedicated to actually reading data from either cached or files or from RDDs or other queries. So, because there are so many different kinds of subclasses, we of course kind of cover everything here. So basically what we will cover is the most Common Narrow Transformations, most Common Wide Transformations such as aggregates in joins, and we will discuss how exchange operators and sort operators get into the Query Plan because sometimes you simply will not get that one-to-one mapping from the Dataframe, the transformations that you wrote. So we will not cover advanced topics like Adaptive Query Execution, Streaming, specifics, or are things that are more specific for this introductory talk. So let's just start with the very basics. Let's look at what happens if we just do a read from the data source and write (indistinct). So on the left, you see that we read from a CSV data source, which is a Row-based data source, and then we ride out to a Delta Lake table. So this Dataframe transformations on the left actually translate to two queries on the right. The first query, Query One denoted by Q1 is containing one operator called the SaveIntoDataSourceCommand. This in itself, doesn't start any jobs, instead it will actually just wait for the second query that it will spawn to be completed. So query Two at Q2, actually is responsible for actually reading the data from the CSV file and writing it out to the Delta formats. So as you can see as well on the right, at the scan CSV operator, there quite a lot of metrics here and you can actually use these metrics to detect potential performance issues in your setup. So if we look at these metrics, I put numbers here, one, two, three, four. Actually just starting with number one, the number of files read that's actually a very important metric to take into account. This denotes, how many files were read as it says, it's like pretty simple, but if you combine this with the other metrics such as two file system read so the actual data that was processed going through this operator and three the total size of the files that might still be processed. So you see that number two is actually much less than number three here, but it's expected to grow from three gigabyte to 120 gigabyte. When this job is progressing actually took the screenshot when you started. So if you compare a number two and three with number one so actually if you compare relatively speaking the number of files with the size of the files read, you actually do not want to be in a scenario where the number of files is super big number and the size of your files read is a super small number because then you might be suffering from small files problem as has been talked about in many other performance talks. So then actually the Spark joke might spend more time on pro on discovering files rather than processing data. The fourth metric that is worth to highlight is a rows output. So this is also interesting to sometimes check if you're doing a query that's very selective. So you only want to get certain values using a filter for example, do you actually see this reflected here, or does it still output a lot of rows when scanning the data source which might actually show you that the filtering predicate is not pushed down to the data source and hence you might need to change something in your query there to optimize it? So here there's another example, just reading and writing, but instead of reading a row-based data format we read a columnar format. So in this case we read from Delta Lake and we write to Delta Lake, and people who might be more familiar with the internals of Delta Lake, know that, the parquet file format is used to store the actual data inside the Delta Lake table. And on the right we see Query Two start with the scan parquet operator but then we see something new. So we do not just see this scan parquet but we also see two new boxes, namely the WholeStageCodegen and ColumnarToRow operator. So just to highlight what is this. So, starting with the ColumnaToRow, this operator you will actually see in Spark three, whenever a columnar data format is read from. And this is an operator that will actually convert the columnar format provided by parquet in this case to a row based format that is used internally by Spark to do all the operations on another box that you see here as the WholeStageCodegen then on the right. So this a box, you will see a lot in different queries. This essentially just means that all the operators that are inside this box, support WholeStageCodegeneration, which means in essence, that they operate on an optimized data format and also use Java bytecode generation to execute the operations they actually perform. So it's both more memory efficient and more performance in terms of execution time. So you actually want to have, want to see these WholeStageCodegen boxes everywhere because that means you're probably having the most performance implementation. So just zooming in on the, it's still the same query on the scan parquet. There's lots of different statistics here. Some have to do specifically with the Delta IO cache that's available in Databricks. Other metrics are similar to the ones that we discussed on the CSV scan operator. One to highlight here is actually the scan time total. Number one, this shows you how long it takes to actually scan the data from the data source. This kind of metric is only specified for columnar formats and not in row-based data forms because it's too heavy or too inefficient to keep track of these row-based timings. So here you can again see like, if this is much higher than you expected then you might have like some network connectivity or like slowness there. And by the way, this metric it stays six hours but it's like a consecutive time spent by all tasks on the entire classes. It's like there's some off time spent if, as if you would use just a single core to process everything. So then still same query but now looking at the WholeStageCodegen and ColumnarToRow. I actually just want to highlight number one here the others are kind of self-explanatory. So number one, actually shows the duration here of the WholeStageCodegen and this will be your main indicator of where to look in a Query Plan to find performance bottlenecks. So in this case it stays 52 hours. So it's really just the sum of time spent across all tasks in the cluster. So you always want to look at this metric relatively speaking to other WholeStageCodegen blocks in the same query and then just look at the highest duration. So it doesn't really matter if it says hours or minutes, just find the highest one and then you probably have your performance bottleneck. And then you can look at operators inside the WholeStageCodegen box to see if there's anything you can optimize. So this actually brings us to the Common Narrow Transformations. So we'll extend the example by just adding some simple transformation. So narrow transformations recall that's actually transformations on Dataframes that do not require a redistribution of the data across the cluster. So it can be applied locally on where the data resize at that point in time. So prime examples of them are filter, which is quite simple just shows us as a filter operator in the Query Plan. And another one that you see is the wait column or select statement. If you execute that it'll actually show you as a show up as a project operator. And if you hover over this project operator this black box will show you the details of what columns are selected and modified in this case. So just to give some more examples, this is basically like super straightforward. Like if you do a range, you get a range. If you're doing sample, you get the sample union, union et cetera. So it's pretty straightforward. These also do not give you much more information apart from the the rows that they output like the number of rows of the outputs. But here there's a bit more interesting case actually. So sorting, what if you want to sort your data and remember and your Dataframe, is just a distributed data set that's this route over clusters. So, it depends what you actually want to solve. Do you want to globally sort it, or do you want to locally sort it? So sometimes it's fun to use locally sorted. So if you for example do this sort within partitions transformation on your Dataframe, and here we do this on the item_id and then the only operator that you will find is if you look at the center, here is the sort operator and it's part of this WholeStageCodegen box. It doesn't require any redistribution of the data. So it will just sort the data as how it's distributed out of the cluster and just sort the data in the partitions that you already have. So on the right, if you look at this table, you can basically see going from the left to the right what happens to the data after certain operations are applied. So in the first column, we have an example datasets. For example, we have some partition X in a dataset that contains item_id 33, and then another partition Y which contains some other values of item_id like 34, four, eight et cetera. So then actually if we apply, the apply to sort operator, the result will be as shown in the red column in the middle. So actually there will be a Local Sort where within each partition, the values will be sorted. And then if you look at from a global perspective, you can actually see that it's not a total ordering. It's not globally sorted the dataset because value 53 from partition X actually is before four and eight of partition Y. So let's see what happens if we actually want to have a Global Sorting or ordering, these can be interchangeable. So let's say we do an order by on your Dataframe, then actually semantically speaking. It assumes that you want to have a global ordering of all the data in your Dataframe. So now suddenly what happens if you look at the Query Plan at the center is that before the sort, all the way at the bottom there's this exchange that gets added into the Query Plan. So why is this well in a Local Sorting sample we have seen that the result of just doing a Local Sorting will not give you a Global Sorting result, spends we need to redistribute the data over the cluster to make sure that this Global Sorting is guaranteed. So again, we can look at an example here. So if you look on the table on the right, we have the same example but now we have added an additional operator, the exchange operator, that's all in yellow. If you look here after doing this exchange which is basically shuffling the data over the cluster and redistributing across to a certain partitioning scheme, we see that the new partition X will actually contain the values eight and four, and then the new partitioning Y will contain the other values including value 33 which used to be in partition X. If you then sort the data after doing this shuffle, you will actually end up with the result in the red column, and if you just assume that there's no partitions and look at it from a global perspective you can see in the most right column that actually tell you 33 is now in correct place, and the Dataframe is sorted from a global perspective. So this actually was an example of Wide Transformations. So Wide Transformations are actually transformations that require a redistribution of the data or potentially redistribution of the data call the cluster. So examples of these are actually Global Sorting like I showed you, but also joins and aggregations. And we actually want to see how does the Query Planner know that it has to put in these exchanges? Well, actually it does this through distribution requirements that operators can specify. So here like this again bit to theory and a bit looking at the code. So every SparkPlan operators, every operator in the physical plan, can actually specify a requiredChildDistribution. And this is basically a requirement on how the data should be distributed over the cluster before this operator can be applied on the data. And then after doing this operation it can also guarantee a certain output partitioning which in turn, then can be used for downstream operators to check if they're requiredChildDistribution is met. So on the right, again it's able to look at what kind of required distributions are there, how are they satisfied, and then some example operators, I want to highlight two of them here. So the first one is an OrderedDistribution requirements, and this is actually the requirement for a Global Sort that we have seen. This will actually be satisfied if the data is arranged partitioned, which means that if your partition the data, so if you distribute the data over a cluster according to certain key, it actually needs to be the case that values of the key that are close together. Like for example, values one to 10, or values 10 till 20. They need to be on the same note in the same task, on the same machine in order to be able to do this sort operation in this case. Another example of this is the HashClusteredDistribution which actually satisfied by HashPartitioning. So an example operator here is actually the HashAggregate which we'll discuss later as well as the SortMergeJoin. So what this distribution requires is that, it doesn't have to have values that are close together in terms of a range like values one till 10 they don't have to go to the ame machine necessarily, but, every value needs to go to the consistently go to the same machine. So if you have a key of value 10, it always needs to go to the same partition and nine also needs to go to the same partition but it doesn't necessarily have to go to the same partition as the value 10. So just to make this a bit more concrete, let's go back to the sort operator. So if you want to do a Local Sort, you actually do not have a requiredChildDistribution, and thus, when the ensure requirements rule is applied it will not add an exchange because it's not required to have certain distribution for this Local Sort. But actually if you want a Global Sort, and then the requiredDistribution, will be in OrderedDistribution. and if the the ensure requirements is applied in this case it will actually inject in exchange operator with range partitioning before to sort and then the sort will actually guarantee the range partitioning as output so that if you have multiple sorts in a row, it might actually not have to again shuffle the data, but can already know that the data is distributed in a certain way to meet the requirements. All right so let's look at some details of the exchange. There're actually two kinds of exchanges The the Shuffle exchange and Broadcast exchange. So the Shuffle exchange here shows on this slide. So what does the Shuffle exchange you basically, Shuffle exchange always consists of two consecutive stages, as seen here in the exchange operator, visualization on the right. You see stages four and five. So one stage will actually write out shuffle files to disk, and then the second stage will read those shuffle files in according to the requiredDistribution of the operation that requires the shuffle. So two things to note here. One is, if you look at the metrics, there's the size of shufflebytes written all the way at the bottom and then there are two other metrics that should correspond to like the some of those metrics shoots correspond to the total shufflebytes written. So in this case, there's a local and a remote side of data rates. These data sizes are actually important to take into account. If you see a very big number here and you only have certain number of shuffle partitions configured for the, either the stations writing or reading, then you might actually start spilling to disk because it might not have a lot of memory or enough memory to deal with so much data per partition. So this is something to keep in mind, yeah. Another implementation is the Broadcast Exchange. This is a typical exchange that's implemented for data sets that are smaller enough to be broadcasted. So they don't need to be spilled to disk. They will actually be collected on the driver and then being broadcast to all the executors and the most commonly use operators that uses are the BroadcastHashJoin, the BroadcastNestedLoopJoin. that Max will also tell you more about. So before I let Max speak again, I first want to actually Zoom in on the Aggregate operators here, which are example of white transformations. So here on the left you see, we do this group by aggregates on a Dataframe. So we aggregate, we want to have the sum of all sales per item_id in this case. So in the center, you see the actual Physical plan that's shown in the Spark UI. So you'll notice at the bottom that there's this HashAggregate operator which typically comes when you do this group by aggregate, and then, because this HashAggregate has a distribution requirement, this exchange is added at the top. So it actually has a distribution requirement that all the item_ids that are equal should end up to the same machine so that they can actually be added up with all the sales. So on the right, you see this example where we have again a dataset, but now I just added item_id and sales. So in the first column, you see that we have actually an item_id A, and an item_id B and we have different sales values pair item_id and they're also spread out over different partitions. So, A and B are both present on different partitions which might be on different machines in the cluster. So first in order to get all the values of the identical item_ids to the same machine we do this exchange and the result as shows in the center column with the red colors. So we see that all the values for A will be brought to a new partition X, and all the values for B will be added to new partition Y. And then finally, we can actually do this HashAggregate which will actually in this case we can still do some will sum all the different values to get the final result that's shown in the green column here. It's called a HashAggregate in this case, because a HashTable data structure is used to actually store these results where in this case item_id will be the key and the values will be the sales numbers. So apart from HashAggregate, there are other kinds of implementations when doing an aggregation. So actually the fault, and you will almost always see this one is the HashAggregate that we showed on the previous slide. As I said, it's based on the HashTable structure, and it Supports Code Generation. This worked for most cases, even when your cluster doesn't have enough memory you might still be able to use this HashAggregate because if it will simply build the first HashTable and when it runs out of memory, it will flush that to disk start a new HashTable, and then finally merge all the results using sort based aggregation. And then actually in the occasions that the HashAggregate cannot be used, so one example of that is if you do a collect list instead of a sum, you're actually collecting data in a buffer that has variable size. So in this example here, we're just adding a counter for sales. So we're just adding up a number and we can just use a fixed size for that can use, for example, a 64-bit integer value there. And hence we can use this as aggregate implementation but for some operations this is not possible. So if we do for example, collect lists we might actually keep accumulating more numbers in an array, and this is not the fixed size and then it will actually fall back to this object HashAggregate, which works similarly as the HashAggregate, but some other intricate details such as supporting operations of non-fixed size, but also supporting aggregations for the dataset API that are custom Java based objects. So if all is lost and this is actually in very rare occasions and nowadays because most of the HashAggregate and ObjectHashAggregate are supporting most of the aggregations, then it will fall back to a sort aggregate, which does sort based aggregation. So that's actually having a bigger time complexity. So instead of just accumulating values in a Hash map and having constant lookup to add the new values it will actually first have to sort all the rows and then do a single pass to collect all the results which is slower because sorting has a higher time complexity than just looking up failures in the HashTable. The only metric you want to look at in HashAggregate typically it's the spill size. Actually, you always want this to be zero. If this is higher than zero, that means that it has started the fallback. As I explained before where it tries to first do the aggregation with one HashTable in memory, but then it runs out of memory and it has to flush this to disk and start new one which will obviously add more over at the off merging the resulting HashTables of reading and writing from disc, et cetera. So if you run into situation, you typically want to look at are my dataset sizes per partition too big, or do I have too little memory per core in my cluster. Then one note on partial aggregation. So I just showed the example of first doing an exchange and then on HashAggregate to show like how aggregation works in this setting. But as Max pointed out way in the beginning of this talk. Typically you'll see this double base HashAggregate. So you first see a HashAggregate then exchange and then and then a HashAggregate. So in this example, we just want to show what that is. It's actually Partial Aggregation. So then, the example's was exactly the same as how I showed but now we have this yellow column in the table again. So what it does before exchanging the data after cluster it will attempt to actually already partially aggregate the results before sending these over. So in this case for partition Y, you see that there are multiple rows that have the key B so that's like three rows of them. So what this partial aggregation operator does it will simply call consolidate all these into one row in this case, B with sales value four before actually doing the exchange. So this will actually reduced the number of data points that are shuffled over the network which might in turn improve the performance of your job and the remaining operations are identical. So I think I've talked enough for now. So I actually want to give the floor back to Max who is going talk a bit more about joins. - Cool, so yeah, joins, are the last wide transformations we are gonna cover. So in this slides, a typical example of a join, right? So on the left, we can see it typical join query in which we join a sales sample data frame to some item dimension table on the item_id key, and this will then be intertwined. And on the right, we then see the associated Query Plan. So first of all, we see the actual Joint Operator on the bottom right, which is this case is a sort merged join operator. But what is really interesting is that there are a lot of things actually happening before we even arrive at a join operator, right? So first of all, we see two sorts. So both of these Dataframes are being sorted. And then even before that are also to shuffle exchanges before both Dataframes are being sorted. And even before that, also the two Dataframes are actually being filtered. So this entire Query Plan is generated just by doing this Joint Operator. So what we now really want to know, is first of all, yeah, we are now seeing just sortMergeJoin but what join algorithms actually exist right within Spark. The second of all we also want to know how does Spark actually choose what join algorithm to use. So on the basis of what kind of rules does Spark, for example, choose abroad, go sort join join bras calls, nested, look, join, or sort merge join. and also lastly as I already just say, where are, for example, the sorts in the filters coming from Stefan already of course, explained a bit about the ChildDistribution. So this kind of means that, well, we kinda already kind of guests right? That the sort first join operator probably has these child distribution requirements which leads to these two shuffle exchanges. and we will then find out in the next few slides that similar type of requirements actually also lead to these two sorts that are happening before this sortMergeJoin operators actually execute it. So what kind of joint implementations do we have? So in this slide you will see a table of the four most common Joint Operators and what their requiredChildDistributions are to recall child ordering, brief description of what all these operators do and a ballpark complexity estimate. So the quickest joint implementation is actually the BroadcastHashJoin. Logically of course logical that the, for requiredChildDistribution one side actually has to be broadcasted. So one of the two data, for instance going to be joined should actually be small enough that it can be broadcasted to all the executive course. There's no required child ordering because the Dataframe does not actually have to be sorted before doing this join. So what does this joint actually do? It just performs the local hash join between the Broadcast and Dataframe in its entirety and the particular partition of the left Dataframe in that particular executor core. Because there's no sorting or anything like that going on, it's just the local comparison between the Broadcast Dataframe and partition to complexities only O (n), right? So the other join the very common join operators to SortMergeJoin which we just saw in the past example. And for here, we actually have to requiredChildDistribution to be a HashClusteredDistribution which is the sane required distribution as you saw for a typical aggregation query. And this essentially just means that the Dataframe has to be hash partitions which means that all the keys with the same value have to be on the same partition before the join can actually be executed. Next is a sortMergeJoinExec also has to recall child ordering. It requires both of the Dataframes that are going to be joined to be sorted before it can actually be executed by the join. Once this is done down the thing to sortMergeJoin operator actually does is really just compares the case of all the sort Dataframes. And it just merges those case if there's a match. The complexity of the SortMergeJoinExec actually fully comes from this required sorting, all right? As sorting a Dataframe is typically O (nlogn) so that is really the complexity of the SortMergeJoinExec. The actual joining itself is only O(n) complexity. So again, all the complexity comes from the sorting. The other two Joint Operators are significantly more complex. So the first one is to BroadcastNestedLoopJoinExec, again, this requires one of the Dataframes to be broadcasted well for the other Dataframe, there is no distribution requirements. There's also again no SortJoin requirements. So what does, what does actually happen in a BroadcastNestedLoopJoin? So this is actually kind of similar to a BroadcastHashJoin but instead of performing a local HashJoin it actually operates in a loop. So for each row of the left Dataframe, which is basically the partition, it will actually compare that row with the entire Broadcast Dataframe and see if any keys are matched and if yes, it will merge those. So for each row it will loop over the entire broadcast its Dataframe. and this basically means that the complexity that becomes O(n) times N where N is the size the left Dataframe and N is the size to Broadcast Dataframe So the luster and operators the actual partition product exact. and what happens in this one, it's essentially doing a cross join between the two Dataframes it tries to join and afterwards it will then do a filter on to only retrieve the rows that actually matched the join condition. So what do I mean with your cross join? It basically generates all the possible combinations of rows between the left and the right Dataframe. This is also quite high complexity again O(n) time N compared to BroadcastNestedLoopJoinExec, a typically tends to be a bit bigger. So this operation tends to be more expensive again compared to the BroadcastNestedLoopJoinExec, So how does Spark actually choose its join? So on this slide, you see a flow diagram of how Spark ends up choosing a joint implementation. So the first choice Spark has to make is the first thing dispart actually looks at, is whether a join is an equi-joint or not. So it started equality statement in a joint statement or not, so for example, this item_id has to be equal between two Dataframes or should it only look at the range of item_ids. So if it's an equijoin, then it will look at whether a one of the Dataframes is small enough to be broadcasted given some threshold configuration that's in the Spark code. If the broadcasted Dataframe is small enough, it will then choose this BroadcastHashJoin which is the quick as possible join. If the broadcast is lifted, if both Dataframes are not small enough to be broadcasted it will actually choose the sortMergeJoin. If the join is not an equality join. So there's not an equality statement within the joint condition, then it cannot do sortMergeJoin or BroadcastHashJoin it will actually have to do a BroadcastNestedLoopJoin or Cartesian product which you can see on the bottom of deploying diagram. So again, it will first look at whether one of the Dataframes is small enough to be broadcasted. If it is, it will then execute our BroadcastsNestedLoopJoin. If not, it will then have to resort to a Cartesian product but this is actually only possible when doing an inner join not when doing like a left join or a full outer join. So if it's done, for example, left outer join, it will then Spark we'll then try to do a BroadcastNestedLoopJoin anyway, but given that both of the Dataframes are not small enough to be broadcasted this will actually then lead to out of memory errors. So this can be very dangerous. And basically the key takeaway of this, is that you really should try to avoid doing any kind of non-equijoin, which is a left join or a full outer join. So lastly I briefly want to touch upon ordering requirements. I already covered this for a bit, but so next to the distribution requirements as Stefan mentioned, some operators also have an ordering requirement most notably sortMergeJoin. So to sortMergeJoin requires two Dataframes. It's going to join to both be sorted on their joint keys. So on the left, you can see, for example that's the Dataframes are going to be joined on left.id at right.id and sort merged joint exec operator then basically says well, I required these Dataframes to then be ordered on the left_id of right_id respectively. and this then means that the Sparkline will actually generate two sort operators on top of the sortMergeJoin to meet this sorting requirements. So as you can see on the right, there're now then two sort operators both for each Dataframes, that then make sure, that the Dataframes are ordered on their joint key. So let's just revisit our joint example, right? So given all the information I just talked about, we can now analyze a bit better. What is actually going on in join. So first of all, it does a filter, right? So it actually turns out that in a step, even before the physical plan but in a step called the logical plan of catalyst optimizer, a Spark is able to figure out that because it's an inner joint it's gonna already move all the join keys that half a null value so it will actually add a filter on isNotNull for you in the Query Plan. Next we know that the sortMergeJoin has required distribution of a hash partitioning, and this caused a Spark to add in a shuffle exchange for each of the data, for instance, are going to be joined. Lastly there's been also the to requiredChildOdering because a distort merge joint actually requires both the Dataframe to be sorted. So this cost is two, sorts to be added before the sortMergeJoin operator. Finally, Spark has seen that, well, this join is actually an equijoin because it contains the equality statement on the item_id. And it has also determined that both of these Dataframes are not small enough to be broadcasted and as such has ended up with the sortMergeJoin. So this really should give you now all the information that you need to see what is going on when looking at the joint, when looking at the Query Plan of a joint. So the last section of this presentation, I quickly want to go over a couple of real world scenarios of Spark Queries that can be optimized by looking at the query plan. So in first example, we have a Dataframe that contains the category column. For example, a product category of supermarket. And what does the typical use case is that for each category you want to apply say a different product discounts. So one way to actually solve this, is that you union four Dataframes. And for each of these Dataframe you filter on a specific category and then apply this discount function. And for each category value will then have a different discount function. If you don't create a query like this, what Spark will actually end up doing, is will release the parquet files four times as you can see on the right. And because these are repeated reads of data, this is really typically a sub optimal query. So by looking at this query plan, this is something we've not found out. And the next question is then, can we somehow rewrites this query to optimize it so that it only reads the data once? Well, and so it turns out we can. So instead of doing a union and then with four filters we can actually just apply a case one statement, all right? So for each of these product categories we can say, well, when the category value is this particular value, apply this function when it's another value, apply that function and so on and so on. So you can create this chain of case one statements. Am I doing it this way, we can actually write a query. That's the Dataframes only read once as we can see on the right. So it's not only one read and this essentially means that the Spark Query is much quicker compared to the last query in which we did for reads. So the second scenario I want to cover is partial aggregations. So as Stefan already explained in most aggregation queries, there's actually partial aggregation done to basically reduce the amount of data that has to be shuffled around before doing the final aggregation. However, this actually turns out not to be always very helpful. What, for example if you have a gripping key that has a very high cardinality or has a lot of distinct values. So in this particular Dataframe of which we show an example on the left, the number of distinct item_ids in which we want to group is almost the same as the number of total rows in the first place. The distance actually means that doing a partial aggregation is really not gonna help a lot with reducing the number of rows that are going to be shuffle. And this in fact actually only go to increase the query duration. So in this first example, in which we still have enabled partial aggregations we see that a query duration is 23 seconds. However, if we not actually turn off partial aggregations and this can actually be done by turning on the configuration as is denoted on the left side of the slides, we are actually able to remove this partial aggregation step in the Query Plan. And in this particular case it actually leads to a speed up of the query of almost 22%. And we brought the query duration down to 18 seconds. I just do want to note that partial aggregations, right? Tend to be really good because well in a normal use case, they really do reduce the amount of data that has to be shuffled before the final aggregation. But again, there can be specific examples like for example, this one where, well, it actually benefits from being turned off. So the final example I want to show you concerns joint strategy. So in this example, we have the Dataframe with ships and all their respective longitude and latitude coordinates. And we also have Dataframe that contain spots, which have a bounding box of coordinates. And what we actually want to figure out in this example is whether a given ship is within a port or is an open sea. So what is one way to figure this out? Well, we can then of course do a join and what we do and just join as you can left is we actually want to see whether the latitude coordinate of a ship is within the min and max value of a port latitude coordinate. And we want to see whether longitude coordinates of the ship is within the min and max longitude coordinator of the port. But this actually leads to a non-equality join as you can see on the left. And as you may recall, from our joint strategies flow diagram, dismiss the Spark will try to choose a BroadcastNestedLoopJoin. So luckily it turns out that the port is small enough to be broadcasted. So it does not have to to do a Cartesian product join. Nonetheless, a BroadcastNestedLoopJoin is actually still quite slow. And as a result, this query takes three and a half minutes. So the question is done. Can we do something about it, so that Spark actually choose a different joint strategy. So, and it turns out we can, right? So one way to solve this because we're talking about coordinates is basically by defining geohashes which are basically large rectangular areas that define these coordinate boxes. And in order to figure out whether a ship is near a port, we can already say, well that probably that means that the geohash of the ship and the port is the same, before even looking at whether the ship is in the port or not. So what we now do to the Dataframe, to the joint query on the left is we actually add a equality statement on which we say, well, the geohash of to ship, it should be the same as the geohash of the port. And by doing this we actually ensure now that the joint is an equality joint. This means again, recalling the flow diagram of joins the Spark can now actually choose a BroadcastHashJoin or a sortMergeJoin. In this case, DF ports, small enough to be bro costs. It's a Spark we'll actually opt for a BroadcastHashJoin which is the quickest joint implementation, there is. Next, we can see the query duration time has actually drastically being reduced from three and a half minutes to the mere six seconds. So this is really an example of by changing the join strategy we really accomplish a drastic reduction inside. Yeah, so these for, so having discussed these three scenarios just brings us to the end of the presentation. I just quickly wanna go over what we actually discussed in this presentation. So we have seen in beginning for the explanations of Stefan, that's a SQL Tab provides a lot of insights into how the Spark query is executed, right? So it really gives you a breakdown of all the operators that Spark is using to execute this query. And so we can use the SQL Tab to reason about the query execution time, especially by looking at the total duration of each of WholeStageCodegen blocks. That's definitely explained. And by using these insights, we can answer important questions like what part of my Spark query takes the most time? And it's my Sparker actually choosing the most efficient Sparker operators for the task. And so, as we saw in the last part of this presentation we can then actually analyze this Query Plan and sometimes see, well, no it's actually not using the most effective query plan. And then you can rewrite your Spark where you're your Spark query in such a way that a more optimal Query Plan is actually chosen, and this can really lead to large reductions in your query time. So if you want to practice or know more, there are a couple of tips we can give you, all right? So what we typically suggest you to do is that whenever you write a Spark query or a Dataframe query is to really mentally visualize what the physical plan would look like without actually looking at the SQL plan. And then to actually look at the SQL query plan and compare whether you were actually correct. And if you are not, try to reason about why that is. The next, what we would suggest you to do is to actually go into the Spark source code and look up to SparkPlan class, because this is really, this cluster contains all the methods and all the operators, and from there you can really dive into a deeper into the Spark codes to really see how the catalyst optimizer computes this query plan. And you will be able to learn a lot more from there. So this is the end of the presentation, thank you very much for attending.
Info
Channel: Databricks
Views: 2,346
Rating: 5 out of 5
Keywords: Databricks, Query Plan, Apache Spark, Spark UI, SQL, Queries
Id: _Ne27JcLnEc
Channel Id: undefined
Length: 62min 35sec (3755 seconds)
Published: Tue Dec 08 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.