- Perfect, okay. It's a pleasure to virtually
meet everyone today. Thanks for joining this session. Just wanted to quickly introduce myself. As Karen said, I'm a Senior Solutions
Architect here at Databricks, where I work with some
of our largest customers here in the Bay Area. And really, what I wanted
to talk about today, was a topic that a lot of my customers are asking me about in the field. And the topic at hand is how do we capture change
data from a Delta table? Before we get too far into
the weeds on that topic, lots to talk about on that today. I wanted to also welcome Danny Lee, who's my colleague at Databricks. he's a developer advocate
here at Databricks. And he's run about 26
of these online meetups in the last week, so you're gonna hear my voice a little bit more today than his. He'll be popping up with
some color commentary and also moderating the chat as well. So what are we speaking about today? So the topic of the talk is how do we capture change
data from a Delta table? And we're first gonna begin with motivation behind this, okay? Why are we even here today? Then we're gonna look at three different architectural patterns of how we can actually
achieve this using Delta. And in these patterns, we'll look at some slide
based architectures. But we'll also have live demos, if the demo gods smile
on us favorably today. We'll wrap that up with a summary and have lots of time
for questions at the end, please get your questions in
throughout the presentation. We may find good points to break in and answer some of those throughout the presentation as well, okay. So capturing change
data from a Delta table. Like why is this an important subject? And I think it's really well summed up by a conversation that I had
with my colleague yesterday. I told her that I was gonna
do an online Delta tech talk on this subject and she said, "Paul, that's a really confusing topic." She said, "You're mixing concepts from two different worlds." And I completely
understand what she means. So on one hand, Change Data Capture is a topic that enterprise architects have been speaking about
for the past 30 years. And generally, it's in
relation to large SQL stores, and perhaps in this
data warehouse paradigm. And every technology has a way to do it, whether it's scraping bin logs or stored procedures or triggers. And you might be familiar
with a schema like this that my SQL gives you where you get the cdc_tables
that give you this change data from each of these tables. But Delta Lake operates
up in the cloud, right? It's in the Cloud Data Lake paradigm. So we're really talking about
Change Data Capture again? And the counter to my colleague was, it's kinda like what
I really like about Delta and part of the appeal behind it. Many of the things that we know and love about data warehouses
like ACID transactions and schema reinforcement and fine grained updates and deletes, we are reestablishing in this paradigm of the Cloud Data Lake. And personally, I think
that Change Data Capture will be one that we look to
capture in this setting as well. And this is just a really natural cycle of innovation and disruption, right? We were forced to innovate
from the data warehouse, because data volumes are exploding and data variety was exploding. And the data no longer fit into these nice tabular structures. So we arrived at the Cloud Data Lake where we're using object
storage, which is super cheap. We're using Managed Cloud Services that allows us to be really agile, and we have cloud elasticity. But that's the innovation. The disruption is we
lose some of the things that we know and love, right? So the part of the appeal
behind Delta for me, is reestablishing some
of these good practices that we had in previous generations. So let me introduce the topic
a little more formally here. Or at least as formally as a slide with a picture of
a dog on it allows me to do. But perhaps you're a data
engineer at your organization, and you love Delta Lake because you can mix Appends
and Updates and Deletes, you can create these huge
petabyte scale Delta Lake tables using Scala Spark computations, and have multiple consumers of this data all with ACID guarantee, right? So you love that, you're there. And then you get a call
from your business lead, and they say, "You know, that
Delta Lake table you created, every time a record is inserted or changes or is deleted from that table, can you send that downstream for me because we've got a business
process that relies on this." And you're left very much
like the dog in the photo here with a rubber gloves on. You're like, "Now what?" Okay, so that's really what
we're here to talk about today. How do we detect those changes that are happening in our Delta Table, so we can propagate them downstream. So the first pattern
that we're gonna look at is the so called
Bronze-Silver-Gold propagation. And you might have heard this, if you subscribe to previous tech talks, but I'll give a brief
synopsis of what this is. So there's an architecture that's grown up around Delta lake. It's called the
Bronze-Silver-Gold architecture, or perhaps the medallion architecture. And essentially, what this is, is different areas of your lake that you separate for
different qualities of data. So your Bronze area is an area of the lake where
you collect the raw data coming in from its source. So perhaps you've got
events coming into Kafka, and you might write them
into a Delta Lake Table, and just keep on appending on
to the bottom of that table. And you don't wanna perform any cleaning. You wanna capture data in its purest form so that no mistakes can creep
into this data whatsoever. But then you want to actually
apply some cleaning to it and perhaps apply a schema
and do some filtering and create a Silver table, right? And this might be a table that data scientists are consuming. So they get something close to raw, but don't have to do all the boring stuff. And then finally, you might perform some
aggregates to this data, produce these business level aggregates that multiple consumers will
be really interested in. That's the Gold area. And so in each area of this lake, we are propagating data throughout, okay? And actually, this is a really good place to start talking about
Change Data Capture, right? Because if we've got events that are being pumped into our
Bronze Delta Lake table here, then we want a really
efficient way to propagate them into our Silver Delta Lake table, okay? Having cleaned them. So specifically if we insert 1000 records into our Bronze table, Then we want to be able to
just read those 1,000 records, clean them and write
them to our Silver table. So how do we do this in Delta? So we can actually use an API, which possibly you all are familiar with, we know and love from
Spark Structured Streaming, it's spark dot read stream. And that fully supports
the Delta format here. And we can just simply point Spark dot read stream format, delta and point it at our
underlying Delta store. So here, this is a
Customer Bronze Delta table that stored in S3, right? So the first point here, if this table is Append Only, you can simply read this as a stream using Spark dot read stream. And the second point is you
should totally use this pattern, because it's something
that we really encourage and lots of customers do, and I'll give some of
the reasons behind that. There was previous methods where you could point Spark
read stream at an S3 directory and use wildcards and it would basically
read it as a stream. However, anyone who's
actually tried to do that in production realizes that soon, you're performing list bucket operations on millions and millions of files, which is very costly and just degrades in
performance over time. So delta doesn't have that penalty, because if you subscribe to
the last Delta Tech Talk, we keep this transaction log that essentially tells Delta exactly which files make
up this Delta table. So we don't have to go back
to S3 to do the list bucket, we already have that information. It's a super scalable way to read your Delta tables as a stream. So we've read all our
Bronze table as a stream. We can perform our cleaning using all of our
structured streaming APIs, which I'll show you in a moment. And then all we need to do now, is write that stream as format delta and write it into our Customer
Silver Delta table, okay? So we're propagating from
Bronze to Silver here. And the missing piece of the puzzle that we haven't talked about yet, is how do we ensure that every time that 1,000 records is inserted as a Bronze, I only read that 1,000 records versus the full petabyte worth
of data that's in that table. And here, we borrow a concept from Structured Streaming
called Checkpointing, right? It takes care of this. So you'll notice the second line here, we are using this checkpoint location. Again, we're just pointing
to a location on S3, which is safe to do so on Databricks. And this is gonna take care
of all of the semantics to do with managing offsets and just reading the
data exactly once, okay? So this guarantees
exactly once processing. So, let's have a look
at the first live demo of today's session. And we're gonna have a look
at exactly this pattern that we just described in the slide, which is how can we
automatically propagate data from our Bronze Delta table
into our Silver Delta table after applying some cleaning to it, okay? So the data that we're
gonna be looking at today, and we'll share all these notebooks, there's gonna be a companion
blog post to this webinar. So all of these will be shared as well. So we're gonna begin, and let me just make this
writing a bit bigger for you. We're gonna begin by just generating some
fake data here, okay? So there's 2 million records that we're gonna put into this table. Half of them say Seinfeld Jerry, and half of them say, David Larry, okay? So it looks something like
this very duplicated data. This is our Delta table here. So we're just gonna begin by inserting this data into
our Bronze table, okay? So we just simply use
our data frame write API to insert this data in Append mode into our Bronze Delta Lake table. Then what we need to do,
is to define the pipeline, which will automatically scan this table and propagate any of the new data to our Customer Silver
table, having cleaned it. So I'm just gonna start
my streaming job here while I talk through it. So this is very similar to
the slides that we just saw. So firstly, we're just gonna
use our helpful read stream API and point it at our Delta table. We're then gonna perform
our cleaning of the data. So here we are just simply
splitting Seinfeld Jerry into two columns, Jerry and Seinfeld. And then we're gonna write that data to our Customer Silver Delta table. And we should be able to see that we've processed some records here. Some records were initially processed. And we'll be able to see the results if we run this next cell here. So this is a visualization of
the Customer Silver data set. So you can see that we've split that one column of Seinfeld
Jerry into two, okay? So we've propagated data
from Bronze to Silver here. Okay, let's run some quick
diagnostics on this table. So let's have a look at
this Silver Delta table. And we can see that in fact, yes, we have a million records
corresponding to Jerry Seinfeld, and a million Larry
David records, perfect. Okay, but let's really
put ourselves to the test. Nothing like a live demo of a tester. So since we have defined
the streaming pipeline, and it's still running now, that is automatically
scanning our Bronze table and propagating it to Silver, we should be able to insert more records into our Bronze table and it be automatically
picked up and propagated. So let's do exactly that. Let's generate a million more records that now say, Alain Benes in them, and write it into our
Customer Bronze Delta table. And we should be able to see
a spike appear in our chart. There we go. So if we look back at
our streaming pipeline, we can see that we've
processed a bunch of data. This spike is the data
that I just inserted. And we should be able to see them automatically propagated, both in... Well, they should obviously
be in the Bronze table here. So now we see in Bronze we have three different types of records and here is Elaine Benes, we should be able to see
them in Silver as well. Okay, so here's our
Customer Silver data set. And indeed, we can see that we now have Seinfeld, Benes and David automatically propagated there. So I'll just do that one
more time for demonstration so we could choose our favorite. Seinfeld character here,
Cosmo Kramer perhaps. And, and we could just
insert those records here. We should be able to see
a spike in our chart here. Okay, corresponding to the
records that have been processed, and then they will be
automatically propagated into our Silver Delta Lake
table here, fantastic. So now we have four
different records here. We have Cosmo, we have
Seinfeld, Benes and David. So it's really cool, right? Because we could just
be pumping in records into our Bronze table via Kafka, and we've got this automated pipeline that's just sucking records out of there, cleaning them and push them into Silver. And then people could
be consuming that data straight away, right? So it's a really awesome way of quickly cleaning your records so that people can start
consuming that data. So this is our first pattern of reading change data from a Delta table. I will need to mention one change that you can meet to this pipeline that many of our customers choose to do. So in the example that we just saw, the data was really duplicated. And you might be saying, "Paul, that's not a clean data set, right? Like, I don't want my
users consuming that data." So what you might wanna do is add a deduplication step there as well. So what you can actually do and there'll be links to
this in the blog post, is when you actually read
from Bronze to Silver, you can perform some
deduplication against the sync, so that you don't get
all of those duplicates entering your Silver table. So that's pattern one, and I just wanted to check with Danny, any thoughts or comments or any questions coming through the line there? - Oh, no problem. We answered a bunch of
the questions right away. Basically on the Q&A panel, considering like the read stream only works with the
Delta format right away. One thing I did wanna clarify right away is that which you alluded to, but I want to call it out specifically, was that, yeah, if you were to try to do this, where you were running
this on Parquet instead, what will end up happening is that you'll have one
Parquet stream right, and the second Parquet stream will actually just fail to process because when you're using Parquet, you don't have those ACID
transactions to protect the data. So hence the importance
of why to do this process, to do this pattern that
Paul's describing here, you really need to go
ahead and utilize Delta. Even though it's using
Parquet under the covers, that transaction log really does a lot. So yeah. I think that was the only
clarifying statement here. - Fantastic. All right, so let's move on
to pattern number two then. So we know that you don't just use Delta because you want to do
Append Only pipelines, right? You love the fact that you can do updates against your delta tables, right? So what about if our
table is being updated? So let's introduce this concept. So now let's assume that we have a we have
a source of updates, which is perhaps coming from Kafka, right? And now we're streaming
Inserts and Updates into our Delta table. And we want to be able to
capture anytime we have a change and propagate it downstream somewhere. So for example, we might insert a record, we definitely want to capture that. We've already discussed that. We might update a record that
was inserted 30 days ago. And we definitely want to
be able to capture that and push it downstream. So this is kind of a classic
Change Data Capture use case. We're not considering
Deletes yet, just FYI. So I actually came across a really interesting example
of this in the field, which I think will
illuminate this use case. So I was working with an
e-commerce website company here in the Bay Area. And they were using Delta to store a big feature
store for their customers. So you can imagine each one of the records
in this feature store was a different customer. And all of the attributes or
columns in this Delta table with the different features
associated with that customer. And every time one of
the features changes, they want to automatically detect that and pass it to a model
to score that customer so they can make better
recommendations to them on their website. So that's a really natural
Change Data Capture use case because we need to be scanning
this feature store table and finding anytime
anything's changed, right? So how would we do that in Delta. So we can still do that using
our favorite read stream API. However, this time, we need to use what's called
the ignoreChanges flag, okay? And I will explain
exactly what that means. So using the ignoreChanges flag, will emit any file rewrites
operation to the stream. That might sound cryptic, so I have a diagram for you here. So let's imagine that we have our feature
store Delta table right here. And we run this arbitrary update command against my Delta table, or you could be using
the Merge Syntax as well. And this Update command
is gonna change the name to four Xs, where customer ID is 101. So what Delta will do under the hood, is use all this data
skipping under the hood, to really efficiently find the file where customer 101 lives, right? Then it will perform the rewrite logic to swap out the name, and it will rewrite this file. And it will rewrite it as a
different file, File Number 10. And write that into the transaction log and sunset the old file in
the transaction log as well. And what ignoreChanges will do, is we will get the whole
file admitted to us when we call a read stream
on this Delta table. So if we just go back to
my previous slide here, this might make a little
bit more sense now, the changes that we get in our stream, will be a file level CDC. So they're guaranteed to include all the data that's changed, but it might be a superset of that. Meaning there might be extra records that weren't the one
customer we've changed. In our case, I'm completely
fine with a trade off. It's a really easy way to
get change data out of Delta. And in this case, we just
wanna rescore customers who have a changed feature and make sure they get
a new recommendation. But if we're re-scoring some
other customers as well, I'm not too concerned about that because their recommendation
won't change, okay? So this is a really easy way to do it if you understand the trade offs. So again, let's have a look at
a live demo of this scenario. So the scenario that we're
gonna try and replicate here is we're gonna build off
features store Delta table. We're gonna throw some updates at it and see whether we can
automatically propagate them downstream having scored them. Okay, so let's have a look
at this architecture here. So we're gonna use our favorite
characters so Seinfeld here, or at least two of them. So Jerry Seinfeld and
Larry David right here. So this time, the data
frame, or the Delta table is gonna look like this, right? So it's got a few extra columns here, last purchase day and a customer ID. And we're gonna begin
by, again, writing this into our feature store Delta table using our data frame write API, and we're just writing it into S3 here, as our feature store Delta table. We're then gonna define a model to actually score these customers and we're not gonna go
too much into this model. All you need to know is a UDF, which means that essentially
you give it a customer record and it will produce a
numerical score for you. And now what we need to do is to define this pipeline that's able to automatically
detect these changes, and score them and write them to our
customers score Delta table. So let me start the streaming
job while I talk through this. So again, we've seen a lot of this in the slide already, but we're gonna call our read stream API with the ignoreChanges flag and point it at our feature store. We're then going to apply our model UDF, which is the model that's
gonna score the customer. And we're gonna create
a new column with that. And then we're simply
just gonna write this as Delta format again to our
customers score table, okay? So we should be able to see, yep, some records have been processed through our streaming pipeline already. So let's have a look at the output here. So here is the customer-scored Delta table and we can see that it looks very similar to what we saw before. We've got two extra columns, which is the score of the
customer and the score timestamp. Okay, and in this live demo session, we'd like to, again put
ourselves to the test here. So let's throw an update
at this feature store table and see whether it gets
automatically propagated into our customer-scored
Delta table, okay? So I'm gonna run this command right here. And those of you familiar
with Delta API's, this is the scholar API that allows you to update a record or a series of records in a Delta table. So what I'm saying here
is find customer zero, and update the last purchase date, right? In our feature store Delta table here. And when I do that, I should be able to see, if we scroll back up to our chart here, look, yeah, so we already
have a spike in our stream, which means some data was propagated into our pipeline, right? So we have detected this change and we should be able to see our customer has been re-scored in our customer-scored data set. So we can see that now we have
Jerry Seinfeld in here twice customer ID zero. And one of them was this minute, right? So in fact, we have been
able to detect the changes in our Delta table, score it, and automatically propagate it downstream. The one caveat that,
we've already discussed, but I just wanted to make sure that you're very clear on this point. So let's read this
customer-scored data set again. And if you see what
actually happened here, there are 31,000 records
that were re-scored as part of the last update, okay? So we made one change and
that triggered 31 re-scorings to a fraction of the second
so it didn't really matter. So there was 2 million records
in the Delta table in total, we re-scored 31,000 because of this update to the Delta table. Fantastic. So this is what I would
call a really good pattern for really easily reading
change data from a Delta table when you've got updates being
applied to that delta table. I just wanted to take a
break and check Danny. Any questions that I can answer or any comments that I should be aware of in the chat there? - Oh, actually, the quick
call out here, right now, it actually had to do less with the pattern that you designed, because I think a lot of people are completely grokking
the questions around that, but more about the internals and so to help clarify
some of these things, just because it would
take a long time, right? To clarify all all those things. What I did is I went ahead
and put three YouTube links inside the chat for everybody on unpacking the transaction log if you wanna know more about how the Delta Lake
transaction log works. Also about schema evolution enforcement that by default Delta
Lake enforces the schema. And then just as Paul
has been talking about, if you want to evolve the schema, you can, and then how that works, and we've put a link inside there as well. And finally also we're
talking about updates and a precursor to Delete, how the internals of that
work in Delta Lake as well. And we've included that link
as well inside the chat. Now, for those who of you
are using on YouTube Live as opposed to on Zoom, not a big deal. We're gonna be placing these links also in the description of the YouTube when the session is finished. So that was a that was the key call out for at least for these set of questions. - Thank you very much, Danny. - Of course. - Alright, so let's crack on with the third and final pattern here. And probably the title now makes sense. Can we do better? So last time, we saw how we can produce
file level change streams by using the ignoreChanges flag. But can we actually get
record level change streams and can we deal with Deletes. So that's pattern number three. And the first part of
pattern number three, is you might call this a cheat pattern for the really simple use cases. Okay, so let me describe this. So let's say we have a source of Updates,
Inserts and Deletes. Perhaps, again, this is
coming from Kafka again. The first of the ways that we can achieve this record
level chain change stream from a Delta table is to first write this into
a Bronze table as Append. So even if a record is saying, update a particular record, we're gonna write that as an
Append into our Bronze layer. We'll then subscribe to
that table as a stream like in pattern one. And we will apply those Updates
and Deletes and Inserts, how they're meant to be applied, right? So we'll apply them as those things and produce our Silver table. And the Silver table will be the one that people are consuming. Now the question we have here is how do we detect the
changes that are happening in our Silver table? Well, in fact, because we have a very
simple setup here, right? We've made the assumption that we've just got one source of Updates and Deletes and Inserts, we don't actually have to
detect the changes, right? There's only one pipe coming
into our Silver table, so we can simply redirect
that pipe elsewhere as well. So we can send that data, we can do a second read
stream on this Bronze table, and they're completely
independent of each other using different checkpoint directories, and we can send that downstream. So the first way of reading
record level change data from a Delta table is to
kind of avoid it altogether. Which is kind of best
way to solve anything. So this will work in
really simple use cases where we have one source of
Updates and Deletes and Inserts and we have very simple
update logic happening in our Delta layer, right? If there's very complicated
logic when we apply our updates, then we perhaps don't want to have to replicate
that downstream, right? But if it's very simple,
then that's less of a risk. But how do we do this in
a more general pattern when we don't have just
this one input source of Updates and Deletes. And we can actually borrow a trick from the data warehousing world. And we can introduce a change-log table. Many of you might be familiar with this. So what this allows us to do, let me speak through the
architecture diagram here. So we have our source of Updates, Inserts and
Deletes streaming in and we're gonna perform
a transactional write to two different Delta tables here. So firstly, we're gonna apply the Updates, Inserts and
Deletes to our main Delta table, the table that people
will be consuming for and using for analytics. But then, we'll also
perform a second write into our change-log table, and we'll write everything
as an append, right? So you can imagine, over time, this change-log is very self descriptive, it's gonna be a record of all the changes that have been happening
in our main Delta table. And because we're writing them as Appends, we can actually read this as a stream, like we've been talking about and perform a join against
the main Delta table to retrieve the full record. So let's have a look at how we do that. So how can we begin by
reading our streaming source and writing it to two different
Delta tables in this manner? So the magic we use here for those of you familiar with the structured streaming world, is we use this concept
of For Each Batch, okay? So Spark Structured Streaming operates in this micro batch format, and we can use this For
Each Batch function or API to implement a custom function with what we want to do
with that batch, okay? So this upsets the Delta
Capture, CDC function is the thing that's gonna tell Delta or Structured Streaming
rather, to write to two tables. I'll share the full code of how you actually
implement that function in the blog post. But I wanted to really call
this out diagrammatically because I think the the pictorial is a little more appropriate
to 9 a.m. on a Thursday. So let's begin with the following diagram. So here's our main Delta
table that has three columns; ID, some string and
some other string, okay? And this is the table that all
of our users are consuming. Then we have this source of
updates that's coming in, and essentially, it's gonna
change one of the columns in our main Delta table to oranges. So you can imagine if we
apply this to this table, we're gonna have apples and oranges sprinkled throughout our main Delta table. These are all type updates. So what we're gonna do
for each back function, the first thing we're gonna do is actually just drain our records out of our streaming source. And we're going to apply them as updates to our main Delta table. So here we have apples and oranges sprinkled through our table. But then we're also gonna
write them as Inserts into our change-log and record the batch they happened in. So we can see that we are recording what is happening in our main Delta table in a separate table called change-log. And then all we need to do is read this change-log as a stream and join it with our main Delta table to assemble the full records and pass it to our
downstream CDC consumer. So now we have these two records, the zero record and the second record here and these are the changes that
happened in our Delta table. So this is how we can perform
this record level CDC stream. I just wanted to talk a little bit about how you perform this bit here before we go into the live demo. So we're talking about the the second part of this structure here
when we perform the join. So we're simply gonna do that by again using our read stream API, pointing at our change-log here. And then we're gonna join
it onto our main table, which is our delta table
that contains all of the data to assemble the full
record here using the ID, which is our unique identifier. And there's a really
awesome property of Delta that we're using here, which
is so nuanced, but so useful. Which is when we perform this
kind of stream to static join, the static side will actually
automatically refresh with changing data under the hood. So this main table is actually changing
under the hood, right? We're applying updates to it and we don't need to do anything
every time this executes in each batch of the
structure of streaming, it will read the latest data, right? Which doesn't happen in Parquet. And it's a really, really
nice feature of Delta, which you get for completely free. So let's have a look at a
demo of this architecture. So here's what we're gonna try and do. So this is the architecture
that we had on the slide before, we have a source of updates. Here, it's being captured
in a Delta table. This could be Kafka or Kinesis,
or Event Hubs or anything. We're gonna read from here and we're gonna automatically
apply our updates to our main Delta table and simultaneously
write to our change-log. Then we're gonna read our
change log as a stream and join it back to our main Delta table and send it downstream somewhere. So let's begin by making some fake data and writing it into our Delta table, okay? So our main Delta table is
gonna look like this, okay? So it's as you saw in the slides. We have three columns here and everything says apples and cakes, it's all a happy world. Here's our update table
that we're gonna use to update that one. So on every second ID, it says oranges. So you can imagine if we apply
this to our main Delta table, it's gonna be apples and oranges sprinkled amongst the data set, okay? Now what we're gonna do is define this part of
the streaming pipeline that performs this simultaneous line. So here is my alluded to function that will be operated on as
part of the For Each Batch. You can check this out
when we provide the links. But what I wanted to do, was just to start my
streaming pipeline here. So again, what we're doing here is just reading our update table with source of our updates to the stream, and then we are calling off For Each Batch with our custom function here to write to this main Delta
table and our change-log. Now stream is just initializing right now it should be processing its data. Finishing up any second now. So here we go. So we can see that our data
has actually been processed through this part of the pipeline. And we should be able
to see to start with, that our main Delta table
is being updated, okay? So we should be able to
see apples and oranges sprinkled through this data set. There we go. So every even ID has
been changed to oranges. So we have indeed, applied the
updates to this Delta table. And the second thing we need to check, is are we actually logging the changes to our change-log table? So let's query our change-log table here. Okay, cool. So you can see every second ID is being logged to the change-log, and it's recording that there
was a change to this record in batch ID zero, okay? Now, what we need to do is assemble the second
part of the pipeline, which is read the change-log as a stream and join it to our main Delta table. So, let me start that
part of the processing while I talk you through the code here. So, we are reading as a stream here. We are reading this
change log as a stream. We are simply joining it
onto our main Delta table. This is a stream to static join. And then we are writing it
out to our destination table, a CDC consumer if you like, the change stream full records, okay? And we've already sucked
some data into this pipeline and processed it. So we should be able to query this table, the change stream for records, and see what the CDC
consumer will be getting. So indeed, we are sending every second ID to the downstream consumer. And we're sending the full record that was changed in the Delta table. So we have actually managed to perform some record levels CDC. So, again, the structure of these demos, let's put ourselves to the test again. And we should be able to write more data into our update table here for it to be propagated
through the pipeline. So let's just throw some data in there. And the data that we're
gonna throw in there is every third ID we're gonna
change to pair's now, right? So let's have a look at
that update set here. So every third ID, we want to change some
string to pairs, okay? So we've written that
into our update table. Since my streaming pipelines are running, it should be getting propagated
through my pipeline here. So let's just have a
quick look at my pipeline. So here's my streaming pipeline. And we see that we have
a spike in my stream. That means I read some data
and some data was processed. So that's the first part of the pipeline. So that looks good. And then the second part of the pipeline, I have a second spike, which means the data has been
propagated through there. So if I go down here, we should be able to see that some more data has arrived
in our downstream location. And here we see it. So we see that we have data
corresponding to Batch ID 1, and some string has now
been changed to pears, okay? So this is what we can send downstream. So we were actually able
to implement a pipeline which captures changes that are happening in our main Delta table
and sends them downstream and we're doing that at a
record level which is fantastic. So the last comment I'll make on this, is we haven't talked about Deletes, right? But this can really easily be extended to capture Deletes as well. So let's say that one of our
records in our streaming source was a Delete, it says "Delete record 10." So in this pattern, what we would do, is go ahead and apply that
Delete to our Delta table, but we would then also log
that in our change-log, that a Delete happened. And then we would just
modify our join a little bit to do a left join here, this
is the left side of the join, and we can send this downstream. So actually, this change-log
pattern is really flexible and allows us to capture
all types of changes that are happening in our Delta table, which is one reason that I
really like this pattern. And Danny, I'll just stop there and see if there's any
questions or comments on that third part. - No, actually right now a lot of the questions
are sort of revolving around how to do the
orchestration around this. And so for example, this
is a really cool pattern, they're seeing that you do
the Updates and Deletes. So I just did wanna do
two little call outs. One is that in terms of orchestration, it really is up to you on terms of your particular environment. So if you're using, for example, if you're running it locally, you could always do like a cron job, for example, like good old
fashioned cron jobs or Jenkins. If you are going ahead
and doing things in the... You have a more scheduler type of design then perhaps a system like Apache Airflow, or to be suited to the orchestration or if you're in Azure, Azure Data Factory, The other call out that I usually do is that of course, if
you're using Databricks, you would use Databricks Jobs
basically to align or flow, the way to run this. The other question basically, is what's the best way to
do this as a nightly batch rather than a stream? And so the quick call that I would say is that you absolutely
can run this as a batch if you wanted to, in terms of like just
processing as a change set. And in fact, what's great
about Paul's example here is that you just change
read stream to read, right? So in other words, to
go from batch to stream. What's great about Spark in general, and then with Delta Lake
underneath the covers is that you just switch
from read stream to read in order to go from streaming to batch. Saying this, a really common reason or popular reason on
why you wanna keep this as a stream anyways, is because of the fact that if you need to orchestrate
many, many different batch jobs, instead of having like 50
or 70 different batch jobs, you could potentially just
run this as a stream instead. So in other words, basically, a bunch of mini-batch jobs. And so for example,
customers like Comcast, which talked about how they did this back in Spark Summit, San Francisco 2019, in the keynote, they went from 84 different
jobs down to three. And because they were using Delta Lake underneath the covers, they were able to reduce their batch jobs from 84 batch jobs to
under three streaming jobs. And so it was operationally much simpler to be able to maintain and
operate this type of design. And so, so when you're looking
or thinking about streaming, it's not just about going in specifically the context of streaming per se. It is also about managing or simplifying the management
of many batch jobs as well. So that's an important call out that I figured that
should be called out here that as of why, when you're looking at
a session like this one in terms of the CDC, even if you initially
wanna start as a batch, and again, just switch from
read stream to read, right? You may wanna seriously consider the context of staying
as a stream job anyways. Just because of the potential
operational benefits as well. - Great point, and just that
one thing I'll add to that, another really common pattern here is using a Trigger Once philosophy that basically turns a read
stream into a batch execution, and allows you to kind
of then go backwards. So Trigger Once will essentially, every time it goes to
the streaming source, it will just read all the
records that are in there and process them in batch, right? So this is a really
common pattern as well. Fantastic, so coming up
on the last 10 minutes. So I just wanna wrap this up in a summary. What have we seen today? So we've looked at
three different patterns of reading change data from a Delta table. The first one of these is what I call an Append Only pipeline. Really commonly used in this
Bronze-Silver-Gold propagation across your Lake. The second was what I like
to call "The easy button" For the update pipelines
using the ignoreChanges flag. And remember what happens here, we get this file level CDC stream, but it's really, really easy to implement. And then the third pattern here
is this change-log paradigm, borrowing a concept from data warehousing. And this actually allows us to capture this record level change stream, like you would perhaps
get in a data warehouse. So I'd love to hear your
feedback on the session. As I've said throughout, there's gonna be a companion blog post with links to all of the
notebooks and code here, and all the all the data which is generated in
those notebooks as well. So you'll be able to check
this out on your own. So I really appreciate you spending the time with us this morning. And Danny, if there's any more questions, happy to take it, or we can go from there. - Absolutely, so we've got
about five minutes left, because I'd like to
always finish our webinars a little early so that way people can prep
for the next set of meetings since all of our meetings
online these days, but we should be able
to answer most of them, so I'll answer the first one, okay? The first question is
if we have any questions from the webinar per
se, from this webinar, and how can we follow up
with you after the fact. So that's what we've been talking
about our YouTube channel. So youtube.com/databricks. This video actually is propped there. And so we actually do go out
of our way to answer questions also on the YouTube channel itself. So pop right into the comments, and we'll do our best to answer questions. It usually takes us about
48 hours to answer them, because we have a lot of questions with a lot of videos these days, so we don't wanna be able to
promise you immediate answers, but we do review them on a
pretty regular basis, okay? And so hopefully, that will help you jumpstart
this process, okay? All right, the next
question is basically... Oh, here's a good one. And Paul, you can answer
it or I can answer it. If orchestration steps are
dependent on each other. In other words, step A has to be complete before
step B will have to start. Basically, it's like you
have these dependencies on your jobs between like your Gold table
versus your Silver table. What do you think is the best approach? And I realize there multiple answers, so why don't we just start
that discussion right now? - Yeah, sure, a really
common question we get. And if you want sort of a system to be able to run
arbitrarily complex DAGs, a lot of dependencies, which is what you're describing there. We really recommend the integrations that Spark and Databricks has with technologies like Apache
Airflow or Azure Data Factory, those are really good options for that which provide that kind
of arbitrarily complex DAG manipulation and execution. - Exactly, yeah, I was just about to add. So hence the reason for me also
calling out Apache Airflow, that's a very common one for folks to use in the open source land. But the context is that it's as simple or as
complicated as you want it to be when it comes to tracking
those dependencies. Because for example, you could always just
create a shell script that places in like,
basically, case statements in your cron job. It could always start with that, right? And it can go into, for example, when it comes to talk
about Databricks Notebooks, you could potentially just go ahead and actually build workflows
within your notebooks that have that stack. But then exactly as
Paul put it out, right? Whether it's an Azure Data Factory style, or if it's Apache
Airflow or whatever else, the point is that those DAX
get more and more complicated and usually when we
involve those latter ones, it's not just because of the dependencies between Silver and Gold, but because the dependencies
with other systems that you're orchestrating way beyond like in terms of your own system, so partners like Informatica
or partners like Streamsets, which also talk to Delta Lake, they also technically,
will get involved as well. So again, there's a lot of
really good ways to do it. I don't think there's
any wrong way per se, it's just more a matter of understanding what is the level of
complexity you actually need. And then basically designing for that, because in the end when it comes to this
type of orchestration, if you were able to do it with Spark, you are able to do with Delta Lake, because Delta Lake has an
underlying storage layer. So whatever you were trying
to do before with Spark, it's pretty much applicable
here as well, okay? All right. Oh, and then we got some quick call outs saying they really
appreciate the session, Paul, I think they love you. So good, I can now quit now. No, I'm joking. But yes, the PowerPoints and
the code will be available. So two quick call outs. We actually put all the code and the Notebooks and the PowerPoints directly in the Databricks
Tech talks GitHub, which is also linked in
the YouTube channel, okay? And so we put them all there. And then just as Paul
alluded to multiple times, we are actually currently writing a blog that provides a lot more of the details that we actually wanna go for much deeper than just what we're able to
do in a 50-55 minute Tech Talk. So we will be following with that, with additional information
linked to this video, but also link to the
tech talks, GitHub repo, and link which contains the
code and the Notebooks, okay? All right, cool. I think the final question
I wanna tackle here before we head back to Karen, your loading to the Trigger Once, Paul, and so basically, if you
change the read stream to read, this is going back to
the stream to Apache, can you still use the checkpoints or do you have to track
the watermarks yourself? And great question. - Yeah, so that's if you use
the just the read method, then you don't get to use the checkpoints. So you have to track
the watermarks yourself. That's the real benefit
of using the Trigger Once, is that you, you can execute
in like a batch mode, but still take advantage
of all of those nice things in structured streaming,
like the checkpointing. And the other thing to say, is that it allows you
to evolve in the future. So if you wanna move
from batch to streaming with a five minute or
32nd trigger interval, then it's really easy to change to do. - Excellent and that's exactly... By the way, just to add to Paul's point. That's precisely what what I meant by simplifying your orchestration
of batch jobs to streaming because the idea is that you have things like checkpoints and watermarks that are included in streaming that are not included in batch that you basically can take advantage of for the purpose of orchestration, okay? And cool, with that, I
did wanna finish it up. Thank you very much for
the great questions. Thank you, Paul, for an
amazingly awesome session. And Karen, back to you to wrap it up and I appreciate your time. - Thanks, Danny, thanks
Paul for your presentation and thanks everyone for participating and asking all of your
questions and comments really helps make the presentation.