Tech Talk | Using Delta as a Change Data Capture Source

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
- Perfect, okay. It's a pleasure to virtually meet everyone today. Thanks for joining this session. Just wanted to quickly introduce myself. As Karen said, I'm a Senior Solutions Architect here at Databricks, where I work with some of our largest customers here in the Bay Area. And really, what I wanted to talk about today, was a topic that a lot of my customers are asking me about in the field. And the topic at hand is how do we capture change data from a Delta table? Before we get too far into the weeds on that topic, lots to talk about on that today. I wanted to also welcome Danny Lee, who's my colleague at Databricks. he's a developer advocate here at Databricks. And he's run about 26 of these online meetups in the last week, so you're gonna hear my voice a little bit more today than his. He'll be popping up with some color commentary and also moderating the chat as well. So what are we speaking about today? So the topic of the talk is how do we capture change data from a Delta table? And we're first gonna begin with motivation behind this, okay? Why are we even here today? Then we're gonna look at three different architectural patterns of how we can actually achieve this using Delta. And in these patterns, we'll look at some slide based architectures. But we'll also have live demos, if the demo gods smile on us favorably today. We'll wrap that up with a summary and have lots of time for questions at the end, please get your questions in throughout the presentation. We may find good points to break in and answer some of those throughout the presentation as well, okay. So capturing change data from a Delta table. Like why is this an important subject? And I think it's really well summed up by a conversation that I had with my colleague yesterday. I told her that I was gonna do an online Delta tech talk on this subject and she said, "Paul, that's a really confusing topic." She said, "You're mixing concepts from two different worlds." And I completely understand what she means. So on one hand, Change Data Capture is a topic that enterprise architects have been speaking about for the past 30 years. And generally, it's in relation to large SQL stores, and perhaps in this data warehouse paradigm. And every technology has a way to do it, whether it's scraping bin logs or stored procedures or triggers. And you might be familiar with a schema like this that my SQL gives you where you get the cdc_tables that give you this change data from each of these tables. But Delta Lake operates up in the cloud, right? It's in the Cloud Data Lake paradigm. So we're really talking about Change Data Capture again? And the counter to my colleague was, it's kinda like what I really like about Delta and part of the appeal behind it. Many of the things that we know and love about data warehouses like ACID transactions and schema reinforcement and fine grained updates and deletes, we are reestablishing in this paradigm of the Cloud Data Lake. And personally, I think that Change Data Capture will be one that we look to capture in this setting as well. And this is just a really natural cycle of innovation and disruption, right? We were forced to innovate from the data warehouse, because data volumes are exploding and data variety was exploding. And the data no longer fit into these nice tabular structures. So we arrived at the Cloud Data Lake where we're using object storage, which is super cheap. We're using Managed Cloud Services that allows us to be really agile, and we have cloud elasticity. But that's the innovation. The disruption is we lose some of the things that we know and love, right? So the part of the appeal behind Delta for me, is reestablishing some of these good practices that we had in previous generations. So let me introduce the topic a little more formally here. Or at least as formally as a slide with a picture of a dog on it allows me to do. But perhaps you're a data engineer at your organization, and you love Delta Lake because you can mix Appends and Updates and Deletes, you can create these huge petabyte scale Delta Lake tables using Scala Spark computations, and have multiple consumers of this data all with ACID guarantee, right? So you love that, you're there. And then you get a call from your business lead, and they say, "You know, that Delta Lake table you created, every time a record is inserted or changes or is deleted from that table, can you send that downstream for me because we've got a business process that relies on this." And you're left very much like the dog in the photo here with a rubber gloves on. You're like, "Now what?" Okay, so that's really what we're here to talk about today. How do we detect those changes that are happening in our Delta Table, so we can propagate them downstream. So the first pattern that we're gonna look at is the so called Bronze-Silver-Gold propagation. And you might have heard this, if you subscribe to previous tech talks, but I'll give a brief synopsis of what this is. So there's an architecture that's grown up around Delta lake. It's called the Bronze-Silver-Gold architecture, or perhaps the medallion architecture. And essentially, what this is, is different areas of your lake that you separate for different qualities of data. So your Bronze area is an area of the lake where you collect the raw data coming in from its source. So perhaps you've got events coming into Kafka, and you might write them into a Delta Lake Table, and just keep on appending on to the bottom of that table. And you don't wanna perform any cleaning. You wanna capture data in its purest form so that no mistakes can creep into this data whatsoever. But then you want to actually apply some cleaning to it and perhaps apply a schema and do some filtering and create a Silver table, right? And this might be a table that data scientists are consuming. So they get something close to raw, but don't have to do all the boring stuff. And then finally, you might perform some aggregates to this data, produce these business level aggregates that multiple consumers will be really interested in. That's the Gold area. And so in each area of this lake, we are propagating data throughout, okay? And actually, this is a really good place to start talking about Change Data Capture, right? Because if we've got events that are being pumped into our Bronze Delta Lake table here, then we want a really efficient way to propagate them into our Silver Delta Lake table, okay? Having cleaned them. So specifically if we insert 1000 records into our Bronze table, Then we want to be able to just read those 1,000 records, clean them and write them to our Silver table. So how do we do this in Delta? So we can actually use an API, which possibly you all are familiar with, we know and love from Spark Structured Streaming, it's spark dot read stream. And that fully supports the Delta format here. And we can just simply point Spark dot read stream format, delta and point it at our underlying Delta store. So here, this is a Customer Bronze Delta table that stored in S3, right? So the first point here, if this table is Append Only, you can simply read this as a stream using Spark dot read stream. And the second point is you should totally use this pattern, because it's something that we really encourage and lots of customers do, and I'll give some of the reasons behind that. There was previous methods where you could point Spark read stream at an S3 directory and use wildcards and it would basically read it as a stream. However, anyone who's actually tried to do that in production realizes that soon, you're performing list bucket operations on millions and millions of files, which is very costly and just degrades in performance over time. So delta doesn't have that penalty, because if you subscribe to the last Delta Tech Talk, we keep this transaction log that essentially tells Delta exactly which files make up this Delta table. So we don't have to go back to S3 to do the list bucket, we already have that information. It's a super scalable way to read your Delta tables as a stream. So we've read all our Bronze table as a stream. We can perform our cleaning using all of our structured streaming APIs, which I'll show you in a moment. And then all we need to do now, is write that stream as format delta and write it into our Customer Silver Delta table, okay? So we're propagating from Bronze to Silver here. And the missing piece of the puzzle that we haven't talked about yet, is how do we ensure that every time that 1,000 records is inserted as a Bronze, I only read that 1,000 records versus the full petabyte worth of data that's in that table. And here, we borrow a concept from Structured Streaming called Checkpointing, right? It takes care of this. So you'll notice the second line here, we are using this checkpoint location. Again, we're just pointing to a location on S3, which is safe to do so on Databricks. And this is gonna take care of all of the semantics to do with managing offsets and just reading the data exactly once, okay? So this guarantees exactly once processing. So, let's have a look at the first live demo of today's session. And we're gonna have a look at exactly this pattern that we just described in the slide, which is how can we automatically propagate data from our Bronze Delta table into our Silver Delta table after applying some cleaning to it, okay? So the data that we're gonna be looking at today, and we'll share all these notebooks, there's gonna be a companion blog post to this webinar. So all of these will be shared as well. So we're gonna begin, and let me just make this writing a bit bigger for you. We're gonna begin by just generating some fake data here, okay? So there's 2 million records that we're gonna put into this table. Half of them say Seinfeld Jerry, and half of them say, David Larry, okay? So it looks something like this very duplicated data. This is our Delta table here. So we're just gonna begin by inserting this data into our Bronze table, okay? So we just simply use our data frame write API to insert this data in Append mode into our Bronze Delta Lake table. Then what we need to do, is to define the pipeline, which will automatically scan this table and propagate any of the new data to our Customer Silver table, having cleaned it. So I'm just gonna start my streaming job here while I talk through it. So this is very similar to the slides that we just saw. So firstly, we're just gonna use our helpful read stream API and point it at our Delta table. We're then gonna perform our cleaning of the data. So here we are just simply splitting Seinfeld Jerry into two columns, Jerry and Seinfeld. And then we're gonna write that data to our Customer Silver Delta table. And we should be able to see that we've processed some records here. Some records were initially processed. And we'll be able to see the results if we run this next cell here. So this is a visualization of the Customer Silver data set. So you can see that we've split that one column of Seinfeld Jerry into two, okay? So we've propagated data from Bronze to Silver here. Okay, let's run some quick diagnostics on this table. So let's have a look at this Silver Delta table. And we can see that in fact, yes, we have a million records corresponding to Jerry Seinfeld, and a million Larry David records, perfect. Okay, but let's really put ourselves to the test. Nothing like a live demo of a tester. So since we have defined the streaming pipeline, and it's still running now, that is automatically scanning our Bronze table and propagating it to Silver, we should be able to insert more records into our Bronze table and it be automatically picked up and propagated. So let's do exactly that. Let's generate a million more records that now say, Alain Benes in them, and write it into our Customer Bronze Delta table. And we should be able to see a spike appear in our chart. There we go. So if we look back at our streaming pipeline, we can see that we've processed a bunch of data. This spike is the data that I just inserted. And we should be able to see them automatically propagated, both in... Well, they should obviously be in the Bronze table here. So now we see in Bronze we have three different types of records and here is Elaine Benes, we should be able to see them in Silver as well. Okay, so here's our Customer Silver data set. And indeed, we can see that we now have Seinfeld, Benes and David automatically propagated there. So I'll just do that one more time for demonstration so we could choose our favorite. Seinfeld character here, Cosmo Kramer perhaps. And, and we could just insert those records here. We should be able to see a spike in our chart here. Okay, corresponding to the records that have been processed, and then they will be automatically propagated into our Silver Delta Lake table here, fantastic. So now we have four different records here. We have Cosmo, we have Seinfeld, Benes and David. So it's really cool, right? Because we could just be pumping in records into our Bronze table via Kafka, and we've got this automated pipeline that's just sucking records out of there, cleaning them and push them into Silver. And then people could be consuming that data straight away, right? So it's a really awesome way of quickly cleaning your records so that people can start consuming that data. So this is our first pattern of reading change data from a Delta table. I will need to mention one change that you can meet to this pipeline that many of our customers choose to do. So in the example that we just saw, the data was really duplicated. And you might be saying, "Paul, that's not a clean data set, right? Like, I don't want my users consuming that data." So what you might wanna do is add a deduplication step there as well. So what you can actually do and there'll be links to this in the blog post, is when you actually read from Bronze to Silver, you can perform some deduplication against the sync, so that you don't get all of those duplicates entering your Silver table. So that's pattern one, and I just wanted to check with Danny, any thoughts or comments or any questions coming through the line there? - Oh, no problem. We answered a bunch of the questions right away. Basically on the Q&A panel, considering like the read stream only works with the Delta format right away. One thing I did wanna clarify right away is that which you alluded to, but I want to call it out specifically, was that, yeah, if you were to try to do this, where you were running this on Parquet instead, what will end up happening is that you'll have one Parquet stream right, and the second Parquet stream will actually just fail to process because when you're using Parquet, you don't have those ACID transactions to protect the data. So hence the importance of why to do this process, to do this pattern that Paul's describing here, you really need to go ahead and utilize Delta. Even though it's using Parquet under the covers, that transaction log really does a lot. So yeah. I think that was the only clarifying statement here. - Fantastic. All right, so let's move on to pattern number two then. So we know that you don't just use Delta because you want to do Append Only pipelines, right? You love the fact that you can do updates against your delta tables, right? So what about if our table is being updated? So let's introduce this concept. So now let's assume that we have a we have a source of updates, which is perhaps coming from Kafka, right? And now we're streaming Inserts and Updates into our Delta table. And we want to be able to capture anytime we have a change and propagate it downstream somewhere. So for example, we might insert a record, we definitely want to capture that. We've already discussed that. We might update a record that was inserted 30 days ago. And we definitely want to be able to capture that and push it downstream. So this is kind of a classic Change Data Capture use case. We're not considering Deletes yet, just FYI. So I actually came across a really interesting example of this in the field, which I think will illuminate this use case. So I was working with an e-commerce website company here in the Bay Area. And they were using Delta to store a big feature store for their customers. So you can imagine each one of the records in this feature store was a different customer. And all of the attributes or columns in this Delta table with the different features associated with that customer. And every time one of the features changes, they want to automatically detect that and pass it to a model to score that customer so they can make better recommendations to them on their website. So that's a really natural Change Data Capture use case because we need to be scanning this feature store table and finding anytime anything's changed, right? So how would we do that in Delta. So we can still do that using our favorite read stream API. However, this time, we need to use what's called the ignoreChanges flag, okay? And I will explain exactly what that means. So using the ignoreChanges flag, will emit any file rewrites operation to the stream. That might sound cryptic, so I have a diagram for you here. So let's imagine that we have our feature store Delta table right here. And we run this arbitrary update command against my Delta table, or you could be using the Merge Syntax as well. And this Update command is gonna change the name to four Xs, where customer ID is 101. So what Delta will do under the hood, is use all this data skipping under the hood, to really efficiently find the file where customer 101 lives, right? Then it will perform the rewrite logic to swap out the name, and it will rewrite this file. And it will rewrite it as a different file, File Number 10. And write that into the transaction log and sunset the old file in the transaction log as well. And what ignoreChanges will do, is we will get the whole file admitted to us when we call a read stream on this Delta table. So if we just go back to my previous slide here, this might make a little bit more sense now, the changes that we get in our stream, will be a file level CDC. So they're guaranteed to include all the data that's changed, but it might be a superset of that. Meaning there might be extra records that weren't the one customer we've changed. In our case, I'm completely fine with a trade off. It's a really easy way to get change data out of Delta. And in this case, we just wanna rescore customers who have a changed feature and make sure they get a new recommendation. But if we're re-scoring some other customers as well, I'm not too concerned about that because their recommendation won't change, okay? So this is a really easy way to do it if you understand the trade offs. So again, let's have a look at a live demo of this scenario. So the scenario that we're gonna try and replicate here is we're gonna build off features store Delta table. We're gonna throw some updates at it and see whether we can automatically propagate them downstream having scored them. Okay, so let's have a look at this architecture here. So we're gonna use our favorite characters so Seinfeld here, or at least two of them. So Jerry Seinfeld and Larry David right here. So this time, the data frame, or the Delta table is gonna look like this, right? So it's got a few extra columns here, last purchase day and a customer ID. And we're gonna begin by, again, writing this into our feature store Delta table using our data frame write API, and we're just writing it into S3 here, as our feature store Delta table. We're then gonna define a model to actually score these customers and we're not gonna go too much into this model. All you need to know is a UDF, which means that essentially you give it a customer record and it will produce a numerical score for you. And now what we need to do is to define this pipeline that's able to automatically detect these changes, and score them and write them to our customers score Delta table. So let me start the streaming job while I talk through this. So again, we've seen a lot of this in the slide already, but we're gonna call our read stream API with the ignoreChanges flag and point it at our feature store. We're then going to apply our model UDF, which is the model that's gonna score the customer. And we're gonna create a new column with that. And then we're simply just gonna write this as Delta format again to our customers score table, okay? So we should be able to see, yep, some records have been processed through our streaming pipeline already. So let's have a look at the output here. So here is the customer-scored Delta table and we can see that it looks very similar to what we saw before. We've got two extra columns, which is the score of the customer and the score timestamp. Okay, and in this live demo session, we'd like to, again put ourselves to the test here. So let's throw an update at this feature store table and see whether it gets automatically propagated into our customer-scored Delta table, okay? So I'm gonna run this command right here. And those of you familiar with Delta API's, this is the scholar API that allows you to update a record or a series of records in a Delta table. So what I'm saying here is find customer zero, and update the last purchase date, right? In our feature store Delta table here. And when I do that, I should be able to see, if we scroll back up to our chart here, look, yeah, so we already have a spike in our stream, which means some data was propagated into our pipeline, right? So we have detected this change and we should be able to see our customer has been re-scored in our customer-scored data set. So we can see that now we have Jerry Seinfeld in here twice customer ID zero. And one of them was this minute, right? So in fact, we have been able to detect the changes in our Delta table, score it, and automatically propagate it downstream. The one caveat that, we've already discussed, but I just wanted to make sure that you're very clear on this point. So let's read this customer-scored data set again. And if you see what actually happened here, there are 31,000 records that were re-scored as part of the last update, okay? So we made one change and that triggered 31 re-scorings to a fraction of the second so it didn't really matter. So there was 2 million records in the Delta table in total, we re-scored 31,000 because of this update to the Delta table. Fantastic. So this is what I would call a really good pattern for really easily reading change data from a Delta table when you've got updates being applied to that delta table. I just wanted to take a break and check Danny. Any questions that I can answer or any comments that I should be aware of in the chat there? - Oh, actually, the quick call out here, right now, it actually had to do less with the pattern that you designed, because I think a lot of people are completely grokking the questions around that, but more about the internals and so to help clarify some of these things, just because it would take a long time, right? To clarify all all those things. What I did is I went ahead and put three YouTube links inside the chat for everybody on unpacking the transaction log if you wanna know more about how the Delta Lake transaction log works. Also about schema evolution enforcement that by default Delta Lake enforces the schema. And then just as Paul has been talking about, if you want to evolve the schema, you can, and then how that works, and we've put a link inside there as well. And finally also we're talking about updates and a precursor to Delete, how the internals of that work in Delta Lake as well. And we've included that link as well inside the chat. Now, for those who of you are using on YouTube Live as opposed to on Zoom, not a big deal. We're gonna be placing these links also in the description of the YouTube when the session is finished. So that was a that was the key call out for at least for these set of questions. - Thank you very much, Danny. - Of course. - Alright, so let's crack on with the third and final pattern here. And probably the title now makes sense. Can we do better? So last time, we saw how we can produce file level change streams by using the ignoreChanges flag. But can we actually get record level change streams and can we deal with Deletes. So that's pattern number three. And the first part of pattern number three, is you might call this a cheat pattern for the really simple use cases. Okay, so let me describe this. So let's say we have a source of Updates, Inserts and Deletes. Perhaps, again, this is coming from Kafka again. The first of the ways that we can achieve this record level chain change stream from a Delta table is to first write this into a Bronze table as Append. So even if a record is saying, update a particular record, we're gonna write that as an Append into our Bronze layer. We'll then subscribe to that table as a stream like in pattern one. And we will apply those Updates and Deletes and Inserts, how they're meant to be applied, right? So we'll apply them as those things and produce our Silver table. And the Silver table will be the one that people are consuming. Now the question we have here is how do we detect the changes that are happening in our Silver table? Well, in fact, because we have a very simple setup here, right? We've made the assumption that we've just got one source of Updates and Deletes and Inserts, we don't actually have to detect the changes, right? There's only one pipe coming into our Silver table, so we can simply redirect that pipe elsewhere as well. So we can send that data, we can do a second read stream on this Bronze table, and they're completely independent of each other using different checkpoint directories, and we can send that downstream. So the first way of reading record level change data from a Delta table is to kind of avoid it altogether. Which is kind of best way to solve anything. So this will work in really simple use cases where we have one source of Updates and Deletes and Inserts and we have very simple update logic happening in our Delta layer, right? If there's very complicated logic when we apply our updates, then we perhaps don't want to have to replicate that downstream, right? But if it's very simple, then that's less of a risk. But how do we do this in a more general pattern when we don't have just this one input source of Updates and Deletes. And we can actually borrow a trick from the data warehousing world. And we can introduce a change-log table. Many of you might be familiar with this. So what this allows us to do, let me speak through the architecture diagram here. So we have our source of Updates, Inserts and Deletes streaming in and we're gonna perform a transactional write to two different Delta tables here. So firstly, we're gonna apply the Updates, Inserts and Deletes to our main Delta table, the table that people will be consuming for and using for analytics. But then, we'll also perform a second write into our change-log table, and we'll write everything as an append, right? So you can imagine, over time, this change-log is very self descriptive, it's gonna be a record of all the changes that have been happening in our main Delta table. And because we're writing them as Appends, we can actually read this as a stream, like we've been talking about and perform a join against the main Delta table to retrieve the full record. So let's have a look at how we do that. So how can we begin by reading our streaming source and writing it to two different Delta tables in this manner? So the magic we use here for those of you familiar with the structured streaming world, is we use this concept of For Each Batch, okay? So Spark Structured Streaming operates in this micro batch format, and we can use this For Each Batch function or API to implement a custom function with what we want to do with that batch, okay? So this upsets the Delta Capture, CDC function is the thing that's gonna tell Delta or Structured Streaming rather, to write to two tables. I'll share the full code of how you actually implement that function in the blog post. But I wanted to really call this out diagrammatically because I think the the pictorial is a little more appropriate to 9 a.m. on a Thursday. So let's begin with the following diagram. So here's our main Delta table that has three columns; ID, some string and some other string, okay? And this is the table that all of our users are consuming. Then we have this source of updates that's coming in, and essentially, it's gonna change one of the columns in our main Delta table to oranges. So you can imagine if we apply this to this table, we're gonna have apples and oranges sprinkled throughout our main Delta table. These are all type updates. So what we're gonna do for each back function, the first thing we're gonna do is actually just drain our records out of our streaming source. And we're going to apply them as updates to our main Delta table. So here we have apples and oranges sprinkled through our table. But then we're also gonna write them as Inserts into our change-log and record the batch they happened in. So we can see that we are recording what is happening in our main Delta table in a separate table called change-log. And then all we need to do is read this change-log as a stream and join it with our main Delta table to assemble the full records and pass it to our downstream CDC consumer. So now we have these two records, the zero record and the second record here and these are the changes that happened in our Delta table. So this is how we can perform this record level CDC stream. I just wanted to talk a little bit about how you perform this bit here before we go into the live demo. So we're talking about the the second part of this structure here when we perform the join. So we're simply gonna do that by again using our read stream API, pointing at our change-log here. And then we're gonna join it onto our main table, which is our delta table that contains all of the data to assemble the full record here using the ID, which is our unique identifier. And there's a really awesome property of Delta that we're using here, which is so nuanced, but so useful. Which is when we perform this kind of stream to static join, the static side will actually automatically refresh with changing data under the hood. So this main table is actually changing under the hood, right? We're applying updates to it and we don't need to do anything every time this executes in each batch of the structure of streaming, it will read the latest data, right? Which doesn't happen in Parquet. And it's a really, really nice feature of Delta, which you get for completely free. So let's have a look at a demo of this architecture. So here's what we're gonna try and do. So this is the architecture that we had on the slide before, we have a source of updates. Here, it's being captured in a Delta table. This could be Kafka or Kinesis, or Event Hubs or anything. We're gonna read from here and we're gonna automatically apply our updates to our main Delta table and simultaneously write to our change-log. Then we're gonna read our change log as a stream and join it back to our main Delta table and send it downstream somewhere. So let's begin by making some fake data and writing it into our Delta table, okay? So our main Delta table is gonna look like this, okay? So it's as you saw in the slides. We have three columns here and everything says apples and cakes, it's all a happy world. Here's our update table that we're gonna use to update that one. So on every second ID, it says oranges. So you can imagine if we apply this to our main Delta table, it's gonna be apples and oranges sprinkled amongst the data set, okay? Now what we're gonna do is define this part of the streaming pipeline that performs this simultaneous line. So here is my alluded to function that will be operated on as part of the For Each Batch. You can check this out when we provide the links. But what I wanted to do, was just to start my streaming pipeline here. So again, what we're doing here is just reading our update table with source of our updates to the stream, and then we are calling off For Each Batch with our custom function here to write to this main Delta table and our change-log. Now stream is just initializing right now it should be processing its data. Finishing up any second now. So here we go. So we can see that our data has actually been processed through this part of the pipeline. And we should be able to see to start with, that our main Delta table is being updated, okay? So we should be able to see apples and oranges sprinkled through this data set. There we go. So every even ID has been changed to oranges. So we have indeed, applied the updates to this Delta table. And the second thing we need to check, is are we actually logging the changes to our change-log table? So let's query our change-log table here. Okay, cool. So you can see every second ID is being logged to the change-log, and it's recording that there was a change to this record in batch ID zero, okay? Now, what we need to do is assemble the second part of the pipeline, which is read the change-log as a stream and join it to our main Delta table. So, let me start that part of the processing while I talk you through the code here. So, we are reading as a stream here. We are reading this change log as a stream. We are simply joining it onto our main Delta table. This is a stream to static join. And then we are writing it out to our destination table, a CDC consumer if you like, the change stream full records, okay? And we've already sucked some data into this pipeline and processed it. So we should be able to query this table, the change stream for records, and see what the CDC consumer will be getting. So indeed, we are sending every second ID to the downstream consumer. And we're sending the full record that was changed in the Delta table. So we have actually managed to perform some record levels CDC. So, again, the structure of these demos, let's put ourselves to the test again. And we should be able to write more data into our update table here for it to be propagated through the pipeline. So let's just throw some data in there. And the data that we're gonna throw in there is every third ID we're gonna change to pair's now, right? So let's have a look at that update set here. So every third ID, we want to change some string to pairs, okay? So we've written that into our update table. Since my streaming pipelines are running, it should be getting propagated through my pipeline here. So let's just have a quick look at my pipeline. So here's my streaming pipeline. And we see that we have a spike in my stream. That means I read some data and some data was processed. So that's the first part of the pipeline. So that looks good. And then the second part of the pipeline, I have a second spike, which means the data has been propagated through there. So if I go down here, we should be able to see that some more data has arrived in our downstream location. And here we see it. So we see that we have data corresponding to Batch ID 1, and some string has now been changed to pears, okay? So this is what we can send downstream. So we were actually able to implement a pipeline which captures changes that are happening in our main Delta table and sends them downstream and we're doing that at a record level which is fantastic. So the last comment I'll make on this, is we haven't talked about Deletes, right? But this can really easily be extended to capture Deletes as well. So let's say that one of our records in our streaming source was a Delete, it says "Delete record 10." So in this pattern, what we would do, is go ahead and apply that Delete to our Delta table, but we would then also log that in our change-log, that a Delete happened. And then we would just modify our join a little bit to do a left join here, this is the left side of the join, and we can send this downstream. So actually, this change-log pattern is really flexible and allows us to capture all types of changes that are happening in our Delta table, which is one reason that I really like this pattern. And Danny, I'll just stop there and see if there's any questions or comments on that third part. - No, actually right now a lot of the questions are sort of revolving around how to do the orchestration around this. And so for example, this is a really cool pattern, they're seeing that you do the Updates and Deletes. So I just did wanna do two little call outs. One is that in terms of orchestration, it really is up to you on terms of your particular environment. So if you're using, for example, if you're running it locally, you could always do like a cron job, for example, like good old fashioned cron jobs or Jenkins. If you are going ahead and doing things in the... You have a more scheduler type of design then perhaps a system like Apache Airflow, or to be suited to the orchestration or if you're in Azure, Azure Data Factory, The other call out that I usually do is that of course, if you're using Databricks, you would use Databricks Jobs basically to align or flow, the way to run this. The other question basically, is what's the best way to do this as a nightly batch rather than a stream? And so the quick call that I would say is that you absolutely can run this as a batch if you wanted to, in terms of like just processing as a change set. And in fact, what's great about Paul's example here is that you just change read stream to read, right? So in other words, to go from batch to stream. What's great about Spark in general, and then with Delta Lake underneath the covers is that you just switch from read stream to read in order to go from streaming to batch. Saying this, a really common reason or popular reason on why you wanna keep this as a stream anyways, is because of the fact that if you need to orchestrate many, many different batch jobs, instead of having like 50 or 70 different batch jobs, you could potentially just run this as a stream instead. So in other words, basically, a bunch of mini-batch jobs. And so for example, customers like Comcast, which talked about how they did this back in Spark Summit, San Francisco 2019, in the keynote, they went from 84 different jobs down to three. And because they were using Delta Lake underneath the covers, they were able to reduce their batch jobs from 84 batch jobs to under three streaming jobs. And so it was operationally much simpler to be able to maintain and operate this type of design. And so, so when you're looking or thinking about streaming, it's not just about going in specifically the context of streaming per se. It is also about managing or simplifying the management of many batch jobs as well. So that's an important call out that I figured that should be called out here that as of why, when you're looking at a session like this one in terms of the CDC, even if you initially wanna start as a batch, and again, just switch from read stream to read, right? You may wanna seriously consider the context of staying as a stream job anyways. Just because of the potential operational benefits as well. - Great point, and just that one thing I'll add to that, another really common pattern here is using a Trigger Once philosophy that basically turns a read stream into a batch execution, and allows you to kind of then go backwards. So Trigger Once will essentially, every time it goes to the streaming source, it will just read all the records that are in there and process them in batch, right? So this is a really common pattern as well. Fantastic, so coming up on the last 10 minutes. So I just wanna wrap this up in a summary. What have we seen today? So we've looked at three different patterns of reading change data from a Delta table. The first one of these is what I call an Append Only pipeline. Really commonly used in this Bronze-Silver-Gold propagation across your Lake. The second was what I like to call "The easy button" For the update pipelines using the ignoreChanges flag. And remember what happens here, we get this file level CDC stream, but it's really, really easy to implement. And then the third pattern here is this change-log paradigm, borrowing a concept from data warehousing. And this actually allows us to capture this record level change stream, like you would perhaps get in a data warehouse. So I'd love to hear your feedback on the session. As I've said throughout, there's gonna be a companion blog post with links to all of the notebooks and code here, and all the all the data which is generated in those notebooks as well. So you'll be able to check this out on your own. So I really appreciate you spending the time with us this morning. And Danny, if there's any more questions, happy to take it, or we can go from there. - Absolutely, so we've got about five minutes left, because I'd like to always finish our webinars a little early so that way people can prep for the next set of meetings since all of our meetings online these days, but we should be able to answer most of them, so I'll answer the first one, okay? The first question is if we have any questions from the webinar per se, from this webinar, and how can we follow up with you after the fact. So that's what we've been talking about our YouTube channel. So youtube.com/databricks. This video actually is propped there. And so we actually do go out of our way to answer questions also on the YouTube channel itself. So pop right into the comments, and we'll do our best to answer questions. It usually takes us about 48 hours to answer them, because we have a lot of questions with a lot of videos these days, so we don't wanna be able to promise you immediate answers, but we do review them on a pretty regular basis, okay? And so hopefully, that will help you jumpstart this process, okay? All right, the next question is basically... Oh, here's a good one. And Paul, you can answer it or I can answer it. If orchestration steps are dependent on each other. In other words, step A has to be complete before step B will have to start. Basically, it's like you have these dependencies on your jobs between like your Gold table versus your Silver table. What do you think is the best approach? And I realize there multiple answers, so why don't we just start that discussion right now? - Yeah, sure, a really common question we get. And if you want sort of a system to be able to run arbitrarily complex DAGs, a lot of dependencies, which is what you're describing there. We really recommend the integrations that Spark and Databricks has with technologies like Apache Airflow or Azure Data Factory, those are really good options for that which provide that kind of arbitrarily complex DAG manipulation and execution. - Exactly, yeah, I was just about to add. So hence the reason for me also calling out Apache Airflow, that's a very common one for folks to use in the open source land. But the context is that it's as simple or as complicated as you want it to be when it comes to tracking those dependencies. Because for example, you could always just create a shell script that places in like, basically, case statements in your cron job. It could always start with that, right? And it can go into, for example, when it comes to talk about Databricks Notebooks, you could potentially just go ahead and actually build workflows within your notebooks that have that stack. But then exactly as Paul put it out, right? Whether it's an Azure Data Factory style, or if it's Apache Airflow or whatever else, the point is that those DAX get more and more complicated and usually when we involve those latter ones, it's not just because of the dependencies between Silver and Gold, but because the dependencies with other systems that you're orchestrating way beyond like in terms of your own system, so partners like Informatica or partners like Streamsets, which also talk to Delta Lake, they also technically, will get involved as well. So again, there's a lot of really good ways to do it. I don't think there's any wrong way per se, it's just more a matter of understanding what is the level of complexity you actually need. And then basically designing for that, because in the end when it comes to this type of orchestration, if you were able to do it with Spark, you are able to do with Delta Lake, because Delta Lake has an underlying storage layer. So whatever you were trying to do before with Spark, it's pretty much applicable here as well, okay? All right. Oh, and then we got some quick call outs saying they really appreciate the session, Paul, I think they love you. So good, I can now quit now. No, I'm joking. But yes, the PowerPoints and the code will be available. So two quick call outs. We actually put all the code and the Notebooks and the PowerPoints directly in the Databricks Tech talks GitHub, which is also linked in the YouTube channel, okay? And so we put them all there. And then just as Paul alluded to multiple times, we are actually currently writing a blog that provides a lot more of the details that we actually wanna go for much deeper than just what we're able to do in a 50-55 minute Tech Talk. So we will be following with that, with additional information linked to this video, but also link to the tech talks, GitHub repo, and link which contains the code and the Notebooks, okay? All right, cool. I think the final question I wanna tackle here before we head back to Karen, your loading to the Trigger Once, Paul, and so basically, if you change the read stream to read, this is going back to the stream to Apache, can you still use the checkpoints or do you have to track the watermarks yourself? And great question. - Yeah, so that's if you use the just the read method, then you don't get to use the checkpoints. So you have to track the watermarks yourself. That's the real benefit of using the Trigger Once, is that you, you can execute in like a batch mode, but still take advantage of all of those nice things in structured streaming, like the checkpointing. And the other thing to say, is that it allows you to evolve in the future. So if you wanna move from batch to streaming with a five minute or 32nd trigger interval, then it's really easy to change to do. - Excellent and that's exactly... By the way, just to add to Paul's point. That's precisely what what I meant by simplifying your orchestration of batch jobs to streaming because the idea is that you have things like checkpoints and watermarks that are included in streaming that are not included in batch that you basically can take advantage of for the purpose of orchestration, okay? And cool, with that, I did wanna finish it up. Thank you very much for the great questions. Thank you, Paul, for an amazingly awesome session. And Karen, back to you to wrap it up and I appreciate your time. - Thanks, Danny, thanks Paul for your presentation and thanks everyone for participating and asking all of your questions and comments really helps make the presentation.
Info
Channel: Databricks
Views: 13,380
Rating: undefined out of 5
Keywords:
Id: 7y0AAQ6qX5w
Channel Id: undefined
Length: 52min 42sec (3162 seconds)
Published: Thu Apr 30 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.