Delta Live Tables A to Z: Best Practices for Modern Data Pipelines

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
foreign [Music] we are very excited to have you here to chat about Delta live tables uh we have Michael armbrust the original creator of spark SQL structured streaming and Delta and he's going to take it away thank you so much for coming I am super excited to be here today to talk about what Matt just alluded to is almost 10 years in the making starting in 2013 with spark SQL and then structured streaming and then Delta we now have Delta live tables a single declarative framework that brings all of these Technologies together into a simple system that allows you to create and update the data sets in your lake house so why do you need DLT let's start with the motivation so you know as I'm sure you all know good data is the foundation of the lake house and many people have bought into this bronze silver and gold architecture where you start with Messy data and you incrementally clean it up until it's ready to be those business level Aggregates that actually drive your business and help you make decisions but this picture is a Lie No data pipeline is this simple they're often much more complicated with a bunch of different inputs and different intermediate tables and you know this reality means that it gets increasingly complicated for you as an engineer to build these systems what what does it actually look like to do A Day in the Life as a data professional so a pretty typical thing is the CEO will email you there's some really important question that he needs an answer to and all you have is a giant directory full of Json files and you can your your goal here is to turn this into a beautiful dashboard that drives insights for the business and you know I'm a little biased but when I get something like this I start with my two favorite tools spark SQL and Delta and we write some queries it's super easy to ingest this Json data into a table and then it's super easy to clean it up and then I can build this dashboard so my CEO knows what to do next for the business but then no good deed goes unpunished and instead of just wanting the report one time he wants a production pipeline that updates it every single day or even worse every single minute what does it take to take these two SQL queries and turn them into a production pipeline well first you're going to have to figure out dependency management there's multiple tables here that need to be updated in the right order we need to handle failures and things we're going to have to do it efficiently so we're going to have to break it up into partitions and make it so that we only recompute individual days at a time rather than complete data sets we're going to have to implement checkpointing and retries because in the cloud things always fail we have to add quality checks just in case bad data comes into the system I don't want that confusing my Downstream consumers we need governance because this is sensitive data and only the right people should have access to it we need Discovery so the right people can find it and use it to make decisions we need to handle backfills in case the requirements change Version Control so I can work with my co-workers and deployment infrastructure and man this is getting super complicated what you realize is the time spent on tooling starts to dominate the complexity and you're not focusing on the one thing that you should be focusing on which is getting value from data and this is the idea of DLT instead of focusing on all of that infrastructure you write these declarative SQL queries and we provide the infrastructure that takes care of all of these other concerns underneath the covers now I've said this a couple of times declarative programming what do I mean when I say declarative programming the key trick of a declarative program is it says what should be done but it does not say how to do it and here I have two examples a super simple example where we want to add up a bunch of numbers the imperative program actually has to have a for Loop and it has to iterate through the array it has to add them up and it has to print out the answer it says exactly how to do it a declarative program on the other hand just says I want to know the sum of these numbers and if you translate and the idea here is the query Optimizer figures out the best way to execute it if there are statistics that allow me to calculate that sum more quickly it can automatically go and use those statistics instead of reading the raw data and another pretty cool trick here is you have data Independence so as the underlying physical data changes the query Optimizer can decide what is now the best strategy to do it and if we translate this into spark it's very similar if you think about your average spark program an imperative spark program will tell you exactly what needs to be done it will say things like overwrite this partition whereas a declarative DLT program is just going to say this table should exist and it should have this answer stored inside of it and then it is the DLT runtime's job to figure out what is the best way to create or update this table now a very common question that I want to make sure I answer before we get too deep into this is should I be using databricks workflows or should I be using DLT and pretty much very commonly the answer is both workflows is used to orchestrate anything including DLT so if you want to run your DLT pipeline at some schedule or when some other tasks are completed or when a file arrives in the cloud or soon when another table is updated inside of unity catalog these are all things you would use workflows to orchestrate and then DLT is one of those task types it's closer to dbr than it is to workflows and that's what it does is it creates or updates Delta tables and it runs structured streaming so it's specifically about data flow now why do we want to use declarative programming well the key idea here is if we understand your objective rather than a set of instructions about what to do we can take care of all the boring stuff for you so you can eliminate a lot of boilerplate in your data pipelines you don't have to worry about fault tolerance you don't have to worry about State Management where your checkpoints are stored you don't have to worry about object life cycle like creating or deleting tables you don't have to worry about scheduling all the dependencies in your pipeline between all the different tables and you don't have to solve a bunch of common data problems that are undifferentiated heavy lifting for your business doing change data capture evolving the schema of tables everybody has to do this and we've created one system that can do it for you the other really cool trick is we can dynamically optimize based you know on the current status of the system or even across multiple queries in the system and it also allows us to do better operations I want to tell you about a really cool trick that we've developed in the last year called HMR or health mediated releases I'm sure all of you you know don't like having to upgrade your DVR versions it's always really scary to see what's changed what Behavior changes have happened how is that going to break your pipelines HMR makes this our problem so what we do is we automatically upgrade your pipelines under the covers and if we ever detect that after an upgrade a pipeline is failing we will roll it back to the previous version automatically using our retries all of our code is item potent so that we know that this is safe and if it works on the old version we now have a very strong signal that there's a bug in dbr we'll open a ticket on our side we'll pin your pipeline to the old version and it will keep running and you have no idea that there's been a problem meanwhile databricks Engineers are busy fixing that bug and eventually Will Roll you forward in some cases we haven't even been able to contact customers to tell them their pipeline you know it was broke again because it just kept working and they never noticed so these are all the things that are enabled when you program declaratively instead of imperatively now DLT has two core abstractions and I'm going to be talking about these for the next 90 minutes the two core abstractions are two different types of data sets you define these and DLT keeps them up to date the first abstraction is called a streaming table a streaming table is a Delta table and it behaves just like a Delta table in many ways and the only difference is it has a stream writing to it and I'll talk about what you use that for but you know General use it for ingestion use it for low latency Transformations and use it for processing huge amounts of data where you can never afford to do recomputation and on the other side we have materialized views materialized views are different they're not a table you can't write to them they are the results of a query that has been pre-computed we do store it in Delta but your mental model here should just be that it is the answer to a query so you write a SQL query we materialize it these are very powerful and you can use them for transforming data building up aggregate reports and also speeding up bi queries in in your in your dashboards so let's deep dive into streaming tables here I'm showing the syntax so it should look very familiar to anybody who knows SQL you say create streaming table you give it a name and then you write a SQL query and in this case I'm reading from the cloud files data source which is the system we have called autoloader It Can ingest any collection of files from any cloud storage system guaranteeing exactly once picking up exactly where it left off so it basically gives you this read-only data set um and it'll you know even if as you add more data it will only read that new data that has just arrived and the reason that's so important is because it allows you to reduce cost and latency by not repeating the same work over and over and over again and like I said before this is a table so in addition to the stream writing to it you can do DML on it so if you want to run update delete merge it's a table you can do all of those operations on it just like a normal table in unity catalog the other abstraction here is a materialized view here's an example so here I'm creating a report that tells us how much profit we made each day and it computes it and the guarantee here is anytime you query a materialized view it will return the results of that query as a snapshot at the last time that materialized view was updated so you can schedule it to refresh and it will always contain exactly the answer to that query you cannot modify the data in it you cannot insert update or delete it if you want to change the contents of a materialized view you change the query definition so if you want to remove a row from it you add a filter to filter that row out one really cool trick of DLT is this thing called expectations all pipelines need to have data quality controls because even if you know that all of your logic is correct you're not controlling what happens Upstream from you and it's very common for someone to change a column name change a data type and have that break your pipelines and if you don't have quality controls in place you won't even know that that's happening expectations are basically these uh single Boolean Expressions that are evaluated on every row and they're very similar to constraints in a traditional relational database but the reason they're more powerful than constraints which can only fail when they see something unexpected is because they have flexible policies that allow you to choose exactly what should happen when your expectations are violated you can choose to just track how many bad records there are and we'll show you that in the UI as a percentage and you can say I'm not worried as long as less than five percent of the data is bad you can drop records so they don't affect Downstream processing but this is much better than just putting a filter in your query because the filter is not going to tell you if it accidentally starts filtering out 90 of the input data and then if it is some super important gold table that you're going to report to regulators and you really never want bad data to get into it you can choose to abort processing as soon as the expectation is violated roll that transaction back and we have an extra cool trick here we actually add lineage we rewrite the spark query plan to track which Row in the input caused the bad Row in the output and we'll show you that so it makes it much easier to debug where this bad data was coming from expectations might sound really simple you might say well if I can only write a Boolean expression over a single row how can I do powerful data quality constraints with that well let me give a couple of examples because once you start combining it with materialized views you could start doing some pretty cool stuff let's say I want to for example enforce a primary key constraint I want to make sure that this value is unique within a table well that's pretty simple I just create a materialized view I count how many occurrences there are of each key and I have an expectation that there is exactly one and this will not only tell me if that primary key constraint is violated it'll make it very easier for me to figure out which key is violating it and as I'll show you there's actually some pretty cute ways that we can keep this materialized view up to date incrementally another example here is joins it's very common when you're joining against you know some Dimension table and your foreign key constraint might be violated a pretty common problem you have in data pipelines is those rows will just be dropped silently and you'll never know that they were being dropped well a cool trick you can do with expectations is you just translate that into an outer join you don't want those bad rows to make it Downstream so you put an expectation on it and you say I'm going to expect that the other you know the other side of the join is there and now any missing records will show up as bad data inside of your UI when you look at your pipeline cool so I want to make my first announcement of the talk up until now when you wanted to use DLT you had to go into the workspace you had to create a pipeline you had to create a notebook there was all this overhead and that's why I'm really excited to announce that we're releasing DLT in DB SQL you now have you have the full power of streaming tables and materialized views in in DB SQL and so this one line of code here is actually creating a robust scalable incremental ingestion Pipeline with just this one SQL DML command and people who have been using DLT might be saying Michael where did live tables go where did streaming live tables go they didn't go anywhere we're just unifying the terminology so we use the same words across the entire databricks product a streaming live table is a streaming table a live table is a materialized view they have the exact same semantics we will continue to support the old syntax forever but we want to have one simple way to talk about this stuff across the entire lake house cool and with that I would like to bring up Franco who's going to demonstrate creating mvs and STS with DB SQL and with a bonus DBT as well [Applause] thanks everybody thank you hopefully everyone's having a great Summit all right so thank you Michael for that awesome introduction if you all don't know me and I mean my name is Franco pitano I lead up the data warehousing and bi practice at databrick SQL predominantly data bricks uh our databrick SQL product uh you know one thing that I have been longing for and a lot of customers that I talk to have been asking for you know you have streaming in databricks why can't I stream a table from databrick SQL or in your SQL API why do my data warehousing people have to learn just a little Pi spark or python to be able to leverage your streaming capabilities and I was like yeah why not and Michael and Paula came up to me and they asked me before they say how would you like to talk about streaming tables and let's realize I'm using DB SQL it's like I'd love to people ask about this all the time so it is my absolute pleasure to show you all streaming tables and materialize views in our database SQL product so let's uh let's take a journey together first off I am a data warehousing professional I work at an organization where we have these wearable devices you can like they track your calories steps and stuff like that and Ascent we also have the device data and we have the user data and my company just asked me the data warehousing person hey I get this Json data and the CSV data we need you to create pipelines and ingest them and we need to be able to put them in our bi reports great uh can I do this in SQL let's do it together so first off there's this brand new function that we have called read files this is absolutely amazing read files is autoloader under the covers it will infer the schema of the of the of the data and it will also understand that the first row is a header and it will infer that schema and it will essentially display the results for you where then I can see exactly what's going on on this table fun times all right so essentially it will show me the results of the table and then I can go ahead and create my crates streaming table syntax and from there you can see here that this is this new streaming table syntax that we have essentially you can create streaming table like Michael had earlier and you would Define the the table schema and then you would say as select and I always like to cast my types implicit type conversion isn't always great you want to be explicit especially when you're making dribble pipelines and now I create that streaming table awesome so I go back over here this is what the data looks like after the query completes so I can go ahead and browse that data through re-files and I understand it now I also have the same files that I'm getting on my user data except these are csvs so I did the same thing I did a read files on those csvs and within the same SQL browser uh in the same SQL experience I was able to build the the user table as well so now I'm ingesting both of these tables as streaming tables but my data's my job's not done yet my business asks I need we need to join these two data sets together because we need to understand what the users are doing like how are they interacting with these devices what are the results and so in my user table I have the their risk factor so that they created a feature called risk or a column called risk that had different capabilities like what risk these are these people are as well as their age and I wanted to understand the business wanted to understand whatever the average calories burnt how many records we're receiving so are we actually getting the records all the time off these devices and then what would the average number of steps and uh miles walked and from here I essentially can look at my aggregates of how I would want to browse this one special thing that not a lot of people know about that I want you all to know about is Group by all Group by all is amazing for when you have Aggregates because essentially what it does is it tells the the optimizer hey or the the planner hey all of my non-aggregate columns in my query Group by those so you don't have to say Group by one two three or group by your columns you just say Group by all and it will tell The Interpreter boom just Group by all my non-aggregate columns and that's that's awesome thanks Reynold for this if you're out there he actually made the commitment uh and then what I want to do is I want to take that aggregate I just created and I want to create my materialized view so I'm going to take that SQL statement that I created I'm going to create a new materialized view and then you want to turn chain enable change data feed because that's some of the the capabilities behind materialize views and I'm going to take that SQL statement I'm going to pop that in there and now I can create a materialize View excellent so I essentially just from in the past two minutes I was able to as a SQL user I never left the SQL API I ingested Json data off of Object Store I ingested CSV data I created an Aggregate and then I created a materialized view this is great but now here at this organization we are a DBT shop so I'm going to put my DBT hat on and we are going to go take we're going to flip over to vs code uh to anyone who wants to use DBT Cloud you will be able to do that there too it's just we had to use a custom version a custom build uh so we had to do this locally but essentially I'm going to take my you know DBT starter project here I created my users table I just brought that SQL statement in with DBT I just need to say config materialize equals streaming table the beautiful part here is that DLT handles the incremental ingestion off Object Store you don't have to as DBT users we don't have to do the the the the Rube Goldberg machine of figuring out how to do incrementalization streaming tables just handles all that for us this is amazing and the other thing that people don't realize is you can do this over the wire over interval protocols so if you use jdbc odbc or use our West API or use node go the execution anything you can now create streaming tables virtually from any application so any Partners out there you can build streaming tables as well and I'm going to do the same thing for devices I'm going to call it my streaming table uh materialize and then I'm going to bring my SQL code that I used in the browser and finally I'm going to create that materialized view and again the config for materialized is materialize View and then the table property I want to turn on that change data feed and I'm going to bring that SQL statement in here awesome and then what I'm going to do is I'm going to go into databricks jobs and I kind of already did this beforehand but I'll just show you how it works essentially I committed this to get my GitHub repo I told the databricks job go grab that repo run it on a databrick SQL warehouse and it did that so now awesome my DBT code is checked in I have my version control I got my tech I got I ran my job let's check out my tables and so from here you can see we have that DBT database that has our all our tables in it so we go into users we're able to browse users I don't know if you all noticed this but one of the awesome things I love about the data Explorer you'll notice the icon for the streaming tables and materialize View that is really cool so you can tell exactly in the UI is this a streaming table or materialized view that use your users can actually tell what they are and here I you can see the two tables that I ingested we can browse some of their sample data I want to look at some of that sample data for that user activity materialized view I created everything is great make sure this is the warehouse I might get an error on this next one if I don't and then in details you can even see information about this query so it'll show you the the SQL query it'll tell you to materialized view and it'll give you all the information that you need to know about this so now as a data warehousing user this is amazing I get incrementalization using streaming tables it's super simple I get materialized views and now it all works from DBT so you don't have to decide do I have is it DBT or a DLT there is no decision use them both it's great and with that I'll give it back to Michael [Applause] thank you that was awesome anybody who knows even just a little bit of SQL is a full-fledged data engineer cool so now that we understand the basic concepts and we know how to create streaming tables and materialized views I want to do a deep dive into structured streaming and how it works inside of DLT so you know as I just hinted to all of structured streaming is available inside of DLT it's based on exactly the same engine we're just a really good way to run structured streaming the streaming computation model is pretty simple the basic idea is the input is an append only table and you might say well I don't have many of those but it turns out there's many things that actually fit into this model files getting uploaded to cloud storage can be modeled in this way message buses like Kafka Kinesis or event Hub Delta tables where you're only appending to them transaction logs of other databases are all examples of append only tables and the magic trick of structured streaming is rather than wait for all the data to arrive and then run a spark SQL query over it structured streaming allows you to produce results on demand as new data arrives and we incrementally keep those results up to date and the reason this is great is because you get much lower latency when you're only processing one new record and you get significant really reduced costs when you're avoiding all of that redundant work and a key thing that I want to make sure everyone understands because often I hear people when I talk about this they say you know what I can't afford streaming I don't care about latency that's too expensive for me and that is not how structured streaming Works structured streaming is often cheaper than running a batch job and the magic trick here is you can run structured streaming in several different ways you can run it manually only when you want this table to be updated you can use databricks workflows to schedule it so you can say I want this to be updated once an hour once a day once a week once a month whatever you know works for your business or if you do have low latency requirements we can support that and then you put the pipeline into what we call continuous mode in this case a cluster is always kept running and we're just constantly polling for new data and updating the answer as soon as new data has arrived and the really cool thing about this is as you move between this spectrum of cost versus latency there are no changes to your code so you can build your pipeline once and if the business requirements change all all you have to do is change the schedule and kind of automatically you spend a little bit more money and you get fresher results now a key thing and like pretty much every single time you're doing ingestion you should be asking yourself should I be using a streaming table it's kind of a no-brainer you never want to ingest the same data over and over again and so you know as we've been showing we have this pretty cool UDF or a table valued function called cloud files it what it does under the covers is it uses rocksdb and it remembers exactly which files have been ingested and then it transactionally communicates this information to Delta using our transaction identifiers to guarantee that even in the presence of failures and restarts we will ingest each row of each file exactly once it works with all the cloud storage systems and it supports arbitrary scale the problem is this if you have a directory with 10 million files in it even just listing those 10 million files to figure out which one is new takes a really long time and so what autoloader does is it integrates with the notification systems of all three major clouds so that they will tell us when new files arrive and we don't have to scan all 10 million files each time this lets us handle just massive massive scale um and then you know finally it does schema inference and evolution so it will figure out what the schema of those data sets are and if new columns appear you can set policies on how to handle them you can say anytime something changes let me know throw an error or you could say just automatically add those columns to the table it depends on your use case what the best thing to do is and it supports all the different data formats Json CSV text files we're adding support for XML so it's a pretty cool tool for ingesting data another really common use case is ingesting data from message buses like Kafka and you can see here I've switched to python syntax so I haven't talked about this too much but everything I've been talking about so far can also be done in Python um and you can see it's pretty simple you just have a function that's annotated as a DLT table and it returns a data frame this is just like the as select part of the SQL queries that we've been showing so far the Kafka Source very similar to autoloader automatically tracks what data has already been ingested it keeps track of those partitions and offsets and make sure that we ingest that Kafka data exactly once and you can use any structured streaming Source it's included in the databricks runtime in this way and this is often what you want to do when you need the lowest latency ingestion because you know uploading files is Big And Chunky you want to wait until you have enough data for a file message buses are a very efficient way to ingest small numbers of records at a time or if you have iot use cases where many people are sending those records a message boss acts as a nice buffer it collects them so you can make nice big commits to the Delta table and just you know have more efficient computation there you can also stream from any Delta table so in this example I'm using the stream function and you can see I can read from tables that are in the catalog that's the first example where I say from streamprod.events or I can just read from random Delta tables that are stored out in cloud storage so in this case I use the other syntax where I use the virtual Delta schema and I just give a path to the table this only works for append only Delta tables though and I'll talk about that in a little bit but as we said before the computation model of structured streaming is an append only stream it does not understand how to handle changes now I'll go into some of those details in a bit so one of the key things when you use streaming is State I keep talking about this I keep talking about exactly once it remembers what rows have been ingested and this is the key thing you need to understand when you're using streaming let me give you a couple of examples of how State can change the results in your queries so you know each row is processed only once if you remember only one thing about streaming from this talk this is kind of the key detail one one consequence of this is if I change the definition of a streaming table so I change that query there for example instead of being a plus one I change it to a times two we do not go back in time and recompute all of those old results and this is critical when you have huge tables you cannot afford to recompute all of the data that that's been computed in the past just because you wanted to add a column or fix a bug or do something and so when new data comes in it will use the new definition but that old data is going to remain unchanged another example where State matters is with streaming joins so when you join a stream against a static Delta table what you're going to do is you're always going to be joining with an up-to-date snapshot of that table so every single micro batch we're going to update that other Delta table and take the latest results that are there but if that table changes again we don't go back in time and update records that have already been joined with those stay the same so in this example I'm joining with the cities table and you can see I'm joining with Berkeley and it's misspelled and let's say I go and update it to fix the spelling and make it correct only this new record that came in after the change was made is going to have the new join value now you might say well what what happens if I do want to update all of that data there was a critical mistake and I need to update my pipelines when we make that easy backfill is a built-in feature inside of DLT we can automatically perform this through what is called a full refresh so in let's go back to that original example I changed the the streaming query here and I now have this state this table that is in this heterogeneous state I want to make it all use that new query I'm either going to run the SQL command refresh table full or in the UI just click this button full refresh and there's also a rest API to do this and it will then recompute the entire table from scratch and this only works if your streaming Source has full retention so a good best practice here is always put your data into Delta because you know Kafka you can store a couple of weeks of data you know similar with Kinesis Delta you can store as much data as you can afford for as long as you want and when you start a stream from a Delta table it always starts with a full snapshot of that table at the moment that you started the stream and so it makes it very easy to do these full refresh cases when you want to do it stream stream joins this is I would say one of the least understood features of structured streaming I regularly see users trying to use stream stream joins for things that they cannot do so let's talk about what streamstream joins are actually for a stream stream join is for when you have co-arriving facts with a bound between how long you know how long apart their arrival can be the canonical example is AD Tech you know your Google and you have Impressions and you have clicks two different systems are producing these data sets but you know they happen very close to each other because users always click on ads right away they don't come back to the page 10 days later and the show the magic trick here is you have these two streams they both have watermarks on them you have a join condition and you have this time bound if these things are not true and you use a stream stream join it's going to buffer all of the data forever so you cannot use this for updating Dimension tables or or these or cases where you you don't have a bound between their arrival but if you do have a bound between their arrival this is a very powerful feature but I wanted to make sure to clarify what it should be used for it is not all types of joins in all cases and I will talk about how to solve those in a moment a really cool trick that we just added to DLT is the ability to evolve your streaming sources to add and remove them over time and uh you know first I want to kind of start with the mental model what's actually happening under the covers when you create a streaming table so let's start with this simple syntax you know create streaming table raw data from Kafka what's actually happening under the covers is this is syntactic sugar it's creating two different objects under the covers there is a streaming Table and there is this flow with the same name that is actually reading from Kafka and inserting that data into the table and the name of that flow is how the checkpoints work so raw data is the name of the flow there is a checkpoint called raw data that is what is keeping track of the Kafka topic and now you can see why there might be a problem if you start changing your streaming sources over time if I switch this from Reading from this Kafka instance to reading from autoloader or from Reading to a different Kafka instance that has totally different offsets it's not going to make any sense you have to use a different stream if you're going to be reading from a different source and this is where multi-flow tables come into play this allows you to evolve your streaming sources by giving each of those flows different names and different state so in this example I've created three different flows one reading from my East instance of Kafka one reading from my West instance of Kafka and another one reading from the European instance of Kafka and you can add and remove these over time and DLT will keep track of exactly which checkpoints should be used for which flows and this is really cool because now you can add and remove sources without having to do a full refresh and that is coming soon another really cool trick with streaming is change data capture the problem is as follows you have an operational database and you want to get that database into your lake house so you can do analytics on it and you don't want to dump an entire snapshot of that database every single time that's really inefficient but fortunately most databases have this pretty cool trick they can give you a feed of updates deletes and inserts that have happened and you can then take that feed of changes and apply it into a Delta table many people today do this with 4-H batch and merge and that's a totally reasonable way to do it but also many people today do this wrong I actually made them delete a whole bunch of broken examples from the databricks docs because this is really hard to do correctly and the reason is data does not always arrive in the correct order and I'll talk about how we solve that problem in DLT in a little bit but first let's talk about the simple declarative way to do change data capture so we have this new command called apply changes into and let's break it down apply changes into as a source where we are reading these updates from so in this case I'm reading from another Delta table called City updates and so it's going to create this table called cities where the updates are applied to you have to tell it what the primary key is this is how we identify each record so when a new record comes in we will look up the record with that key if it doesn't exist we will create it if it does exist we will update it and then the magic comes in with this sequence number so you say sequence by and you give us some column this can be any sequencing in your data set it can be a time stamp it can be a log sequence number it can be the upload time whatever makes sense in your application but this is how we're going to decide what came first and what came after and the guarantee here is that DLT will produce the correct answer as though the data arrived in the correct order even if it doesn't and so you know in this example we're gonna a new record is going to come in and we're going to update it and the sequence number is greater so everything works and you can do CDC with DLT from any data source so there's a wide variety of systems that can produce these change feeds and you can read them from any streaming source so you can read it from you know S3 you can read it directly from dubesium into Kafka you can read it directly we have customers that take Oracle Golden Gate push it to Kinesis and then read that data from Kinesis this is kind of the cool thing about DLT is because it's just structured streaming under the covers you can use all of those same building blocks when you're doing something like apply changes into and a really cool trick this is also coming soon I just wanted to give a quick preview is Federated CDC with UC today it's often a big struggle to get that change feed how do I authenticate with postgres how do I configure it so that it gives me the right change feed there are tools to do this and you can run them on your own I mentioned debussium already but it's off it's often a whole bunch of work and so you know I think at Summit we just announced query Federation you can now Federate your Unity catalog with these other data sources all of the tables from your postgres or your MySQL instance will just show up there in unity catalog and what we're going to do is we're going to make the table changes function work with those Federated data sources so now all you need to do is say apply changes from table changes of that database and will automatically handle all the governance getting the right permissions and bringing that data into your lake house now I want to kind of dive under the covers and talk about some of the secrets about how we actually make all this stuff work because handling out of order data is not trivial and the trick that we use to do it is called reconciliation reconciliation allows us to hide all of the extra bookkeeping we have to do under the covers so that you get the right answer even though we're doing all this extra Machinery underneath so let's start with an example let's say I have a CDC feed you know arrival time working downwards we have two operations that are updating on the same key now what you'll notice here is based on the time stamp the delete should happen after the update but actually it arrived before the update because the update arrived late and so if you were to just naively do a 4-H batch merge what would happen here is you would have a permanent zombie row that update would cause the road to be reborn even though it's supposed to be gone and it would live there forever confusing all of your Downstream users so what we do in DLT is we have this magic trick where inside of unity catalog we actually create what we call a backing table this is the Delta table that has the actual information in it and then this layer on top called the reconciliation view that backing table has extra columns and extra rows those extra columns remember what version that row was last updated in and if you're using some of our more advanced modes where you're actually um you know accepting partial updates where only columns that have changed are included in the record you actually have to do something a little bit more complicated you actually have to keep a vector clock of every column and when it was updated we keep all this in extra columns within that table we keep those tombstones to remember when a row was deleted so that we don't have a zombie and then what the reconciliation view does is when you query that streaming table through Unity catalog it just filters all of this stuff out and you see only the active rows exactly as you would expect and of course you can do DML on these things so you can run update delete and merge that goes through the reconciliation View and if you absolutely have to like for example if some pii made it into one of our secret columns you could modify them using this DML now everything I just talked about is how it works in unity catalog if you're using HMS obviously we could not create a new streaming table object inside of HMS and so we just create a view so a question I get very often is why when I use apply changes into is it creating a view instead of a table it's because we have to do reconciliation in order to give you the correct answer another really cool trick with CDC is it's not only about keeping that up-to-date snapshot of what happens at exactly this moment you can also use it to keep history this is often called slowly changing Dimensions type 1 or type 2. type 1 is updating in place type 2 is keeping a history of every single change and normally this is even more complicated to implement I found many examples of this being done incorrectly in our docs and all you have to do with DLT is add this one extra Clause to the bottom store it as STD type 2 and now when we get a change what we're going to do instead so let's say we change it to Berkeley and a new row appears you'll notice there are these two extra columns starts at and ends at which tells you what time period was this version of this record valid for and that those time stamps are just your sequencing so they are whatever number makes sense in your particular application this is not something we make up it's your timestamp your log sequence number however you are ordering the rows and this is really cool people sometimes use Delta time travel for this and that's great if you want to look back a couple days maybe a week but using Delta time travel to keep years of history is a fundamentally inefficient way to do this and you might say why is that well the reason is time travel thinks about physical files and if so you if let's say you have a bunch of tiny files and you run optimize and now it creates these big beefy files time travel doesn't know the data is the same it's going to have to keep both the small files and the big files around forever so you have storage explosion when you try to keep history for a very long time using Delta time travel the magic trick of STD type 2 is this is all inside of the Delta table we're only keeping rows that change so you do not have the same storage explosion so to summarize you can keep history using STD type 2. if you only want to keep a couple of days you can use Delta time travel if you need to keep yours this is usually the right approach now to get to some of the more complicated streaming Concepts chaining streaming so having multiple hops of streaming where you use Delta as an intermediate and this works because Delta is both a streaming source and a streaming sync this is actually one of the kind of biggest things that help Delta take off in the early days and so in this example I'm reading a bunch of data into bronze from cloud files and then I'm doing some extra augmentation in my silver table this is a best practice you don't want to do transformations in bronze because if there's a mistake in your transformation and you've thrown away the raw data you can never recover it so keep your raw data exactly as is and then transform it in a separate step so if there's a bug in that transformation you can always recover and this is great this is a great pattern when you have very large data or very low latency targets but there's a gotcha and it's this thing that I've kind of said several times again this is one of the things that confuses people the most you cannot do updates in streaming streaming assumes that everything is append only so if you do this chaining streaming pattern and you have updates deletes if you have apply changes writing into that table which is also doing updates if you have a query with a group by that doesn't have a watermark it's Computing an aggregation it's going to be updating the values of those Aggregates all of these things represent changes in a Delta table and what structured streaming is going to do when it sees those changes is it's going to say I don't know how to process these it's going to throw an exception and so you know there is a way to handle this we just added this pretty cool feature uh where you can manually handle these updates and it's called skip change commits so basically what you're going to say is don't worry about it structured streaming I've got it ignore those changes I will propagate them manually so streaming will do what it does best it will propagate those appends down and you know this is like a gdpr example I have to delete from my bronze and I also have to delete from my silver when I'm using this option so this is great again when you have massive tables and you can never afford to recompute you want to guarantee every row is only red once and you're willing to deal with this extra complexity as a result this is a great pattern to do but this is getting pretty complicated there's got to be a better way where I don't have to think about all this extra State and that's why I am super excited about materialized views materialized views simplify transforming data and as we said before a materialized view guarantees it always returns the same results as the streaming query there's nothing for you to reason about you don't have to think about what it was in the past or how it's changed or what the watermark is it is exactly the answer of that query compared to streaming tables materialized views can handle updates deletes and appends and their inputs they can support any window function or aggregation that that you can do in spark SQL joins work exactly the same there's no stream stream batch there's just standard inner and outer joins and they automatically handle changes to the query definition if you change the materialized view the next time you run it you will get that updated answer but there's a reason why we can have all of this power and materialized views it's because unlike streaming materialized views are allowed to read the input more than once consider the case where you're Computing a Max the current value of that Max is 10. somebody tells you the record 10 is deleted what's the new Max the only way to do that is to go back and look at the data and see what it is now you can keep extra data structures and things but you you have to keep an arbitrary amount of state in order to compute this this is why materialized views are so much more powerful because they can do what they need to do in order to compute the correct answer now of course recomputing from scratch every single time would be prohibitively expensive nobody would use materialized views if you always had to recompute the entire table and that's why we built enzyme the idea of enzyme is instead of doing this one row is updated replace the entire table which is correct it's trivially correct which is why it's kind of a nice way to do it we're going to do something a little bit smarter we're going to notice only one row is updated and we're only going to update that one row and we have several techniques we use to accomplish this so one technique is very similar to streaming we leverage what's called a monotonic query which is a query that does not lose any tuples given new input so we know that appends will only lead to a pens we'll just feed those appends right through super efficient as efficient as structured streaming only works for simple monotonic queries though select projects inner joins does not handle changes to the input so we have another strategy partition recompute every data engine in the area in the room has actually built this by hand DLT automates it if you have two tables that are both partitioned in the same way for example by date when a date changes we will only recompute the date in the next table that has changed rather than recomputing the whole table and we do this very efficiently by just looking at the Delta metadata to ask which partitions have changed this is great because now we can do aggregations we can handle updates and and deletes in our input but it requires that these tables have the same partitioning and that's not always true either you don't control it or that partitioning doesn't make sense like you should never Partition by customer ID um and so we have a third technique which is merging this is basically using the last 50 to 60 years of database research um and the idea here is we basically compute what is the derivative of the query we say how is the output going to change when the input changes and then we feed it through this new query that we've Rewritten using Catalyst and we figure out if these two changes happen the this row should change by 17 and we merge that into the final result so this is the most powerful we can handle arbitrarily complicated queries and I'll talk a little bit about you know what we can do here it can handle update deletes and inserts but merge is expensives this is not always the best thing to do this is only a good idea of a tiny fraction of the table is changing if the whole table is changing well it was cheaper to just recompute anyway and it's pretty complicated to reason about it is very hard to hand code this but fortunately we have some pretty awesome database Engineers including a professor who has been studying materialized views for many years working on enzyme so how does enzyme know which one to do it's got all three of these options well basically we're building on top of spark and Catalyst we add into it the Delta changes so we can read the transaction log and see how the inputs are changing over time and then we have a cost model and we pick the optimal update strategy based on that cost model a pretty cool trick that's happening under the covers is this thing called decomposition sometimes a SQL queries is written isn't possible to incrementalize how do you incrementalize something like count distinct well the answer is you break it up into a simpler query instead of keeping just that for every distinct aggregate we'll create another table under the covers and there will calculate the count grouped by the thing we're doing a distinct on and now the number of rows in this you know materialization that we've created is the answer to that count distinct query and we can answer it very efficiently because you can answer account query over Delta just by looking at statistics so this allows us to incrementally pre-compute the answer to that distinct query by keeping a little extra information around and so we're going to use a very similar trick to what we do with streaming tables where inside of unity catalog your materialized view is not necessarily even just one Delta table it might be many Delta tables where we've broken your complicated query down into these simpler queries and then there's a Reconciliation view that says how to take these intermediate results and return the final answer and you know just you know very similarly you query the materialize View and you get the correct answer and now I'd like to make my second announcement we're releasing at the summit into public preview and private preview DLT serverless and the really cool thing about DLT serverless is it has enzyme enabled by default so any materialized view that you create in DB SQL will automatically use enzyme and any MV and a DLT pipeline running with serverless turned on will also use it we're coming out the gate with three different techniques we'll support these co-partitioned use cases we'll support associative aggregation and will support these monotonic queries and we have a pretty awesome roadmap and I would love to hear from people what queries are you writing that are not incrementalizing today we will use that to drive our roadmap so we can improve the incrementalization engine over time I'll be super honest with you I think enzyme is like a three or four year project but I'm really excited about what we can do with it so this might sound too good to be true so I would like to bring Newton up who's going to show us how it actually works [Applause] thank you Michael for the great introduction to enzyme let me switch gears okay so hi everyone my name is nitin Sharma I am an engineer at radar bricks working in Delta life team uh for this demo I'm going to assume role of a data engineer working who works in a e-commerce company let's say uh my team is responsible for creating orders related data sets and these data sets then can be used for analyzing uh orders uh finding out daily Trends and stuff like that for that I built this pipeline uh just to set some context there are a bunch of mvs here materialize views the first materialized view is in our data warehouse adira Lake we ingest uh as new orders are placed they are ingested every few minutes and the first materialized view is basically picking up those uh draw order facts and then enriching them with dimensions in this case we join order table with customer table and Nation table uh and we we create this enriched uh orders table on top of this uh once we have this enriched order table we basically then create a few other aggregated tables as Michael mentioned uh one of that table is we basically create daily uh total order total number of orders uh placed total price uh by nation and this data set is becoming very very critical for our marketing team right now they use this data set to tweak their daily or hourly spend on their online at ad business and but they're not very happy at this point in time with the well and I'll tell you what the problem is they complain that this is very slow so let's take a quick look at last few runs of this pipeline uh this pipeline runs early and roughly it's taking uh 20 minutes and that's not good for our business they want to basically look at what happened in the last few minutes so they can tweak their ads uh how to spend let's look into what's going on here so I go to this run and what I noticed was that this table is actually taking 18 seconds to compute but on top of it there is another table this enriched order table which is taking roughly 11 minutes and what I also notice is as more and more arrives in data warehouse this table keeps on growing and the time to compute this is actually also increasing now let me take a look at what was going on here let me scroll down yeah so what I noticed was this table because it was a materialized view it was getting recomputed every day every time I was running this pipeline so it's very slow as you can see luckily I heard Michael stock and I realized uh enzyme is this new cool feature which can incrementalize these materialized views uh so let's Let me give it a try seems like it's part of serverless offering so all I had to do is click on serverless I didn't make any code change nothing and I saved this setting and let's run this pipeline okay Okay so great so pipeline has started running uh there's a little bit of magic because of DLT serverless we pick the cluster very fast and one thing that I notice is it's getting planned so this enriched order table this time it's getting uh executed as a partition override as Michael explained uh what we noticed was this order table that order enrichment table was reading only last few partitions was were getting changed so we detect those partitions and only recomputed those partitions up and if you look at it I don't know how when it's visible or not let me increase it just took it's six seconds to finish it uh excellent and let's take a look at maybe this other table which are marketing team is really interested in so just I think Michael just explained underneath What's Happening Here is we actually could we evaluated four different techniques here so there's a complete recompute it was available for it to run with some cost and there's a partition override we also could run it but then we realized this row based technique which Michael explained actually its cost was lowest and we picked that this is the one that basically is chosen and we ran there and it finished in 12 seconds but overall now the runtime for this is let me see uh 46 seconds earlier it used to take me 20 minutes to run this pipeline this enrich order was taking 600 seconds now it's taking about six seconds so almost 100 x Improvement wow that's awesome I think my marketing team is going to be really really happy more importantly my CFO is going to be happy because I'm pretty sure the TCO for this is going to be really low uh and yeah it's a Media review time for me so I'm going to talk to my manager for a promotion for just clicking this serverless but uh yeah with that I conclude my demo and hand it back to Michael thank you okay pretty cool and it turns out that enzyme is actually not the only magic trick that you can use to reduce TCO in your streaming pipelines that's part of DLT serverless so we have this other pretty cool thing uh in in serverless we will dynamically optimize both the compute and scheduling of your streaming queries and so what we did here was we ran a benchmark where we were using that autoloader source that we were talking about before and we were ingesting a huge data set auto loader backfills where you have you know terabytes to petabytes of data and you want to read them into Delta this is often a big tuning pane for for customers how do I make it run fast enough how do I make sure I'm spending as little money as possible and DLT serverless does this automatically if you look at these two techniques we're comparing to a baseline of one how long it took just by tuning the admission control how big each batch is based on the amount of compute that was available we were able to get a 1.7 x speed up and by adding this new into more intelligent scheduling that we call Micro batch pipelining you're able to get another 2.15 when you put them together it's actually 2.6 times more efficient to run your streaming queries on DLT serverless so I could give a whole talk on DLT serverless but they told me I only had 90 minutes so let's move on to the next part to wrap up what we've seen so far most use cases will actually be a mix of streaming tables and materialized views you should pretty much always use streaming tables for ingestion and the best practice here is to avoid complicated Transformations that could have bugs or accidentally drop important data in that first initial ingest and the reason you want to do that is because that allows you to delete that raw data it's all it's almost impossible to do gdpr retention over files in cloud storage so you want to have a really short retention don't keep them around for more than 30 days and you don't have to worry about it do all of that work inside of Delta you can retain infinite history there you can do all of your compliance tasks and then use materialized views to read from that streaming table these materialized views will automatically handle those complex joins and aggregations they'll automatically propagate gdpr updates and deletes into all of your reports so you don't have to worry about it okay so now that we've made it through the basics I am really excited to get to the cool stuff here's where we start talking about software engineering and best practices when you're doing software development specifically for data but first let's talk about just what is software engineering in general you know in order to build these reliable data pipelines we can learn a lot from the practice of of deploying software you know pretty much all good software teams will use Version Control you'll always check all of your code into something like git you'll use build systems nobody's sitting there linking their things together by hand you have a system that understands dependencies between components it knows how to package them it knows how to run them in the right order it knows how to use parallelism and it kind of takes care of all of it for you this could be bazel this could be make this could be Gradle whatever you have continuous integration you should always be testing your code and that testing has to happen in an isolated environment there's no more testing in prod no more just running ddl against your database that's a bad idea continuous deployment is also really important there should be no manual steps in how you get your data from development into production because manual steps are where humans can make mistakes and finally modularization you don't want to have one giant four or five page SQL query spark SQL can handle it but it's just not very readable you want to break it up you want to have clear abstraction boundaries not only within your own code but between you and Upstream teams and downstream teams and of course everything I've talked about here is possible to do with just plain off-the-shelf spark but it is a ton of boilerplate I've seen so many spark programs where most of the time is spent interpolating strings so that you have like the table name with the environment name concatenated together and again this is a distraction from what you really should be doing which is focusing on those SQL queries and let me give an example of why this is so important if my code is as simple as you know create a replace table production report if I hard code the destination of that data into the code well then when I check it out and I start doing some some work on my laptop if I make a mistake and I run it it's just going to clobber that production database and this is like a pretty common pattern that I see and so this is why we created DLT pipelines so far we've only been talking about individual streaming tables and materialized views pipelines bring software best practices to a larger collection of multiple streaming tables and materialized views a pipeline is very simple it has three components you list which notebooks or files should be read and these notebooks and files just have DLT code create streaming table create materialized view they can be python they can be SQL you can mix and match all in the same pipeline together however you want to do it a Target where should this data be published to and you will notice this is a property of the pipeline not a property of the code the code is independent of where the data is being published to and then whatever other configuration you might have do I use serverless what I am role do I need what workspace should it run on all that kind of stuff and DLT now automatically manages the life cycle of creating updating or deleting these tables it handles the dependencies of executing them in the correct order and it provides this isolation so that development in production can work on their own and not clobber each other even though you're working off of the same code base creating a pipeline is super simple open a notebook write some code in it create a pipeline click Start and you're ready to go and a key idea of pipelines is this idea of live dependencies so if you're reading a table from outside of the pipeline you just read it like normal select star from catalog you know however you're going to do it but when you're reading a table from within the pipeline you want to make sure you're getting the right copy if I'm in development I want to read the development copy if I'm in production I want to read the production copy and this is what the live virtual schema does this is a virtual schema inside of unity catalog and when you say live.events what you are really saying is give me the correct copy of events for the environment that I'm running in we figure out how to link these two things up together and we automatically detect all of these dependencies run them in the correct order handle parallelism we also give you this nice lineage graph which Newton was just showing off so let's take a look at what it looks like to do development and testing and isolation now that we have this primitive a best practice here would be code lives in a shared Repository there are separate checkouts for each environment so every developer should have their own checkout production staging should all have their own checkout so when you're editing code you're not changing code out from underneath the production job and then you create three different pipelines with three different targets one that targets my development environment another that targets the staging environment and another that is the production that all of my Downstream consumers are actually going to read from and these different pipelines are what provide the isolation so what's going to happen now to kind of go back to that example we just did when I make a change it's happening in my development environment and only after I make that change correct and I validate it in continuous integration am I going to push it up to production where it will be automatically deployed into production with no manual steps this works even when you're publishing into multiple schemas inside of the same database so this is a feature that just went into preview that I'm really excited about because tons of people have been asking for it let's say I have a pipeline and I want to publish both to the marketing schema and to the report schema well now we need to make sure that there are isolated copies of those schemas as well so what you'll notice is I just say marketing and reports and then I say live dot marketing you know dot dot table and what it's going to do is it's actually going to concatenate these things together so it will create a schema called marketing Dev and reports Dev and marketing prod and reports prod all this interpolation is done automatically and it will automatically link all of these related copies together for you so now even for these complex pipelines that publish to multiple places uh you can you can handle all of this isolation now that we've got pipelines you might be saying to yourself well how big should a pipeline be should it have one table ten tables and there's really no correct answer this is up to you and your business requirements but let me give you some high level guidelines a larger pipeline is going to give you better TCO because it turns out more things happening always allow us to Multiplex that cluster more efficiently you know when one node is Idle there's another query that can use that stuff right away so in general bin packing things into smaller numbers of pipelines is better but at some point you're going to start to hit scalability limits within the driver you know tens of streams hundreds of streams it really depends a lot on the volume of data how many columns you have how complicated your Transformations are it's something you're going to have to kind of experiment with and measure but there are some good places where you probably want to create separate Pipelines if you have two different teams who have different deployment schedules well they should probably have their own pipelines they can deploy on their own if you have application specific boundaries like between training and serving you might want to create different pipelines there so you can update those things independently but in general larger pipelines will just give you less orchestration less things to worry about and you know better resource utilization another trick is modularizing your code this is pretty I'm not going to spend too much time on this because it's basically how you do it in SQL as well but you can create views inside of DLT and these views the right way to think of them is just like in SQL it's just a name that is substituted with a query so instead of having a 10-page SQL query you can have many views that then get queried together and it's breaking it up not only makes your code more reasonable readable but because of expectations you get this really nice superpower you can now assert conditions about those intermediate results you can make sure the input to that join has these conditions and that's pretty powerful but the thing you have to be aware of is views just like outside of DLT are recomputed every single time you query them so if you are reading the same query four or five times you're recomputing that thing four or five times it might be more efficient to create a streaming table or materialized view because then it will get computed once and everybody can share those results another really cool trick of DLT pipelines is they completely take care of life cycle management and I want to again clarify this is UC only if you're using Hive metastore still you have to do these steps manually but in unity catalog you need no migration scripts for your DLT Pipelines when would you need a migration script well let's say I remove a table from my pipeline in a traditional database you'd have to use Flyway or one of these other systems that would go and say ah yes this table was deleted so now I'm going to delete it you'd have to make sure that it handles the cases or the table is not there it ends up being pretty complicated GLT is a fully declarative spec of what should exist in your your data Lake and so if you delete a table from the pipeline code we will remove that table from Unity catalog if you don't want to delete it do not remove it if you remove a table property we remove it if you change the partitioning of the table we make sure that partitioning is correct the right mental model for DLT is as this system that is constantly comparing your code and the state of the lake house and making sure that they are in sync to each other and anytime it notices something is out of sync it will change that now you might say that sounds really scary what happens when I push something to production and I just accidentally deleted a table well never fear all of the deletes are lazy we log exactly what's happening and if you recreate that table you just you know revert that git commit and put that back into your code we'll pick up right from where we left off with no data loss um and all of this is recorded in the event log as well so you can set up continuous integration scripts that tell you a table was removed and this is much more powerful because if you think about what would happen in a traditional system if you accidentally deleted part of your source code that table would just go stale and you wouldn't even know that it wasn't getting updated anymore until one of your users complained to you about the results being wrong so this makes it much safer to ensure that you're always doing the right stuff in production I talked a little bit about python but I want to Deep dive just a little bit more so you know as I said before all you need to use are our python API you can use all of the stuff we've been talking about in DLT so far I just use SQL so far because it's easier to put on slides but all you do is create a function annotate it with DLT table and return data frame and we will create that table for you you can control all the options by passing different arguments to that decorator you can use udfs and pandas udfs and all the different things you can use Pi spark you can use koalas you can even use strings of SQL that you interpolate in Python whatever works best for you and like I said before you can mix and match SQL in Python all in the same pipeline so you you can use the best tool so I often use SQL for all of my reports but I use Python for my data engineering but whatever you're most comfortable with you can use all of the libraries with Pip so you if you want different uh functions that you're going to use inside of your udfs you can use pip install you just put this in any of your DLT pipeline code and there's one important thing to understand here because all of the tables are stitched together into one single data flow graph you are running in one Library environment so in your pipeline you can only have one version of each python Library so if you install a python library in one notebook it is available globally to all of the other notebooks in that Pipeline and this is pretty much fundamentally required because views can reference udfs and Views can be included in other tables so it has to be done that way if you need to use separate versions of libraries use different Pipelines and here's where python gets really cool meta programming a pretty common pattern that I hear over and over and over again is I don't need to ingest just one table I need to adjust a thousand tables and it's very tedious to write out the same create table statement over and over and over again changing only a tiny part of it meta programming allows you to do this using control flow inside of python so in this example I have an array that has a whole bunch of different regions I need to create the same sales report for every region and so all I do is say you know for each R in region call this function and then I create a different DLT table for each region you'll notice something interesting about this code that that DLT table annotation is in its own function and this is critical because of how python does variable capture when you have a python variable and it's captured by a Lambda function that and we run that Lambda function later it's going to have the final value of that Loop so if you were to put all of this in that Loop what's going to happen and this is super confusing but it's how python Works um you will end up with 10 tables with exactly the same arguments because all of those functions are evaluated at the end of the loop lazily after it is completed by creating this extra function here you are capturing the values at that moment of loop iteration and using those consistently so if you didn't understand that that's okay just copy this code do it this way another trick here is parameterizing DLT code meta programming isn't only for python users SQL users can do it too and all you have to do is use this dollar sign notation and you can substitute in different things into your SQL code or into your python code and you could do this for a variety of reasons to improve code readability or maintainability or to reuse the same notebook in multiple different pipelines but with different input parameters but remember this is not an imperative system this is a question again that I get over and over and over again how do I tell DLT which date to process you are not telling DLT what to do if you were to say for example update only this date you are not saying insert overwrite this partition you are saying only this partition should exist and we will delete all of the other partitions so don't fall into this trap if you want to do stuff incrementally use streaming or enzyme and let us figure out what data is new and if you have to insert overwrite partition that is totally valid you should use workflows in dbr for that that is how you write imperative code inside of databricks um but a great way to use this this is actually a valid way to use it is for configurable data volumes so let's say I want to make it configurable am I going to have one year's worth of data three years worth of data 10 years worth of data that's a great thing to parameterize because you are actually saying I want this complete data set you're not picking how it's done you're saying what should be done okay almost done um so all of this seems pretty complicated creating all these different checkouts creating all these different pipelines making sure that all of these pipelines are in sync with each other that's a ton of manual work and I started off this section of the talk by saying no manual steps and that is why we have created this pretty cool new thing which is my third announcement databricks asset bundles and the idea here is instead of having this onerous error prone process where you create these things manually dabs or dabs allow you to version control everything not only your code but also the jobs notebooks clusters pipelines that exist inside of your databricks workspace if you're familiar with terraform this is actually built on top of terraform it's a simpler version of terraform specifically for databricks and the really cool thing about this is now we can automate the creation of all of these separate isolated environments so a real quick description although I'm going to go through this fairly quickly there is documentation online and I'll provide a link to get to it at the end but the basic way that dabs work is everything is inversion control alongside those notebooks we specify a resources section which says which resources should exist in this example I'm creating a pipeline called my pipeline it automatically is interpolating which environment it's running in and then I have two different environments where I override properties of that pipeline in development I'm publishing to the dev schema in production I'm publishing the production schema and it takes care of all of this overriding uh and interpolation now there's a pattern you want to use here to make your DLT pipelines work well which is you want to structure your pipelines correctly by separating sources and transformations sources are queries that read from external inputs this is where I was talking about modularization you want to have interface boundaries between you and your Upstream consumers so ideally these sources have little to no transformation and you can use view this is a great use case for views because they have no extra cost inside of the pipeline Transformations on the other hand are where the real work of your pipeline is done these are the same in all environments and then the other best practice here is use expectations for validation unit tests are great but they're not going to protect you from the Upstream team changing the data type and not telling you about it so the expectations are a much more robust way to ensure that your pipeline is working not only in development but also in production let me give a concrete example so let's say I have a pipeline and I have some sources I'm reading from this Upstream table called prod sales in production I'm just going to read the table it's super simple just select star in development however I don't want to read that whole table that'll take way too long for every iteration so instead I'm only going to read the last day of data but the transformation doesn't have to know which version It's querying it queries from live.sales and gets either the complete table or the small subset of the table automatically and there's a whole bunch of cool ways you can create test data inside of DLT to speed up development and testing you can use this subset trick where you just filter out a time period you can read hard-coded examples from Json if there's like specific pathological inputs you want to deal with you don't even have to create Json you can actually just hard code those values into a SQL view so in this example I have kind of two pathological examples do I handle the negative profit case do I handle the very large profit case do the right things happen or if you want to kind of test the scale of your pipeline you can randomly generate data using python and so now putting it all together when I I create my pipeline I have all of the Transformations up there in the resources section I have all of the sources down in the environment override section and now my development is super fast and my production is correct and so with that I would like to bring up Dylan who's going to show us how it all works [Applause] hi everyone I'm Dylan Bostwick in the real world I am a Solutions architect at databricks and I like to spend a lot of time helping folks in the field with Delta life tables but for now while I'm on stage I'm going to pretend I am one of you I'm basically a data engineer who is working on a pipeline right so this is actually a loan pipeline right so we're trying to approve loans and I'm working for a bank and there's a change that I need to make right I'm actually working on all of this in vs code and I added this line here DLT expect or fail this is basically saying that I want all of my loans to be newer than 2014 and if not I want the entire pipeline to fail the first thing I'm going to do is check this indicate you can see that changed and let me go ahead and push that to the branch we can see I'm currently working on a separate Branch right so even though we're working with data we're working with a lot of different objects like pipelines and all these different things I'm actually using kind of traditional devops and software best practices so I'm I'm using git flow I've checked out the separate branch and I'm actually working using the vs code extension for databricks to run all of this in my development environment right so you can see on the left hand side like Michael was alluding to I actually have multiple different python files that I'm using and perhaps I might have some SQL files or something like that as well but this is basically how I'm building the loan expectation pipeline in development so the next thing I'm going to do like a good software engineer is I'm going to go into GitHub and I'm going to create a new pull request for the change that I made all right you can see this is the branch that I checked out and you can see that change that I made there so we're actually very busy at the bank that I work at we don't really have the resources or the time to manually do all of the unit tests and integration tests and then deploy all of those changes into production but there's a lot of regulatory requirements where we need to be able to test any change that we make to the pipeline across data that's sitting in staging before we push it into production so what we're going to do is look at the last decade of devops best practices and we're going to do this thing called CI CD right CI is when we're going to do we're going to you know integrate and test all of the data automatically and then of course the CD is when we're actually going to be putting that deploying that into the production workspace so when I actually submit the pull request I'm using GitHub actions here and all of the uh every single push that I'm making is going to actually run some some automated CI checks there's actually two here one succeeded and the other failed and if you look at it the first one is where we do this thing called deploy bundle and the second thing is when we're actually doing the Run pipeline update so it looks like we were able to deploy it but it wasn't actually able to run and when we go to look at the files changed you can actually see in the GitHub action script we have it set up so that in line you can see a big X on that expectation so what actually happened here we ran we ran an end-to-end test on staging and the change that I made actually failed based on the real data and as the developer I can pretty easily see where the issue was all right going back here if we actually open up that action you can see the deploy bundle succeeded these are all the different things we could ignore a lot of the setup for now but we can actually open up this is the part that I really care about this is actually running on the GitHub actions box I run this command called databricks bundle deploy okay that's actually part of the databrick CLI and what you'll also notice is I'm passing this thing called this thing called a databricks bundle environment right and it just so happens I want that to be the QA variable and what that's going to do is it's going to tell the databricks bundle to run the the QA version um of this deployment so the deployment actually succeeded but the problem was with the run so it looks like there was something in the QA or what we call the staging environment where it actually failed when it ran even though the pipeline was able to deploy successfully okay so I'm going to go here I talked to my manager my manager said actually it was 2012 not 2014. we got the date wrong my bad so I'm going to quickly make that change and then I'm going to push that again foreign so what's actually happening what is this bundle thing let me hop over to another file that is actually located in the in the same directory next to all of the actual code that's implementing the pipeline this is our bundle yaml file and it specifies all of the different workspace objects in this modular extensible way where everything has a sort of single responsibility right if you've worked with infrastructure as code or you have any background of devops this looks pretty familiar to you right what we're doing is all of the different infrastructure not just the code itself we're defining in the code so it's a lot easier to to manage okay it's kind of like terraform but for databricks objects right so we have a name for it and if you scroll down we have this thing called resources and we're actually creating this thing called the loan approval pipeline and what you'll immediately notice is the name we're giving it we're going to interpolate that bundle environment variable right so when we're running this in the QA environment it's going to say QA loan approval pipeline but we can also do the same for development and production right so we can do things like interpolation up here these are the things they're going to be shared across the in different environments but if you scroll down we're going to do this thing called substitution where now we're going to Define environments and we're going to start to sort of mix in different properties right so this is going to make it a lot easier for us number one as a team to see how all the different configuration changes between the environment but more importantly it's a lot easier to make those changes when we're going from development to staging to production with a lot less friction so here you can see we're actually doing different workspaces right so we're actually going to be running in different environments with different data sitting in different Cloud accounts things like that and for example development in in QA are both going to have development mode turned on that's of course the DLT setting for development mode which makes it easier to do cluster reuse and if you scroll down to production we're going to make a couple different changes right for example we're going to say we always want to run Photon in production we're going to turn off development mode right so you know in the past you would actually have to go into the UI click the button and there wasn't really an automated CI CD kind of best practice way to do that we can be more opinionated about the kind of infrastructure that we want Delta life tables to manage for us and we can even specify other kind of data bricks parameters like permissions and of course in the production environment we're going to lock it down so we're going to use databricks to ACLS as well okay jumping back over to GitHub again let's go back to the pull request it looks like 2012 actually worked and now both the checks have passed and we can go ahead and merge the pull request all right now we're merged this is going to kick off another databricks bundle deploy into production and we're finished I think everyone can appreciate the green check mark it's a nice dopamine hit for all of our Engineers makes it a lot easier thanks [Applause] okay so if you thought that was pretty cool I you should definitely check out bundles there's actually a session I believe it's after this where they're going to be talking about it in more detail it's this is actually a new feature of the databricks the new databrick CLI uh that I think we're actually just previewing as part of this Summit there's a link to the documentation you can contact uh this uh email address if you want to learn more and with that I encourage you to try building your next data Pipeline with DLT thank you so much foreign [Music]
Info
Channel: Databricks
Views: 56,712
Rating: undefined out of 5
Keywords: Databricks
Id: PIFL7W3DmaY
Channel Id: undefined
Length: 87min 52sec (5272 seconds)
Published: Tue Jul 25 2023
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.