Data pipeline using Kafka - Elasticsearch - Logstash - Kibana | ELK Stack | Kafka

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
[Music] [Music] hello people this is self turds and in this video we are going to learn about a data pipeline creation with the help of kafka logstash and elasticsearch so basically this logstash and elasticsearch are the components of the elk ecosystem so in the elk ecosystem we generally do data manipulation and then data viewing through some graph etc so basically that is a part of elk stack in this video we will integrate kafka with this elk stack and we will mimic a scenario wherein we'll publish some registration data and that will be pushed to elasticsearch and then that will be viewed to the kibana we will have a kafka server and there will be a kafka producer written in python so we'll be writing a kafka producer in python and that will publish some data to this kafka server now there is this elk ecosystem or elk stack which has this elastic search application running and there is a kibana application running so this kibana act as a graphical user interface for this elasticsearch so basically you can create some indexes you can create some graph you can view the data inside the indexes another thing the log stash here plays an important role because it consumes data from this kafka server and then it pushes that data to this elastic search so basically we'll be creating this whole pipeline in this video so before creating the pipeline there are some prerequisites that needs to be fulfilled the first one is the docker and docker compose must be installed on your system because docker and docker compose is system dependent or operating system dependent i'll leave it on you that how you install it by searching some block the next thing is the kafka that also should be installed the third thing is the elastic search then you have kibana and finally you have log stash so we will be installing kafka elasticsearch kibana and logstash in this video tutorial i'll also paste the link of other videos on this like installation of these applications or some post which will help you to install for your own operating system we will go to my website selftuts.com and here you can see i have written a lot of post related to these things so first thing is that we will do elasticsearch and kibana installation so this is one of the post the post link will be given in the descriptions below so for this elastic search and kibana installation there is this yaml file so i'll just copy and paste it and i'll go to my linux box is i'll go inside sandbox and here i'll create a new file that is ml file the name of the file will be elasticsearch dot yaml or email something whatever you want then i'll do what i'll just paste this content here i copied it from there and i pasted it here i'll also paste the link of a video which explains all these things what are this configuration and what are these data but the most important thing that you need to understand here is what elastic search will be running on the port 9200 and kibana will be running on the port 95601 so these are the two important ports that you need to know because you can access these two applications elasticsearch and kibana on the basis of these ports now there is no extra configuration needed this is just a simple docker compose file let's save this file and try to start our docker container so the command that we are going to use is stalker compose minus f the minus f says what is the file name so it is elasticsearch dot yaml and then i'll say up so this will start all the services in the foreground mode that is all the logs will be present here and you will be able to see all the logs now i have already uh having this elasticsearch and kibana images on my local docker that's why it didn't downloaded it but in your case if it is not there then it will try to download it and then it can take time this particular section will start what thing this will start two things that is the services are zookeeper not zookeeper it will be elastic search and kibana now the second thing or the second prerequisites that we saw here is the installation of kafka so we'll be installing kafka using this post i have already created a video and i will share the link of this post in my description for this video so i'll just copy this particular ml file i'll again come to my linux box and here i'll say kafka dot ml ok so it is eml now here again what i'm gonna do i'm gonna just paste this data and one configuration that you need to do here is the you need to provide your machine ip address in the kafka advertise host name so my machine ip address is 192.16 it will be different for your machine and then you need to provide that ip address so finding the api is is very easy you can use if config command ipconfig command any command whichever you want based on your operating system the important thing that you need to understand here is that this will start three services kafka manager kafka and zookeeper so kafka manager is a graphical user interface to create topics and other things so we'll just see it in some time the zookeeper is running on the port two one eight one kafka is running on the port 9092 and kafka manager is running on the port 9000 we have previously seen that elasticsearch and cabana have their own port which you can see here that is elasticsearch was running on 9200 and kibana was running on five six zero one you can see here five six zero one so let's save this file for our kafka and kafka manager and we'll just start the services so we'll again say docker compose minus f and the name of the file is kind of so it will create what it will create three containers that is kafka zookeeper and kafka manager let's go to another window here in my terminal and let's say docker ps so you will see all the containers that are running so you can see here zookeeper is one kafka is one kafka manager is one kibana is one and elastic search is one so all the prerequisites has been done except the logstash installation so you can follow this particular thing to install it on ubuntu 18.04 or you can search it on google or you can follow some blocks to install logstash on your operating system after the installation of logstash you will see one important thing is the configuration folder all those configurations are being provided inside this folder that is etc logs test and then this conf.t let's go inside that folder here so i'll come here and i'll create one more window or a pane here so i'll just say cd slash atc and then log stash and then conf dot t so you can see here i'm inside this file let's remove all the things that are present here so i'll just say rm minus rf star so it is giving me permission error so i'll just say sudo and then i need to provide my password fine so now if i do ls you can see the file is empty let's see that whether the log status is running or not so for that i just need to type the command system ctl and then status and then log stash okay fine so logstash is what it is inactive or dead it is not running so that is fine so still it is not running which means that if there is some config file it will not pick it up and it will not start the pipeline the important thing that i want to focus here is that you need to write your configuration in this folder where you can find the logs related to logstash it will be present inside this folder where log log stash so if i go there where log log stash so if i go here and if i do a ls you can see logstashplain.log so all the logs related to logstash will be present here if nothing is working or if there's some error then you can go into this log file and check what is the reason so let's go to our previous folder that is the con folder so i'm present here i'll do ls and it is still empty according to our pipeline first we need to create a producer so for that i will be using pycharm id to create my producer and in this pie chart we will be using some important modules which will come to know now so what i'm gonna do here is this is my pie chart i'll create a new project and it will be a pure python project let's select the directory where you want to create so inside my d drive there is a workspace and inside here it is python so here i am going to name this as uh kafka elastic search pipeline okay so it's a big name but it's satisfied that what we are going to do so it is saying kafka elasticsearch pipeline let's do a okay and here also okay and we will be running it inside the virtual environment if you are familiar with python then you must know what it was involvement we will just hit create so this will do what this will try to create a new virtual environment and once the watch environment is created we will be able to code our own producer i have already written a post for it you can follow that post also so if you go to my website you can see here i have already written a post for this particular video here you can see all the stuffs that are present here have explained it you can also follow this video for the explanation and the thing so inside here what we are gonna do we will be creating a kafka producer so we'll follow this one uh particular article you if you find something to copy and paste like the code you can just come to this particular post and then you can copy paste it let's see what is the status of our pie charms okay the folder has been created and now we'll create a new file here so the new file will be what new file will be producer dot pi this file is generated let's install the dependencies for our python application so the first thing is what i'll open my terminal and you can see my virtual environment is active and the folder is this so i'll just say pip install faker this is the python library that we use to generate fake data so let's say you want to create a fake name every time if you're because we don't need a real application we just want the real data kind of thing so we'll just say okay give me 10 different names or 20 different names in every request so this faker module will help you to get that okay so pip install faker will help you to install the faker module it has been successfully installed now let's install another module and the name of the module is kafka python so what does this kafka python do as you see the name already such as that it helps you to connect your python application to kafka so this has also been installed and now we will minimize it after that what we are gonna do is we will write our fake data that we will be generating so for that i'll say from faker import figure so this is one of the class and after the class has been imported we'll create an instance of it so we'll say fake is equal to faker so the instance of the fake figure class has been created and now we'll define a method that will be what get registered data so this is one of the method as the name suggests it will give you a registration data so the data will return what it will return you three things first is what it will be name okay so i'll just say name and the name will be what it will be fake dot name so every time this method runs this will give you a different name so that is the powerful that is how powerful faker module is that every time you if you run this method get register data it will give you a different name same way we will say address and here what we are going to do we will say fake dot address or c you just write this method and you will get a fake address every time then we will say created at and in the created ad we will say fake dot here because we want just the ear so this will return you a fake registration data that is get registered data okay now once the registration data is here we will try to send this data or push this data to kafka so for that we need to create a kafka producer so for that from kafka import you can see what are the differencing this kafka client admin client client kafka consumer broker connection we are interested in kafka producer so i just took this kafka producer and then what i'm gonna do is i'll create a producer instance so i'll say producer is equal to kafka producer and this kafka producer accept some arguments or parameters so the first argument is what bootstrap server so the name is important because it is a named argument and you need to pro you need to type it correctly now in our case the linux box that we are running here where we are running the kafka what is there it is running on a particular ip address which is equal to 192.168.0.10 this can be different in case of in your case because you will be using some different vm you can use aws you can use cloud system like google or anything that will be different so ip address is important because this is the broker or the bootstrap server ip address and the port on which our kafka is running is what so if you see the docker containers that are running you can see kafka and it is running on 909 to port so i'll just say 909 to port so the broker configuration is done second thing is what whenever you are sending data to kafka you need to serialize it serialize means what you need to convert it into a format that is able to be transferred over the wire if you send an object let's say we created an object and you are sending it so it is difficult to send because you can't send objects over the wire you can serialize it you can convert it to a string and then it will be sent as a bits so that's why there is an important technique in kafka producer that is called serialization and deceleration so when you are sending the data it is serialized and when you are again consuming it is this d serialized so what i'm gonna do here is i'm going to serialize the value because we are not sending anything with key it is empty key for kafka we are just serializing the value part so we will say value serializer will be equal to what we need to provide a method so name of the method so here we are saying serializer so the method is not created as of now so we need to create it so we will say def json serializer and as you can see the error has gone and the json serializer does what whenever this producer will be called to send the data the data will be sent through this json serializer so what we need to do here is we need to just say we need to create a new into a string and since the data is json so we will need some json library to perform some operations so i'll just import json library of python and here what i'm gonna do i will say return json.dumps json.dumps was it stringifies the json data so json.dumps dot the data is the input to it and we'll say encode as utf8 so it is encoding you can read it on um some blocks or online that what is the meaning of this encode it just helps you to vince during dc relation and simulation nothing is lost as symbols or anything that's why we use a particular encoding method so our json serializer is also complete and now what we're going to do we will have to run this file so that we can send data so in that case we'll use the important feature of python that is if name equal to equal to underscore underscore mean you can follow one of my videos where i've explained it and we are doing what we will run it in a while because we want stream of data we just don't want a single registration and then it should not end there should be a stream of data so what i'm going to say i'm going to say is registered data will be equal to get register data so this is the method that we are calling and it is created here once we have the registration data we will say print and then register data so just for the sake that okay data is being generated and it is getting printed i have just printed it here then finally we will be using the send command over the producer instance so you can see producer is here and we are using the send command and in the send command we will be using a topic name so the first parameter is topic name so i'll say registered user because this is the topic name of kafka we'll just now create it i'll show you how to create it we'll be sending this data in the registered user and the data is what register data so whenever you will send this data so the register data that we are passing first it will go through this json serializer so it will be received here and finally we will get a stringified format and then it will be pushed now one important thing if we run this file as of now then lots of data will be published every second it can be thousand based on your cpu speed but what i'm gonna do i will send data every three second or every four seconds so for that i am going to import the time module so with the time module i am gonna do what i'll say time dot sleep and the interval is three so three says three second so this application does what it takes the registration data fake registration data and then finally it send its to kafka producer with an interval of three now here is one problem i need to just change it to okay it will be created so there's a spelling mistake i generally do lots of spelling mistake now everything looks fine so our python producer is created let's go to our browser here and in the browser we will open our kafka manager so the kafka manager that we have seen is running on which port if you see kafka manager it is running on port 9000 so i'll just come here i'll say 192.168.0.1 and then 9000 so this kafka manager will give you a very good graphical user interface for your kafka so the cluster name first we need to add a cluster so if i again go here you need to define your cluster so the name of the cluster i am giving a self-test cluster and the zookeeper will be zookeeper 2181 now why this is zookeeper 2181 because when i'm creating the instances you can see the name of the service is zookeeper so that's why i have written zookeeper and the port on which it is running is 2181 that's why it is 2181 you need to select this enable jmx polling then poll consumer and then enable active offset cash just go here and just save this so this will do what this will and this will let you enter inside your cluster now inside the cluster you have a default topic that is consumer offset now we will create our own topic and the name of the topic is registered user where we will be publishing the data using this producer so you can see it is registered user so i'll copy it i'll just come here and i'll paste it so it is fine everything looks fine and i'll create so you got a new topic and you see there is no latest offset which means what there is no data present here as soon as we will start publishing the data if the sum of partition offset and in the partition information latest offset will be increased if you want to learn about the details of kafka everything every integrated details you can follow my uh tutorial i have created a series on kafka for beginners you can follow that and you can get the knowledge fine so the kafka part is done let's go to our slide and we can see what things have been completed so this part is complete like producer is there and we are publishing the data to kafka server now we have installed everything log stress is installed on my operating system and as i have shown you the data or the folder where we'll be creating the log stress pipeline is present where it is inside this slash atc logstestcon.d we will be creating this pipeline which is present here so our job is what our job is to read data from kafka and push it to elasticsearch so logstash ha has three responsibilities one is the consumption of data that is reading data from somewhere then is the filtering of data and finally it is pushing of the data so in this case what we are doing we are reading it from kafka and we are publishing it to elasticsearch so these are default configuration which is present inside logstash which you can use to consume from kafka and publish it to elastic you can see here that how easy it is to write this configuration so i'll just come here i'll copy it and then what i'm gonna do here i'll go to my linux box and here in this folder what i'm going to do if i do pwd this is the folder slash atc lock stash quantity i am going to write a new file the file will be pipeline.conf okay and i'll just paste this file now we need to do some modification the modification is what i'm going to provide the ip address so because i'm running everything here so the machine has the ip address of 192.168.0.10 so that's why i'm providing this now bootstrap is running where the kafka bootstrap is running on port 9092 which i have shown you previously the topic from where we'll be reading is registered user because we are publishing the data there itself now the output is elasticsearch and the elastic search is also running on this server only so that's why i need to change the local host here 192.168.0.10 now let's see where this elasticsearch is running so if i come here elasticsearch it is running on port 9200 so this is okay 9200 in elasticsearch we have something called as index and this index behaves as a database so we are saying that all the data that you are reading from this topic of this kafka just publish or push the data to this particular index or this particular database and we are providing a worker of one which is by default so if you want to increase the performance anything you just need to see the extra configuration in log stash documentation let's save this file so i'll just write and quit and the file has been saved what i'm going to do here is on the left hand side if you see i'll just split my screen and what i'm going to do i'll go to the lock folder of lockstash so i'll say where log lockstash and here you can see i just do a tail of minus f and this so this will help me to understand that everything is running okay previous locks are there so currently there is no lock and currently lock stash is not running which i have shown you previously now let's open elastic search here so the elastic search can be elastic search or kibana so the kibana is the graphical user interface so i'll say 192.168 and 0.10 and 5601 because we have seen kibana is running on that so i'll say here no explore with my own data and you can see this is the interface for kibana so you can see there is management where we do the index management and this discovery and anything because we don't have any indexes of now so the discovery is not working so i'll come here and since we are not having any data that's why we do we don't have any index pattern or anything so if i come here there's nothing no if you search if you create there's nothing so let's start our lock stash which will do what which will start consuming from this kafka and then it will publish the data to elasticsearch before that we need to start our producer so let's come to our pycharm here and what i'm going to do is i'll just say place this button so this will do what this will say unrecognized config bootstrap servers okay so it is bootstrap servers not servers okay so i told you the spelling is very important so bootstrap and servers so let's start it again and this time you can see we got the first name brandon good dennis johnson so after every three seconds that we have given here the data is being pushed let's go to our kafka manager and if i do a refresh you can see five sum of partition offset is there and also the latest of satisfy which means five registration data has been pushed now six you can see there is no consumer because we have not started the log stash or nothing is consuming from this topic that's why the consumer list here is empty so if i again reload it you can see the number is increasing but in the kibana there is no data there is nothing because we have not started our lock status let's start the log stash here so i'll go to my linux terminal and on the right hand side if you see on my cursor i'll just say sudo system ctl start log stash because i'm using ubuntu that's why i'm using this command but the process will be same for any operating system so the lock stack has been started and if i again do sudo system ctl status you can see the lock status is active and running it is saying now resetting offset of partition uses 0 offset to 33 so i think it is constant started consuming the data let's reload it here and if i come here you can see there is nothing let's go to see index management you can see it has come the index management registered user index that we have created in our logstest pipeline configuration it is present here now what we need to do is we need to define an index pattern for that we will come to this index pattern and here i am going to create a new index pattern as you can see i'll just say registered user and star which will say the your index pattern matches with one index which means if i go through this the data inside this registered user index will be visible in this index pattern so there is no rocket science in it just type the name and give it next and here you need to use the timestamp filter so that it is here now you can specify some more setting before we create another thing so time filter will be used so basically timestamp create the index pattern and it will start creating the index pattern okay so it has given all these attributes to it now let's go to the home here we'll click here and here in the discover we'll go so if you see here you will see discover so now what is there you can see the logs very cool so now you can see these are the things that are present here lewis williams george miller another thing let's do what let's do it a refresh after every two seconds so start the refresh and you can see here you will see that this pane is moving on the left and the data is coming so you can see every time you can see here this is changing after every two second because it is reading the data from elasticsearch in an interval of two seconds so the data is being sent here from the kafka producer so now this whole pipeline is working that is producer is sending here logstash is consuming it is sending the data to elasticsearch and finally kibana is reading so you can see here the new data is being produced now let's search see the power of elasticsearch so let's search someone with the name of john means how many people with the name john has been registered in our system so what i'm gonna do i'll just so i copied that name or i'll just say john and i'll just hit enter so you can see we got only one john brown let's see how many names got here with brown okay there's only one name so let's remove this and hit enter so this will do everything let's search how many came with the name christopher so i just come here okay so we got three people with the name christopher as the first name so christopher robinson christopher freeman and christopher hatfield so you can see that if you want to do all the searching in your logs and other thing you can just use this kibana feature on the elasticsearch to all the data searching another thing so we have completed this pipeline that is producing the registration user to this kafka server log stats which is consuming pushing it to elasticsearch and we have read it through kibana by creating an index i will provide you the links of all these posts for the installation in my video description you can go and look it there that what are the different things you can follow other post also here to understand how you can install docker compose how you can install docker jenkins and everything that is present here so hope you like this video hope you like my channel please subscribe to my channel and please share these videos with other people thank you
Info
Channel: SelfTuts
Views: 18,532
Rating: undefined out of 5
Keywords: data pipeline using elk, kafka elasticsearch pipeline, kafka logstash pipeline, how to create data pipeline using elk stack and kafka, kafka logstash elasticsearch kibana, kibana, elasticsearch, elk stack
Id: IFfULE1VxE0
Channel Id: undefined
Length: 28min 52sec (1732 seconds)
Published: Sun Aug 23 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.