Introducing Kafka Connect and Implementing Custom Connectors - Kobi Hikri @ Independent (English)

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
okay hi everyone and welcome to data among the most virtual event introducing cover connect and custom connectors my name is daya Fez Nelson and I will be moderating this session this is the first time we've all seen such a virtual event so we hope it will go as planned today Kobi hickory we'll give a short intro to connect and then show actual code example of developing and doctor raising the custom connector cop is a passionate about software systems in general and this video systems in particular in social design and architecture consultant he has designed implemented large-scale systems and he loves to talk about it so take it away Kobi okay hi everyone thank you for joining to this introducing kafka Connect and implementing custom connectors session this this talk is has a bit of of a background about what is Kafka Connect and what it provides us what we can do with it when when is it suitable for our needs and when it might not be and then we will this is going to be a short intro to this talk and then we'll do everything hands-on and simply implement a Kafka Connect cluster in Dhaka and implement go and see how we can work against it you think using the REST API and eventually we will develop a web page links data source meaning a web kollel will implement it and see how we can install it in our cluster and and then if we'll have additional free time we also implement a sync connector to consume this data and we'll leave some time for Q&A at the end and eventually if there will be additional interest in the group we can dive into more advanced topics in a following talk so that's that's basically what's this this talk is going to be about it's about an hour and a half or a bit less so let's get started in this talk we are going to talk at the beginning about what is Kafka Connect we will talk about the world of data as as it is today and most of you guys are experiencing it daily and uh noticing that things have shifted it's not it's not a single database anymore a simple developer world will you only knew sequel and walked against sequel database or an Oracle database things that have changed our data sources have changed we'll talk about why we need Kafka connect when will we consider using it and then we'll get to the part about how to work with Kafka Connect which will be most of this talk we will see how to set up the infrastructure how to interact with our deployed cluster and how to implement a custom source connector and then we'll see how to work with the REST API will basically all the interaction with the with the Kafka to connect cluster and deployment and status checking will be will be done with the REST API and what we will not cover in this talk but I am open to doing it in a follow up session is Kafka Connect transformations which is a notion about mutating data as it is being received which is I consider it a bad practice but if you if you want to go into it and I can go into it and walking with custom schemas which is a big topic and I will mention it well well we get to the point of sending and receiving the data but it deserves a whole talk of it on its own so it's not going to be fully covered in this talk okay so what is Gothica Connect so basically it's a Kafka integration tool that's what we need to to understand it it's it's a tool which is meant to be used in the domain in a domain where we are using Apache Kafka whether it is vanilla implementation or whether it's the confluent implementation or the lens's implementation it's it's centered around Apache Kafka it uses its capabilities and extends on it and that's the most important thing to understand it's a Kafka integration tool it allows us to import data into Kafka and export data out of Kafka Kafka connect runs is a cluster of one or more system processes each process is referred to as a local now you're going to notice some of you that are coming from the world of storm and you will see that the terms ease and the grouping and everything is very similar because some of the main contributors to the project so the same same people by the way so we are saying that each such process we call it walker walkers are grouped together in worker groups and we have connected instances that are managed by the connect cluster each connector is has tasks and tasks are configured by connect ops and distributed across workers the workers are and the workers are taking care of making sure that the tasks are the desired parallelism and that we defined and if someone dies then they will take up and then they will take up a new one and then we we are going to do the last abstraction layer which is data pulling and pushing data has been pulled by source tasks which are configured by source connect off or pushed by sync tasks which are configured by sync connectors now a one-shot world about the world of data today you all know it but keep in mind there are a lot of definitions out there there is no official definition for the world of data or data at all and I like to think of it as the book if you remember from those of you that studied the physics then there was this very famous book by Halliday and Resnick fundamentals of physics and what they did there was very nice at the beginning of each chapter they had a what is physics definition and fell this chapter they gave a definition for physics now what we are doing today now in this talk is something similar and I'm defining data the world of data today as as a world where data is scattered all over we basically meet a jigsaw and plenty of glue to create our own data that that that our company or enterprise our users are interested in we need to glue it for them we have so many sources for data and so many so many ways to represent and ship it to the users that we need to just create our own own puzzle pearl client and when you probably all will feel it in your day-to-day work ok so why do we need kafka connect this relates to the previous slide and in one world integration Kafka Connect is all about bringing different data oriented technologies together about connecting things that are not natively there aren't able to connect natively we want to connect a new data source with a new technology or a proprietary technology and we need some way to pass that this data around into Kafka and I remind you it's all it's it's after centric technology we want to pass this data in Kafka and analyze it so we need to win when we want to do it we need to consider that we have so many api's for different technologies and communication protocols that it's it's really it's really impossible to have to have these integrations built into Kafka so we need an integration layer so we can implement the specifics we need as we need them as in an ad hoc manner so and in more than one world if I said that in one world its integration then in more than one world we also want to make sure that our integration layer is fault tolerant and scalable because when we are bringing data from this wild-west of sources and and the sinks of data we want to make sure we don't lose anything on the way oh that we lose the fuel data we as as we can so we want to rely on a fault tolerant and scalable and mature technology such as Kafka and Kafka Connect leverages a lot of the capabilities of Kafka it's basically it's based on them to make sure that we can scale our pipelines and not lose our data if something breaks so when will we use Kafka Connect so I will generally say that if we are implementing if we are able to implement a Kafka producer and consumer with ena within the technologies then we might not need Kafka connect we could just put user data into Kafka topics and then consume them from other Kafka topics that if our needs are very very simple and we are not expecting a very high bandwidth but in many cases I would risk saying most cases implementing Kafka producer or consumer with in the end points within the technologies is impossible we usually don't have the source code we we cannot have if we want to crawl a website like we're gonna do it in a few minutes then we cannot we cannot plant a calf kaput useful in the web servers hosting machine in the web server service itself we cannot we cannot do it we don't have the source well and sometimes we do have the code but it's very expensive to modify it for example right we are writing in a coding language we and we don't we don't have a native Kafka producer for this language and implementing one could be very very expensive so if we have a system that is it's already in legacy mode and we every time we touch the code we break something it's very untestable so in these cases using Kafka Connect would be a natural choice to integrate externally the other case is even if we have the source code and even if modifying it is very cheap to do and we have a Kafka producer and consumer drivers available for us and they are up to date then sometimes it won't be feasible because the bandwidth of the bandwidth we desire will be limited by the hosting process which creates the consumer or the producer so in these cases we would like to take out the integration the integration components and make sure that they are able to to receive more resources for producing or consuming this this data so this was me talking about Kafka connect and and the slight part is is done and what we're gonna do now is open the code and start seeing some stuff because that's that's the way people learn the best I think especially practical people and what we're gonna do in this demo is bring up an infrastructure a a Kafka a Kafka cluster in docker compose then we'll add Kafka connect to it we will examine how we can use Kafka connects REST API and implement at the end implement a custom false connector and if we'll have time then I will also show implementing async connect off to consume the data of of the incoming source connect oh okay so let's get started let me just skip here and can everyone see my screen just if someone can say yes yes oh yes okay so what we're going to do in this in this demo is first of all I'm not going to show how to bring up a Kafka cluster but I'll just mention that in this compose file in this docker compose file I have a calf cluster which comprises a calf kabocha and zookeeper this is the vanilla vanilla Kafka okay I'm downloading it from from it's an official Kafka release it's not any specific company and it's configured it's it's configured pills held the documentation you can follow it I can also provide this compose file at the end if if you guys want to play with it and then on top of this cluster we are connecting a connect a Kafka connect cluster to it and exposing its rest port its rest api pod I want to show you the docker file so that you'll believe me that it is the vanilla Kafka version this is Kafka 2.5 point oh all I'm doing in this document is I'm taking the jelly Alpine OpenJDK jelly Alpine version and then I'm just installing double you get and downloading the official Kafka version and then I'm doing something magical here which I will I will explain shortly configuring the bocal and running it but let's let's see what i'm doing here Kafka Connect is basically if you just bring it up it will it will not have your connectors right you didn't you didn't still ship any custom connector to it what I'm doing here is I'm telling Kafka connect not Kafka connect I'm telling my dock you'll file and I'm saying take whatever is in this compiled connectors directory here in Kafka connect take whatever is found he'll and copied to the beam compiled connectors directory in the container okay so now you see it's it's this directory is currently empty but we'll see that after we implemented connect of will see that we as part of our build process we are delivering the output to this folder and then when we are building a docker image then we are copying the connectors into it eventually in order to start the Kafka connect cluster I'm invoking script configured and run and inside it I'm first I'm copying it into the image okay and then I can make sure that before I invoke Kafka connect and start it by calling the command Connect distributed and passing it properties configuration properties then I'm first populating the the properties file I'm first telling it the most important thing is hey here is your cough cough cough cough Laster use it by connecting to this would stop server okay this is the this is the Kafka server that I declared heal it's name is Kafka the host name is Kafka so I'm telling my configuration right your boot stops ervil's boot stop against the bocal Kafka at bot 1992 the group ID for this connect cluster will be connect cluster I know it's not a it's not the best name but this is also by the way this is the default the default value that that is in the connect distributed in if you do not if you do not provide it cluster name will be connect cluster then I will come back to the converters immediately but then we have the plug-in path I'm telling I'm telling my my container that look for additional plugins in this folder bin compiled connectors and I remind you if we go back to our document here I copied into the image the contents of this folder I made sure that it will be there when the when a container is started from this image okay so once I put files here then they will they will be configured they will be copied into the image into the bin compiled connectors and when I'm starting the the container the container will be configured connect will be configured to take to read the plugins from this folder so this is the way that our that my this is my build process to make sure that when I'm creating connectors they are delivered into my doctor eyes environment and then pointed at by my connect distributed process that is that is being that is being started so by the way in the plug-in path you can provide multiple multiple multiple passes and for the sake of simplicity I just I just used one let's let's get back at these configurations they are very very important they also have different values but it's very important to understand and I will now I will try to to touch on a problem that a lot of people are experiencing the convert data conversion problem when they begin with cough go connect but it is something that if you understand it if you understand it well you will experience less problems so Kafka Connect is as an integration service relies on an existing Kafka cluster now it might be the case that you are writing into a topic or reading from Kafka topics that will already created it's not it's not your topics it's not topics that you can do whatever you want with them so in this case you will have to provide what is called a converter class it's it's really a class it's a Java class and and tell Kafka connect what to do when he serialize and deserialize this data when working with these Kafka topics so in our case we are going to use JSON so I'm going I'm telling Kafka connect that the key that when he tries to see when he writes something into Kafka into the topics that were going to write to then to convert the key to JSON and when he reads from it to convert it from jason the same thing is about the value I'm telling him make sure that when you write something into the value field in the in the message then write it in converted to jason when you write the topic now keep in mind obviously everything in Kafka topics is stole stole this bytes Kafka doesn't know about our types so this is a way to convey to when we are writing the key and value converter this is a way mostly to work with it's for developers it's not fall for Kafka Kafka doesn't need this knowledge Kafka can take whatever you throw at it you throw byte away at it he will manage it you will pass it to the other side so we'll see when we consume data in a few minutes then we will all see that how this data comes out and you won't be surprised now when you see the we are writing data we are writing a string and then data comes out as is JSON in JSON format so now you understand you'll understand why it happens okay so at this point we now understand that we have a zookeeper and Kafka instances which makes a Kaffir cluster and then we we now talked about how we can bring Kafka connect up connected to a Kafka cluster and configure it I also did something here phille in case we want to investigate the logs so I installed the elastic stack and installed the elastic self cabana and file bit to collect all the logs into elastic search so that it will be easier for us to search and I also installed the tool called C mark which is a Kafka Kafka administration tool it was initially developed by Yahoo and it was recently renamed to see mark it was called Kafka manager before that but because of trademarks it was trademark trademark concerns it was changed the name was changed to to see mark and by the way it's getting some long-awaited support recently and that's a good thing so let's first bring up our class tell without the compiled connectors we still didn't build anything right and let's see how we can interact with a without cluster so in order to bring up to bring up the cluster what I do is go and either they write local compose up or click on this button here and it will bring up the the entire stack and we see here that everything was deployed successfully we can see now we'll see that the services up it will take a few seconds for everything to stabilize but let's let's check in let's switch now to Ostman and and let me explain something so the way that Kafka connect the way that Kafka Connect allows people to interact with it is through a REST API the REST API is actually well documented and let me show you the documentation important to know that it's there because in some cases you well it's not this okay so with the rest API which is well documented here you can see all the rest calls that you can make to the cluster in order to interact with it so what I did before this before this session is I prepared some of the most common common rest calls in postman just to just to show you how it works so the first call we can make to cluster is just a call to to the to the port to the to the main route to the hood file file system of the of the best server and just get its version and a Kafka cluster ID then we can see we can invoke it and tell him give us all the connector plugins that are loaded into the data loaded into the server okay now what you see here you see that both connectors that I've implemented because I did not run I didn't run the dock and composed build command before I brought up and so my images which are stalled on my machine still remember the the connectors even though they are not in the in the folder that I've just showed you so this is my bed I simply I should have run docker compose build before before running it up and then you would these two connectors will not be there then we can see we can run and see if we have if we have any active connectors that are currently up we see that we don't have any active active connectors we can request a specific connectors information write its name here and get its gets it get the configuration that it has and how many tasks and all that then we can request the tasks of a specific connector and the status of the specific task we can also do we can post we can create new connectors we can update them using put and we can delete them we will do all these operations after we implement our first first connect oh I don't want to do it right now we'll do it after way we write some code and deploy it okay so let's get back to you Kelly J which was some yeah okay so we have our cluster up let me take it down because I want to show you that we don't have the I want to have a good sorry I didn't do it before the talk it's a good point yeah okay excellent yeah yep so I'm using the Python implementation do so specifically the confluent one yeah and for example there I'm using like a string sterilizer for the key just example and adjacent serialized ill to them can implementing the like the sky my register and everything connected to it yeah so I don't have to change anything I think I just have to say to Jason it's a Jason converter and by the way that's when that's when you are walking with a scheming registry I don't know if you were in the beginning of the talk but I said I I said that walking with schemas is a topic on its own if you guys want I can give a follow-up session but just in just in general you should know that including the schema within is each message has an over the head and that's why people are walking with schema registry and tools like that to reduce the overhead and also to manage the evolution cycle of schemas but it is true what you just said is true to walking with the specifics of confluent you just specify the the rest you just give it the schema registry address and that's it and specify the name of the schema and then when it goes to the schema registry with the new schema it is registered and if you if you are specifying the name of a schema that he knows then you will retrieve it from the schema registry okay that's we can talk about it in the end okay so now what I've done here is I built I we were talking so I ran a docker compose build operation to make sure that i refresh my images to represent the the fact that i do not have any compiled connectors and i want to make sure that it is represented in postman so I'm asking a I'm asking postman now is the is the fresh cluster up yes you can see that the cluster ID changed and now I'm telling him can you give me a available connectors and yes and you see the the connectors that he has now only the official connectors that are shipped with Kafka connect we don't see any connectors that any custom connectors here okay any more questions before I proceed to the implementation of a custom connector no questions okay so I'll continue and you'll get the chance to ask more questions later okay so once we have a fresh cluster with no connectors in it all it may have connectors that we already deployed but let's say that we want to implement our own connector so what I created here is a multi module maven project why is it multi module just because I want to deploy I want to run to manage everything in it with a single maven command so I made sure every go all you see here is that it has two modules it has a source connector module that I did I wrote and a sync connector module that I walk let's focus on the source connector for now because it will be the vast majority of of this talk and if we have time we'll talk about the sync connector so I took the liberty of choosing the the topic for this for this connector and I defined it as we want to write a soft connector that takes all the links from a specific web page a web crawler so if we go to to a website let's just make sure we are we understand what we're doing if we if we and this is this is the site that we are going to go to we are going to go to engadget.com oh whatever then myself connector is task is to do the following it's like it's like he did right click and view page source and then I am looking for all the links within this page and I want to know all the H left's I want to collect all the links that are in this in the source code of this page I want to collect this one and this one I want to correct all the all the clickable links that this that this page has okay this is this is a task that is it's very common for web crawlers but [Music] but I want to do it I want to do it from within Kafka Connect I wanted to have a scalable cluster that can manage as many web crawling requests as I need it to and I need and I want to be able to scale it up and down fill my needs so the first thing we are going to do when we are implementing a connector any connector whether it is a sink or a sauce is to extend the base class form from Kafka Connect and the the Kafka connect shipped API comes from an artifact called connect API it is it is an official release of it comes it comes with every version of Kafka increments its connect a P I and we are using the official official package we will also use two more dependencies so we'll use jeiza which is which will serve as an HTTP client to get to get web pages you can do you can use something else if you want to but I'm gonna use I'm gonna use this friend here and the HTTP card I'm not using anymore so that's that's a redundant part so we are going to implement a connector a stove's connector this is the name that we gave it and in order to implement him we are first extending the class soft connector which comes from the official package of the connect API now before we continue I want to remind you from here from the slide just a second so so these represent tasks okay because in Kafka connect tasks the functional part of the connector the connector itself is it is responsible for configuring tasks and the degree of parallely and the degree of parallelism we expect from this connector but the functional part of of a connector is a task or many tasks so so what we are going to do here the purpose of the soft connector is to define at the end eventually to define what tasks classes will go in to instantiate how to configure them and to what degree of parallelism and then when we start them we are going to we are going to relate to this configuration um and and and that's it that's that's the that's the purpose of the of the connector class itself it's to configure tasks that's that's what it's responsible for and the task itself will do the work of actually fetching data or writing it or calculating it so let's see what we are doing let's go method by method and see what we need to override so when we when I'm extending the sauce connector you'll see that the first thing that I've the first method that I have here is a declaration of my task class Here I am telling the connector when you instantiate what you are responsible for tasks of this type and in our case it's a web page links sauce task when you are created you are responsible to create tasks or web page links sauce tasks this is your sole managed task so when connect calls on this connector and tells him what class are you implementing and then he says I'm implementing this class which is here and we'll see it shortly the next thing that the connector does is configure configure tasks so what he does here is creating in add a list and when I'm saying he I mean the connector but I wrote this code you as developers or data scientists will have to implement this code pair you'll need so I'm here I'm telling the task all the configuration he needs in order to do his job everything he needs to know will will be passed form from a single instance of a map we are passing properties to it why do we have an a daily step that relates to the max tasks property max tasks is a an integer which tells the component what is the max degree of parallelism that he is allowed to run in each task is instantiated in a new thread Kafka connect will a worker of Kafka connect will create a thread pearl pearl task so this number that we have here is the maximum you as developers have they have the freedom to supply configurations up to this number you can give less and then your connector will run at max let's say that you have a max tasks of 10 but you can't you make sure in your code that you do not provide more than 5 configs then you will never get to the degree of 10 tasks you we'll get at max to five we see it when we when we run the samples so here I'm creating configurations one map pill task and adding it to this to the result of the task configs method which the connector provides to the Walker so the worker knows through the connector how many tasks he needs to deploy in the workers in the working group after and the task configs is connected in not directly but it is connected to the config def method the config method which returns a config def class and you'll see that the config death here we are defining that's our way of telling the connect cluster how to validate how to validate the configuration we passed into the connector so here we can define that this connector receives a URI to analyze which is this is the property how it will be this is the property key how we will represent it in the in the request we send to the connect API and we can tell that the type of this property if it has a different value or not what's the significance of it from the importance of it from if it has to be a mandatory or not and the description to to display in case it in case in case you want to present it in the in the ayahs as done in several commercial products so so this config def is for the configuration of the connector and then we can we can read belt ask the configuration this task needs so we also provide the version of the connector which comes from you'll see it also in the API and eventually we get to the most interesting part the actual work that the connector is supposed to do so in just before that we have a start and stop for the connector so and in the start operation what we do here I'm just writing a message to to the system out I remind you I'm walking with file bit to collect the lobes two two elastics in them cabana and this pack this has helped me a lot but you can also use a logger who and and configure the logo as as you wish what I'm doing in the start is I'm taking all the connect to connect properties and copying them into private members to help me then use them in the task configs so I'm copying the URI to analyze the webpage that I want to crawl and the name of the output name of the output topic which you'll see how I mention it how I give it to the to the actual how I'm telling the workerle well to write the in which topic to write the data will sit in the task and in the stop method what we have is basically I'm just writing here just writing something to the to the constant just to know that it was invoked you'll see these logs show clean when we run the connectors and you'll see it you'll see it in the log and you'll understand okay we now switch to the to the task itself the connectome is instantiating tasks and as i mentioned before we we extend the base class which is called source task and then we have to implement the following methods we have to implement the method version which returns the version of the connect to itself see I'm instantiating the connector and calling on its version and then I have a method to start the task with the configuration with a single instance you remember here we created a list of configurations a list of maps but a task only gets a map not the least he gets a single single member from the list of maps so task is configured with one item from the set of configurations that the connector provided it and this is this is a good point just to stop and make sure we all understand this because parallelism in a connect in a custom connector parallelism is defined by what we do here in the task configs in the connector itself not in the not in the not in the sauce task but in the source connector when we when we get the task configs it is our responsibility to split the to split the work among tasks but we have to do it for Connect connect doesn't know exactly he doesn't know the strategy to use to paralyze the data we are getting he doesn't know it we are we just invented something we invented a wedge a web page links aux connector how will connect know how to distribute the load of how to properly cut the data and pass it into tasks so for example if I had heel aur eyes to analyze in the plural and I would have done something like this and now I sorry not heel but if I passed in the value in the when I'm feeding the connector or the configuration he I asked multiple websites then here would be the proper the proper place to make multiple configurations not with a single uri to analyze but with splitted your lies to analyze and that will make sure that each task is configured with one webpage and and then when we get to the task then the task would receive all the properties you need it needs to do its part of the of the entire the entire job it is this point clear or do you want me to try and clarify it more it does does anyone have a question about how Connect distributes the walk between tasks okay no questions so I hope that after the demo by the way it will be it will be clearer so ok so when I'm starting the task when the task is started and what I'm doing is the following things in start all in stop the task is expected to allocate resources ok so when we are when we are running stout we are expected to allocate all the resources the disk task needs when we are calling stop the task is expected to release all the resources that it that it created ok it's it's very important to understand if you have something - if you have something to create a if you for example in my start what I'm doing here is what I'm doing is writing a log to the console I'm saying hey I started and then I'm reading my my configuration into private members I'm reading this is the URL that I'm going to analyze this is the output topic that I'm going to write to and I'm going to write to the soft partition I will explain what the soft partition is and then eventually I am doing resource allocation I am creating a an HTTP client behind the scenes by using gzip it connects to the UL I and then I can do get requests against the the web server and get the data and do whatever I would and and pass the links and and pass them forward to the to the Walker and so after I was I was invoked by start somebody called me it was the Walker by the way it's not just a somebody the walkers called me he said I start a task with this configuration just a second is there's something in the chat yeah I'm going hey oh man I'm going to answer these questions in in the coming demo I have a demo fluid for changing the NAM tasks and we're going to do it okay so start we allocated the resources reddle configuration and then comes a call to poll poll in a soft ask the worker is telling me he schedules me and says hey call a cold poll I want you to go and get data from your from your source he doesn't know the source that's why our interface is a source record this is a Connect connect API class source record is something defined by it's a strongly typed type from the connect API I am expected to go get whatever data I have and put it in a sauce record so that so that Kafka Connect will be able to go and take the polled data and pass it forward to Kafka and he gives him a list of sauce records and Kafka knows according to this list how to create the messages and to which topic to deposit them so we still didn't mention that the name of the topic so let's see what we are doing in the poll hole is invoked and for us we do not we implement it and we expect a Kafka connect through the Walker to call us and tell us give us data when he calls us what we are going to do is the following first we instantiate the result set we create a list of source records which we are expected to return then I'm going to to the website I'm actually using the connection I started just sorry sorry so I have I have a I have an HTTP client open a connection I never know a connection object which was instantiated and then I'm telling him every time I'm told I'm telling him go and get the website that we defined he'll go and get give me the HTTP document once I got the HTTP document I'm taking a timestamp and saying this was why am i taking a timestamp here and not within the writing into Kafka because I want all the I want all the messages which were taken from a crawling instance web calling time I want them to have the same timestamp so that I could later query on on this day on this data as a key so I'm saying give me the page the page returns to me and then I'm taking all the AHF elements form it by using the select of this J sub library and I'm getting an element's object this returns now this contains the information that I want but the information is not in the format that Kafka connect is expecting Kafka Connect expects to get the list of software code he doesn't know anything about HTTP elements or your lies it doesn't know what it is so I'm iterating on the elements and this is the part while I am using the knowledge that I have about the integrated technology if I'm integrating with a web server then I need to know that I'm gonna get HTTP elements or in this case it's already each element is already a link and it is my responsibility to pass this knowledge into the result set which is the list of salsa codes this is this is the part where I'm going back to the domain of Kafka Connect so I'm looping over the elements I'm iterating over it and then I'm getting each element and pair each element I need to create a source record and add it to the result set ok and at this point once I added the result into the result set this is the last the this was the last time that my task had anything to say about this specific technology from this point forward Kafka Connect doesn't care about this specific technology anymore the results that will be passed to to the worker and the worker will deposit the data according to what I wrote in the source record and let's see let's take a look here what happens what we put inside the source records that connection can relate so the first thing we put in a software code when we instantiate it is the salt partition now that the name is a bit tricky because we call it a politician and we are in the world of Kafka but the sauce partition is basically the sauce partition is basically an object I'm calling it an object because it's it's a map in which I can say everything I want about myself I can say for example this can be a key and the key will say I came from web color version 3 or I came from Reddy's topic Redis cluster this and version that and this was the URL to get to the server in here we are telling we are placing the data about the source that we came from and when someone at the backend is trying to whit this data he might use this data as to to decide how to how to move forward with analyzing the data that we have so the sauce partition is something that we can put whatever we want in as long as it's in the form of a map and it it is supposed to contain data that represents the source we came for the source offset and also and again it's another it's it's the terminology of Kafka but is the source offset is anything we want to say about how to know within the source partition where our location is so the source offset can for example represent if the source partition was let's go back to the Redis example so if we were in the domain of Redis and the specific : so the sauce offset would say if if I if I came from ready seven and the sauce and the sauce collection was called X the sauce offset will be something that the third from the the felt element form from the top and then it could be the fourth element from the top and the fifth element from table with weed we decide how to represent the offset within our sauce it's a way of us knowing what well to go back to in case we had a failure this is not Kafka offset and this is not Kafka partition it's important to understand well is it Kafka hill the output topic name tells connect in what in which in which topic to put the data we are now giving it he will take the connect Walker will take the list of source records iterate through them and put each one in the in the topic you mentioned in our case it's it's the same topic in our case we are writing all of our all of our records into the into the same the same topic then we define what the schema of the keys and we will see what happen when we see the output you'll understand you understand you'll see what happens there well we're not telling Kafka your key that I'm going to give you now here this is the key the sauce becomes key it is represented with with the string schema okay the same thing for the value I'm giving you a link this is I took form the element I took the value of the href I took the the link itself that we found in the h2 p-paige and we wrote it as a string not that we have any other choice in this case so I'm I am declaring that I'm giving you a value and it's schema is string and then I'm giving you I'm giving you a key and this the schema is string I'm giving you a value and his schema is string also and then we ring we write all the social circles into our result and send it back to send it back he'll - once we return it then the walking takes it and manages the records for us and takes care of writing them - Kafka very important thing is ello handling within tasks and he'll by the way I didn't do it's not the best practice to do what I just did to assume that the user configured the connector properly and that he passed the all the input that is supposed to give you it's not a good practice we need to add a little checks here and we need to remember that we are walking with an external technology here which might have some downtime or anything bad configuration without we may we may have passed here a web page that is URI that it doesn't exist so we'll keep getting here for an exception that page doesn't exist exception handling is important because if we don't if our task continues to fail then then the task will be will be marked as failed in the connector will the connector will stop walking eventually if all the tasks are failing okay so at this point I want you just to notice that we added this because we are now going to run the connector but so I want you to know this I've just added a system out for the result count so that now when we run the when we run the web page links Sox connect oh I want you to see results and so just notice that I added this system out so in order to deploy our connectors into Kafka Connect I remind you we must compile them and ship them to this compiled connectors folder in the Kafka connect folder so that a local file when we build the image will copy them into the compiled connectors path and then you know configure and run script before we run the before we start the connect oh we are configuring to add the plug-in path of the of the files we just we just added ok so what I need to do now is see first of all okay so we have a cluster up and excuse me here and at this point we do not have our connectors deployed so I'm going to deploy now I'm going to deploy the connector all the way that I'm going to do it is first of all I'm gonna take down cluster then the next thing I'm going to do is run a maven package command on this maven build configuration I'll show you what we have here so this is a multi module which says multimodal palm I'm telling him I want you to build this module in this module and within the web page links soft connector I told him sorry I told him that I wanted to use the maven compiler plug-in and maven assembly plug-in and I'm asking him that when when he builds the jar with dependencies because we have to we have to ship all the dependencies to connect because he doesn't have them so I want you to build a jar with dependencies and copy it to the Kafka connect compiled connectors folder okay and you'll see that now they were created here both the web page links sauce connector and and the sync connector which we haven't implemented yet I implemented it before so it's it was shipped here and now after we build the project we need to make sure to update all images we run a doc and compose build operation which means that now these two files these connectors we we built will copied into the calf into into the image and we can now instantiate the when we're undocking compose now we expect to see the day that that we have these connectors so let's let's bring our system up again go to Postma and see that the cluster is up the cluster is up now let's see the connectors and we see that that we have here the web page links sauce connector and the system outputs in connect oh I didn't you know what I'm gonna show it in a minute because I want to use it so let's see so just as we implemented the web page links source connector I implemented a system output sync connector what it does is write to the system out so just as with the sauce connector we extend a class form connect sync connector we configure the connector is responsible for configuration including the max tasks part and the task is responsible for writing data instead of Paul which means get data it has a put which gets a collection of sync records and what it does is simply writes it to the system out okay the magic Hill will be in in your case will be to write to each technology that you want it could be to a Cassandra a database - - anywhere you want but I just created it to write it to the output because I wanted to demonstrate parallelism which will now do so we have we have both connectors deployed at this point I want to create an instance of the web page links source connector I want to start crawling a web page so I'm going to the connector and I'm doing the following this is a restful api and I'm going to send it the following data I want you to create a new connector a new source connector that's his name the Metro is a web page links source connector instance here is its configuration the configuration is the connector class is a web page links soft connector that we have here this is the class of the connector okay this is the class and I'm this is what you should look for how does he know the class because you have the jar file which we copied to the plug-in pie I want you to instantiate it at the degree of parallelism which is one why because there is no use in doing in paralyzing web cloning operation when we are working with a single with a single URI okay through I'm here telling him I want you to run a single task a single thread well each URI that I'm passing you I'm giving you you are like to analyze which is HTTP world wide web and gadget comm this is the the configuration that you should ask this task and when you are creating data write it to the following topic to the cold in gadget your lies and once I do I am sending this data to the to the cluster and I'm getting a response and he tells me basically everything is okay he is confirming what what we just did for now what I can do I can go to the to this REST API call and I can do a get operation on active connectors and I see that my connector is active now okay I can also ask I can also request the tasks for this connector by doing a tasks restive REST API call and we see that we have a specific we have one task running and we can also run here and ask the status of this task and we see that it is running and this is the Walker lady by the way it runs on on this Walker so let's see what happened what happened here when we invoked the when we created the cluster and we have 10 minutes left just to excuse me yeah yeah he's my cabana instance I remind you that I sent all the logs using file bit to Cabana we sent all the older logs to cabana using file bit and I am now he'll going to look at the logs and let's see if you remember here we wrote to the we have the result count right so let's go and see in Cabana if we have any results from the connect oh ok and we see that it does return data but let's do an auto refresh 5 seconds and we see that every five seconds this is our time now and every time it invokes it brings back 647 Yule eyes we have to finish shortly and I really haven't covered everything I want but I just want to demonstrate the consumption of data and we might have to do a follow-up another session but let me just show you I'm going to I'm going out to create an instance of my output connector of my sink connector and I'm going to configure it I want you to write I want you to write the output and I want you to write it in degree of parallelism which is 4 and yes of course I didn't give and uh I need to give him regex we'll have to we'll get back in the next session if we want to do it then we'll get get back to this point huh and okay so what I did now was I just instantiated I just instantiated a system out connector just to make sure that that it is because I wanted to demonstrate the output of the thing okay so it runs and we see that it runs enough degree of parallelism which is four which is good and let's see if we have any output let's for this okay and we see that our caller is walking we see that we have here our caller is walking we have tasks from zero to three degree of parallelism which is full and it gives us the websites all the links that it sees so this is so we completed the circle we have a source which takes data from a web page and and sync which writes the data into our system output in a desired degree of parallelism and we can see it here so but at this point since we have five minutes I have to I have to stop now I want to stop for four questions and I want to ask you if you want also if you want a follow-up session so that we can complete what we just did here and clarify because at the end I had to run a bit quicker than I desired so I I'm not open for questions if you cannot move I have a question it's a bit more a little more general question first of all thank you so much for the for the talk and if you sure if you can share the code with us that can also be super helpful yeah not a problem so I wanted to ask when when in general would you find yourself wanting to write tougher connect oh rather than implementing just a consumer like a regular consumer using Java or something like that yeah okay this is a good question so in general I would say that I would want to implement the connector in two to main use cases the first one is that I'm unable to walk with to open the code that I have and integrate in it a Kafka producible consumer in a good enough version so that so that so that I can rely on and and send my data reliably to to my Kafka cluster that's the first but if you do have the source code then the next reason would be that I do not want to pose a bottleneck on my ship on my data ship because the if I have a if I have a component service that I'm now telling him I want you also to send another five thousand messages a second that might not be might not might not be feasible for a lot of services so I want to decouple the I want to take my I want to take my integration layer into a separate component so that I could scale it if I need to is that does that answer your question Natasha okay welcome and more questions did we have here something in the chat what is the limit number of competition okay interesting full ml full source full force all connectors for sauce connectors no the limit of max tasks will not be the number of Kafka politician when will it be the number of Kafka petitions in sync in sync connectors in when we are implementing sync connect off the connector becomes the connector generates we have a consumer group created to build this pair this topic that he listens on and if we have three politicians and we run the task in and we run four tasks one of them will be a zombie one of them will not get will not get an active consumer we do he will not get a petition assigned to it so this was a this is a good question is there any information about the costs of the you know running different types of cuff to connect to something that's extremely interesting for us we're running a spark clusters to do some writing as consumers of Kafka and moving to connect there's all kinds of reasons why to use Kafka connect but cost is definitely something that's relevant yeah what I was wondering if you care of anything yep so the answer the straightforward answer is no there is no cost because what we did here we ran everything we ran here we ran a vanilla zookeeper a vanilla vanilla Kafka if you see by the way my zookeeper Kafka and Kafka connect run the same instance it's the same download of the official version of Kafka which is completely free for use and then I just invoked different members in one of them I ran zookeeper in the other I ran so you'll see that these files Ile Kafka connect zookeeper Kafka and Kafka connect out the base they are the same document I just added a different configuration and invoked a different a different service in my configure and run so if you are running a zoo keeper and Kafka and Kafka connect you can and you are implementing your personal your custom connectors there will be no cost of cost for licenses you will have a combination processing cost of process big number licenses connect how fishy disconnect compared to other technologies that you could use as a consumer for Kafka I would say well I would say it is mostly it is very very efficient it uses it uses it is integrated within Kafka it is a CAF connect uses the consumers and producers that are from from the same codebase it's not an it's not an envelope on it is Kafka producers and consumers the integration is very tight here ok ok it is very efficient it is but if you look now at my machine here you will see that it is mostly idle it does nothing my machine is now running this this entire cluster and polling on polling on websites constantly and passing passing thousands of messages a second let me just show you here if we go to see mark just I want to show you the Kafka class still behind the scenes it is walking now it is walking but you'll see that it is not it's not working it is walking like it does it has about 100 100 links and miss 100 messages a second it's not working hard at all but but it is walking and you hardly see any effect on the machine it's very efficient ok some someone wrote something here I think Kobe will okay yeah now you can breathe second yeah so we'll probably yeah well we might do a follow-up seems like there was an interest in that and on behalf of the entire group I would like to thank you for doing it and for preparing it so quickly so thank you very much thank you guys thanks for everyone who joined today yeah thank you guys and if you want a follow-up session because I want the follow-up session because I want to compliment the laughs some things that I've done here that were very fast and I would like to take the time and I see the more questions so if you want to then just tell it died and I I'll complete this session maybe one more hour and we can do a lot of specific questions for your use cases okay thanks guys thank you thank you everyone thanks we thank you guys thank you very much
Info
Channel: Big Things
Views: 3,425
Rating: undefined out of 5
Keywords: Kafka, Kafka Connect, Docker, Big Things, big data
Id: 1eNjrMYII84
Channel Id: undefined
Length: 86min 58sec (5218 seconds)
Published: Mon Apr 27 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.