Accelerating Data Ingestion with Databricks Autoloader

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hello and welcome to accelerating data ingestion with databricks autoloader so my name is simon winely and i'm a director of engineering what that actually means is i build a lot of data lakes i spend a lot of time with my clients figuring out how i get all their various bits of data that's coming in at different times every minute every hour every day how do i figure out what data they've read and what they haven't read and why i get that into a lake data ingestion is fairly complicated it's a bit of a challenge now autoloader is a fairly new feature in databricks that allows us to simplify some of that process but there's some gotchas there's some things you need to know there's a bit of config you need to understand before you tackle it so i'm going to take you through some of that in this session we'll be looking at why incremental loading is hard the various patterns we've all been using for years and some of the limitations that we'll have a look at the components that go into autoloader and then some of the patterns that we actually use in production that i've used with some major uh u.s manufacturing companies to actually implement this uh in in real life then we'll have a look at evolution so what happens if my data changes and that's one of the most painful parts of doing any kind of data ingestion is managing schema drift and schema evolution and then i'll share some life lessons some config things that we missed first time around that is important to know okay so let's first have a look at why incremental is hard and there's a few different patterns that people actually use to try and get over this and they're all trying to solve the same problems so firstly we've got this scenario we've got a couple of csv files and i'm loading it into a delta table so i'm getting it into my leg via a load process and that's nice and easy i can just do it in batch and suck all my data up but the challenge comes when the next csv comes along and i have to then kick off the next process and how do i then work out what i have read and haven't read and there's some golden rules that we need to follow for whatever pattern of ingestion we're doing to actually make sure we're doing it properly so a couple of golden rules for what we need our ingestion engine to do so we need to one only read new files i don't want to have to go back and read the files each time if i've got growing and growing pile of files i don't want it to get slower and slower as it re-reads and re-reads all my data i don't want to miss files that's the worst thing in the world if a file comes in and my process just doesn't notice it and i don't see those records and my figures in my reporting is then all wrong i need it to trigger immediately now that's not always a golden rule we don't always have to have that immediate snappy thing but it's good to have the ability if a file comes in and just have that automatically get processed and just automatically stream into my lake that is fantastic so a lot of ingestion engineers are trying to do that so we're going to use that as one of our patterns now speaking of patterns it needs to be reusable so i don't want to have to build a new thing each time there's a new data set i've got a new table my vendors added a new table to their database i've got a new source that might be interesting i just want to run my same script with some different parameters i don't have to build something new each time i get new data sets and i needed to not slow down so a lot of existing approaches they read out the entire list of files they might not read the data but that growing list of files becomes trickier and trickier to manage so they're the problems i'm normally facing so there are three different ways that i've built this in the past i want to talk through just to say here's some ways that you can build ingestion engines now number one that we see all the time is the etl metadata driven place and that's essentially i've got my list of files i've got some files that are currently in that blob store and i'm saying i need to keep track of the last one i read so i'm manually managing that keeping track of the data to say i've read that file i haven't read that one i've read that one i have read that one and then i can just pass that directly into my data frame and say go and load that data load that piece go and do that specific thing that's great and that's nice and repeatable and it's all parameter driven but it's not that reactive i have to polar i have to trigger that i have to manage where that metadata lives so that has its own challenges but that's the most common thing you see all over the place at number two i can just use file streaming to do this so spark can just read a directory of files and i'll go and read in anything it needs to they'll say here's my list of files on one side this is what i have read and i'll keep track of the files it's read so far now the challenge comes in order to do this each micro batch so each time it finishes writing and it starts a new batch it'll read the entire directory structure again compare that to its entire history of files it's read and then work out what's missing and then load that file and that's challenging when we talk about hundreds of thousands of files this long-term landing blob of data coming in and then really starts to slow down when we're doing directory listing and this grows and grows and grows so it has its existing challenges that streaming approach now number three we can build something we can use things like azure aws we can essentially extend our own architecture so we could have on azure land a blob can have a file trigger so i can have it so it pumps out a message that trick is a logic app that logic app decides what kind of file should be triggered calls a function calls the databricks job api passes a parameter in that's fine i can build that but then i get to the scenario of what about with the next data set what about with the the next time i need to spin this up am i having to constantly deploy new logic apps new azure functions new as your infrastructure each time i want to track a new file so essentially we have of our three different approaches we've got the metadata it is nice and repeatable it's fairly simple but i have to pull it i have to figure out how i do it i need to build that manually that's not out of the box with file streaming sure it's repeatable and it's parameterizable and it's immediate so it can have it so it's constantly sitting there streaming as soon as a file lands that'll stream through but it gets slower and slower as my directories get bigger it has some limitations in terms of having to stream it's not very good my data changes so there are limitations in the existing file streaming approach and i can build my diy stitch some things together and it's immediate and it triggers as soon as a file lands but it's not that repeatable so there's challenges all across the different ways we work currently so let's talk about autoloader autoloader is specifically to try and resolve this challenge that set of we've got some approaches but none of them are perfect that's what we're trying to fix so when databricks came out with autoloader this is how they described it it's an optimized cloud file source for spark that loads data continuously and efficiently makes sense that's kind of what we're after so that's from prakash on the original announcement blog but when we actually take a look and compare it to those approaches this is what it's doing so autoloader is storing metadata about the files that we've read so far so it's doing our metadata driven thing except it's doing it for us it's using spark structured streaming so it can do that immediate polling is do the reactive as soon as the fire lands it'll start processing it but it's also using cloud native components it's using things like event grid it's using a message handler it's doing the file watching so essentially autoloader is taking the lineage of those three different ways that we have historically worked and said you know what we're going to take the best of them bring it together into one out-of-the-box solution that's the whole point of autoloader it's working the way we normally work except it's done for us which makes a lot of sense so let's dig into what's actually inside autoloader how it actually works we've got two main parts so there's the cloud files data reader so just a normal spark data we're used to reading data in spark it's a format it's the same as we might have a json data reader a csv data reader we've got a cloud files data reader which is specifically for using autoloader and then an extension to that we've also got the cloud notification services and that's like a little bit of the extension that goes and creates these file watches it creates a message cube and we'll talk about what that actually means okay so in this case we're using the cloud files reader and what we need to see is a queue of data already so we've got our blob storage down the bottom we've got our list of files that we can actually go and read in our landing blob but we've already got a queue set up that has a list of messages and each message is a file that we haven't read yet so we've got this track we've got this list of here's all the files i need to go and read next time execute now the cloud files data reader is expecting that to be set up in advance so when we start we kick off that data frame the first thing it does is have a look in that queue and say is there anything i need to read and then it's going to go yes you need to read file four you've not read that yet and then the actual data reader kicks off reads that json file and processes it as normal so the cloud files reader is always looking for that blob queue if we're using notification services that's the trigger though we can switch and say yes i want you to use a queue rather than do the directly listening rather than work in the same way that spark streaming works but again has to read the entire structure of the directory each time i want you to go look in the queue just read what you need to read and that makes it much easier okay so in terms of the normal data frame there's those couple of bits of information we need to give it so you can see we've got kind of spark read stream it's a streaming data frame always it's not a batch one we've got format of cloud files so we're telling it to use autoloader we're telling it's going to be loading from a queue of messages and then we've got because we're telling it that we also need to tell it well the data still has a format so we've got that separation so we've got these two different bits that we're keeping track of spark's telling it to use autoloader via the format of cloud files but then auto load is telling it to expect json we're saying cloud files dot format the format that the cloud files data we should look for is json and you'll see a couple of those cloud file configurations we'll step into detail with what they actually look like so we're also using cloud files that use notifications and that's the thing that allows us to switch it between doing directly listing which gets slower and slower as things go on and expecting to have a queue and have a list of files it should read so get used to a couple of those different configurations and we'll have a look at some of these interesting ones as we get deeper into it okay so on the other side so when we're looking at cloud notification services so if we're saying yes please please use notification services what's going to happen so in azure we've got blob storage that's where our files are coming in they're just landing into blob as we go now underneath that every single blob store has an event grid topic now that is an automatic file watcher that something's always keeping track of anytime anything happens in that blob that could be a blobs created deleted updated so it's edited some permissions or something in the queue essentially it's a whole event tracker now that's going to fire when everything happens and we can have the same as any other kind of kafka style queue we can have subscribers on that topic so what we're going to say is i want to subscribe to any changes to this event grid when it pertains to files landing in this folder and then what that's going to do is put messages into a blob storage key and then for each different data set that we're tracking so i've got my customers table my product table my sales table i might have different subscriptions set up each of which will have a different queue so when i kick off my auto loader job it looks to the relevant queue and then can just go and grab all the messages saying here are the files i need to load okay so it's going to break that down a little bit more in our blog storage our csv lands as soon as it lands it triggers that message into the event topic and then my subscribers watching that event topic with certain filters set on it and that if it matches that filter that's when it'll go and put that message into my queue so that is all the notification services and you can go and build that manually yourself you can just go into azure and say i want to create a new event subscription here's the filters this is my cue you can build that yourself or you can let autoloader build it for you and that's all down to the config that we set so bit of config so we've got that cloud file settings there's different parameters we can pass into the cloud file settings so do you want to use notifications what's the queue name so if we've already set it up if we've done it ourselves and we've produced a queue and we just want autoloader to use that existing queue we can use that q name go look at that thing or we can say well here's a connection string and then here's some details about a service principle essentially here's how to connect to azure as an administrator who can create azure resources for me and i want you to create an event grid topic subscription i want you to create a queue in a blob store i want you to set up that small bit of architecture to start file watching and keep track of those messages so you do need to have that bit of information set okay so that's a little bit of basics that's a little bit of kind of these are the core parts that we're expecting to see in autoloader so what we're going to do is going to hop into a notebook and have a look at how i set that stuff up inside a notebook kind of what we're expecting to see from the blob storage side of things and essentially what it looks like when we start reading some files in okay so here we have a little picture of visual so this is my blob storage i've got some files landing in here and we'll work through a couple of different examples throughout this talk so starting off with i've got my nyc taxis the classic new york taxis data set and i've got a single csv i've got some data living in my blob store and i want to start loading it so i've got a notebook here which is set up to go and do that and go and read the data in so you see first things first i'm having to tell autoloader the schema of the data to expect now previously this has always been the case and we've got some advancements that means we no longer do that but we'll come to that later so for now we're having to give it a struct so i've got this struct defined in json i'm converting it into a normal spark struct the same as i would with any other data set so we've got this schema that i'm going to pass into my data frame and then got a whole load of secrets so inside my key vault so in my inside my secret store within azure i've got things such as what's my subscription id what's my service principal id what's the secret all the necessary bits of information to connect to azure and ask it to create an event grid subscription to create a cue to do this kind of resource creation on my behalf now i've also got a key that allows me to go and read that existing queue so it's kind of like i'm expecting the queue to be there i know how to connect to the blob storage in which it's going to create a new queue for me so there's a little bit of pre-work a little bit of setup that you need to have done for this to actually go ahead and work so i'm populating those various variables from my keyboard then i'm building up a little bit of a config so i've got my cloud file object and inside that i've got a few different options i'm piling up so i'm just passing in those various things i've just gone and grabbed from my secrets and i'm passing those in to this config file i've got two extra things i've got the format of the pharmacist expecting to see so i've uploaded this as a csv it's expecting to see a csv and telling it yes please use notifications i want you to use a queue when you go and create things so i'm going to run that and then finally because i'm going from blob store i'm going to give it a little key about how to go and connect to that blob store when i read from it normal sparky kind of things okay then we can actually build our data frame so i've got a little variable of the file pass this is telling it to look in my landing blob and to look at this nyc taxi folder inside that container and then splitting up a new data frame i'm using spark.readstream again always going to be streaming it's format of cloud files and then i'm just unpacking that config file into my data frame using the options um conditions using the options attribute so essentially that's the same as you're writing dot option and passing each of these different things in as a different option i'm just kind of packing it all together and sending over as one load of options altogether i can do that and go ahead and create that data frame so it's used that schema so it's got that json schema that passed in it knows what it's expecting at this point it hasn't gone and done anything and then i'm just going to let's do a little bit of a test so i want to say i want to do a quick new data frame i want to add the file that it's coming from and do a quick aggregation just tell me how many rows you're getting from each file so what we're going to see here is it's initializing the stream and now because i've got that use notifications option it's going to go and create some objects for me so actually inside azure it's creating things on the fly it's creating an azure event grid topic it's creating a subscription to that topic and it's creating a cue for me to go and look at see in this case it's using an existing system topic and we'll go and see what that looks like so there we go so i've now got a query it ran it's got a load of rows that it found in there i can see it's doing a normal kind of match kind of thing and this will just keep running so the second batch had no rows in it gonna keep running and looking to see what's happening in there now for now let's keep that idea in mind so a3 a4 etc etc and we'll go and have a look for that and you can see i've got one file with a row count so i'll leave that running and what we can do is step back to our landing area so i'm going to go and upload a new file i'm going to go and grab a file and we can just grab uh we're dealing with taxi number two makes sense and upload that that's gonna go in and hopefully i'll get picked up so while that's actually happening we can think about what's going to happen so the file's going to get uploaded it's going to notice that file has been uploaded it's going to add a message to the queue then that's going to go and get passed back and picked up by autoloader so we can see inside landing spot we've got our events so we can go and have a look at what's actually happening under here we can see we've got a inbuilt system topic and then it's got a couple of different things built up one of which got that a343 actually matches our streaming query so it's created this whole subscription for us on the fly on the back of that existing uh query and then on the other side it's also create the queue so in my log queues i've got a queue for each time i've run a query you can see i ran one earlier and i've got no results in there and that's because my stream is constantly running so if i step back over here you can see it's already updated i've got my second file in there and i've got a new file count can i cancel that so you can see in that case autoloader was just constantly looking at that drop zone we're quietly looking at the messages that are being fed from that drop zone and it will update as soon as something comes in now in reality we don't tend to display over there that's a one-off query it's not going to help us that much we'll be doing something like this we'll be saying i want to dataframe.rightstream i want to do it in delta format i want to append it to an existing one and i can give it a query name which really helps with debugging which we'll look at i can give it a checkpoint now when you're dealing with sponsorship streaming these days especially with autoloader it uses a thing called rocksdb behind the side uh and that's essentially a key value pair so anytime it loads a file it keeps a log saying i've loaded this file uploaded this file i've loaded this file so even if it saw the file again in the metadata you'd go i've loaded that before so it has that kind of nice track of what's been done what point it was at so not only is it clearing down the messaging time and keeping track of what it's done purely by what's still outstanding in that message queue it's also got a checkpoint log of what it's actually processed so i can use this as part of a fairly clear etl stream going get data constantly watch this area push it down there when you're done and that's a real simple little etl example saying i don't want to have to care about what files i've read what files i haven't read to turn it on keep running and it'll just go through and process files as they arrive okay so that was a nice easy example of how we might do some basic etl but it doesn't really show you what we do in reality what we do in production because it's rarely as simple as get the data append it into an existing table so there's a couple of scenarios i want to step through just as to how we actually do some more advanced things and a few little tricks that we used to get the most out of autoloader so as a reminder we're doing this we're saying we're trying to stream from our data when in ucsb how do i put that in now that can be a challenge right because we don't necessarily know when files are going to arrive and one of the biggest challenges we had when using autoloader is essentially what happens if they don't send files that regularly so this kind of scenario i've got a cluster that's turned on 24 7. so that is a fairly expensive cluster there's a big processing distributed engine sitting there waiting for files to arrive and they send one once a day that means most of the day my file is sitting there going well i got nothing to do now obviously i might have lots of other stuff i might use a cluster for loads of other things and be getting tons of value out of it that makes entire sense but what if i have a dedicated cluster just for that i don't want to leave it turned on so we can use the same as with any other streaming we can use it in a trigger mode so we can do things like this we can say well my data frame on a right stream i only want to trigger it once that means when i execute that notebook code it'll initialize the stream it'll go to the message queue it'll find any files it's gone it'll set for micro batch to process them all and then it'll stop and i'm using things like auto terminate in my cluster that's really nice i can just have an etl process that starts that notebook it'll read any files it's found then it'll turn itself off i don't have to worry about anything so for low frequency drips it actually works really nicely as long as you use that trigger once process so you've got a choice you can use it streaming you can use it so it's just always turned on always doing things as soon as the file lands it'll go straight through and you'll start seeing it in your later downstream tables or you can use it as a you know what i'm not i'm still going to do things batch i've still got a batch process i'm going to run this every hour every day every week if i wanted to with some limitations which we'll get to and then it will just look after itself it's essentially saying i just want to use this for the file watching the file management side of things so definitely some interesting use cases we can do that the other scenario is merge now if i'm doing streaming streaming and merge don't really talk to each other normally we've got things like you've got append you've got update you've got complete there's a few different ways of managing streams and there is complexity there because that's expecting you to have kind of a stateful stream i mean you're trying to do things like drop duplicates or merge into an existing delta table there's real challenges there so if we're doing this autoloader thing to try and suck data in automatically and actually land it in a usable state what happens if we want to update the records what happens if you want to upset the data and we don't want to have a flat append so in that case there's a little normal streaming trick which is for each batch so i can save my data frame write my stream and then for each batch and then i give it the name of a function so i need to define a function saying expect an input data frame and that is a micro batch data frame so each time the streaming query goes and does a micro match each time it kicks off a separate little bit of processing it'll actually kick that data frame over to this function and then run as a normal batch so anything you do inside a normal batch process writing to multiple different outputs writing to things that aren't an existing streaming sync doing a delta frame merge you can include inside that um function so by building up that function and then accepting that inside that for each batch actually suddenly you unlock being able to do all the things you can do in a normal batch process it's just you're having to wrap it in a function so we use that quite a lot so that ability say i want to use autoloader and i wanted to merge into an existing data set it's pretty cool there's a lot we can do in there and that suddenly locks a huge amount of functionality in terms of getting not just into that first landing bronze layer of your leg but actually your ability to do streaming into things like your silver and your gold curated areas of your leg because we can do things like merge things properly so we'll have a bit of a look at that next we'll say how do we actually turn it so it's bad we'll have a look at that trigger once we'll have a look at going what happens if i've got an existing set of data in a delta table and i want to use auto loader to merge some updates into it what does that actually look like what does that for each batch function look like and then what do we actually do with logs how do we actually see what's going on inside there and i'll show you quickly the spark streaming ui and how we can see what's going on inside those queries okay so we're back in the azure portal we're back in my blob store and this time i've got some employees so i've got a basic employee csv and that's i've loaded this previously into my delta table so let's have a look at that so i've got my autoloader too again same again i've got my schema defined at the top just want to go and create this make sure that's all happy and i've just bundled all of that setup into one cell so i've got my schema i've got getting the secrets from my key vault i've got building up that cloud file config bits and i've got my blob store i've just combined all of that into a single thing and we're then gonna run my data frame as normal that's nice and easy in this case because i'm reading from a csv i can actually include things i would normally include in a csv batch data reader so i can say well headers is true i don't want you to i want you to ignore that first row i can do things like bad rows or bad record i can play with some of the normal settings in conjunction with some of the cloud file settings so i've got that going in and before we actually kick things off let's go and have a look at what this currently looks like so i've got this table stored as a delta table i've already created this data table in advance and i've ran that first file through so you can see inside this data we've got some people i've got some departments some salaries a variety of people i think from public fiction um and what i want to do is i've got this extra csv so i've got this other data set some people and this is some updates to existing people so i can see gme gimmick i think at the top it's going to get a big whack of salary added so he's going to match some existing employee ids and there's going to be some new people it's a normal update insert upset kind of operation so i'm going to be able to take this csv drop that into autoloader and have that table become updated with those values okay i want to do a merge makes sense so i've got my data frame i've created my data frame which we said was normal i've passed in my cloud files options i've got those various settings i've passed in the schema it's all good to go um then how do i get this working so i've got these two bits i've got my upset to delta function and then i've got my stream query so let's do that weirdly let's do the screen query first so i'm saying dataframe.rightstream it's in delta format i'm using a pen mode i just want to send all the data through i don't have to try and aggregate i don't want to be to look at anything of the existing stream basically to treat each incoming file as a separate chunk of data to be processed independently and i'm doing for each batch so i'm saying for every single chunk of data you try and process go and run this function and that's what we've got in that cell above that upsell to delta function which we'll have look in a moment i'm giving it a query name very useful we'll see that in logging i'm saying trigger once is true so in that case i'm going to kick this off it's going to start running it's going to do a single batch and then instead of the first one that we saw where it just continued running and if i drop a new file in that automatically get processed this is going to run as soon as it's finished that first micro batch it's going to stop it's going to turn itself off and that means we've got a little bit more control about using it as a batch process using it incrementally whenever i want to trigger it i'll just hit go and it'll do anything it needs to do since it last ran i'm also again giving it a checkpoint so it's got somewhere to store that data and then we're saying start and i don't need to give it a path weirdly because that's handled inside this absurd delta function so that's our stream query action so let's have a look at that upset to delta so i've got a function up here so i'm having to import the delt tables because i'm doing a downtable merge it's expecting these two inputs so it's expecting the first input being a data frame of some kind call it what you will and the second being basically an integer which is how many batches am i on what number of micro batches the streaming stream being running on and that will continue incrementing even if you stop it just going to use the check points to work out where it's up to so i'm saying i want to go and grab my delta table from an existing registered table and then just a normal straightforward merge i want to merge i'm going to alias my table into an s i can do that they both got employee id so i'm giving it that match criteria so it knows what to merge it on and then if it already exists just update all the records if it doesn't exist insert its new record the most fog standard basic of merges but that's all i want to have okay so let's run that first so we've registered that function i can now call that function you could have a generic function that you call from many different auto loader jobs it's kind of up to you how that works and then we'll call that in a second what i'm going to do first is make sure i drop in my new file so i'm going to go grab my data and i've got my employees i'm going to drop in my employees 2021 may upload that file much quicker in a much smaller file and then that should now trickle through now you saw when i went into the first demo i already had a queue set up that was this one so it's already got the the q mechanism the event grid handle all the stuff it needs to actually do this so we should have to kick off it should realize it's got one file with i think 10 rows in there run that through and then we can do another selection we can see has successfully managed to merge my updates into my table and it should then stop okay so it ran it ran a single job my raw data i could see it's the batch id1 so it's not zero so this is the second time this is run so i know it's managed to get from the checkpoint the previous history and knows what batch id it's up to number of input rows 10. so this is from the update table it's managed to put some stuff in i've got a load of telemetry i've got a lot of information about what happened inside there which is super useful and then we can go and rerun this so now let's have a look at my employees table i'm gonna see so jimmy dimmick has been updated he's got that extra salary i've got some new rows for people who didn't exist previously essentially i've managed to merge in my stream of data that's coming live from a file that's just been dropped in with no config about having to worry about metadata of what i've read what i have read and all of that kind of stuff and suddenly that becomes really really quite useful to be able to just turn this on and start merging things in and then managing that data now as a final other thing i want to switch over to the spark ui over here um which has the whole spark streaming um ui and that can be really hard to find things in be able to find out what i was dealing with when i've got all my display queries now that's the reason why i've named that query so because in my thing i had that query name saying dies merge i can go into structured streaming and i can see my nice labeled query it can see the run id then i can get some stats about it so i can see it around that previous version of it there and batch one so i get a history of it running and running and running so always fantastic idea to make sure you've named your queries and then you can start pulling the logs out from that i can also inspect that job okay so that gives us a fairly good idea of some of the more advanced features when we're seeing about how do i merge into things how do i plummet into um things when i've got kind of existing data but that still doesn't really give us a huge amount over existing structured streaming i can use for each batch with a structured stream i can do that merging approach got all that logging already with structured streaming so at the moment the only real advantage that we've got is the fact it's not doing directory listings it's keeping nice and efficient and quick until we start talking about evolving schemas now this is a real big when we're talking about essentially if you've got a disconnect between the people sending you data if you're essentially you've got a storage layer and you're at the mercy of what people are sending you over schema drift can be a real hard problem so we've got things like this where we've got some json files so i've got a basic i had an id and a product name and then i had an id product name and a size and then i had an id a product name a size and then some nested structs and some more details and machine washing and all that kind of stuff and essentially that gradual growth of data complexity can be really hard to look after it can be really hard to manage so that's what we're talking about when we're saying schema evolution kind of getting extra data what do we do with it so we have a choice how do we handle that well we can say ah no broken stop it and we just fail the stream and then we have to go and fix it we have to break it we have to reload our data and go and change around all the schema that we're passing into it so it updates the new version of things and then figure out how we get that into our existing data set we manually intervene we can go and update our table so it looks the right way or we can automatically evolve and that's the dream right that is paradise if i can just keep my stream running and it will just automatically figure out oh there's new data that's cool we'll we'll capture the new data we'll put it somewhere else which won't fail i mean that's that's really really kind of nirvana but dealing with this ongoing schema drift problem so in order to do that we kind of need to have a few bits of information what is the schema expected to be and that in the previous demos we're giving that in at the start we're saying this is the schema this is the thing i want you to deal with then what the schema is now so actually if that was my schema and this is my schema now do they do they still match do they not match and then what to do in the case they don't match so we can kind of dig into that and work out what's actually happening there so schema inference is brand spanking new as of the point of recording so databricks runtime 8.2 which is currently the most recent database runtime allows you to turn off that schema input into a data frame allows you to do schema inference so when you start that autoloader stream it'll take a look at the file and go oh that's what it looks like and generate a schema for you the same way if you're doing a normal data frame and you turned on the infer schema option it allows you to do the same thing but inside of a stream and that's super super useful and then we can start to manipulate actually how well it figures out what that schema looks like so by default it's going to assume everything's a string and there's a few things it does with it so we've got a schema location that's where it goes and stores the metadata about the schema that it inferred so unlike when we're dealing with batch data frames and it infers a schema it doesn't go and put that anywhere that's just used for the lifetime of that particular um session whereas here when we're dealing with streaming and we need to have a reference of what did we think it was what do we think it is now it goes and puts down some metadata about what it thought the scheme was and we can say i want you to infer column types so should it just go well here are the columns i'm going to treat everything because they're string i'm not going to worry too much about it or do we want it to do some data sampling and go well that looks like an integer that's a long that's a decimal and actually put some intelligence behind it or finally we can have schema hints so i could say well just go and infer most of the schema but if you come across the product id that really has to be along because we've had problems with that in the past so we can not don't have to provide the full schema but we can kind of nudge it in the right direction for certain bits and pieces of it so we've got some things that we can do which really allows us to get on top of schema inference inside of autoloader so in this kind of case so we're dealing with that straight file the first time we turn on autoload the first time we run this thing it's going to take a look at that initial file and go cool let's go two columns it's gonna store that as a metadata file inside my schemas folder they're gonna have a zero file and it's gonna have a little bit of json in there that says there's a bit of like metadata wrapping then it'll say well here's my struct here's my spark struct object that it's going to expect to use as the data frame and that's an id it's a string it's a product that's a string again as i said it's going to assume everything is a string unless we tell it otherwise so that is new that whole idea of having a schema meta store that keeps track of what it currently thinks the schema is is fairly cool now if we're not happy with that we think well strings strengths aren't good i want you to do a bit of work to figure out what that looks like then we can say well infer column types we can add in an additional column cloud files in for a column types option and then it's going to say well that zero metadata it generates it's going to be a bit more intelligent so id it's gone that's an int product name that knows as a string it's going to go and do a little bit of sampling of some files and we've got some options about how much how many files it should sample how many bytes it should read before it goes ah that's enough i'll work out the data types from there and it'll do that for us but if we're still looking at that going no we know better um that id that that should be a long then we can use schema hints so that's an alternative to allowing it to do in first game or on top off and we could say we can pass in essentially a little bit like a sql table definition we could say int is a long comma product name is a string comma and we can just give it a comma separated list of column definitions and it will take that and go great i'm going to use that to improve the metadata i'm generating so in this case i'm saying schema hints are on id should be along and then it's going to generate the scheme same schema file but just update the data type if it found a column matching the one i gave a hint for so we've got a lot of options about how we do this schema inference and how we're going to deal with it we're going to just tweak and adjust this metadata object that's created and that's okay that's that's useful that saves me the effort of creating that schema that i pass in when i first turn on autoloader well that's not the really cool stuff that's not where we get the most value out of this where we're dealing with the interesting stuff is when we talk about schema evolution and we've got some options so we said at the start kind of when we're dealing with this stuff we need to know what we thought the schema was what the schema is now so schema inference is doing that for us metadata store is doing that for us what we should do if the schema differs is where we come to these options so we've got a few options about how we can tell it to work in a new schema evolution mode so we can say well add new columns so if i in further schema doing a micro batch and i check the schema of the file i'm opening and it's different to the schema that i'm expecting what it's going to do is one it's going to fail weirdly it's going to fail the job it's going to go to that metadata store it's going to create a new object with the updated schema and then when i next restart that job it's going to pass it's going to work so it is stopping the string if you're leaving this in streaming mode it's going to break you're doing it in trigger once mode is going to break but then you can just restart it and it'll then recover from where it was with its updated schema information so add new columns updates the schema for you fail on new columns doesn't it just start good no broken you need to go and fix it that's kind of your manual intervention you can go and edit that schema object you can create a new schema object essentially diy go and play with a metadata yourself i can also do this thing called rescue now that is really cool and i love it and that is when instead of actually changing the structure and changing the schema you kind of just have a dumping ground column and anything i don't expect anything doesn't conform to my expected schema just put it in that dumping column so i don't evolve my schema i don't change my schema but i just have this kind of dipping ground where i can go and write some questions and explore what's been held in there without affecting the rest of my schema and that is a super usable functionality when dealing with unexpected drift or i can say non ignore if it doesn't match my schema throw away those extra columns i only want these bits i'm only ever going to want these three bits of information from that table if it gives me five bits of information ignore the other two so it's up to you how you deal with it as a quick reminder of the things we're looking in terms of evolution we've got those three files so we're expecting id and product name and then we've got id product name and size and then we've got a load of extra information if we're dealing with rescue if we're saying i want a rescue column we'll end it with something that looks a bit like this so i'll have my three records in there and i'll have idm belts and nothing in rescue data because we didn't rescue anything in my second one i'll just have size excel as a little bit of nested json basically it's a jason string that i can then go and query and pull bits and pieces out of and then the same for my third one despite the fact it's more complicated it's got a nested struct in there it's got some complex things inside but that complexity has all been hidden away and pushed down into that rescue data car now that does have some limitations in terms of if you're trying to get something really optimized and using bloom filter indexes you're always looking for file skipping using zed orders and all that kind of good stuff you can't really do that if the attribute's hidden inside that rescue data column but if you're just trying to create a plain querying layer but still allowing people to go in and query those bits of information inside that without worrying about schema drift that is awesome so that's really really one way of doing things the alternative if we're talking about this add new columns we'll get this kind of pan so when it fails on failure you'll go and create a new mess data object which represents the new schema so next time i start it it won't go to that zero file i go to the one file and go hey i'm now expecting size i can now see size as part of my data frame and it updates and caters for it so let's go and have a look at those things and see when we're using autoloader and we've got a changing evolving schema what does that look like and how do we play around with some of these settings to change schema inference to create that schema meta stop and then how do we deal with schema evolution okay so we are here back again in the azure portal and i've got my product list now so i've got a real basic bit of json sitting as a product list kind of getting that product id name color list price some basic information right i've got my one file in there now got my autoloader number three and you can see the first things i can do i can play around with some settings so if we're gonna use schema inference i can say i wanna use a number of bytes to read or number of files now in this case i'm using tiny data it's a couple of json files doesn't really matter but each time you're going to go and read and infer the schema it's going to have to read a certain number of files so how many files you wanted to do each time how much of a rolling window according to the last modified date in the actual blog you can have a play with those settings and get it tailored to whatever you're trying to do and then normal stuff so i'm going to go and get my secrets i'm going to go and create my cloud file config i'm going to go and connect to my blob normal stuff and then this time i've got some more configuration options i've got a whole host of extra options i can use to determine how we're doing schema inference so number one i've got that schema hints as an example so if i want if i knew something about my data and i knew that product id was an end i knew that list price was a decimal i could include this option to say here's some hints this will help you figure out where you're going what you're trying to understand in terms of your data instead i'm going to use it to let it infer i'm like no you can sample the data you can figure things out it's all right i trust you and it'll go and figure that out for me now if we're doing schema inference we have to give it a schema location so that's somewhere to go and store that metadata file to go and create a little bit of json that describes the schema so whenever i do a micro batch whenever i stop and restart the stream it knows what schema to expect i've got some extra things i can do partition columns so if my landing table if my source data set had actually had a folder structure that represented some kind of hive partitioning i can tell it to infer the columns into my data set uh i'm not using that at the moment and then we can talk about what our different scheme evolution modes are so how do we want it to deal with different data sets so for now let's actually start off with the rescued data and again with that i can say my schema evolution mode is going to be rescue so in this case if the data ever veers away from what i'm expecting just dump it in inside this rescue data column and i can change what that's called so i'm leaving it default i'm just leaving it's called rescue data if you want to change the name in that column you can go and have a play with that and then we'll have a look at those two later so let's stick with that as our basic bit of information about how we want to set things up so we're going to infer columns we're going to store this uh the schema in in our lake i'm going to use rescue mode when we're doing it so let's go and do that i'm passing in our cloud files and our drift config into it so you can see it has already gone and inferred our columns so it didn't have a structure when i inferred that's the structure and we've got this rescue data it's automatically pended this extra column onto the end of it knowing it's doing some kind of schema inference so we can say what does that look like and go and kick that off now while it's running so that's going creating a cue for me doing the messaging doing all the normal auto loadery stuff but i can actually go and have a look inside my lake inside this tokyo drift autoloader 3 uh and created the schemas folder for me automatically you can see i've got that zero metadata it's created an object because i kicked off that job it's gone created that for me and have a look what that looks like and there's a little bit of metadata kind of just saying it today to schema and all that stuff but then it's just the normal struct so i can see color i can see list price it's just a normal json schema that is inferred on my behalf that represents the same thing that we've got inside this particular structure okay so we can see shockingly enough nothing has been rescued and that's because i inferred my schema based on this file shock horror the file hasn't changed schema so let's go and upload a new file i'm gonna go and grab in here i'm gonna go and my final one is in drift now upload this 2020 version and push that in we go nice and quick that's gone in so what we should see is when this actually goes through and refreshes it's going to go and push it into that rescue data column so next time it actually runs a micro batch we get this update what we're expecting is to see a load more records so we're just doing this play we're just saying we're not trying to merge or anything in this one we should see a load more records go into that particular data frame that actually then has a load of rescue data so because it had extra data it had weight and it had size it had some attributes we weren't expecting to see it's fine we've just rescued them so the rest of our data that conform to the schema that's fine we can see it and we just populate this rescue data with the things we didn't understand and did and things that we didn't see it's not going to change my schema at all so i've still got my schema at zero there's no change to that it's just handled the drift like naturally so let's go and have a look at the other version of doing that i don't want to rescue it now what i wanted to do i wanted to update my schema so when i add new columns i wanted to go and fail and then go and tell it tell me that my structure's updated i'm going to run this again so it's still got that same one it hasn't re-inferred the structure because at this time it's got a schema in the metadata so it's not working out fresh it's going no this is the structure i'm expecting so then go run from that go and run now we're expecting this to fail this is going to say no no no there's columns in here that aren't in your schema that's incorrect so once it's initialized once it's got to the queue once it starts reading the data in we're going to see it fail so encountered unknown fields it's got weight and size it's not expecting to see that but because we're using add new columns it's gone in here it's created a new schema so we've got a new json file now that in that structure should have towards the end there where we've got it we should see weight and size have now been added to that structure so it breaks the stream immediately but then next time i actually run that data frame it picks off the latest schema so it's now got size and weight in there and then that'll just work so you've got these two different approaches do you want to put a pin in rescue data and say just put everything in there and don't worry about it or do you want to actually evolve the schema and represent the newest schema and have that moving description of what your schema looks like but it has a failure basically you have to restart the stream each time and both of them have their pros and cons as to how you like working and what you wanted to look like so you can see we've now got size and weights are in there nothing's in our rescue data because we're now using that updated schema okay so hopefully that is fairly inspiring in terms of yeah we can go from what we started off with our something fairly similar to approaches we've used before but we're building on those we're adding some more stuff to it we're enhancing it with all things like schema drift and scheme of evolution out of the box and suddenly becomes a much more attractive proposition now there are one or two gotchas i want to make sure you guys are aware of when you're using autoloader kind of because of the nature of some of the systems now some of these recommendations are very much in the azure side of things so they will be different even using this in aws or gcp so some of the lessons one event grid quotas we didn't even think about this but you need to be aware of what the limitations are for the number of different objects you can create then there's a few settings and tweaks you can make when you're doing streaming and when you're working out with batching that you just need to be aware of okay so first things first some out of the box quotas so if you're working in azure custom topics for um as your subscription that's something that we're not too worried about in terms of how we've been working we've just let autoloader use a system topic so a blob store or a storage account when you create it automatically creates its own event grid subscription that is a system topic now we can add new existing new topics on top of that existing storage account but we can only create 100 of them so if you go in the custom topic route be a little careful but that's generally we can use it to get over the next limitation which is event subscriptions per topic so if i've got my storage account and i'm creating multiple different subscriptions so for this folder structure for this folder for this folder for this folder and then creating lots of them as i go i can only have a maximum of 500 subscriptions to that topic and that doesn't seem too bad but if you think about a huge enterprise style data platform it was saying we can only have 500 different data feeds coming into it 500 different tables i'm ingesting you're gonna hit that limit especially when it's good to know when you're doing things like a display when you're just quickly running something to check something that's gonna go and create its own one-off queue that's then abandoned and left there you need to be cleaning up managing those services so be very aware of those limitations now based on that so we can have 500 files from a single store be a little careful about that we can if we delete the checkpoint folder so if you're writing out to a delta table and you've got the checkpoint metadata that stores inside there that has the id of the streaming query inside of it and that natively links it to the queue and to the topic if you delete your checkpoint because you want to restart from scratch you want to re-pull things in it's going to ignore the existing queue and topic and start a new one and then suddenly that 500 that you had just gets chipped away with orphaned records being left there now you do have the cloud notification libraries it's worth checking those out and that allows you to say actually i don't want to use a system topic i want to use a custom topic so i can actually instead of having 500 i can have up to 100 custom topics across my subscription each of which can have 500 subscribers so actually that suddenly gives us much much much greater scalability for how many things i can read from a single storage account and it allows us to tear down ones we're not using so dig into that that's very very important if you're trying to run this at scale and you've got thousands of files and thousands of different data feeds you're gonna need to do some custom topic management and some essentially maintenance and cleanup of orphaned notification services okay so streaming wise there's a few things to do and it's all normal streaming stuff so you've got things like match spikes for trigger and max files per trigger so you can do that outside of the trigger once trigger once will override those settings but if you're doing it and leaving it turned on as a stream and you want it continuously doing that you can say well how often should you do micro batch should i actually micro batch per file and so you can treat it independently in your for each batch or should it go for each chunks it depends on how many files you're having coming how big they are how often they're coming in but you can tweak that and you can manage that you can combine it with the trigger settings rather than saying trigger once you can trigger every minute trigger every 10 minutes trigger every two seconds you've got a lot of nuance to how you actually set that stuff the other thing to be aware of is fetch parallelism now that is how many threads it's going to use to read your blob key now that's not a big thing if you're dealing with a handful of files it's going to use one thread by default and go off and just get a bunch of messages back but if you've got a very very heavily used thing that's got thousands and thousands of files going into that queue that's going to be a big bottleneck in itself you're having one worker just trying to read back all these messages to tell autoloader which files to load so you can manipulate that you can have a play with that to turn the dials of how often it micro batches how often it triggers what's included in each trigger and then how many threads it's using to go back and read that cue so you can go fairly deep into looking at that and you need to be aware of those settings if you're leaving it turned on as a stream now on the batch side of things um there's a couple of real gotchas now on a event quiz subscription there's two settings that are very important now you've got this storage queue message time to live essentially when it's picked up the fact there's a new file and it puts a message into the queue if you don't touch that message for a week it gets dequeued it disappears so if you're running this as a batch mode and actually you pause it you pause for maintenance you pause over the christmas holidays and then you come back and try and start things again if it's been longer than seven days you'll have lost those messages with the default settings now fortunately autoloader does use the default settings when it's doing it doesn't provide a number of days and the default is seven now you can go and update that you can update it via the python cli you can use the azure cli you can go and update it manually various different ways you can do it but you can set that never expires in which case it suddenly then becomes if a message goes into your queue it's never going to disappear i'd love if we could just update that as part of the cloud notification service don't think you currently can the other thing is a thing called dead lettering now that is if for some reason your event grid subscription can't get the message into the queue it's going to retry i think 30 times by default but if it fails if you can't get there for some reason the queue is not available it's just going to give up and go okay well that message is going nowhere unless you've got dead lettering turned on and then it's essentially going to store the event somewhere else going well here's a store of all the events we didn't manage to write down properly again that's not a setting you can get to from the autoloader settings but if you're running in batch mode you're managing this kind of stuff you're using it in production trying to get production grade system you need to be aware of some of these settings so that you can then actually put a little bit more robustness behind it a little bit of tracking especially if you're going to be pausing things now kind of makes sense there hasn't been that much consideration of that kind of stuff given autoloader is built from a streaming perspective it's expecting to be turned on and constantly streaming so the chance of a message being in a queue for seven days is incredibly unlikely but definitely worth being aware of that stuff so in the whole of things autoloader we've seen it does reduce the complexity of figuring out which files have i read which ones haven't i read we haven't even thought about that across all of those examples we're just dropping a new file in and then it magically appears in our data friends it's magically going through and working but it does add some complexities where we might be just previously just taking a data frame and merging in we now have to use some for each batches we need to be aware of streaming to be aware of things like stateful streaming if we're going to do drop duplicates we need to do that inside the for each batch so it does it just on the new files not on the whole stream including all files that have been streamed before there's a few quirks about how we're actually building these patterns depending on how often we're actually running it one of the biggest killer features for me is the steam revolution and the steamer drift and actually that ability to say i don't care what's going on with the data just always run no matter how much the data's changed get it into my initial tables and capture the change data in a separate place don't lose the data don't throw it away put it in a place where i can handle it so despite some of the extra complexity we get from the streaming elements you get a huge amount of benefit from the schema drift and the schema evolution elements and it's growing and growing and growing in terms of that functionality that we have at our fingertips okay so that is all the lessons i have for you again my name is simon wiley and i do this stuff a lot so feel free to reach out on twitter and emails and whatnot and i do do a lot of this kind of stuff on youtube so feel free to drop by if you've got any questions or if you want to look at any more demos digging into this kind of stuff now as always feedback is super super important to the conference all of us speakers really really appreciate any kind of feedback that we get so don't forget to fill it in and i hope you have a fantastic rest of the conference cheers
Info
Channel: Databricks
Views: 5,454
Rating: undefined out of 5
Keywords: Databricks
Id: 8a38Fv9cpd8
Channel Id: undefined
Length: 59min 24sec (3564 seconds)
Published: Thu Aug 19 2021
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.