Optimized ETL's with QueryDatabaseTable and PutDatabaseRecord | Apache Nifi | Part 10

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
welcome back i'm steven and before we get started today i'd like to thank you and everyone else watching these videos for uh taking the time to go ahead and watch them and hopefully you're getting something out of them as i enjoy making them as well and uh thank you for your comments and your questions and your feedback as well in those comments they're great to read and i enjoy taking the time to try to solve or figure out how i can help you with some of your questions as well uh if you want to help me please take your time to hit that subscribe button and the benefit is you can get notified the next time i have a video out if you want to as well when you do subscribe and uh leave a like as it does help the videos spread across the youtube there and maybe there's someone else out there who might benefit from these videos spreading out there and helping them solve the problems they're looking to solve all right so in the last video we covered the how to basically do etls with a generate table fetch and it's a very powerful tool in a very powerful processor i mean because it breaks up our the source table or the source data we're trying to retrieve data from into partitions based off the size that we'd like so it makes it easy to handle it can solve the problem of out of memory errors that we can get in ifi if we consume and over allocate ourselves by trying to consume too much data at once if we don't have the resources available on our server so it could be a good way to manage our resources and just keep things moving smoothly and allowing us to insert the desired amount of records at a time into our tables all right so the next one we're going to look at which is another way about going about doing etls or just getting data from tables in general so this is our query database table let's go ahead and grab that processor [Music] query database table there are two versions of it there is a record version as well so keep that in mind that is a that can be a benefit to using this one you can use either the normal one which just outputs the data and the flow files into avro format or you can use the record version of it which will let you pick after you set up your record reader and record writer or actually in this case would be the writer only right so you set that up you can pick the format you want to use and then you build your schemas for that as well there is a benefit where you can you have the record version of the query database table available but inside of a generate table fetch that's not available over here uh you'd actually have to if you wanted to do that you would be using the execute sql record so sql this one right here so that would still take in your sql statement flow files from the generate table fetch and then from here you could go ahead and automatic convert it to json or csv or any other format that you're looking for in order to get it to your source destination or to your destination so let's go ahead and continue now with our etl with a query database table so we're gonna be working the same data set again nothing's gonna change there let's move this up okay so first thing we want to do and we'll look at two different ways to mess with this one but first we are going to do our standard stuff so we know we want to set this one up and to set this one up we are going to use sql server again we'll go ahead and specify the mysql sql and then the table name is still going to be active flights we are not using in this case for this example we're not going to be using the column return or the where clause or the custom query now we could do a quest custom query here uh and it could be very i've used it i use a lot actually when um like if i have tables i have like audit uh audit timestamps on the end i don't really need those from me because those are only my for most of time they're valuable only for that table and that source table for it keeping track of the audit information where i would have my own set of audit information inside my own table to keep track of so i don't need to bring that one over from there so a custom query to make sure you're only getting all the tape all the columns except for maybe those type of columns is very helpful uh we want to use id again as our maximum value column and just reminder this allows us to this allows the processor to keep track internally of the max value for the last uh row that i retrieved and it can be a comma separated list as this description says in a hierarchy type of structure you want to test those out and make sure they're working the way you want them to if you do decide to add a common separated list to this make sure you're getting the results that you intend and then the effect size so in this case we're going to limit uh not the fetch but the max flow file per to 100 and the reason for that is it could be very dangerous to do too many at once you can easily so when we're doing this one versus the generate fetch we're partitioning here when we go to run this one over here this query database table is just gonna grab everything in that table on the first try uh it won't be until it's caught up where it starts to get incremental right so we want to be careful how we set this one up first time or uh or we could run into very we could easily cause that up memory issues inside of wi-fi if we're not prepared on how to handle that take down our entire knife environment have to go restart it and just deal with hassles we don't want to deal with you have some other action options as well just like normal uh let's go ahead and leave this alone though so we're done with that one we need to handle the no success is good we're gonna schedule this for every five seconds and it will kick off again all right so from here we know we have avro coming out of us so we need to convert the avro to json so we can prep it to go into a new table and let me move this over i don't know for um put on the side there we go that one's there just trying to keep it nice and compact today now from here we know we need to take it into json format so that's where we're gonna do our we're taking the json now which is the output and we are going to turn into sql all right so we have that there and like i said we're going to do the json sql grab that success done now right now the way this is designed that we have set it up is we query we're gonna get flow files with 100 rows each so it's gonna it's going to do one big query where the table is like almost i think it's above 200 000 now so our 200 000 records there are going to get queried all at once but when the flow files get built and generated they're going to be rows of 100 so one flow file one under rec 100 rows inside of it right now there's a problem right now which is uh the json to sql this really only deals this can only deal with uh or the put sql only deal um one flow file with one record each so oops i didn't grab the right one so where that becomes a problem is if we try to just do you know let's put that wrong if we just do it like this and we leave things as they are this is not going to give us the desired outcome because we have 100 we have a hundred rows in a flow file so this convert will easily convert this for us and give us 100 comma separated or give us uh json that is row by row right uh or depending on how we set it up the convert will not do that so let's look at the usage on this convert so ideally when we use it we send one flow file in we get one result out because it converts the json into attributes and you can't shove multiple rows into one attribute so you have to do one to one now if we look here in this it tells us um you need to do a flat it needs to be a flat json so we got that consists of single json element for each field back our for each field maps and signal type and then it does get to a part where [Music] uh okay so if the input is an array of json elements each element in the array is output as a separate flow file to the sql relationship so this is telling us that we have a hundred we're bringing in 200 000 here we're splitting them out as one full file per 100 so one to 100 uh this is not going to do anything it's just gonna leave it as is and then when they go into when the profile passes into the convert json to sql it's gonna see if it's if it's um passed as an array of json then this is going to automatically break that array of 100 into flow valves into a one-to-one relationship so now 100 will come one will come in at 100 and we'll get one full file will go in 100 will come out that's what we expect as the outcome in this case now that only works if during this convert stage from avro to json we make sure that we leave it in the default of array if we put it in none it's line by line and they're comma separated but if we leave them the array then they'll be they'll be encasing a container or the array container so let's go ahead and do that because that's desired that's what we need so let's terminate the failure terminate the failure in the original down here very original what the name is oh set it up okay so set that one up we're doing an insert this is going to be to let me look at my table real quick so i have a new one called active flights temp 2 for this data and oops [Music] is our catalog game and everything else is good to go all right so let's take a look and see what we get from this so first thing is we run the main one i'm going to stop it right away oops what did i miss okay so i set this up wrong um and you make sure oh i do want to in this case i want to use a custom my custom is going to specify the table name and the catalog name together so i'm just saying select all from aviation.activeflights that's what i want to do for this one go ahead and apply that one now we can go ahead and run it so the timer it doesn't have a max value yet because we haven't ran it successfully so we can go ahead and start it stop it real quick just because i don't want to run again it's running we have two tasks right now running they start and stop there we go so we're almost at 200 000 in there so at 100 each where we have 1932 we go and take a look at these flow files uh we can look at their attributes we can see the max value id is being recorded and added as an attribute here and we can see the row count inside the slow file is 100 so exactly what we're looking for and then we do the convert keeps them in the array container and now let's go ahead and set this to one every five seconds real quick so we have 1932 here we're going to take in one and we're gonna get we should get a heart out there we go exactly what we're looking for so there's our expected outcome go ahead and take a look at that now if we look we can see this is just one in here uh we can't see it because attributes now so we looked in here and we have our attributes applied now so there you go the values the types are all there all good to go which means that excuse me now we can go ahead and insert these into the table set this one up oops wow that was wrong just like that we're done we can terminate everything because we're not handling these and there we go we should be able to make these hundred go straight into the table and let me check the table i'll check the table screen real quick and we have 100 rows in there so that worked perfectly we can go ahead and get rid of that artificial five second weight start it and we can see flow files just flow on through and the bottleneck becomes the put sql uh processor as it's only processing 100 flow files at a time and a batch of 100 because we have that set for about size of 100 so 100 full files get inserted at a time into the table so this is not the fastest way we can do this though inside of knifei uh there's let's look at another way that we can handle this one and maybe make it go a little bit faster so the way we're going to do that is we're going to stop the processors because we don't want to wait for this by the way i have updated to the newest version of nifi so the most important thing to me in this new version has been the long waiter for where i can right click on the canvas now say empty all cues this is i tell you what i've been dying to have this option and i'm thrilled to have it in here now so there we go we've cleared up all the cues all at once and from here we're gonna modify things and make a little change on how we handle this so we can actually cut this to a two-step process and i'll show you how so from the query database table we're going to now add a put database so i don't think i've used it before in a video but i use this all the time at work i mean this is one of my favorite processors to use because this allows me to handle bulk flow files uh or let me show you what i'm doing here so we right click on this one we say view the usage and in this one we have the put database record processor uses a specified record reader to input possibly possibly multiple records from an incoming flow file unlike the other one this one can handle all by itself a it can use a record reader to take in any format we want in one state as part of the processor so it cuts out the extra processors and does it all in one and at the same time this can deal with multiple rows at a time no more having to split these out so it will ingest them and just deal with it which is exactly what we're looking for so we want to take the first of all we need to reset the counter on this because we're going to put these back in the table i'm going to truncate the table same time so i'm going to delete this copy paste there we go that new one will have a zero value we'll go ahead and connect it here just because and just so we can still flow and show the difference how long it takes to process this all right so we have that there and yeah i know it's not straight there we go all right bring this one over here a little bit so this is still important we don't have to change this this is just fine we know it's still it still outputs an avro format flow file for us right so we can take this and build that um the difference here will be is as you can see we're going from here to here two processors doing the work of four down here on this side right that's all we're going to need so over here we need to select our record reader so the format coming into this one and i have a uh let's go ahead and make a new one avro or just leave it not give it any special name apply go back in there now let's go set that one up or enable it i have a reader when we look at it there's nothing we want to infer the schema information because it's embedded into the flow file because it is avro format so go ahead and enable that when we're done here all right now we come back out here let's finish setting this one up back to my sql statement type is a insert there is a so in this new version of nifi some processors now have the ability to do upserts this is only applicable if the database platform that you're going into has functionality to use an upserve so uh we're not using that anyways we're just doing a insert my sql server information the catalog is back to [Music] i see catalog is must be aviation and then tim table 2 is my table name and we are looking good there none of these are ones we need to worry about uh we're not doing updates or anything so we don't worry about the id field here uh we're gonna leave that alone first let's go ahead and set up the failure retry and success that's terminated and we're done there let me terminate off off screen here i'm going to truncate the table i'm going to verify it truncated it is empty and there we go and then in this case i'll let these guys run over here but i will not run the put because we'll get duplicate uh ids because i'm not i'm using the same idea i pull from the source as the id i put into the new table and it's set up as key so that would cause a conflict and an error all right so here we go now the benefit here matter of fact you know what let's just stop these two here here's one huge benefit so right now we have query database table set to do max flow file of 100 rows per file but we don't need to do that here this really just comes down to how many rows i want to insert into my target table my destination at one time uh comes down to performance stuff like that so you decide or find out for your admin what's the most what's the optimum amount i can insert at once that would give me optimum performance uh but you could do one big maybe i just want to duplicate this guy real quick and put it all over here i can do that all at once in one flow file and we'll just demonstrate that real quick so we go ahead and configure this one and we say you know what zero so maximum size we'll go ahead and start it this will take longer to run because it has to query that entire table and build the flow file which is going to be a pretty fat flow value and in the this case if i go back and look at my off screen i'm going to look at that table and that table is 193 200 roads so what we're expecting is wow that was actually pretty quick right so we look at this flow file we check check out details look at the attributes and we can see that we have a row count of one hundred ninety three thousand two hundred and here's the best part all we have to do as long as we set it up correctly it says start it's not how many we really want to insert at once most of the time but we're going to do that and they will just all go in this one big batch especially because i don't think that that should set up uh nope no batch so give that one probably take a couple seconds maybe a minute maybe a little longer but this is a really strong way to use this one right and the best one of the cool things is just like the generate fetch the generate table fetch over here where it's using a max value column to keep track of not the partition but the column here this knows that every time i run it every five seconds it will grab only new information new rows inside that table this will do the same thing over here i have a set of five seconds it's using the id as my max value column so it's not it's very performant and it costs the cost for this to query into that source table is very low as it's only starting from where it left off last time and as we saw inside of here when we looked at this oh it's already done well it's getting there uh when we looked at this inside providence don't see it there uh we have to wait for it i don't think it wouldn't show up here yet no okay but when we looked at the attribute on here there was a attribute for max value for the max value column which is how this processor is keeping track internally of where it left off so we get it we get an awesome little feature from that they can just move forward so if we say uh normally i think i set mine for like a 1 000 per flow file that way i'm only inserting a thousand at a time there we go that one's all done yeah it took a little bit longer because it was really fat that was a really big flop out there but it got done i'm going to look at my table off screen real quick and 193 200. so all of them got inserted and that was just two steps that's all that is there right i mean for what i'm doing here it works perfectly now if i was to let this one pass through here this would just break and it would probably crash and if i get out of memory exception uh because it's trying to handle this and it can't deal with that big fat phone file there so let's go ahead and empty that before i make a mistake and run it and there you go so two different ways to get things done uh using the query database table we can go to long route which there might be situations where this is more ideal especially where you're queer you're recording the table you split out into one full file each and the reason could be because uh maybe you have a couple extra steps in here that are enriching that flow file with some extra information and it would make sense to do that way but another cool thing about that is uh just like we did in the past over here oh i can't remember i think i'm doing yeah so when we get results from this api pool of 100 at a time in the json the jolt can manipulate those 100 together and the query record can work against a batch of 100 or whatever's in that flow file so we could have a query record in here somewhere that enriches the we could put one in here and an extra step could be query record here and from here we are using that query record to enrich the data row by row but because the query record will query against the entire flow file it can enrich and do our sql statement against all that data at one time so it would still be shorter this route just sometimes there's reasons why you need to go this way and this may not be the best way to handle it here all right so that covers the using our query database table processor it is a very good processor i use it all the time i like using it for a lot of different things and i also use generic i use it for a couple things as well it's very good for etling and i like to use it for duplicating the data on the other side from that source table just have it raw and be duplicated there you go so don't forget to hit that like and subscribe button down below uh subscribing is definitely helpful to me yeah let me know which videos you are lets me know that you appreciate all these videos and encourages me to want to make more of them for you and unless you get notified and don't forget to leave comments if you have any questions or anything and hit that like button that way i know if this video is helpful to you and if you enjoyed this type of video i'll catch you next time
Info
Channel: Steven Koon
Views: 4,908
Rating: undefined out of 5
Keywords: apache nifi, steven koon, apache nifi examples, data streaming, data flow processing, learn nifi, nifi training, nifi training videos, nifi training online, nifi training courses, nifi for beginners, learn nifi in a day, big data, data science, data scientist, dataflow, etl, apache kafka, streaming, data, data engineer, datascience, bigdata, kafka, spark, data pipeline, data pipelines, data engineering, Kinesis, nosql, aws, azure, api, computer science, data analytics, database
Id: 9X8DJGXMra4
Channel Id: undefined
Length: 25min 59sec (1559 seconds)
Published: Thu Sep 24 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.