Tech Talk | Diving into Delta Lake Part 3: How do DELETE, UPDATE, and MERGE work

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
- This is the third talk in the interview Delta Lake series and where we're gonna talk about how Delete, Update, Merge, these operations work in Delta Lake. We're gonna take a little bit sneak peek into the internals of how do they work and how you can deal with the performance of this. So I myself TD, Thank you, Karen, for the introductions. So a little bit background about myself. I have been working in the Apache Spark's Ecosystems since 2011. Back then, I was a grad student in the AMPLab, UC Berkeley, where in 2012, I along with Madi Zaharia, who started the Spark project itself, we started working on the Spark streaming, we've Spark streaming together. And now eight, nine years later, where I'm right now a staff engineer Databricks and a core developer of Structured Streaming and Delta Lake. And, Danny, you wanna say a little bit about yourself? - Well, thanks very much. So I'm not as smart as TD, but I'm a staff developer advocate at Databricks. (laughs) I worked for Apache Spark since 06, so roughly around the same time around 2011, as well, former Senior Director of Data Science Engineering at Concur, I'm also a former Microsoftie, and yeah, there you go I think those are the key components here. So I'm glad to be here, and hopefully you'll enjoy today's session. - Alright. So, for those who are completely new to this series, this is the third talk of the series. So just to get you bring it up to speed, there's a one slide intro on what Delta Lake is. It's essentially like an open-source storage layer that brings ACID transaction to Spark workloads. So the format is open, you can store files in the same way as you would create parquet tables you would create Delta Lake tables. But unlike parquet tables where things can get bottlenecked on the metadata handling in Spark, with Delta Lake the metadata handling is far more scalable than parquet tables there are things you can the whole table is versioned, so you can travel back in time and query earlier versions of the table. You get full on schema enforcement and schema evolution properties which ensure that you can maintain a much higher quality of data without corruptions and stuff as well as you get given it to audit all the operations you have done with transaction guarantees in the history of the table through GAPIs that allows you to query the history of the table. And fundamentally how Delta Lake works, is that it maintains a transaction log within the table within a subject called Delta log it maintains a transaction log of all the operations that has happened, it maintains it by writing these JSON files, which has description of all the operations that happened, all the files that got added and removed in each of the operations. And so each of these JSON files is essentially like a new version of the table, and this log is maintained within the same directory where all the rest of the data files are also maintained. So all the information both the data and metadata is co-located in one location, and this log is updated atomically on every operation, therefore, you get full ACID transaction guarantees on any operation on the table. That's like a two minute version of what Delta Lake is. So if you want to know a little bit more detail of how this transaction log works, et cetera, go take a look at the part one of the series, YouTube link is here. It's also in the YouTube channel that you hopefully have subscribed to. And then if you want to learn more about the schema enforcement which I have written the title wrong here, is of a schema information how schema validation works, that's the part two of the series. So in this part three, I'm gonna talk about DML operations, Update, Delete, Merge, how they work under the hood, and how you can tune the performance to get the best performance out of it. So, and I'm going to end by talking about some common design patterns we have observed that people our users of Delta Lake have used and how they can probably help your use cases as well using the schema versions. So, the first operation we're going to talk about is Update and Update is simply SQL Update. Update table set X equal to Y based on a predicate. And what we have provided is not just SQL support, but also Scala and Python API's for doing this update operation. So now incidentally, the SQL support is currently only available in database Delta Lake but we are really trying hard to get it out in the open source Delta Lake but it will come out as soon as part 3.0 comes out, which has actual faster level support for these DML operations like Update, Delete, Merge. So as soon as part 3.0 comes out, we will have a Delta Lake release with actual SQL support for these, until then, you can always use Scala and Python for doing these sort of operations. And, the way it works under the hood, is that Delta Lake maintains these files, and so it only tracks data at the granularity of files. Let's say you have the version 12 of the table where you have four files. Now, let's say you run Update. What it will do underneath is that it will use two scans on this data, to update them. First, it will do a first can to (audio blurs) contain the data that needs to be updated based on the predicate that you've provided. So, let's say out of these four files, two of the files has data that matches the predicate. Now in the entire file, not all the rows in the file by the way these are parquet files, that's how Delta stores the data as parquet files. So now not all the rows in the parquet files may match the data, so there'll be some rows that actually match the predicate, some rows that does not match the predicate, as you can see marked here as green and red. Now to identify these files, it uses the predicate and column stats and partitions et cetera partition pruning, all the things that Spark provides it uses that to narrow down what files it needs to actually read, to actually find whether the file contains matches or not. Now once it finds those files, it selects them and does another scan once again, we still re-write those files. So those two files got re-written as completely new files. Because we cannot go ahead and update files, parquet files are not designed for an in place Update. So, we have to re-write the entire data in the parquet files as new files, where the data the data in the files that matched actually got updated. And the data that did not match got just copied into these new files. And the files that got replaced are essentially tombstoned, which means that in the transaction log, what we do is that we add this information that two new files got added, and the files that got replaced were removed, marked as removed, they're not physically removed from the directory make so that we can do time travel, which we'll talk about later, we can still query those files as a previous version of the table. So, only this is what happens, so let me actually send it over to Danny who can actually demonstrate this in a notebook and give a better cleaner idea of how this works underneath. Over to you, Denny, let me stop sharing my screen. Denny we can't hear you. - It would help if I unmuted myself one day. - Yes, would help. - It would help, alright cool, so I want to make sure you guys can see my screen perfectly fine, this notebook. - Yes. - Perfect, excellent, excellent, alright. So exactly what TD was talking about, right. So in other words, we created this Delta table, so I'm just scanning and skipping through and what you'll notice they're actually much what the underlying file system looks like. So I'm just gonna run it live, so you can take a look. But basically, you'll notice that there's an underscore Delta log, and there's this parquet file, right and so this is what it initially looks like, just like what TD was talking about, okay. I'm gonna skip past what the initial log is and the metadata though, I'm leaving it in there, so that way from a standpoint of the notebook, you guys can have it for yourself, okay. Now, here's what the data looks like initially, okay, so basically you've got a state, if I was to just explore the data, here's what it looks like. And you notice that Illinois and Washington has a value of 3.2. Okay, but now I want to run an Update, just as TD was talking about, so let's go ahead and do this. In this case, I'm gonna be rather arbitrary here. And I'm just gonna say, hey, let me look at the number of loans that I wanna go ahead and update okay, in this case, 209. I'm gonna go ahead and in this case, run this Update statement where I'm saying for the state of Washington updates the paid amount to be equal to the funded amount okay. So in other words, I want those values to be updated, okay, just as TD was talking about. Alright, so with a Delta Lake, as TD not talked about, you can run an Update, but then what's actually happening underneath the covers? Alright, well, let's go look at the jobs real quick. Alright, so for example, if we have job 1838, let's go look at that now. Sorry, alright. Okay, it's gonna pop open. And we're going to sorry, I'm gonna scroll back in so you can see it better, alright. So here's all the jobs I've been running here, job 1838 is right here. We're gonna read the transaction log, we're looking at the DAG visualization. - Go to the SQL query. - Oh, sorry, yes I meant to do that and forgot to do it, so there we go. Okay, perfect, so as you can tell, I wanna zoom back in from the statistics, we read one file, okay. It's literally right here, actually in the USpark UI, okay, so we've read one file, here's all the information we did like, the cache, write size, the time spent file system, all these different things, but here's the rows output, okay. And sorry, let me try to close that. So that's what how it starts off with. And then let me go ahead and go back and was it 1839 I think? There go. So again, here's the four rows that are gonna be in set of rows that potentially going to be impacted. And all the exchange processing and all it's actually gonna go do to the data, okay, now, if I was to go instead, just simply look at the history of this, the operation metrics are actually embedded right inside here, okay, so the total number of rows 14705. The updated rows, they were 340 rows, in fact they were actually updated because we actually updated all of Washington State, not just the ones that were the values weren't separate, we updated all of them because of the way I wrote that particular statement. And so the number of rows that were copied was right here. 14365 plus 340 equals the 14705 that you see. So underneath the covers, that's what we're doing. We're actually taking a copy of all the files into a new set of files, okay, and we're going ahead and making the modification as you see here. So here's what the result looks like, I just simply said Washington State's No, and then when I look at it, see here's the file right here that you see, okay. So this is the file that actually has the new set of data per the updates, that just happened, okay, cool. TD let me switch it back to you so you can go ahead and flip the next section. - Right. - Perfect, yeah. - Alright. So, now that you've seen how you can create new versions by updating the data, you can still go back and query the old version, Delta Lake provides these ways to query previous versions of the table that you saw in the history. So here if I had created version 13, by updating version 12, you can still query version 12 using this option called versionAsOf 12. So now you can just compare two versions of the table very easily to see how the rows got updated, if there was incorrect update, how you can debug it and stuff like that. So that, gives you a lot of power to actually go back in time and figure out what happened whether all the updates that happened were correct, or if you see some sort of corrupted data, you can go back and check in which version did the data get corrupted and maybe rollback using by reading the version overwriting the table with the correct older version once again. So, this is again, a lot of very powerful tool. So now let's talk about how do you improve the performance? So as Denny showed you that there are two scans on the data, now somebody asked question regarding whether those are two full scans or not? The simple answer is that the first one may be a full scan, and the second one is usually not. So it all depends on what predicate you put. So if your predicate for example, say that update all the rows in a particular partition, then the first scan to find all the files that needs to be updated will only query that partition, and so it doesn't need to go full scan. However, if your table is not partitioned, it's a smaller table therefore not partitioned, it can be that it cannot narrow down based on your predicate, what files to look at it will in that case we'll have to scan the entire table to find which files have matches that satisfy that predicate. But after that, that's for the first time, but the second scan usually scans over only the files that needs to be updated. So to answer this question, it doesn't have to be two full scans, completely depends on your table setup. But now the key thing to remember here is that the more predicates you add in the Update clause, the easier it is for Delta Lake to narrow down the search space, and therefore the faster that first scan is going to be. So that improves the total runtime of Update. Now, the Databricks Delta Lake platform has a few more performance optimizations that's the difference between Oasis Delta and the Databricks Delta Lake is that we have a little bit few more performance improvements in Databricks Delta Lake. There are tools for doing better data skipping especially we have this thing called Z-order Optimize that essentially can optimize the layout of the data in the table in a more organized manner, it's essentially like multi-column sorting, but much better than that. Using space filling curves and stuff, and so it essentially improves the efficacy of storing column stats at the level of files and stuff, and then there's also support from Bloom filters. We will not go into detail of those but the key item to remember is that if you already understand what goes on underneath these operations, it makes it easier for you to reason about how to optimize the performance of these operations, like for example, narrowing down the search space by adding more predicates if you know where your matches are going to be present. So, similar to Update, there is Delete where similar syntax delete from table based on predicate, there's obviously SQL that was going to come with Apache Spark 3.0, but there's until then there's also Scala and Python support directly in... But it's interesting to remember that, as Denny showed that the update create new files, but it doesn't replace the old files, it only marks it as tombstone, it marks it as deleted in the transaction log, so that it doesn't automatically delete it so that you can do time travel and query those old data once again, you will by specifying the exact version number. But now for deletion, the setup is exactly the same, any data that needs to be deleted will re-write new files, but the old files will be preserved by default so that you can go back and query the version of the table before the Delete if things get accidentally deleted you can rollback and stuff. But in some cases, you actually do want the data to be completely, permanently deleted from disk, you don't want any file to have the old data. So for that you have to run this additional operation called Vacuum. That's an operation that provided that is specific Delta Lake, and what it does is that you can say, how many previous or what is the length of the previous versions that you want to retain. So let's say you want to retain all the versions, written in last one day, because you know that you don't want to care about querying any version of the table that is older than one day then you will run vacuum table retain one hour. By default, if you don't specify retain it is seven days, that's what we have observed at works well in most situations. But what it goes Delta will do is that it will figure out what are all the files present in the directory that are not needed by any of the versions that were retained in the last one day. So, any file that is not needed in the versions retained within the last 24 hours, those will get deleted. And that includes any sort of partial files, corrupted files that may have been retained but not actually committed to the log because of failed rights and stuff, so vacuum will delete all of that. And, using that you can actually control how much of the previous versions you want to retain and what you can delete and eventually, you can age data out from all the data files in this way, the data will actually get deleted. You can even run Vacuum retain zero hours, that actually will keep only the latest version of the table if you really don't care about history, and you really want to ensure that after deletion, all the data has actually been physically deleted, you can run Vacuum retention zero, which will preserve only the last version. However, it's important to remember that vacuum zero, do not run vacuum zero when other writes are in progress because you may delete files that are being correctly retained right now, so just be careful of that. Anyways, so then on to Merge which is probably the more interesting and most powerful operation among these three. So, Merge has a standard SQL syntax, where the idea is that you are merging into a target table using data from a source table, where based on a match that, if we match the keys of a source row and a target row. If it is matched, then you can Update the (audio blurs) using this matched source data. If there is no match that is, if there is a source row that does not match any target row, then you can insert a new target table column based on the source, new target table row based on the source row. Now, this is a standard SQL syntax, we support that obviously, but we have additional support for a lot more extended syntax that makes things a little bit more interesting. For example, we have support for additional clause conditions, say that for example, you have matched based on this condition, but you want to have additional condition on top of those matches then here you can specify additional clause conditions, not just the match condition, but clause conditions, that needs to be satisfied before the update kicks in. And similarly, when not matched and additional clause condition, then only insert what happen. And where this is useful, is that you can actually have multiple at least up to two matched conditions as well. And you have support for deletes as well which and is equal syntax doesn't, so, here you can say that when matched and clause condition then update otherwise given it's matched, but the clause is not satisfied Delete. This is very powerful as we will show in using examples later, to do a lot of complicated changes in the table, and we're gonna walk through with you use cases where this is very, very useful. The other very useful feature is the Star support to auto-expand target columns. And this is as far as we know, the only engine that supports this sort of a syntax, which is when we once introduced it, it was a very very popular syntax, what it does is essentially for a table with very large number of columns, you do not want to hand write all the columns that you need to update. Let's say if you have a bunch of changes, which requires you to update all the columns, if it matches, or if you want to insert all the columns with the exact same as the source, it's often tedious to hand write all the columns, the number of columns is in the range of hundreds. So, Update Star just essentially auto-expands the columns to say that Update Set column one equal to source, column one, column two in target equals the source column, which is auto-expands, which makes it much easier to express and actually manage these sort of points. So we also have support for Programmatic APIs, Scala and Python. Again, one of the very few systems we know of that has actually Programmatic APIs for doing this sort of SQL module operations. Like I think, depending on what kind of user, are you a primary SQL user or programmatic software engineer, data engineer kind of person, which I'm more like that I like type safety, this is very, very, very useful. So, under the hood, what happens is essentially the exact same thing as Update, there are 2 parts of the data, one to find the list of files that has matches and therefore needs to be updated, and a second part to update those files by re-writing them as new files. But the interesting bit is that now unlike Update and Delete, here, in both case, you have to do a join between the source and the target to actually find the matches. The first kind is in the inner join between the target and source, to find the match. The second one is an outer join between only the selected files in target and the source to generate to do the Update, Delete, Insert operations on the data on those files, and even some of the data may be copied, some of the data may be updated, deleted, inserted, et cetera. Again, Denny will demonstrate this much more effectively, then, over to you Denny. - Thank you, sir, and this time, I actually bothered remembering how to go ahead unmute myself, so that actually usually helps. Cool, alright, so exactly what TD was talking about and below, don't worry, I've asked you folks to go ahead and ask your questions in the Q and A panel, it's not that we're not willing to answer them, it's just that we're gonna most likely answer some of these questions after the session, once we do the regular presentation portion of it, okay. Alright, so here's just a quick animated GIF about the Insert Update slash process where when you're running a merge, right, so typically underneath the covers, basically what we're doing just like TD had called out you identify the new rows to be inserted, identify the rows that need to be replaced, i.e. updated, identify the rows that are not gonna be impacted by the insert or the update, create new temp tables, delete the original table, rename the temp table and drop the temp table. Now, that if you were to do it under the regular parquet process, right, so it's pretty inefficient, and requires you to actually do all the writing. And I'm lazy, so I don't wanna do all that writing, so that's why TD shows us awesome match syntax which actually simplified that process massively. Alright, so in other words, when I do that, I can just simply write a match thing. Okay, so let's use a quick example here. I'm gonna go ahead and take a look at some data here. Okay, for example, if I was to look at just New York, with loan Id less than 30, this is the three rows that I have here, okay, in terms of what that could potentially be impacted, okay I have a loan Id of 11, 21, 28, each of them being funded with about $1000 to $1200, here's the different paid amounts, and the state being New York, alright. Now, let's create a new table that we want to now run the Merge. Okay, remember the first table is the source table, it's of the 14705, three rows of the 14705 that potentially can be impacted. Now, here's this new table that we're creating, which is listed here, where basically for loan Id 11, for $1,000, we're gonna go ahead and pay it off, right, this first row that you see here, okay. So that's why the funded amount is 1000, and the paid amount is 1000, okay. Alright, so then, we're also gonna add a new loan, okay, and we're also gonna for the fun of it also add a duplicate loan, okay. In other words, by accident, the source system screwed up, so there was a duplication, alright. So again, as opposed to going ahead and running all of those other statements where I'm gonna do an insertion specifically for loan Id 12, I'm gonna de-duplicate the data myself for 28 and I'm gonna go ahead and up run an update specifically for loan 11, I'm just gonna run a single merge statement, and it's really simple right here, actually, it's right here, okay. And so I'm using the Python syntax because unlike TD, who loves type safety, I'm lazy so I don't like doing it, no, I'm joking. I just happen to like the Python language, that's all even though the tabs irritate me a lot. Nevertheless, I'm gonna simply do a Merge where basically, it's where the source table s.loan Id is equal to the target table t.loan Id, okay. Where you basically when it match, you update them all when not match you insert them all, and then implied inside here also is the duplication as well, okay. So same idea, we have a bunch of stages, okay. Now, there's a couple of one question in terms of that I was asked, which I'm gonna answer right now, which is about the execute plan query cool tool costs, okay. That's actually what the spark UI is for, right. It actually provides you a lot of that context, I'm gonna look at 1844, using the SQL tab, just as TD had called out, so I'm gonna go look at this, right. When I look here, the query plan is actually before I even show you the DAG here, let me do a quick call log, the logical plan, it's actually listed right here, so you can actually understand logically what Sparks doing underneath the covers, so you can actually go ahead and see similar to when you're working with a relational database, you have a logical plan, same concept, all that's actually placed right here. Now, obviously, some people are gonna turn on us and say, I'd love to understand how it works graphically. And then that goes back up to here here, right, which basically, the timing is here, seven milliseconds for the WholeStageCodegen here. One second for this particular WholeStageCodegen in order for it to do its various processes, okay. So a lot of your query information is actually all in that Spark UI. So you can figure out how to improve the performance by understanding what's going underneath the covers. So for example, this one implied right away, actually, you know what, I wanna go look at a different one right away, give me... - This is actually (mumble). - Exactly, I wanna talk about the other one first, so I think it's this one. - Yes, the 45, I think that should be the one. - Here we go, perfect 1845, okay. So here is that SortMergeJoin, right, so in other words, here's the two files, okay, so for example, when you open this one up, okay, it tells you right away 17 milliseconds was taken to read a single file. Alright, that file was the file that contained the 14705 rows of data that we were working with, okay. And so in fact, it actually tells you right here, it tells you about the cache, the file system reads and all these other statistics, which actually helps you understand why the query performance is gonna be that particular way. But it lets you know right away. So exactly as TD was explaining about like, the number of files or partition, right, you will be able to tell like if this is running in seconds or minutes, right you have a very long time perhaps partitioning needs to come into play, because you need to go ahead and reduce the number of files are gonna need to be read because for sake of argument, you're only trying to process one state or one date of data, as opposed to three years of data using that as a simple example, okay. Alright, let me close this up. There we go. Alright, so then versus this Scan ExistingRDD. That was the three rows that we created, remember, we had created this loan updates table? Well, this is what the Scan our ExistingRDD is, again, this is a small idea, but here's the projection you can see it right here. The loan Id the funded amount, the paid amount, this is the RDD that we explained, right. Then exactly what TD was talking about in terms of the joins that happen, here's that SortMergeJoin, that actually had to kick off. So first, there was a Sort right, of this information and then now, we're gonna do SortMergeJoin between the three rows that we originally had from the table that we generated, that we created, versus the 14, similar five rows that came from the parquet file, the single file that we had, in this case, that we would now perform the Join. And then all of the statements oh, sorry, the projections here, right here, right, in terms of how it figures out the logic, that's actually all shoved inside here, okay. And so, the idea is underneath the covers, then you can tell right away what's actually happening to the data, okay. In other words, it's grabbing one file, it's grabbing the three rows, it's gonna do a SortMergeJoin, that's what this example tells us, versus the previous step, 1844, I believe, yes. Right, this is actually doing a broadcast exchange, right, in terms of here's the amount of data that's being pushed over, right, that tells you the data size itself actually helps you understand, okay if you've got a ton of data that's being broadcast or exchanged over and over again for this broadcast hash join that's happening right here, yeah, this tells you that you're probably transferring or shuffling or moving too much data across the wire, right. So again, can you show shuffle, can you filter can you partition to reduce the sizes that you are working with, okay. And then, ultimately, back to this again, like for example, the output of this is, when I look in New York, there is the loan Id 11, which is the basically the one where we updated it, right, here's loan Id 12, which basically is the brand new one that we just added, and here's the other loans that basically were unaffected, right. In other words, we actually put a duplicate 28, but that duplicate never actually entered in there because the merge statement automatically had de-duplicate the data, okay. And then same idea when we looked at the history, right, I can look at the metrics, right. The operation metrics are right within the history, so you know how many rows were copied, in this case of the 1475, 1472, were actually copied, okay, we only looked at one single file in order to make sense of it. The target rows that were updated, three of them were actually updated. And based on the source rows of three, oh, sorry. There we go. Okay, and the number of bytes, so all that information is basically packed inside basically the operation metrics that you see within the history table, and within the spark UI, the SQL tab to basically make sense from the UI perspective, cool. Hey, TD, anything else you think I should be adding or we're good to go here? - Yeah, I think we are good to go. - Okay, perfect, alright, well, I will stop sharing now. - Awesome. Alright, let's talk about how we can improve performance. (speaking in low tone) Let's talk about how we can improve performance. So, getting started with Update it's important to understand what goes on underneath the covers, as Denny just showed you, there's Inner Join and there's Outer Join. So, you have to really understand that what is the bottleneck between the inner join and outer join. If the inner join, which is finding the file really right is the bottleneck taking the most time in the query, then you have certain optimization techniques. If it's the other one, then you have a certain other optimization techniques. So if the inner join is slow that you're taking a lot of time to just find the files to update, then again, the standard techniques of getting more predicates into narrow down the search space helps, you can obviously adjust the shuffle partitions that will be the standard Spark optimization techniques, and there's number of shuffle partitions to control the parallelism of the shuffle that needs to be done for the join. You can adjust the broadcast join we get towards more broadcasting it, allow larger data to be broadcasted if the source is too small enough to fit in a single machine memory. If they are sometimes it slows down because there are too many small files in the table. Like if you have a million of tiny kilobyte sized files, then the overheads of reading each one of them is much higher than you should compact the Delta table there is only documentation on how do you re-write the layout of the Delta table to compact them. But then again, you shouldn't make extremely large files like 10gb files, because remember that we re-write only the granularity files and the larger files that you create, the more data may need to be re-written unnecessarily. So if there is only one row that needs to be updated in a file, it's cheaper to re-write an entire 100MB file rather than a 10gb file for that one row update. So, you have to tune that, based on your workload requirements. And then India recently has few performance optimizations. The Z-order optimize that I mentioned earlier, helps to sort the data in a certain in smart ways, which allows you to exploit the locality of updates that if your changes are going to be in a certain range of values with only for a particular column, then you can Z-order optimize by that column to get better locality so that less number of files are touched and needs to be updated. So but on the other hand, if you're outer join, the second scan it is actually re-writing the file to slow then there are different set of techniques. Well, there are common ones, like Adjust shuffle partitions that again control the parallelism, but sometimes what happened is that if you parallelize too much, you can generate too many small files, especially with partition tables, you can generate too many small files for that though the solution, the knob we have provided is that you can actually reduce the number of files by enabling automatic repartition of the data based on the partition column before the write. Now, this is available as Optimized Writes in Databricks Delta Lake. But in the next upcoming release of 0.6.0, which we will release tomorrow or day after tomorrow, well, tomorrow actually. 0.6.0 also has support for this automatic re-partitioning of data inside Merge before write. Now if it's a full outer join, the outer join that you see then Spark cannot do any sort of broadcast join, but doesn't support broadcast threshold for outer joins. But starting from Delta Lake 0.6.0, it's possible that it may be a right outer join, and Spark in that case can do broadcast joins and then again if you observe that in the Spark that you're doing a right outer join, we're looking at the logical and the plans as Denny showed in the Spark UI, you can also get it programmatically as well. Then you can adjust the broadcast threshold to make it broadcast larger volume of data than what Sparks for default is. It also helps to cache the source table dataframe, because you're gonna do two passes on the both the source and the target, especially on the entire source, it helps to cache the source table or a dataframe, it can speed up the second scan, but it's important to remember that do not cache the target Delta table, because caching the Delta table can lead to weird cache coherency issues because if the Delta table updates, the cache doesn't, it all can lead to all sorts of confusion. So, generally don't cache the target table if you're going to update it. Anyways, we're kind of short on time, so I'm going to kind of squeeze through the common design patterns very quickly. So the common design patterns, Denny showed a little bit of that in his model example as well, is deduplication during an ETL, in ETL pipeline can generate duplicates, you don't want the duplicates in your final Delta table. So, if you write a merge query that only inserts only if they're the unique Id based on which you can deduplicate, only if it doesn't match, that means that row the new row that you're being inserted is not in the table, judging by the unique Id only then insert. So this is again, another example of the extended merge syntax that you may not even specify when matched clause at all, you can only specify when not matched, then insert, this is a kind of deduplication. Now you can optimize it further by... Well, the problem you will face is that it may scan the entire table every time to find out whether the unique Id exists or not. So you can optimize this further, if you know that your duplicate data is going to come within only a certain time period like if you have only duplicate data only over last seven days and not older than that, then you can specify, inject that seven day constraint as part of the match condition, and therefore forcing merge to only search last seven days of data for (mumbles) So these are optimizations you can do. Another common pattern is using Structured Streaming. Structured Streaming you may compute aggregates using structured streaming very easily. I have previous talks in Spark Summit and still based on structure streaming, if you can look it, we want to learn more about structure streaming. But what comes out of structure streaming are essentially key value aggregates, which are essentially prime candidates for really observing and you can do that very easily. Structured streaming has this operation called foreachBatch where for every batch of generated, updated key value aggregates you can absurd that data into Delta Lake by calling this merge operation within the foreachBatch. Again more details is there in our online documentation. Another common pattern is GDPR, which is very easy to do with Delete and Vacuum. You delete the user from the info table and then back in the table to ensure that it is actually deleted physically from the files. But there are again, better ways of doing that if you want to maintain the entire history of a user than rather than relying on the tables history, you can store the history of the user explicitly by keeping all the previous records of the user in the latest version of a table using this sort of operations for SCD Type two operation. Again, details, examples are there in our online docs. Not we're going to do. Another extremely common pattern is Change Data Capture. People often want to take change data from the OLTP databases and want to do OLAP operations on them without affecting the OLTP databases. So they wanna take the changes made in the OLTP database and apply it to some table in the Delta Lake format which is great for OLAP. So for that, again, the merge operation, supports Delete, Insert, Update, all in the same syntax. So based on your sequence of changes, you can very easily apply those changes from OLTP to OLAP mean a very common pattern to be observed. Anyways, to end my talk, let's talk a little bit of the community and ecosystem we are very aggressively making releases almost every quarter. I'm making significant improvements in terms of the connector ecosystem as well as the performance of very popular operations like Merge and stuff. We are going to release something in the next couple of days. In 0.7.0, we're gonna release a Apache Spark 3.0 support which brings a whole list of stuff like SQL DDL support, SQL DML support, support for defining tables in Hive metastore and all of that. And this connector ecosystem is really growing. There's online documentation on how to use Delta Lake with Amazon Redshift, Athena, presto, snowflake and Hive just released a couple of weeks back, released a Hive connector where you can natively query Delta tables from high up and from Hive directly. So take a look at online docs, we have a growing partner of ecosystem partner that has promised to support Delta Lake within their platforms, and a very large list and rapidly growing list of users of Delta. Thank you very much. We should answer questions now. - Yes, absolutely, so we got about five to eight minutes left, so let's dive right into the questions, I'll ask them to you TD. Alright, perfect. So, first question, does vacuuming improve performance? - I was actually typing that answer, but I'd rather give it verbally, now... - Exactly , I figured giving verbally would actually make a ton of sense in this case. - In most cases it doesn't, but in some cases, when you're querying the table using Delta Lake, because of the transaction log, it doesn't need to list the directories to find out which files to read. So, it doesn't really matter from data lakes point of view, whether there are too many files in the directory for a very large number of versions or not. But in some cases, we have observed that with very, very long history like that leads to millions, ten's of millions of files, the storage system itself often starts to behave in a weird manner, because it just slows down all file system operations, and this is true for even cloud storage file systems like S3 as your file systems with very large number of files in the bucket, or in the container in case evasion. So and that is something that is beyond what Delta Lake or any processing engine can do, this is just all file system operations slow down. So it's not advisable to keep very large history involving that requires you to keep tens of millions of files, it's gonna be slow everything down, that is beyond our control or control of any person. - Perfect, and then related, again we're gonna talk about the file system a little more just because there seems to be a lot of questions about that. What about the file distribution after you deal with the Updates and Deletes, right, I guess the concern is that basically because we're adding new files as we've been talking about during an Update and Delete and we're creating tombstone, all eventually be able to speak English. What about the final distribution? - So yes, what can happen is that as you're re-writing these files, you can get a small file problem that can lead to fragment. But that's where you can re-write the layout of the Delta Lake with transactional guarantees using these operations. In India if it's Delta, it is the optimize operation in open source, you have this operation, you have this option called... You can do essentially no data change rights using dataframe rights, by which you can say that I am going to re-write this partition of the Delta table by reading all of the data and re-writing it back with a much smaller number of files. Essentially in terms of Delta table operations just read as a dataframe the re-partition to a smaller number of files and write it back into that partition. And there is additional support for an option called Data change, set to false which means that it is telling the transaction log that there is no data that was changed, only data was re-arranged, and that helps in actually, that write operation do not conflict with any other actual data change operations. So again, all of this is documented in our online docs, so take a look. - Perfect, and just to add to exactly to TD's point, during the unpacking the transaction log section, we actually dived a little bit into that as well. So just a speed up to the section where we talked about file compaction, and also the notebook itself that's associated with that particular Tech Talk. We actually dive not only dive into it, but we show you just just like we do here, we show you in a demo exactly how that works, okay. Alright, so I think that answers those questions in terms of file size and all this other fun stuff. I have more fun one here, hey, Sparks port for Java, Python and Scala language, which one provides the best performance and why? - Ah, okay, so in terms of Delta Lake operations like reading and writing, there is absolutely no difference between Scala, Java, Python. In terms of processing, you want to do an addition on top of that, once you have them as dataframes and stuff, that is where there may be slight performance difference between Python and Java. It all depends on let's say, if you're doing built in functions, like explode and stuff, then there is absolutely no difference because everything gets boiled down on the code generated on the Java side, even if you're writing files. But if you're doing UDFs, that's where the difference may arise because Python UDF versus Pandas UDF versus Scala, Java UDF, those differences can arise in terms of UDFs, but that is purely on the Spark side of things, whatever place you wanna do in Spark that is independent of Delta. - Perfect, okay. I'm gonna switch gears a little bit now, we've talked about the languages. Oh, actually, no, I wanna stay in the language just for this one quick answer. How can we use the table name in the SQL query as opposed to the Scala and Python syntax, you already implied it with the spark 3.0, but I figured it was worthy of us going ahead and totally out again. - Yes, I can explain that in more detail. So, Apache Spark, in the 2.4 line, or the 2X10 before 3.0 does not allow any data source like Delta to really customize what gets written out to the metastore. For Delta, what we really need is that customization because unlike like parquet tables or CSV tables and stuff, Delta does not rely on keeping the metadata (audio blurs) metastore rather keeps all the metadata in the transaction log. That means that when you're querying or planning a query on a Delta table, it needs a certain amount of customization such that it can specify that I do not want all the metadata stored in Hive metastore rather, I want to use the metadata in a transaction log. Since before Spark 3.0 there were no API's to do the customizations, that's why the Oasis Delta Lake working on Spark 2.4 does not support Hive metastore defined tables because we could not do that customization. But with spark 3.0, we added all the necessary API's by working with the Spark community very closely, we added all the APIs, so that the Delta Lake data source can do this customization and therefore lead from Hive metastore such that given a table name, it can map it to just the location ignore all the other metadata from the Hive metastore, just read the location from that, go ahead and read the rest of the metadata from the transaction log in that location, and then plan the query based on that. This customization will come with Spark 3.0 which the timeframe would be some probably we can only guess 'cause Apache process takes it's little unpredictable for such a major release. It will probably land somewhere between June and July most likely and as soon as Apache 3.0 lands we will have Delta Lake release, most likely delta is 0.7.0. release on top of that. - Excellent, okay, I think we only had time for probably one more question, so I'm just gonna just leave it at that. The question is how does the connectors with Presto basically work when it comes to trying to read Delta Lake, right? Does it actually read the Delta log itself to figure things out? How does Presto Connect basically? - Very very good question. So there are two part answer to that. One, so the Presto support we have added in open source does not read the log, it rather reads this thing called Manifest files, again, details are present in online docs. What manifest files are, is that it's basically a bunch of text file that contains the names of the data files, the parquet data files that someone needs to read to give a full snapshot of the Delta table. So, the Delta Lake API's provide the command for generating these manifest files on Delta table, which can then be read by Presto to figure out what are the actual parquet files that needs to be read to put in the Delta table and then according to query. So Presto has inbuilt support for this manifest file. So it doesn't read the log. Databricks and Starburst work closely to build Delta Lake native connector that actually reads the log and I think they just re-used it on their enterprise platform. So that would actually read the log directly to bypass manifest file system in Starburst to read the log directly to find out what data to read. And hopefully at some point of time they will open (mumbles) - Perfect, well, hey, thanks very much, I apologize that we could not get in.
Info
Channel: Databricks
Views: 10,538
Rating: undefined out of 5
Keywords:
Id: 7ewmcdrylsA
Channel Id: undefined
Length: 54min 33sec (3273 seconds)
Published: Thu Apr 16 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.