Demo: How to Connect to a Relational Database Using Apache NiFi

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
into the session on how to connect relational database management system to data Lake or to anywhere you want to have the data using Apache knife I pad Gina if I can connect to any kind of data source and can put data to any kind of data sink and today we specifically look at how to do that with a relational database management system as a data source and could the data into in this case a simple file so I've already set up and running my sequel database was my relational database management system and as you can see I've created a very simple table in a company database the table is called people has two fields the ID and the name integer and a character field and I've already inserted one venue with the ID one and the name Stefan and that's that is the database and the table that I'm going to connect to and who try to replicate into a file that runs locally this my sequel database runs on a node on the host c22 8 - node 3 and my knife for instance that you can see here runs locally so the first thing that I need whenever I connect to database is of course database connection so I'm using the query database table record processor here but you do exactly what it says clearly a database table and there's two options table and the table record processor and there's always the difference between normal and the record processes whenever you have the possibility to use the record processes use them because there have more performance they allow you to schemas used a schema registry and they also basically help you easily convert from one file format to another so whenever possible use record processes that's what I'm going to do now you see this this is the processor on the knife I canvassed and also let's put this processor in a process group I'll call it database connection demo my process group basically confines a set of flows a set of processors in knife I I'll drag and drop it here double click and as you can see on the bottom line here I'm now in a database connection demo and I can now build my data flow here the first step I configure the database connection and as with ever every database connection I first need to define the pooling service I mean I already have prepared and my secure connection pool that I'm going to be using here and this one is not configured as you can see there's a lot of warnings a lot of missing information and that's why I'm going to configure it here I need the database connection URL in my case the database connection URL is a JDBC connection string to my sequel database so this is going to be the hostname that my sequel server runs on the standard port I want to connect to a company database and that's it the database driver that I've already installed in my system is the normal sequel driver try to kill the driver that you can download from there my sequel database website also this is where I downloaded the driver chart ooh this will be different for every installation especially if you use different systems and I'm using version Java 8 0 18 dot char so that's it I just need to configure the database user I'm going to use Stefan I didn't set a password so it's only secured by the network access and I'm the only one to have network access to this database apply there's no more warnings so we've configured everything we theoretically need are fully correct and I'm starting the pooling service you don't see any issues here right now because we also don't fire query the issues will be apparent once we started theory and now with the query database table my card processor that we now configured with the working and running my CL connection pool we can go ahead and basically connect to that connection pool and define the query that we want to fire I am using my sequel here I want to connect to the table that is called people that I created in advance I want to return all columns so I leave that field empty if I only wanted to return the ID as you remember we have ID and name found or if I only want to return name I could put this here this is basically the Select statement or the terms after the Select statement we can use a filter and where clause here some some more maybe I just want to have special names or dependents is here I could also enter a custom query that will override all the other before I won't do such a thing right now what we need to define is because it's a record processor record writer basically to define the file format that we want to have this in and I'm going to use create a new record writer service and a bro record set writer so I will create it it's now there and I need to configure it here and start it basically the defaults are fine what you could do is to define a schema certain schema a certain schema registry in a schema name but we won't do that we will embed and inherit the schema so we won't use external services for the sake of this demo now and we enable the record set writer go back to the processor and basically that's it that's it so there's nothing more we need to do to connect to the database and being able to query records and that's what we're going to do now there's still a warning so if you've never used nightly before it warns you that relationship success is invalid because success is not connected to any component it's not already terminated so what we are going to do here is what we always have to do in AI file since it's a flow based we all we need to connect this to another processor that also has warnings now for similar reasons because we didn't configure it and connect whatever we query to the other processor and define the relationship that we want to connect for the query database table record processor we only have the success relationship so that's the one I am using and you see the warning turns into a stop symbol means the process is currently stopped and when we press the play button here or right click on it and start it will hear indicate it's performing something query and we need to collect the fresh button and we see basically that we perform the query and we still performing the query and we still and we are receiving flow files which is great so I'm stopping this processor for a second refreshing is an impatient we have 39 files here let's have a look what we have of list queue I can inspect the flow files and the files that we have we read so far we we see that all of those files have the same file size so they are probably the same and I can look into it and you see it's an Avro file you can format it make it beautiful and readable and you see each of those files contains ID one named Stefan basically in opera format what we have in our current database now who did not what we usually want to query the database every time we finish a query and always to a full table scan and always migrating the full table table so we need to tell knife I mean we can have a look at number 17 you for melted he adds the same entry so we need to be able to tell knife I either know don't query table like all of the time or we need to tell knife I only query certain parts of the table basically don't query or don't replicate from the table what you've already queried before let's have a look at option one I'm going to enter the queue here so delete all the files from the flow so that we can start from the beginning so what we could do is we could enable scheduling time at Raven run schedule and we could here use cron to trigger the event or the query in our case on a certain schedule time a driven will allow us to define a run scheduled here and I want the query in this case to be performed every 60 seconds so what happens now is restart this refresh will have one flow file no query and 60 seconds later I'm not going to wait now we will have the same query again another flow file so if you need to keep a very small table up to date and sync I don't know maybe a lookup system radius then this is a good way but what if it's a large table and you really only need new entries then there's there's a second way to do this or another way to do this em--to the queue to start from the beginning I still want to perform the run schedule every time query finished I want to be notified immediately when something changes and I'm using the maximum venue columns property so I'll interfere ID which is the column that is going to be increased and that I want to filter on basically so what happens now is when we start the processor we get this one entry that we already have and basically that's it we will perform the query that the query database table record will be performed but we won't add new files here since nothing changes on the source system so let's enter a new row in the source system ID to value both of the name should be Alex inserted refreshing the UI and then we see we have to flow files here on listing to queue the second flow file came just in a few seconds ago inspecting it and yes it's the flow file with this certain ID now if I stop that processor and restarted performs the same query again but if you go it won't create duplicates here so what if I wanted to query the full table and deleting those files empty the queue what if I want to create a full table again I need to clear the state so I'm viewing the state but you see the key here is people ID it's currently at the value 2 of the ID and clearing the state I'm starting the query we have one flow file in here which flow file is it the one that contains both entries of the current database so since this was one query performed it contains all the records that are currently in the database and whenever I add a new one three you there is also refresh you I a new flow file missed queue 12 seconds ago a new flow file containing only the new entry and then not as important but basically what we want to do now is we don't want to keep this these events forever in the queue we want to store them somewhere and here I can configure just a local file file system to put the file into a file with just in a certain directory I can name the directory I can name the directory people updates and configure it with a certain fail conflict resolution strategy create missing directories if they don't exist set the owner the group permissions and so on and I also of course have to set success and failure relationships to route the flow file that is incoming in both my cases I don't want anything to happen further than just putting the file so I'm terminating flow the success and failure relationships and I'm good to go here now so instead of writing this to a file I could put this anywhere I could put this to message queue h2s cask to an event of s3 a sure databank elasticsearch solar FTP server I could even send an email with the flow file contents I could put this too high in flux to be and so on as you can see there's like 284 assessors officially supported but there's many many more in the community and you can also write your own processors but that shouldn't be necessary as of now that basically was the basic introduction of what we can do with an databases or how to connect relational databases thanks for watching
Info
Channel: Stefan Dunkler
Views: 16,718
Rating: undefined out of 5
Keywords: Demo, Apache NiFi, NiFi, Walkthrough, RDBMS, MySQL, Databases, Data Flow, Data Flow Management, Data Integration, Data Ingestion
Id: OHLYJUOTaYc
Channel Id: undefined
Length: 18min 12sec (1092 seconds)
Published: Sat Nov 09 2019
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.