- Hello everyone, and welcome to this Data AI Summit presentation, in which we will talk about supercharging your Spark Queries using
Spark UI with the SQL Tab. So my name is Max Thone. I'm a Resident Solutions
Architect at Databricks, and my co-presenter
will be Stefan van Wouw who is a Senior Resident
Solutions Architect also at Databricks. So what are we going to talk about today? So the first part of this presentation, we are going to give an introduction to the Spark SQL Tab, which is
one of the tabs into Spark UI that most of you are
appropriately familiar with. Next Stefan wanna introduce you to all the most common
components that you will find in the Query Plan, which is
again the Spark execution Query Plan within the Spark SQL Tab. And finally, I will then walk you through a couple of scenarios, some
real world Sparks scenarios in which we can use the
knowledge that we gained about to the Query Plan
and to Spark SQL Tab, to optimize your Spark queries to make them really as
optimal, as fast as possible. So, yeah that's just it, in this first part, I'll just give you a brief introduction
to this Spark SQL Tab. So first of all, why should you really
know about the SQL Tab? Well, really gives us two important pieces of insights, right? So it shows to how exactly
the Spark query is executed. So as most of you probably
know when you submit a Dataframe or Spark SQL query to Spark, it first goes fruit is
optimization engine called catalyst before it actually generates
this final Query Plan that gets entered into the JVM. And this SQL Tab gives us some insights into this final Query Plan
before it goes into the JVM. And so we can use to SQL
Tab to really reason about whether this final physical
plan is actually the most optimal possible plan
that can be executed. So we can also use this
Query Plan and to SQL Tab to really reason about
to query execution time. And again, critically think about whether it is the lowest execution time that we could possibly
achieve using this query. So how do we actually get to the SQL Tab on the below part of the slide? You will see a screenshot of the Spark UI. So what you simply do is
first you of course go to the Spark UI within your cluster, and then below there, you
can click on the SQL Tab as you can see into
little, a yellow rectangle. When you click on the SQL
Tab, you'll then see a list of all the SQL queries
that you have executed within your cluster. And when you click on
one of the description so a one description is
associated with the SQL query. We then actually get a visual
overview of the Query Plan which I will show you later
on in this presentation. So what exactly is a Query Plan? So as I already just mentioned it Sparks SQL Dataframe or Dataset query, it will go through this
optimization engine called the Spark Catalyst Optimizer before is finally being
executed by the JVM. So actually the Spark Catalyst Optimizer really executes a series
of optimization step before finally arriving at
the final execution plan which is called the Selected physical plan that you can see on the right. And so what we typically
meet with Query Plan is this final Selected physical plan which is the output of the
Spark Catalyst Optimizer. So where does this Query Plan
actually sit in a hierarchy? So I think most of you are
already aware with this famous hierarchy that goes from Spark Job to job stage to Spark tasks. So if we now go from right to left, we know that one stage, one Spark stage actually
executes a number of tasks and in your cluster each executor core can execute one task at a time, but and one Spark Job actually again, executes multiple stages. So maybe you already noticed but one Spark Job is usually cut down by the what we call shuffle boundaries, and shuffle boundaries then
dictate how many stages you have for one Spark Job. The query actually sits,
the Query Plan actually sits on top of these Spark Jobs. So one Spark query can actually generate multiple Spark Jobs. And then on top of that again,
is this Dataframe action. So that could be something
like Dataframe.count or Dataframe.writeto_parquet which can again actually
generate one or more queries. So in this slide, I just want
to show you a simple example of how does hierarchy actually plays out. So on the top left, we have
a very simple Spark query in which we have a Dataframe
that contains some sales data, and what we want to do
here is we want to filter on certain item_id codes and
for all these item_id codes we want to get the total number
of sales for each item_id. So this will be a filter query off to which we do a group by, and then we aggregate
the sales per item_id. We're then going to trigger the query by doing a ride command on this Dataframe. So this is stand the
actual Dataframe action which has done the first step. So this query will then
generate in this case one SQL query which you
can see on the top right next to number two. So that will be the associated SQL query that's associated with
this Dataframe query into in the following
slides we will actually click on this query and see what kind of information we get then. So this query again,
generates one Spark Jobs. So this will then be job 24 that you can see on the top right, and this job will again
have generated two stages that you can see on the bottom right, which per stage 50 and stage 51. Finally, you can see on the
bottom left on the summary Tab that these two stages in total
then generates nine tasks. So what it just wants you
to pay some attention to, is that one way to go to the SQL query is that you simply just go to the job that's associated with
this data from query, and then you can just
click on the blue number next to it associated SQL
query on the top right. So what does the SQL query look like? So we see a visual
overview of it on the right and we see the actual
generated physical plans. So that is the Query Plan on the button. And so we can actually do a
simple mapping exercise, right? Using this simple query. So on the very top right of the SQL plan, we see what is called
an InmemoryTableScan. And this InmemoryTableScan
actually references the fact that we reach this Dataframe
from a cached Dataframe. So it's actually reading
the Dataframe from in-memory and that is what InmemoryTableScan mean. Next we're doing a filter operation, and that is what you can see by following the yellow arrow. So a filter operation
using the different query actually just leads to a filter operation in this final physical plan. And then we see a bunch of
operations has have to do with the aggregation query. So it actually performs two HashAggregate, and in which between is
doing a shuffle exchange. And Stefan will explain a bit more on why it's actually
doing two HashAggregate. For now it's okay to know that this group by plus aggregation actually leads to these aggregation steps in this final physical plan. The final step of this physical plan is then the actual right
step which is denoted by the overwrite by
expression that you can see on the bottom right. So this is really a very simple query and you can kind of
already intuitively see how all these steps in the different query mapped to these physical plan operators that you see on the right and in the physical plan on the buttom. So, but what we now really want to cover in this presentation is, what possible operators
exist in this physical plan? So we have now seen a
filter and a HashAggregate, but what do these actually mean? and what more operators like
these do we actually have? And next, we also of course
have these small details the detail section within this plan that you can see on the
rights, and the details they convey all the metrics,
all the Spark metrics that are a component of this physical plan and Stefan and I were also
go a bit more into detail in how to use these details to potentially optimize the query more. So finally, once we
have all this knowledge so once we gave you this brief
overview of all the operators and all the metrics that you
can analyze using this SQL Tab I will then go over a couple of scenarios that uses the above not above knowledge to optimize to some examples, SQL queries. So with that slide I just
finished the introduction to this presentation and Stefan will now take you for an overview of
all the common components of the physical plan. - Yeah thank you Max, for
this gentle introduction. So I will actually go over
some common components of the physical plan here
to kind of give you a breath for you over, like what
kind of components do exist. So this is quite technical,
but I think it's important. So essentially, the source
code of the Spark project can bring a lot of information
through you, right? So let's first discuss how
a physical plan is actually represented in the source code of Spark. So it's actually presented
by the SparkPlan class in the source code which both functions as a data structure, and
it can be iterated over and manipulated as like
a tree of operators, but as well as it will
dictate what properties each of the operators have to adhere to. So to make it a bit more visual
on the left, again you see an example, physical plan
as shown in the Spark UI. So here you see different operators. So really most of the
boxes, each of the boxes, is basically an operator, right? So you see scans at the top, you see transformations
represented for example, the columnar to row, but also union and override by expression. So if you generalize these, you can basically look on the right that there are certain operators who extend from the
LeafExecNode in the source code. These are typically scans
or that read from disk or for memory or reuse
results from other queries. And then this is followed by
one or more binary or unitary transformations such as, the union in this example on the left is
a binary transformation which extends from the BinaryExecNodes. And for example of columnar to row is a unary transformation
extending from the UnaryExecNode So sometimes you will see a
specific operator dedicated to writing the output. It will not always be feasible
but actually in the code, there's no difference. There's no specific
right outputs ExecNode, it's all just a UnaryExecNode. So actually, if you go
through the source are over 150 subclasses of this physical plan, again like just to give you an overview, to kind of get an idea
of how many operators exist in each of the categories. Actually the biggest
category is to UnaryExecNode, which contains all transformations that work on one Dataframe as an input and produce one Dataframe as an outputs. There are only very few
operators that work with multiple Dataframes just inputs, such
as joins and unions, et cetera. And there's also quite some
operators dedicated to actually reading data from either cached or files or from RDDs or other queries. So, because there are so many
different kinds of subclasses, we of course kind of
cover everything here. So basically what we will cover is the most Common Narrow Transformations, most Common Wide Transformations such as aggregates in joins, and we will discuss how exchange operators and sort operators get into the Query Plan because sometimes you simply
will not get that one-to-one mapping from the Dataframe, the transformations that you wrote. So we will not cover advanced topics like Adaptive Query Execution,
Streaming, specifics, or are things that are more specific for this introductory talk. So let's just start with the very basics. Let's look at what happens
if we just do a read from the data source and write (indistinct). So on the left, you see that
we read from a CSV data source, which is a Row-based data source, and then we ride out
to a Delta Lake table. So this Dataframe
transformations on the left actually translate to
two queries on the right. The first query, Query One denoted by Q1
is containing one operator called the SaveIntoDataSourceCommand. This in itself, doesn't start any jobs, instead it will actually just
wait for the second query that it will spawn to be completed. So query Two at Q2,
actually is responsible for actually reading the
data from the CSV file and writing it out to the Delta formats. So as you can see as well on the right, at the scan CSV operator, there quite a lot of metrics here and you can actually use these metrics to detect potential performance
issues in your setup. So if we look at these
metrics, I put numbers here, one, two, three, four. Actually just starting with number one, the number of files read that's actually a very important metric to take into account. This denotes, how many
files were read as it says, it's like pretty simple,
but if you combine this with the other metrics
such as two file system read so the actual data that was processed going through this operator
and three the total size of the files that
might still be processed. So you see that number
two is actually much less than number three here, but it's expected to grow from three
gigabyte to 120 gigabyte. When this job is progressing actually took the
screenshot when you started. So if you compare a number
two and three with number one so actually if you compare
relatively speaking the number of files with
the size of the files read, you actually do not
want to be in a scenario where the number of
files is super big number and the size of your files
read is a super small number because then you might be
suffering from small files problem as has been talked about in
many other performance talks. So then actually the Spark
joke might spend more time on pro on discovering files
rather than processing data. The fourth metric that
is worth to highlight is a rows output. So this is also interesting
to sometimes check if you're doing a query
that's very selective. So you only want to get
certain values using a filter for example, do you actually
see this reflected here, or does it still output a
lot of rows when scanning the data source which
might actually show you that the filtering
predicate is not pushed down to the data source and hence
you might need to change something in your query
there to optimize it? So here there's another example,
just reading and writing, but instead of reading
a row-based data format we read a columnar format. So in this case we read from Delta Lake and we write to Delta Lake, and people who might be more familiar with the internals of Delta Lake, know that, the parquet file
format is used to store the actual data inside
the Delta Lake table. And on the right we see Query Two start with the scan parquet operator but then we see something new. So we do not just see this scan parquet but we also see two new boxes,
namely the WholeStageCodegen and ColumnarToRow operator. So just to highlight what is this. So, starting with the ColumnaToRow, this operator you will
actually see in Spark three, whenever a columnar data
format is read from. And this is an operator
that will actually convert the columnar format provided
by parquet in this case to a row based format that
is used internally by Spark to do all the operations on another box that you see here as the WholeStageCodegen then on the right. So this a box, you will see
a lot in different queries. This essentially just means
that all the operators that are inside this box, support WholeStageCodegeneration, which means in essence, that
they operate on an optimized data format and also use
Java bytecode generation to execute the operations
they actually perform. So it's both more memory efficient and more performance in
terms of execution time. So you actually want to have, want to see these WholeStageCodegen boxes everywhere because that means you're probably having the most performance implementation. So just zooming in on the, it's still the same query
on the scan parquet. There's lots of different statistics here. Some have to do specifically
with the Delta IO cache that's available in Databricks. Other metrics are similar to
the ones that we discussed on the CSV scan operator. One to highlight here is actually the scan time total. Number one, this shows
you how long it takes to actually scan the data
from the data source. This kind of metric is only
specified for columnar formats and not in row-based data
forms because it's too heavy or too inefficient to keep track
of these row-based timings. So here you can again see like, if this is much higher than you expected then you might have like
some network connectivity or like slowness there. And by the way, this
metric it stays six hours but it's like a consecutive
time spent by all tasks on the entire classes. It's like there's some off time spent if, as if you would use just a single core to process everything. So then still same query but now looking at the
WholeStageCodegen and ColumnarToRow. I actually just want to
highlight number one here the others are kind of self-explanatory. So number one, actually
shows the duration here of the WholeStageCodegen and this will be your main indicator of where to look in a Query Plan to find performance bottlenecks. So in this case it stays 52 hours. So it's really just the sum of time spent across all tasks in the cluster. So you always want to look at this metric relatively speaking to other
WholeStageCodegen blocks in the same query and then just look at the highest duration. So it doesn't really matter
if it says hours or minutes, just find the highest one and then you probably have
your performance bottleneck. And then you can look at operators inside the WholeStageCodegen box to see if there's anything you can optimize. So this actually brings us to the Common Narrow Transformations. So we'll extend the example by just adding
some simple transformation. So narrow transformations recall that's actually
transformations on Dataframes that do not require a redistribution of the data across the cluster. So it can be applied
locally on where the data resize at that point in time. So prime examples of them are
filter, which is quite simple just shows us as a filter
operator in the Query Plan. And another one that you
see is the wait column or select statement. If you execute that
it'll actually show you as a show up as a project operator. And if you hover over
this project operator this black box will show you the details of what columns are selected
and modified in this case. So just to give some more examples, this is basically like
super straightforward. Like if you do a range, you get a range. If you're doing sample, you get the sample union, union et cetera. So it's pretty straightforward. These also do not give
you much more information apart from the the rows that they output like the number of rows of the outputs. But here there's a bit more
interesting case actually. So sorting, what if you
want to sort your data and remember and your Dataframe, is just a distributed data set that's this route over clusters. So, it depends what you
actually want to solve. Do you want to globally sort it, or do you want to locally sort it? So sometimes it's fun
to use locally sorted. So if you for example do
this sort within partitions transformation on your Dataframe, and here we do this on the item_id and then the only operator
that you will find is if you look at the center,
here is the sort operator and it's part of this
WholeStageCodegen box. It doesn't require any
redistribution of the data. So it will just sort the
data as how it's distributed out of the cluster and just sort the data in the partitions that you already have. So on the right, if
you look at this table, you can basically see going
from the left to the right what happens to the data after certain operations are applied. So in the first column, we
have an example datasets. For example, we have some partition X in a dataset that contains item_id 33, and then another
partition Y which contains some other values of item_id
like 34, four, eight et cetera. So then actually if we apply,
the apply to sort operator, the result will be as shown in the red column in the middle. So actually there will be a Local Sort where within each partition,
the values will be sorted. And then if you look at
from a global perspective, you can actually see that
it's not a total ordering. It's not globally sorted the dataset because value 53 from partition X actually is before four and eight of partition Y. So let's see what happens
if we actually want to have a Global Sorting or ordering, these can be interchangeable. So let's say we do an
order by on your Dataframe, then actually semantically speaking. It assumes that you want
to have a global ordering of all the data in your Dataframe. So now suddenly what happens if you look at the
Query Plan at the center is that before the sort,
all the way at the bottom there's this exchange that
gets added into the Query Plan. So why is this well in
a Local Sorting sample we have seen that the result
of just doing a Local Sorting will not give you a Global Sorting result, spends we need to redistribute
the data over the cluster to make sure that this
Global Sorting is guaranteed. So again, we can look at an example here. So if you look on the table on the right, we have the same example
but now we have added an additional operator,
the exchange operator, that's all in yellow. If you look here after doing this exchange which is basically shuffling
the data over the cluster and redistributing across to
a certain partitioning scheme, we see that the new partition
X will actually contain the values eight and four, and
then the new partitioning Y will contain the other
values including value 33 which used to be in partition X. If you then sort the data
after doing this shuffle, you will actually end up with
the result in the red column, and if you just assume
that there's no partitions and look at it from a global perspective you can see in the most right column that actually tell you 33
is now in correct place, and the Dataframe is sorted
from a global perspective. So this actually was an example
of Wide Transformations. So Wide Transformations are
actually transformations that require a redistribution of the data or potentially redistribution
of the data call the cluster. So examples of these are
actually Global Sorting like I showed you, but also
joins and aggregations. And we actually want to see
how does the Query Planner know that it has to put in these exchanges? Well, actually it does
this through distribution requirements that operators can specify. So here like this again bit to theory and a bit looking at the code. So every SparkPlan operators, every operator in the physical plan, can actually specify a
requiredChildDistribution. And this is basically a requirement on how the data should be
distributed over the cluster before this operator can
be applied on the data. And then after doing this
operation it can also guarantee a certain output
partitioning which in turn, then can be used for downstream operators to check if they're
requiredChildDistribution is met. So on the right, again it's able to look at what kind of required
distributions are there, how are they satisfied, and then some example operators, I want to highlight two of them here. So the first one is an
OrderedDistribution requirements, and this is actually the
requirement for a Global Sort that we have seen. This will actually be
satisfied if the data is arranged partitioned, which means that if
your partition the data, so if you distribute
the data over a cluster according to certain key, it actually needs to be the
case that values of the key that are close together. Like for example, values one to 10, or values 10 till 20. They need to be on the
same note in the same task, on the same machine in
order to be able to do this sort operation in this case. Another example of this is
the HashClusteredDistribution which actually satisfied
by HashPartitioning. So an example operator here
is actually the HashAggregate which we'll discuss later as
well as the SortMergeJoin. So what this distribution requires is that, it doesn't have to have values that are close together
in terms of a range like values one till 10
they don't have to go to the ame machine necessarily, but, every value needs to go to the consistently go
to the same machine. So if you have a key of value 10, it always needs to go
to the same partition and nine also needs to
go to the same partition but it doesn't necessarily have to go to the same partition as the value 10. So just to make this a bit more concrete, let's go back to the sort operator. So if you want to do a Local Sort, you actually do not have a
requiredChildDistribution, and thus, when the
ensure requirements rule is applied it will not add an exchange because it's not required
to have certain distribution for this Local Sort. But actually if you want a Global Sort, and then the requiredDistribution, will be in OrderedDistribution. and if the the ensure requirements
is applied in this case it will actually inject
in exchange operator with range partitioning before to sort and then the sort will actually guarantee the range partitioning as output so that if you
have multiple sorts in a row, it might actually not have to again shuffle the data, but can
already know that the data is distributed in a certain
way to meet the requirements. All right so let's look at
some details of the exchange. There're actually two kinds of exchanges The the Shuffle exchange
and Broadcast exchange. So the Shuffle exchange
here shows on this slide. So what does the Shuffle
exchange you basically, Shuffle exchange always consists of two consecutive stages, as seen here in the exchange operator, visualization on the right. You see stages four and five. So one stage will actually
write out shuffle files to disk, and then the second stage
will read those shuffle files in according to the requiredDistribution of the operation that
requires the shuffle. So two things to note here. One is, if you look at the metrics, there's the size of shufflebytes written all the way at the
bottom and then there are two other metrics that should correspond to like the some of those
metrics shoots correspond to the total shufflebytes written. So in this case, there's
a local and a remote side of data rates. These data sizes are actually important to take into account. If you see a very big number here and you only have certain number of shuffle partitions configured for the, either the
stations writing or reading, then you might actually
start spilling to disk because it might not have a lot of memory or enough memory to deal with
so much data per partition. So this is something
to keep in mind, yeah. Another implementation is
the Broadcast Exchange. This is a typical exchange
that's implemented for data sets that are smaller
enough to be broadcasted. So they don't need to be spilled to disk. They will actually be
collected on the driver and then being broadcast
to all the executors and the most commonly
use operators that uses are the BroadcastHashJoin,
the BroadcastNestedLoopJoin. that Max will also tell you more about. So before I let Max speak again, I first want to actually Zoom in on the Aggregate operators here, which are example of
white transformations. So here on the left you see, we do this group by
aggregates on a Dataframe. So we aggregate, we want to
have the sum of all sales per item_id in this case. So in the center, you see
the actual Physical plan that's shown in the Spark UI. So you'll notice at the bottom that there's this HashAggregate operator which typically comes when you
do this group by aggregate, and then, because this HashAggregate has a distribution requirement, this exchange is added at the top. So it actually has a
distribution requirement that all the item_ids that are equal should end up to the same machine so that they can actually be
added up with all the sales. So on the right, you see this example where we have again a dataset, but now I just added item_id and sales. So in the first column, you see that we have actually
an item_id A, and an item_id B and we have different
sales values pair item_id and they're also spread out
over different partitions. So, A and B are both present
on different partitions which might be on different
machines in the cluster. So first in order to get all the values of the identical item_ids
to the same machine we do this exchange and the result as shows in the center
column with the red colors. So we see that all the
values for A will be brought to a new partition X, and all the values for B will
be added to new partition Y. And then finally, we can
actually do this HashAggregate which will actually in this case we can still do some will
sum all the different values to get the final result that's shown in the green column here. It's called a HashAggregate in this case, because a HashTable data structure is used to actually store these results where in this case item_id will be the key and the values will be the sales numbers. So apart from HashAggregate, there are other kinds of implementations when doing an aggregation. So actually the fault, and you will almost always see
this one is the HashAggregate that we showed on the previous slide. As I said, it's based on
the HashTable structure, and it Supports Code Generation. This worked for most cases, even when your cluster
doesn't have enough memory you might still be able
to use this HashAggregate because if it will simply
build the first HashTable and when it runs out of memory,
it will flush that to disk start a new HashTable, and then finally merge all the results using sort based aggregation. And then actually in the
occasions that the HashAggregate cannot be used, so one
example of that is if you do a collect list instead of a sum, you're actually collecting
data in a buffer that has variable size. So in this example here, we're just adding a counter for sales. So we're just adding up a
number and we can just use a fixed size for that can use, for example, a 64-bit integer value there. And hence we can use this
as aggregate implementation but for some operations
this is not possible. So if we do for example,
collect lists we might actually keep accumulating more
numbers in an array, and this is not the fixed size and then it will actually fall back to this object HashAggregate, which works similarly
as the HashAggregate, but some other intricate details such as supporting
operations of non-fixed size, but also supporting
aggregations for the dataset API that are custom Java based objects. So if all is lost and this is actually in very rare occasions and nowadays because most of the HashAggregate
and ObjectHashAggregate are supporting most of the aggregations, then it will fall back
to a sort aggregate, which does sort based aggregation. So that's actually having
a bigger time complexity. So instead of just accumulating
values in a Hash map and having constant lookup
to add the new values it will actually first
have to sort all the rows and then do a single pass
to collect all the results which is slower because sorting
has a higher time complexity than just looking up
failures in the HashTable. The only metric you want
to look at in HashAggregate typically it's the spill size. Actually, you always want this to be zero. If this is higher than zero, that means that it has
started the fallback. As I explained before
where it tries to first do the aggregation with
one HashTable in memory, but then it runs out of memory and it has to flush this to disk and start new one which
will obviously add more over at the off merging
the resulting HashTables of reading and writing
from disc, et cetera. So if you run into situation,
you typically want to look at are my dataset sizes
per partition too big, or do I have too little
memory per core in my cluster. Then one note on partial aggregation. So I just showed the example
of first doing an exchange and then on HashAggregate to show like how aggregation
works in this setting. But as Max pointed out way in the beginning of this talk. Typically you'll see this
double base HashAggregate. So you first see a
HashAggregate then exchange and then and then a HashAggregate. So in this example, we just
want to show what that is. It's actually Partial Aggregation. So then, the example's was
exactly the same as how I showed but now we have this yellow
column in the table again. So what it does before
exchanging the data after cluster it will attempt to actually already partially aggregate the results
before sending these over. So in this case for partition Y, you see that there are multiple
rows that have the key B so that's like three rows of them. So what this partial
aggregation operator does it will simply call consolidate
all these into one row in this case, B with sales value four before actually doing the exchange. So this will actually reduced
the number of data points that are shuffled over
the network which might in turn improve the
performance of your job and the remaining
operations are identical. So I think I've talked enough for now. So I actually want to give the floor back to Max who is going talk
a bit more about joins. - Cool, so yeah, joins, are
the last wide transformations we are gonna cover. So in this slides, a typical
example of a join, right? So on the left, we can
see it typical join query in which we join a sales sample data frame to some item dimension table on the item_id key, and this
will then be intertwined. And on the right, we then see
the associated Query Plan. So first of all, we see
the actual Joint Operator on the bottom right, which is this case is a sort merged join operator. But what is really interesting
is that there are a lot of things actually happening
before we even arrive at a join operator, right? So first of all, we see two sorts. So both of these Dataframes
are being sorted. And then even before that
are also to shuffle exchanges before both Dataframes are being sorted. And even before that, also the two Dataframes are
actually being filtered. So this entire Query Plan is generated just by doing this Joint Operator. So what we now really want to know, is first of all, yeah, we are
now seeing just sortMergeJoin but what join algorithms actually
exist right within Spark. The second of all we also want to know how does Spark actually choose
what join algorithm to use. So on the basis of what
kind of rules does Spark, for example, choose abroad, go sort join join bras calls, nested, look,
join, or sort merge join. and also lastly as I already just say, where are, for example, the
sorts in the filters coming from Stefan already of course, explained a bit about
the ChildDistribution. So this kind of means that,
well, we kinda already kind of guests right? That the sort first join operator probably has these child
distribution requirements which leads to these
two shuffle exchanges. and we will then find out
in the next few slides that similar type of requirements actually also lead to these
two sorts that are happening before this sortMergeJoin
operators actually execute it. So what kind of joint
implementations do we have? So in this slide you
will see a table of the four most common Joint Operators and what their requiredChildDistributions are to recall child ordering, brief description of what
all these operators do and a ballpark complexity estimate. So the quickest joint
implementation is actually the BroadcastHashJoin. Logically of course logical that the, for requiredChildDistribution one side actually has to be broadcasted. So one of the two data, for
instance going to be joined should actually be small enough
that it can be broadcasted to all the executive course. There's no required child ordering because the Dataframe does
not actually have to be sorted before doing this join. So what does this joint actually do? It just performs the local hash join between the Broadcast and
Dataframe in its entirety and the particular partition
of the left Dataframe in that particular executor core. Because there's no sorting or
anything like that going on, it's just the local comparison between the Broadcast
Dataframe and partition to complexities only O (n), right? So the other join the
very common join operators to SortMergeJoin which we
just saw in the past example. And for here, we actually have
to requiredChildDistribution to be a HashClusteredDistribution
which is the sane required distribution as you saw for a typical
aggregation query. And this essentially just
means that the Dataframe has to be hash partitions which means that all the
keys with the same value have to be on the same partition before the join can actually be executed. Next is a sortMergeJoinExec also has to recall child ordering. It requires both of the Dataframes that are going to be joined to be sorted before it can actually
be executed by the join. Once this is done down the
thing to sortMergeJoin operator actually does is really
just compares the case of all the sort Dataframes. And it just merges those
case if there's a match. The complexity of the SortMergeJoinExec actually fully comes from this
required sorting, all right? As sorting a Dataframe
is typically O (nlogn) so that is really the complexity
of the SortMergeJoinExec. The actual joining itself
is only O(n) complexity. So again, all the complexity
comes from the sorting. The other two Joint Operators are significantly more complex. So the first one is to
BroadcastNestedLoopJoinExec, again, this requires one of the Dataframes to be broadcasted well
for the other Dataframe, there is no distribution requirements. There's also again no
SortJoin requirements. So what does, what does actually happen in a BroadcastNestedLoopJoin? So this is actually kind of
similar to a BroadcastHashJoin but instead of performing a local HashJoin it actually operates in a loop. So for each row of the left Dataframe, which is basically the partition, it will actually compare that row with the entire Broadcast
Dataframe and see if any keys are matched and if yes,
it will merge those. So for each row it will loop over the entire broadcast its Dataframe. and this basically means
that the complexity that becomes O(n) times
N where N is the size the left Dataframe and N is
the size to Broadcast Dataframe So the luster and operators the actual partition product exact. and what happens in this one, it's essentially doing a cross join between the two Dataframes
it tries to join and afterwards it will then do a filter on to only retrieve the rows that actually matched the join condition. So what do I mean with your cross join? It basically generates all
the possible combinations of rows between the left
and the right Dataframe. This is also quite high
complexity again O(n) time N compared to BroadcastNestedLoopJoinExec, a typically tends to be a bit bigger. So this operation tends
to be more expensive again compared to the
BroadcastNestedLoopJoinExec, So how does Spark
actually choose its join? So on this slide, you see a flow diagram of how Spark ends up choosing
a joint implementation. So the first choice Spark has to make is the first thing
dispart actually looks at, is whether a join is an equi-joint or not. So it started equality statement in a joint statement
or not, so for example, this item_id has to be
equal between two Dataframes or should it only look
at the range of item_ids. So if it's an equijoin, then it will look at whether a one of
the Dataframes is small enough to be broadcasted given some threshold configuration
that's in the Spark code. If the broadcasted
Dataframe is small enough, it will then choose this BroadcastHashJoin which is the quick as possible join. If the broadcast is
lifted, if both Dataframes are not small enough to be broadcasted it will actually choose the sortMergeJoin. If the join is not an equality join. So there's not an equality statement within the joint condition, then it cannot do sortMergeJoin
or BroadcastHashJoin it will actually have to do
a BroadcastNestedLoopJoin or Cartesian product which
you can see on the bottom of deploying diagram. So again, it will first look at whether one of the Dataframes is small enough to be broadcasted. If it is, it will then execute
our BroadcastsNestedLoopJoin. If not, it will then have to
resort to a Cartesian product but this is actually only
possible when doing an inner join not when doing like a left
join or a full outer join. So if it's done, for
example, left outer join, it will then Spark we'll then try to do a BroadcastNestedLoopJoin anyway, but given that both of the Dataframes are not small enough to be broadcasted this will actually then lead
to out of memory errors. So this can be very dangerous. And basically the key takeaway of this, is that you really should
try to avoid doing any kind of non-equijoin, which is a
left join or a full outer join. So lastly I briefly want to touch upon ordering requirements. I already covered this for a bit, but so next to the
distribution requirements as Stefan mentioned, some operators also have
an ordering requirement most notably sortMergeJoin. So to sortMergeJoin
requires two Dataframes. It's going to join to both be
sorted on their joint keys. So on the left, you can see, for example that's the Dataframes
are going to be joined on left.id at right.id and sort merged joint exec
operator then basically says well, I required these
Dataframes to then be ordered on the left_id of right_id respectively. and this then means that the Sparkline will actually
generate two sort operators on top of the sortMergeJoin to meet this sorting requirements. So as you can see on the right, there're now then two sort operators both for each Dataframes,
that then make sure, that the Dataframes are
ordered on their joint key. So let's just revisit
our joint example, right? So given all the information
I just talked about, we can now analyze a bit better. What is actually going on in join. So first of all, it does a filter, right? So it actually turns out that in a step, even before the physical plan but in a step called the logical plan of catalyst optimizer, a
Spark is able to figure out that because it's an inner joint it's gonna already move all the join keys that half a null value so it
will actually add a filter on isNotNull for you in the Query Plan. Next we know that the sortMergeJoin has required distribution
of a hash partitioning, and this caused a Spark to
add in a shuffle exchange for each of the data, for
instance, are going to be joined. Lastly there's been also
the to requiredChildOdering because a distort merge
joint actually requires both the Dataframe to be sorted. So this cost is two, sorts to be added before the sortMergeJoin operator. Finally, Spark has seen that, well, this join is actually an equijoin because it contains the equality
statement on the item_id. And it has also determined
that both of these Dataframes are not small enough to be broadcasted and as such has ended up
with the sortMergeJoin. So this really should give
you now all the information that you need to see what is going on when looking at the joint, when looking at the Query Plan of a joint. So the last section of this presentation, I quickly want to go over a
couple of real world scenarios of Spark Queries that can be optimized by looking at the query plan. So in first example, we have a Dataframe that
contains the category column. For example, a product
category of supermarket. And what does the typical use
case is that for each category you want to apply say a
different product discounts. So one way to actually solve this, is that you union four Dataframes. And for each of these Dataframe you filter on a specific category and then apply this discount function. And for each category value will then have a different
discount function. If you don't create a query like this, what Spark will actually end up doing, is will release the
parquet files four times as you can see on the right. And because these are
repeated reads of data, this is really typically
a sub optimal query. So by looking at this query plan, this is something we've not found out. And the next question is then, can we somehow rewrites
this query to optimize it so that it only reads the data once? Well, and so it turns out we can. So instead of doing a union
and then with four filters we can actually just apply a
case one statement, all right? So for each of these product
categories we can say, well, when the category value
is this particular value, apply this function
when it's another value, apply that function and so on and so on. So you can create this chain
of case one statements. Am I doing it this way, we can actually write a query. That's the Dataframes only read once as we can see on the right. So it's not only one read and this essentially
means that the Spark Query is much quicker compared to the last query in which we did for reads. So the second scenario I want to cover is partial aggregations. So as Stefan already explained
in most aggregation queries, there's actually partial aggregation done to basically reduce the amount of data that has to be shuffled around before doing the final aggregation. However, this actually turns out not to be always very helpful. What, for example if
you have a gripping key that has a very high cardinality or has a lot of distinct values. So in this particular Dataframe of which we show an example on the left, the number of distinct item_ids
in which we want to group is almost the same as
the number of total rows in the first place. The distance actually means
that doing a partial aggregation is really not gonna
help a lot with reducing the number of rows that
are going to be shuffle. And this in fact actually only go to increase the query duration. So in this first example, in which we still have enabled
partial aggregations we see that a query duration is 23 seconds. However, if we not actually
turn off partial aggregations and this can actually be done by turning on the configuration as is denoted on the
left side of the slides, we are actually able to remove
this partial aggregation step in the Query Plan. And in this particular
case it actually leads to a speed up of the query of almost 22%. And we brought the query
duration down to 18 seconds. I just do want to note that
partial aggregations, right? Tend to be really good because
well in a normal use case, they really do reduce the amount of data that has to be shuffled
before the final aggregation. But again, there can be specific examples like for example, this one where, well, it actually benefits
from being turned off. So the final example I want to show you concerns joint strategy. So in this example, we have
the Dataframe with ships and all their respective longitude
and latitude coordinates. And we also have Dataframe
that contain spots, which have a bounding box of coordinates. And what we actually want to
figure out in this example is whether a given ship is
within a port or is an open sea. So what is one way to figure this out? Well, we can then of course do a join and what we do and just
join as you can left is we actually want to see
whether the latitude coordinate of a ship is within the min and max value of a port latitude coordinate. And we want to see whether
longitude coordinates of the ship is within the min
and max longitude coordinator of the port. But this actually leads
to a non-equality join as you can see on the left. And as you may recall, from our joint strategies flow
diagram, dismiss the Spark will try to choose a
BroadcastNestedLoopJoin. So luckily it turns out that the port is small enough to be broadcasted. So it does not have to to
do a Cartesian product join. Nonetheless, a BroadcastNestedLoopJoin is actually still quite slow. And as a result, this query
takes three and a half minutes. So the question is done. Can we do something about
it, so that Spark actually choose a different joint strategy. So, and it turns out we can, right? So one way to solve this
because we're talking about coordinates is basically
by defining geohashes which are basically
large rectangular areas that define these coordinate boxes. And in order to figure out
whether a ship is near a port, we can already say, well
that probably that means that the geohash of the ship
and the port is the same, before even looking at whether the ship is in the port or not. So what we now do to the Dataframe, to the joint query on the
left is we actually add a equality statement on which we say, well, the geohash of to ship, it should be the same as
the geohash of the port. And by doing this we actually ensure now that the joint is an equality joint. This means again, recalling
the flow diagram of joins the Spark can now actually
choose a BroadcastHashJoin or a sortMergeJoin. In this case, DF ports,
small enough to be bro costs. It's a Spark we'll actually
opt for a BroadcastHashJoin which is the quickest joint
implementation, there is. Next, we can see the query duration time has actually drastically being reduced from three and a half minutes
to the mere six seconds. So this is really an
example of by changing the join strategy we really accomplish a
drastic reduction inside. Yeah, so these for, so having discussed these three scenarios just brings us to the
end of the presentation. I just quickly wanna go over
what we actually discussed in this presentation. So we have seen in beginning
for the explanations of Stefan, that's a SQL Tab provides a lot of insights into how the Spark query is executed, right? So it really gives you a
breakdown of all the operators that Spark is using to execute this query. And so we can use the SQL Tab to reason about the query execution time, especially by looking
at the total duration of each of WholeStageCodegen blocks. That's definitely explained. And by using these insights, we can answer important questions like what part of my Spark
query takes the most time? And it's my Sparker actually choosing the most efficient Sparker
operators for the task. And so, as we saw in the last
part of this presentation we can then actually
analyze this Query Plan and sometimes see, well,
no it's actually not using the most effective query plan. And then you can rewrite your Spark where you're your Spark
query in such a way that a more optimal Query
Plan is actually chosen, and this can really
lead to large reductions in your query time. So if you want to practice or know more, there are a couple of tips
we can give you, all right? So what we typically suggest you to do is that whenever you write a Spark query or a Dataframe query is to
really mentally visualize what the physical plan would look like without actually looking at the SQL plan. And then to actually look
at the SQL query plan and compare whether you
were actually correct. And if you are not, try to
reason about why that is. The next, what we would suggest you to do is to actually go into
the Spark source code and look up to SparkPlan class, because this is really, this cluster contains all the methods and all the operators, and from there you can really dive into a deeper into the Spark codes to really see how the catalyst optimizer computes this query plan. And you will be able to
learn a lot more from there. So this is the end of the presentation, thank you very much for attending.