- This is the third talk in
the interview Delta Lake series and where we're gonna talk
about how Delete, Update, Merge, these operations work in Delta Lake. We're gonna take a little bit sneak peek into the internals of how do they work and how you can deal with
the performance of this. So I myself TD, Thank you,
Karen, for the introductions. So a little bit background about myself. I have been working in the Apache Spark's Ecosystems since 2011. Back then, I was a grad student
in the AMPLab, UC Berkeley, where in 2012, I along with Madi Zaharia, who started the Spark project itself, we started working on the Spark streaming, we've Spark streaming together. And now eight, nine years later, where I'm right now a
staff engineer Databricks and a core developer of Structured
Streaming and Delta Lake. And, Danny, you wanna say a
little bit about yourself? - Well, thanks very much. So I'm not as smart as TD, but I'm a staff developer
advocate at Databricks. (laughs) I worked for Apache Spark since 06, so roughly around the same
time around 2011, as well, former Senior Director of
Data Science Engineering at Concur, I'm also a former Microsoftie, and yeah, there you go I think those are the key components here. So I'm glad to be here, and hopefully you'll
enjoy today's session. - Alright. So, for those who are
completely new to this series, this is the third talk of the series. So just to get you bring it up to speed, there's a one slide intro
on what Delta Lake is. It's essentially like an
open-source storage layer that brings ACID transaction
to Spark workloads. So the format is open, you can store files in the same way as you
would create parquet tables you would create Delta Lake tables. But unlike parquet tables
where things can get bottlenecked on the
metadata handling in Spark, with Delta Lake the metadata
handling is far more scalable than parquet tables
there are things you can the whole table is versioned,
so you can travel back in time and query earlier versions of the table. You get full on schema enforcement and schema evolution
properties which ensure that you can maintain a
much higher quality of data without corruptions and
stuff as well as you get given it to audit all the
operations you have done with transaction guarantees
in the history of the table through GAPIs that allows you to query the history of the table. And fundamentally how Delta Lake works, is that it maintains a
transaction log within the table within a subject called Delta
log it maintains a transaction log of all the operations
that has happened, it maintains it by
writing these JSON files, which has description of all
the operations that happened, all the files that got added and removed in each of the operations. And so each of these
JSON files is essentially like a new version of the table, and this log is maintained within the same directory
where all the rest of the data files are also maintained. So all the information
both the data and metadata is co-located in one location,
and this log is updated atomically on every operation, therefore, you get full ACID transaction guarantees on any operation on the table. That's like a two minute
version of what Delta Lake is. So if you want to know
a little bit more detail of how this transaction
log works, et cetera, go take a look at the
part one of the series, YouTube link is here. It's also in the YouTube channel that you hopefully have subscribed to. And then if you want to learn more about the schema enforcement
which I have written the title wrong here, is
of a schema information how schema validation works, that's the part two of the series. So in this part three,
I'm gonna talk about DML operations, Update, Delete, Merge, how they work under the
hood, and how you can tune the performance to get the
best performance out of it. So, and I'm going to end by talking about some common design patterns
we have observed that people our users of Delta Lake have
used and how they can probably help your use cases as well
using the schema versions. So, the first operation we're
going to talk about is Update and Update is simply SQL Update. Update table set X equal
to Y based on a predicate. And what we have provided
is not just SQL support, but also Scala and Python API's for doing this update operation. So now incidentally, the SQL support is currently only available
in database Delta Lake but we are really trying
hard to get it out in the open source Delta
Lake but it will come out as soon as part 3.0 comes out, which has actual faster level support for these DML operations like
Update, Delete, Merge. So as soon as part 3.0 comes out, we will have a Delta
Lake release with actual SQL support for these, until then, you can always use Scala and Python for doing these sort of operations. And, the way it works under the hood, is that Delta Lake maintains these files, and so it only tracks data
at the granularity of files. Let's say you have the
version 12 of the table where you have four files. Now, let's say you run Update. What it will do underneath
is that it will use two scans on this data, to update them. First, it will do a first
can to (audio blurs) contain the data that needs to be updated based on the predicate
that you've provided. So, let's say out of these four files, two of the files has data
that matches the predicate. Now in the entire file, not all the rows in the file by the way
these are parquet files, that's how Delta stores
the data as parquet files. So now not all the rows
in the parquet files may match the data, so
there'll be some rows that actually match the predicate, some rows that does not
match the predicate, as you can see marked
here as green and red. Now to identify these files, it uses the predicate and column stats and partitions et cetera
partition pruning, all the things that Spark provides it uses that to narrow
down what files it needs to actually read, to actually find whether the file contains matches or not. Now once it finds those
files, it selects them and does another scan once again, we still re-write those files. So those two files got re-written
as completely new files. Because we cannot go
ahead and update files, parquet files are not designed
for an in place Update. So, we have to re-write the
entire data in the parquet files as new files, where the data the data in the files that
matched actually got updated. And the data that did
not match got just copied into these new files. And the files that got replaced
are essentially tombstoned, which means that in the transaction log, what we do is that we add this information that two new files got added, and the files that got
replaced were removed, marked as removed, they're
not physically removed from the directory make so
that we can do time travel, which we'll talk about later,
we can still query those files as a previous version of the table. So, only this is what
happens, so let me actually send it over to Danny who
can actually demonstrate this in a notebook and give
a better cleaner idea of how this works underneath. Over to you, Denny, let
me stop sharing my screen. Denny we can't hear you. - It would help if I
unmuted myself one day. - Yes, would help. - It would help, alright cool, so I want to make sure
you guys can see my screen perfectly fine, this notebook. - Yes. - Perfect, excellent, excellent, alright. So exactly what TD was
talking about, right. So in other words, we
created this Delta table, so I'm just scanning and skipping through and what you'll notice
they're actually much what the underlying
file system looks like. So I'm just gonna run it
live, so you can take a look. But basically, you'll notice
that there's an underscore Delta log, and there's
this parquet file, right and so this is what it
initially looks like, just like what TD was talking about, okay. I'm gonna skip past
what the initial log is and the metadata though,
I'm leaving it in there, so that way from a
standpoint of the notebook, you guys can have it for yourself, okay. Now, here's what the data
looks like initially, okay, so basically you've got a state, if I was to just explore the data, here's what it looks like. And you notice that
Illinois and Washington has a value of 3.2. Okay, but now I want to run an Update, just as TD was talking about,
so let's go ahead and do this. In this case, I'm gonna
be rather arbitrary here. And I'm just gonna say, hey,
let me look at the number of loans that I wanna go
ahead and update okay, in this case, 209. I'm gonna go ahead and in this case, run this Update statement where I'm saying for the state of Washington
updates the paid amount to be equal to the funded amount okay. So in other words, I want those
values to be updated, okay, just as TD was talking about. Alright, so with a Delta
Lake, as TD not talked about, you can run an Update,
but then what's actually happening underneath the covers? Alright, well, let's go
look at the jobs real quick. Alright, so for example,
if we have job 1838, let's go look at that now. Sorry, alright. Okay, it's gonna pop open. And we're going to sorry,
I'm gonna scroll back in so you can see it better, alright. So here's all the jobs
I've been running here, job 1838 is right here. We're gonna read the transaction log, we're looking at the DAG visualization. - Go to the SQL query. - Oh, sorry, yes I meant to
do that and forgot to do it, so there we go. Okay, perfect, so as you can
tell, I wanna zoom back in from the statistics,
we read one file, okay. It's literally right here,
actually in the USpark UI, okay, so we've read one file, here's
all the information we did like, the cache, write size,
the time spent file system, all these different things, but here's the rows output, okay. And sorry, let me try to close that. So that's what how it starts off with. And then let me go ahead and go back and was it 1839 I think? There go. So again, here's the four
rows that are gonna be in set of rows that potentially
going to be impacted. And all the exchange processing and all it's actually gonna
go do to the data, okay, now, if I was to go instead, just simply look at the history of this, the operation metrics
are actually embedded right inside here, okay, so
the total number of rows 14705. The updated rows, they were 340 rows, in fact they were actually updated because we actually updated
all of Washington State, not just the ones that were
the values weren't separate, we updated all of them because of the way I wrote
that particular statement. And so the number of rows that
were copied was right here. 14365 plus 340 equals
the 14705 that you see. So underneath the covers,
that's what we're doing. We're actually taking
a copy of all the files into a new set of files, okay, and we're going ahead and
making the modification as you see here. So here's what the result looks like, I just simply said Washington State's No, and then when I look at it,
see here's the file right here that you see, okay. So this is the file that
actually has the new set of data per the updates, that
just happened, okay, cool. TD let me switch it back
to you so you can go ahead and flip the next section. - Right. - Perfect, yeah. - Alright. So, now that you've seen how
you can create new versions by updating the data,
you can still go back and query the old version, Delta Lake provides these ways to query previous versions of the table
that you saw in the history. So here if I had created version 13, by updating version
12, you can still query version 12 using this option
called versionAsOf 12. So now you can just compare two versions of the table very easily to see how the rows got updated, if
there was incorrect update, how you can debug it and stuff like that. So that, gives you a lot of power to actually go back in time and
figure out what happened whether all the updates
that happened were correct, or if you see some sort of corrupted data, you can go back and check in which version did the data get corrupted
and maybe rollback using by reading the version overwriting the table with the correct
older version once again. So, this is again, a lot
of very powerful tool. So now let's talk about how do
you improve the performance? So as Denny showed you that
there are two scans on the data, now somebody asked question regarding whether those are two full scans or not? The simple answer is that the
first one may be a full scan, and the second one is usually not. So it all depends on
what predicate you put. So if your predicate for example, say that update all the rows
in a particular partition, then the first scan to find all the files that needs to be updated will
only query that partition, and so it doesn't need to go full scan. However, if your table is not partitioned, it's a smaller table
therefore not partitioned, it can be that it cannot narrow down based on your predicate,
what files to look at it will in that case we'll
have to scan the entire table to find which files have matches that satisfy that predicate. But after that, that's for the first time, but the second scan usually scans over only the files that needs to be updated. So to answer this question, it doesn't have to be two full scans, completely depends on your table setup. But now the key thing
to remember here is that the more predicates you
add in the Update clause, the easier it is for Delta Lake to narrow down the search space, and therefore the faster that
first scan is going to be. So that improves the
total runtime of Update. Now, the Databricks Delta
Lake platform has a few more performance optimizations that's the difference between Oasis Delta and the Databricks Delta Lake
is that we have a little bit few more performance improvements in Databricks Delta Lake. There are tools for doing
better data skipping especially we have this
thing called Z-order Optimize that essentially can optimize
the layout of the data in the table in a more organized manner, it's essentially like
multi-column sorting, but much better than that. Using space filling curves and stuff, and so it essentially
improves the efficacy of storing column stats at
the level of files and stuff, and then there's also
support from Bloom filters. We will not go into detail of those but the key item to remember is that if you already understand what goes on underneath these operations,
it makes it easier for you to reason about how to
optimize the performance of these operations, like for example, narrowing down the search
space by adding more predicates if you know where your matches
are going to be present. So, similar to Update,
there is Delete where similar syntax delete from
table based on predicate, there's obviously SQL that was going to come
with Apache Spark 3.0, but there's until then
there's also Scala and Python support directly in... But it's interesting to remember that, as Denny showed that the update create new files, but it
doesn't replace the old files, it only marks it as tombstone,
it marks it as deleted in the transaction log, so that it doesn't automatically delete it so that you can do time travel and
query those old data once again, you will by specifying
the exact version number. But now for deletion, the
setup is exactly the same, any data that needs to be
deleted will re-write new files, but the old files will
be preserved by default so that you can go back and query the version of the table before the Delete if things get accidentally deleted you can rollback and stuff. But in some cases, you
actually do want the data to be completely, permanently
deleted from disk, you don't want any file
to have the old data. So for that you have to run
this additional operation called Vacuum. That's an operation that provided that is specific Delta Lake,
and what it does is that you can say, how many previous or what is the length
of the previous versions that you want to retain. So let's say you want to
retain all the versions, written in last one day,
because you know that you don't want to care about querying any version of the table
that is older than one day then you will run vacuum
table retain one hour. By default, if you don't
specify retain it is seven days, that's what we have observed at works well in most situations. But what it goes Delta will
do is that it will figure out what are all the files
present in the directory that are not needed by any of the versions that were retained in the last one day. So, any file that is not
needed in the versions retained within the last 24
hours, those will get deleted. And that includes any
sort of partial files, corrupted files that
may have been retained but not actually committed to the log because of failed rights and stuff, so vacuum will delete all of that. And, using that you can actually control how much of the previous
versions you want to retain and what you can delete and eventually, you can age data out from all
the data files in this way, the data will actually get deleted. You can even run Vacuum retain zero hours, that actually will keep only
the latest version of the table if you really don't care about history, and you really want to
ensure that after deletion, all the data has actually
been physically deleted, you can run Vacuum retention zero, which will preserve only the last version. However, it's important to
remember that vacuum zero, do not run vacuum zero when
other writes are in progress because you may delete
files that are being correctly retained right now,
so just be careful of that. Anyways, so then on to Merge which is probably the more interesting and most powerful operation
among these three. So, Merge has a standard SQL syntax, where the idea is that you are
merging into a target table using data from a source table, where based on a match
that, if we match the keys of a source row and a target row. If it is matched, then you
can Update the (audio blurs) using this matched source data. If there is no match that
is, if there is a source row that does not match any target row, then you can insert a
new target table column based on the source, new target table row
based on the source row. Now, this is a standard SQL syntax, we support that obviously,
but we have additional support for a lot more extended syntax that makes things a little
bit more interesting. For example, we have support for additional clause conditions, say that for example, you have matched based on this condition,
but you want to have additional condition
on top of those matches then here you can specify
additional clause conditions, not just the match condition,
but clause conditions, that needs to be satisfied
before the update kicks in. And similarly, when not matched and additional clause condition, then only insert what happen. And where this is useful, is
that you can actually have multiple at least up to two
matched conditions as well. And you have support for deletes as well which and is equal syntax
doesn't, so, here you can say that when matched and clause
condition then update otherwise given it's matched, but the clause is not satisfied Delete. This is very powerful as we will show in using examples later, to do
a lot of complicated changes in the table, and we're
gonna walk through with you use cases where this is very, very useful. The other very useful
feature is the Star support to auto-expand target columns. And this is as far as we know, the only engine that supports
this sort of a syntax, which is when we once introduced it, it was a very very popular syntax, what it does is essentially for a table with very
large number of columns, you do not want to hand
write all the columns that you need to update. Let's say if you have a bunch of changes, which requires you to
update all the columns, if it matches, or if you want
to insert all the columns with the exact same as the source, it's often tedious to hand
write all the columns, the number of columns is
in the range of hundreds. So, Update Star just essentially
auto-expands the columns to say that Update Set
column one equal to source, column one, column two in
target equals the source column, which is auto-expands, which
makes it much easier to express and actually manage
these sort of points. So we also have support
for Programmatic APIs, Scala and Python. Again, one of the very few systems we know of that has
actually Programmatic APIs for doing this sort of
SQL module operations. Like I think, depending
on what kind of user, are you a primary SQL user or programmatic software engineer, data engineer kind of person,
which I'm more like that I like type safety, this
is very, very, very useful. So, under the hood, what happens is essentially the exact
same thing as Update, there are 2 parts of the data,
one to find the list of files that has matches and
therefore needs to be updated, and a second part to update those files by re-writing them as new files. But the interesting bit is that now unlike Update and
Delete, here, in both case, you have to do a join between
the source and the target to actually find the matches. The first kind is in the inner join between the target and
source, to find the match. The second one is an outer join between only the selected files in target and the source to generate
to do the Update, Delete, Insert operations on
the data on those files, and even some of the data may be copied, some of the data may be updated, deleted, inserted, et cetera. Again, Denny will demonstrate
this much more effectively, then, over to you Denny. - Thank you, sir, and this time, I actually bothered
remembering how to go ahead unmute myself, so that
actually usually helps. Cool, alright, so exactly
what TD was talking about and below, don't worry, I've
asked you folks to go ahead and ask your questions
in the Q and A panel, it's not that we're not
willing to answer them, it's just that we're gonna most
likely answer some of these questions after the session,
once we do the regular presentation portion of it, okay. Alright, so here's just
a quick animated GIF about the Insert Update slash process where when you're running a merge, right, so typically underneath the covers, basically what we're doing
just like TD had called out you identify the new rows to be inserted, identify the rows that
need to be replaced, i.e. updated, identify the rows that are not gonna be impacted
by the insert or the update, create new temp tables,
delete the original table, rename the temp table
and drop the temp table. Now, that if you were to do it under the regular parquet process, right, so it's pretty inefficient,
and requires you to actually do all the writing. And I'm lazy, so I don't
wanna do all that writing, so that's why TD shows
us awesome match syntax which actually simplified
that process massively. Alright, so in other
words, when I do that, I can just simply write a match thing. Okay, so let's use a quick example here. I'm gonna go ahead and take
a look at some data here. Okay, for example, if I was
to look at just New York, with loan Id less than
30, this is the three rows that I have here, okay, in
terms of what that could potentially be impacted,
okay I have a loan Id of 11, 21, 28, each of them being
funded with about $1000 to $1200, here's the different paid amounts, and the state being New York, alright. Now, let's create a new table that we want to now run the Merge. Okay, remember the first
table is the source table, it's of the 14705, three rows of the 14705 that potentially can be impacted. Now, here's this new
table that we're creating, which is listed here, where
basically for loan Id 11, for $1,000, we're gonna go
ahead and pay it off, right, this first row that you see here, okay. So that's why the funded amount is 1000, and the paid amount is 1000, okay. Alright, so then, we're also
gonna add a new loan, okay, and we're also gonna for the fun of it also add a duplicate loan, okay. In other words, by accident,
the source system screwed up, so there was a duplication, alright. So again, as opposed to
going ahead and running all of those other statements where I'm gonna do an insertion
specifically for loan Id 12, I'm gonna de-duplicate
the data myself for 28 and I'm gonna go ahead
and up run an update specifically for loan
11, I'm just gonna run a single merge statement, and
it's really simple right here, actually, it's right here, okay. And so I'm using the Python syntax because unlike TD, who loves type safety, I'm lazy so I don't like
doing it, no, I'm joking. I just happen to like the Python language, that's all even though the
tabs irritate me a lot. Nevertheless, I'm gonna simply
do a Merge where basically, it's where the source
table s.loan Id is equal to the target table t.loan Id, okay. Where you basically when it
match, you update them all when not match you insert them all, and then implied inside here also is the duplication as well, okay. So same idea, we have a
bunch of stages, okay. Now, there's a couple of
one question in terms of that I was asked, which
I'm gonna answer right now, which is about the execute plan query cool tool costs, okay. That's actually what the
spark UI is for, right. It actually provides you
a lot of that context, I'm gonna look at 1844, using the SQL tab, just as TD had called out, so I'm gonna go look at this, right. When I look here, the query plan is actually before I even
show you the DAG here, let me do a quick call
log, the logical plan, it's actually listed right
here, so you can actually understand logically what Sparks doing underneath the covers, so you
can actually go ahead and see similar to when you're working
with a relational database, you have a logical plan, same concept, all that's actually placed right here. Now, obviously, some people
are gonna turn on us and say, I'd love to understand
how it works graphically. And then that goes back
up to here here, right, which basically, the timing is here, seven milliseconds for the
WholeStageCodegen here. One second for this
particular WholeStageCodegen in order for it to do its
various processes, okay. So a lot of your query
information is actually all in that Spark UI. So you can figure out how
to improve the performance by understanding what's
going underneath the covers. So for example, this
one implied right away, actually, you know what, I wanna go look at a different
one right away, give me... - This is actually (mumble). - Exactly, I wanna talk about the other one first, so
I think it's this one. - Yes, the 45, I think
that should be the one. - Here we go, perfect 1845, okay. So here is that SortMergeJoin,
right, so in other words, here's the two files,
okay, so for example, when you open this one up, okay, it tells you right away
17 milliseconds was taken to read a single file. Alright, that file was
the file that contained the 14705 rows of data that
we were working with, okay. And so in fact, it actually
tells you right here, it tells you about the
cache, the file system reads and all these other statistics, which actually helps you understand why the query performance is
gonna be that particular way. But it lets you know right away. So exactly as TD was
explaining about like, the number of files or partition, right, you will be able to tell
like if this is running in seconds or minutes, right
you have a very long time perhaps partitioning
needs to come into play, because you need to go ahead
and reduce the number of files are gonna need to be read
because for sake of argument, you're only trying to process
one state or one date of data, as opposed to three years of data using that as a simple example, okay. Alright, let me close this up. There we go. Alright, so then versus
this Scan ExistingRDD. That was the three rows that we created, remember, we had created
this loan updates table? Well, this is what the
Scan our ExistingRDD is, again, this is a small idea,
but here's the projection you can see it right here. The loan Id the funded
amount, the paid amount, this is the RDD that we explained, right. Then exactly what TD was talking about in terms of the joins that happen, here's that SortMergeJoin,
that actually had to kick off. So first, there was a Sort
right, of this information and then now, we're gonna do SortMergeJoin between the three rows that we
originally had from the table that we generated, that we created, versus the 14, similar
five rows that came from the parquet file, the
single file that we had, in this case, that we
would now perform the Join. And then all of the statements oh, sorry, the projections here, right here, right, in terms of how it figures out the logic, that's actually all
shoved inside here, okay. And so, the idea is underneath the covers, then you can tell right away what's actually happening to the data, okay. In other words, it's grabbing one file, it's grabbing the three rows, it's gonna do a SortMergeJoin, that's what this example tells us, versus the previous step,
1844, I believe, yes. Right, this is actually doing
a broadcast exchange, right, in terms of here's the amount of data that's being pushed over,
right, that tells you the data size itself actually
helps you understand, okay if you've got a ton of
data that's being broadcast or exchanged over and over again for this broadcast hash join
that's happening right here, yeah, this tells you that
you're probably transferring or shuffling or moving too much
data across the wire, right. So again, can you show
shuffle, can you filter can you partition to reduce the sizes that you are working with, okay. And then, ultimately, back to this again, like for example, the output of this is, when I look in New York,
there is the loan Id 11, which is the basically the one
where we updated it, right, here's loan Id 12, which
basically is the brand new one that we just added, and here's the other loans that basically
were unaffected, right. In other words, we actually
put a duplicate 28, but that duplicate never
actually entered in there because the merge statement automatically had de-duplicate the data, okay. And then same idea when we
looked at the history, right, I can look at the metrics, right. The operation metrics are
right within the history, so you know how many rows were copied, in this case of the 1475, 1472,
were actually copied, okay, we only looked at one single file in order to make sense of it. The target rows that were updated, three of them were actually updated. And based on the source
rows of three, oh, sorry. There we go. Okay, and the number of
bytes, so all that information is basically packed inside
basically the operation metrics that you see within the history table, and within the spark UI, the
SQL tab to basically make sense from the UI perspective, cool. Hey, TD, anything else you
think I should be adding or we're good to go here? - Yeah, I think we are good to go. - Okay, perfect, alright,
well, I will stop sharing now. - Awesome. Alright, let's talk about how
we can improve performance. (speaking in low tone) Let's talk about how we
can improve performance. So, getting started with Update
it's important to understand what goes on underneath the covers, as Denny just showed
you, there's Inner Join and there's Outer Join. So, you have to really understand that what is the bottleneck between the inner join and outer join. If the inner join, which is
finding the file really right is the bottleneck taking
the most time in the query, then you have certain
optimization techniques. If it's the other one,
then you have a certain other optimization techniques. So if the inner join is slow that you're taking a lot of
time to just find the files to update, then again,
the standard techniques of getting more predicates into
narrow down the search space helps, you can obviously
adjust the shuffle partitions that will be the standard
Spark optimization techniques, and there's number of shuffle partitions to control the parallelism of the shuffle that needs to be done for the join. You can adjust the broadcast join we get towards more broadcasting
it, allow larger data to be broadcasted if the
source is too small enough to fit in a single machine memory. If they are sometimes it slows down because there are too many
small files in the table. Like if you have a million
of tiny kilobyte sized files, then the overheads of
reading each one of them is much higher than you
should compact the Delta table there is only documentation
on how do you re-write the layout of the Delta
table to compact them. But then again, you shouldn't
make extremely large files like 10gb files, because remember that we re-write only the granularity files and the larger files that you create, the more data may need to
be re-written unnecessarily. So if there is only one row
that needs to be updated in a file, it's cheaper to re-write an entire 100MB file rather than a 10gb file
for that one row update. So, you have to tune that, based on your workload requirements. And then India recently has
few performance optimizations. The Z-order optimize
that I mentioned earlier, helps to sort the data in
a certain in smart ways, which allows you to exploit
the locality of updates that if your changes are
going to be in a certain range of values with only
for a particular column, then you can Z-order
optimize by that column to get better locality so that
less number of files are touched and needs to be updated. So but on the other hand,
if you're outer join, the second scan it is actually
re-writing the file to slow then there are different
set of techniques. Well, there are common ones,
like Adjust shuffle partitions that again control the parallelism, but sometimes what happened is that if you parallelize too much, you can generate too many small files, especially with partition tables, you can generate too
many small files for that though the solution, the
knob we have provided is that you can actually reduce
the number of files by enabling automatic
repartition of the data based on the partition
column before the write. Now, this is available as Optimized Writes in Databricks Delta Lake. But in the next upcoming release of 0.6.0, which we will release tomorrow
or day after tomorrow, well, tomorrow actually. 0.6.0 also has support for
this automatic re-partitioning of data inside Merge before write. Now if it's a full outer join,
the outer join that you see then Spark cannot do any
sort of broadcast join, but doesn't support broadcast
threshold for outer joins. But starting from Delta Lake
0.6.0, it's possible that it may be a right outer
join, and Spark in that case can do broadcast joins and then again if you observe that in the
Spark that you're doing a right outer join, we're
looking at the logical and the plans as Denny
showed in the Spark UI, you can also get it
programmatically as well. Then you can adjust the
broadcast threshold to make it broadcast larger volume of data than what Sparks for default is. It also helps to cache the
source table dataframe, because you're gonna do two passes on the both the source and the target, especially on the entire source, it helps to cache the
source table or a dataframe, it can speed up the second scan, but it's important to remember that do not cache the target Delta table, because caching the Delta table can lead to weird cache coherency issues because if the Delta table updates, the cache doesn't, it all can lead to all sorts of confusion. So, generally don't cache the target table if you're going to update it. Anyways, we're kind of short on time, so I'm going to kind of squeeze through the common design patterns very quickly. So the common design patterns,
Denny showed a little bit of that in his model example as well, is deduplication during an ETL, in ETL pipeline can generate duplicates, you don't want the duplicates
in your final Delta table. So, if you write a merge query that only inserts only
if they're the unique Id based on which you can deduplicate, only if it doesn't match,
that means that row the new row that you're being
inserted is not in the table, judging by the unique Id only then insert. So this is again, another example of the extended merge syntax
that you may not even specify when matched clause at
all, you can only specify when not matched, then insert, this is a kind of deduplication. Now you can optimize it further by... Well, the problem you will face is that it may scan the entire table
every time to find out whether the unique Id exists or not. So you can optimize this further, if you know that your
duplicate data is going to come within only a certain time
period like if you have only duplicate data only
over last seven days and not older than that,
then you can specify, inject that seven day constraint as part of the match condition,
and therefore forcing merge to only search last seven
days of data for (mumbles) So these are optimizations you can do. Another common pattern is
using Structured Streaming. Structured Streaming you
may compute aggregates using structured streaming very easily. I have previous talks in Spark Summit and still based on structure streaming, if you can look it, we want to learn more about structure streaming. But what comes out of structure
streaming are essentially key value aggregates, which are
essentially prime candidates for really observing and
you can do that very easily. Structured streaming has this
operation called foreachBatch where for every batch of generated, updated key value aggregates
you can absurd that data into Delta Lake by calling
this merge operation within the foreachBatch. Again more details is there
in our online documentation. Another common pattern is
GDPR, which is very easy to do with Delete and Vacuum. You delete the user from the info table and then back in the table to ensure that it is actually deleted
physically from the files. But there are again,
better ways of doing that if you want to maintain the
entire history of a user than rather than relying
on the tables history, you can store the history
of the user explicitly by keeping all the previous
records of the user in the latest version of a table using this sort of operations
for SCD Type two operation. Again, details, examples are
there in our online docs. Not we're going to do. Another extremely common
pattern is Change Data Capture. People often want to take change data from the OLTP databases and
want to do OLAP operations on them without affecting
the OLTP databases. So they wanna take the changes
made in the OLTP database and apply it to some
table in the Delta Lake format which is great for OLAP. So for that, again, the merge operation, supports Delete, Insert,
Update, all in the same syntax. So based on your sequence of changes, you can very easily apply
those changes from OLTP to OLAP mean a very common pattern to be observed. Anyways, to end my talk,
let's talk a little bit of the community and ecosystem we are very aggressively making releases almost every quarter. I'm making significant
improvements in terms of the connector ecosystem
as well as the performance of very popular operations
like Merge and stuff. We are going to release something in the next couple of days. In 0.7.0, we're gonna release
a Apache Spark 3.0 support which brings a whole list of
stuff like SQL DDL support, SQL DML support, support
for defining tables in Hive metastore and all of that. And this connector
ecosystem is really growing. There's online documentation
on how to use Delta Lake with Amazon Redshift,
Athena, presto, snowflake and Hive just released
a couple of weeks back, released a Hive connector
where you can natively query Delta tables from high
up and from Hive directly. So take a look at online docs,
we have a growing partner of ecosystem partner that has promised to support Delta Lake within their platforms,
and a very large list and rapidly growing
list of users of Delta. Thank you very much. We should answer questions now. - Yes, absolutely, so we got about five to eight minutes left, so let's dive right into the questions, I'll ask them to you TD. Alright, perfect. So, first question, does
vacuuming improve performance? - I was actually typing that answer, but I'd rather give it verbally, now... - Exactly , I figured giving verbally would actually make a ton
of sense in this case. - In most cases it
doesn't, but in some cases, when you're querying the
table using Delta Lake, because of the transaction log, it doesn't need to list the directories to find out which files to read. So, it doesn't really matter
from data lakes point of view, whether there are too many
files in the directory for a very large number
of versions or not. But in some cases, we have observed that with very, very long history like that leads to millions,
ten's of millions of files, the storage system itself
often starts to behave in a weird manner,
because it just slows down all file system operations,
and this is true for even cloud storage file systems like S3 as your file systems with very large number
of files in the bucket, or in the container in case evasion. So and that is something that
is beyond what Delta Lake or any processing engine can do, this is just all file
system operations slow down. So it's not advisable to
keep very large history involving that requires you to keep tens of millions of
files, it's gonna be slow everything down, that
is beyond our control or control of any person. - Perfect, and then related, again we're gonna talk about
the file system a little more just because there seems to be a lot of questions about that. What about the file
distribution after you deal with the Updates and Deletes, right, I guess the concern is that basically because we're adding new files
as we've been talking about during an Update and Delete and we're creating tombstone, all eventually be able to speak English. What about the final distribution? - So yes, what can happen
is that as you're re-writing these files, you can
get a small file problem that can lead to fragment. But that's where you
can re-write the layout of the Delta Lake with
transactional guarantees using these operations. In India if it's Delta, it
is the optimize operation in open source, you have this operation, you have this option called... You can do essentially
no data change rights using dataframe rights,
by which you can say that I am going to re-write this partition of the Delta table by reading all of the data
and re-writing it back with a much smaller number of files. Essentially in terms of
Delta table operations just read as a dataframe the re-partition to a smaller number of
files and write it back into that partition. And there is additional
support for an option called Data change, set to false which means that it is telling the transaction log that there is no data that was changed, only data was re-arranged,
and that helps in actually, that write operation do
not conflict with any other actual data change operations. So again, all of this is
documented in our online docs, so take a look. - Perfect, and just to add
to exactly to TD's point, during the unpacking the
transaction log section, we actually dived a little
bit into that as well. So just a speed up to the section where we talked about file compaction, and also the notebook itself that's associated with
that particular Tech Talk. We actually dive not only dive into it, but we show you just just like we do here, we show you in a demo
exactly how that works, okay. Alright, so I think that
answers those questions in terms of file size and
all this other fun stuff. I have more fun one here,
hey, Sparks port for Java, Python and Scala
language, which one provides the best performance and why? - Ah, okay, so in terms
of Delta Lake operations like reading and writing, there
is absolutely no difference between Scala, Java, Python. In terms of processing,
you want to do an addition on top of that, once you have
them as dataframes and stuff, that is where there may be
slight performance difference between Python and Java. It all depends on let's
say, if you're doing built in functions,
like explode and stuff, then there is absolutely no difference because everything gets boiled
down on the code generated on the Java side, even
if you're writing files. But if you're doing UDFs,
that's where the difference may arise because Python
UDF versus Pandas UDF versus Scala, Java UDF,
those differences can arise in terms of UDFs, but that is purely on the Spark side of things,
whatever place you wanna do in Spark that is independent of Delta. - Perfect, okay. I'm gonna switch gears a little bit now, we've talked about the languages. Oh, actually, no, I wanna
stay in the language just for this one quick answer. How can we use the table
name in the SQL query as opposed to the Scala and Python syntax, you already implied it with the spark 3.0, but I figured it was
worthy of us going ahead and totally out again. - Yes, I can explain that in more detail. So, Apache Spark, in the 2.4 line, or the 2X10 before 3.0 does not allow any data source like Delta to really customize what gets
written out to the metastore. For Delta, what we really
need is that customization because unlike like parquet
tables or CSV tables and stuff, Delta does not rely on keeping
the metadata (audio blurs) metastore rather keeps all the metadata in the transaction log. That means that when you're
querying or planning a query on a Delta table, it needs a
certain amount of customization such that it can specify
that I do not want all the metadata stored in Hive metastore rather, I want to use the
metadata in a transaction log. Since before Spark 3.0 there were no API's to do the customizations,
that's why the Oasis Delta Lake working on Spark 2.4 does not support Hive metastore defined tables because we could not
do that customization. But with spark 3.0, we added
all the necessary API's by working with the Spark
community very closely, we added all the APIs, so that
the Delta Lake data source can do this customization and therefore lead from Hive metastore such that given a table
name, it can map it to just the location ignore
all the other metadata from the Hive metastore, just
read the location from that, go ahead and read the rest of the metadata from the transaction log in that location, and then plan the query based on that. This customization will
come with Spark 3.0 which the timeframe would be some probably we can only guess 'cause
Apache process takes it's little unpredictable
for such a major release. It will probably land
somewhere between June and July most likely and as soon
as Apache 3.0 lands we will have Delta Lake release, most likely delta is 0.7.0.
release on top of that. - Excellent, okay, I
think we only had time for probably one more question, so I'm just gonna just leave it at that. The question is how does
the connectors with Presto basically work when it comes to trying to read Delta Lake, right? Does it actually read the Delta log itself to figure things out? How does Presto Connect basically? - Very very good question. So there are two part answer to that. One, so the Presto support
we have added in open source does not read the log, it
rather reads this thing called Manifest files, again, details
are present in online docs. What manifest files are,
is that it's basically a bunch of text file
that contains the names of the data files, the parquet data files that someone needs to read
to give a full snapshot of the Delta table. So, the Delta Lake API's provide the command for generating
these manifest files on Delta table, which can
then be read by Presto to figure out what are
the actual parquet files that needs to be read to
put in the Delta table and then according to query. So Presto has inbuilt support
for this manifest file. So it doesn't read the log. Databricks and Starburst work closely to build Delta Lake native connector that actually reads the log and I think they just re-used it on
their enterprise platform. So that would actually
read the log directly to bypass manifest file system in Starburst to read the log directly to
find out what data to read. And hopefully at some point of
time they will open (mumbles) - Perfect, well, hey, thanks very much, I apologize that we could not get in.