[EN] How to create near real-time data pipeline with Kafka, Databricks and Azure Functions?

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hi i'm mukash and in this tutorial i will show you how to build new real-time data pipeline with apache kafka other data breaks and other functions so first thing we need is of course the data source in our case we will use the api that provides the current gps localization of city buses in gdisk so every 20 seconds we have the refreshed localization of each bus in the city of gdansk yeah next thing is a message broker in this case we will use apache kafka and we will set up it on cloud karafka why i use this service is because it has free tiers so we don't care about the configuration we just create new instance and that's all of course we have some limitation but in our case it's totally fine and totally enough next thing is of course a producer so in our case some application that will get data from api and put it directly to kafka in this case i will use a python application hosted on other functions and every 30 seconds we will get current status of buses from the api do little transformation and push the messages to cafe the next thing is of course a consumer so a service that will consume our messages in our case we will use other data breaks and in the notebook we will just get the messages from kafka in streaming mode so we will connect to it directly and save it in our delta table and the last layer of our solution is visualization in our case we will use databrick sql dashboard where we can show on the map the current location of each bus in our case some additional information like head sign of the bus or the id of the bus so we will be able to show the current location of each bus in gdisk and little disclaimer of course all of the steps can be done in for example only data breaks in python code in node but i would like to show you the whole step of near real-time data pipeline so from the producer from the data source via producer and kafka to consumer yeah so yeah this is only for our demo of course you are able to reach the functionality easier but i would like to show you the particular services and solution okay so let's start coding let's start with short introduction what is kafka of course i will not talk too much about so shortly what is kafka kafka is a message broker so from our point of view is a service that will be able to take messages from the producers store it on kafka in topics and make it readable for consumers what is the producer producer could be a application that sends a message to kafka and what is the consumer consumer could be also an application that reads data from kafka of course i will not tell you much more because if you would like to know more you can just find a lot of videos where there is explanation good explanation what is kafka and what are the use cases there's a lot of them however in our case i will just show you how we can shortly set up kafka in cloud karafka so the first step is go to cloudkaravka.com and click on get apache kafka cluster i'm already logged into login you can use your github account or gmail account or just set up another one and as you can see it has a lot of plans but i will use the free one which is called developer dac let's try the developer deck okay so let's start with the name of my cluster then plan and we just leave text empty let's select a region there is no a lot regions for free tier however it's however it's totally fine for me to choose the ireland let's go to review yeah and that's all create an instance good so that's all we have an instance okay let's go inside yeah and here you can see the details uh very important will be a connection details we will use these details to connect to kafka but yeah as you can see here's username and password and topic prefix and the server it's very important so okay let's create a new one as you can see the default one is created okay which is good so let's try to produce some messages and in browser you will be able to produce the message so let's try to produce some messages so let's paste topic name here and also try to consume it yes let's try to type text okay it seems that it works as you can see we send the message and consume it in one place which is good as you can see also there is a lot of different stuff for example notes and metrics and logs it's not available here because it's only available for the higher tiers but for test purposes it's totally fine okay let's start to push our messages into kafka what i will try to do in this demo i will try to send messages to kafka from api and i will use the api that provides us the gps localization of city buses in gdansk and as you can see here is this api it's what it returns as you can see here is the json and we have a lot of information a lot of columns especially with longitude and latitude of the specific bus we will be able to see where is the specific bus so what i will try to do is to create a simple code that will run on azure functions and just read this data from api divide it into the messages so because the vehicle is an array so i will just divide it into specific rows and just send every row to kafka then from kafka we will be able to consume it with for example data bricks so let's start our work and go to visual studio code okay so i have already created the function app but if you don't have we can create it from here so let's create a function create function up in azure uh you need to provide a name so let's try with cloud b cloud bi bus monitor second select the runtime stack so let's use python 3.9 the location it could be west europe and let's wait when it will be created okay so our function app is created of course to make it work you need azure plugin installed into your visual studio code okay so we have the function app which is good i will use a function that was created previously okay so what's now let's create an function so let's go to workspace and then create on plus create a function create new project set up the location of your project choose the language and interpreter and i will use yeah you need to cr you need to set up the trigger the trigger in our case will be time trigger because every 30 seconds i would like my function to get the data from api and then push it to kafka choose the trigger name so let's call it cloud bi monitor [Music] 30 seconds and you need to set up the chrome expression in my case it will be there every 30 seconds and open current window let's wait when the project will be set up okay first thing you need to do is select python interpreter okay so my project is created so first thing i need to do is install the libraries so let's start with confluent kafka okay as you can see in my virtual environment i already have it but if you don't have it you need to install it so what's more i need also the requests packet okay so it seems that i have all that i need so yeah what will be the the first step the first step is creating a class that enables us to get the data from the api and put it to kafka so yeah let's start add the folder with src and add the file called kafka producer python and right now we are able to just get the code from my repository the link to the repository will be in the description of this video so let's go to this folder to src and open this file and just copy the code let's go back to visual code and paste it and yeah what we have here we just get the necessary libraries and then we're creating class and in the init method we are setting up the properties as you can see we need to set the brokers which are our servers and then the bus api which is the url of our api and then username password and also topic and column name in our case the column name will be the vehicle column from this this will be the name of the array that contains all of the records let's go back to visual studio code okay as you can see we have here the os get hands and in this case we will have a special configuration file which is here in local json settings so when you run the function locally it will get the records from here but if you deploy it on the cloud it will read the configuration from the configuration of our application so for example as you can see here is the configuration tab and i have all that i need most of them i mean brokers and password and username are in keyboard but firstly we will need to add these properties and directly to our json file in local visual studio project and thanks to it we'll be able to test our code locally so let's go to repository once again and you are able to see the sample of the json file let's copy it and paste as you can see we have the brokers it's normal string separated by comma and username and password and topic name and and column name for api and also the bus api url so this is how it looks so you need to need to copy the parameters from cloud karafka yeah so this is what we have here and let's go to our producer the next function that we have in our class is get data from api as you can see it just get the data and get the text and return it then we have the delivery callback it's used to produce the message when the message will be set send successful you will get information that the message was delivered in other case the delivered failed okay yeah and here we have the run function which runs the process so what we are doing here is just set up the servers the necessary properties servers session timeout and so on it's based on the example that you are able to see on official repository of cloud karafka yeah here is also username and password and topic name and as you can see we have the producer which is new producer and with the configuration and the next step is just get data the json from our api but we only would like to get the vehicles yeah and for each row we will create a message which is particular row from this array and just send this message directly to our kafka and that's all so our class is ready uh let's go then to the init method in init method there is of course a sample so what we are going to do is just add a producer run method and it should be enough for okay and as you can see we have it we need also to import our class in unit file okay and extend the requirements file because this is for our other functions what libraries need to be installed so we have two of them the requests and confluent kafka okay let's save it and try to run our functions so let's go to debug and try to run it as you can see we have some information from debugger and let's go to the back console as you can see it seems that our messages are correctly delivered to the kafka so let's try to start stop debugging and try to go to cloud karaf and see if we are able to see something paste our topic name and try to consume unfortunately sometimes we are not able to see this data i'm afraid that this is related with gui and this is the reason why i created my own consumer to see the data but let's wait a bit let's try to deploy our function directly to the cloud to azure function so let's go back to visual studio code and go to azure tab right and as you can see here is our function app that we created uh yeah so this is this is our app to deploy our project we'll need to click on here and deploy our function app and choose the function yeah and just click deploy i won't do this because i have already working so it's not necessary but if you click deploy it should be enough just remember to set up the properties that we already sent i mean i mean this one in configuration so kalavka brokers post password and topic and also bus api and that's all i think so let's start my function up because stopped it's good idea to just stop it after working because it's not necessary to just pay money for nothing okay let's go to the azure portal and go to the functions okay you can see my function go to the monitor and as you can see we can see the [Music] execution of our function but what we can do also is go to logs and look at directly our logs and yeah check if everything is working right connected okay and we are able to see the messages let's wait for information that our functions working correctly and the data are sent to cloud karafka by the way in cloud karaka you can see the messages but not all of them so yeah but it seems that something is working right now is it's utc time of course but it seems that it's correct and our data goes here let's open also live metrics okay and in life matrix we are able to see that our message is delivered to kafka which is quite good okay so it seems that everything is working for sure i will open because however we are able to see it in the cloud karafka but i will try to open my own connector and it visualizes the code and try to read data from our topic okay and as you can see here is the consumer the consumer is of course based on the example from the repository provided by klaus karafka we are able to run it okay and as you can see we are able to consume the data the messages which is quite good as you can see here is the generated not scheduled by generated it's 52 past five yeah which is quite correct because as expected so yeah it seems that our producer is working make sure that after work you will just stop your azure function yeah to not lose the money okay then so let's go to databricks first thing what i need is the schema of the topic as you can see i have this one of course this code will be also on github so if you would like to set it up on your own databricks instance is not a problem you can just download it yeah so firstly we just set up the schema for the topic then we need to set up some variables the first one is kafka brokers as you can see in this case i just get it from keyboard and also user name and also also password okay let's go to the next command and here is our main command where we will load the data directly in our data frame so let's start with import some functions and then let's start to read stream from kafka in this option we'll need to provide uh kafka brokers yes so as you can see here is the curate the variable then we will need to authenticate and we are using this configuration where we pass username and also also password then we will need to just set up the topic that should be consumed yeah here is the topic name it's hardcoded but you can also parameterize it it's also the group id we just say hey just give me the earliest message because on topic we could have messages and we can set up the offset so for example if i would like to get the data from specific time period i can just set it here here we have also the security configuration and the last point load the date unfortunately data from kafka will be in binary format so we'll need to just cost it to string and also because it's json we would like to get this json and put it into column called json yes so all our data will be in json column and what is the next step give me the generated column vehicle code root short name latitude longitude and head sign yeah and probably that's all so this is all that we have yeah so we are ready what is the next step so we need to save the data somewhere i will save it to our delta table so let's see okay here is our data frame let's write the stream in delta format in output mode appends so all rows will be appended to the out to our table let's trigger this procedure every 30 seconds which means that every 30 seconds we'll go to kafka take new records and put it directly to our delta table we need to set up the checkpoint for location the checkpoint ensured that we get only the latest data so if we break the stream we will be able to run it again and there will be no duplication or something like that yes so it will be correct and save the data directly to bus monitor table okay let's start our streaming as you can see stream is initializing but wait a second as you can see our stream is initialized sometimes it takes more time let's check if our data are saved to tables so let's paste the code here is our table and run it yeah as you can see we have some data let's sort it yeah it's still the old version so let's try to select the maximum of generated yeah as you can see the generated date is 20 of july and 4 p.m utc which is quite good because i am recording this video exactly at this time okay perfect so what is the next step because we have it in our table i would like to visualize it and show on the map the current location of our bus so let's go to databrick sql yeah and prepare a query i already prepared the query so let's go to the sql editor and as you can see here i prepared a query so in this query this query also will be in our repository so in this query i only get the latest location of each bus so as you can see here i have the map to get the map you need to go to edit visualization and here you need to just choose a map and choose a latitude and longitude and just it's grouped by head signs so the direction when the where the bus is going so yeah let's save it okay good and i will need also one more query shortly is that from when we have the data yes so i just save it and also visualization in this case will be counter so yeah go to our dashboard so here we have dashboards and create a new one let's say it's bus test let's add new visualization so the first one will be refresh date okay so it should be this one right and also add the [Music] visualization okay let's just go with here and enlarge it okay quite good yes so let's finish the editing and also schedule the refreshing so this dashboard will be refreshed every one minute and just choose the of course you need the sql end point here and also make sure that when you finish your work you will stop it stop the schedule and also stop the sql endpoint do not lose money okay let's save it okay let's wait a bit because we are waiting for the sql endpoint it will be started okay so as you can see our dashboard is refreshed so yeah as you can see the date the date is here so it's proper and let's see on our dashboard and in a minute we should see the updates so let's look on it let's make it even larger when you hover on this point you will see the information about the bus and when you click you will have more information like latitude and longitude okay it's disappear so probably yeah and as you can see it's working uh you can watch where is the specific bus i think it's working correctly i just added this condition to show only one bus on our map so let's save it and go to dashboard okay we have see two of them maybe there is some exception but let's wait for the refresh i will do it manually okay here we can see that our bus changed the location let's wait 30 seconds more okay and once again our bus changed the location let's wait 30 seconds more but it seems that we are able to see the location the current location of our bus which means that our whole pipeline is working of course this query also will be in our repository so if you would like to just copy it you can copy it and yeah hope you enjoy thanks for watching
Info
Channel: Cloud BI
Views: 2,041
Rating: undefined out of 5
Keywords:
Id: DcLp8liySDA
Channel Id: undefined
Length: 30min 39sec (1839 seconds)
Published: Mon Aug 01 2022
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.