From Zero to Hero with Kafka Connect

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hi my name is robert moffat i'm a developer advocate at confluence and this is from zero to hero with kafka connect kafka connect itself is part of apache kafka it's the integration api for apache kafka and it lets you build streaming integrations from systems upstream into kafka topics and from kafka topics to systems downstream so it means you can integrate all of these systems with their data flowing into kafka and out of kafka the great thing about kafka connect is it's just configuration files to use so you set up a lump of json and you say i want to use this particular connector and i want to get the data from this particular place and stream it into this particular topic there's different uses for kafka connect but if you think about the applications and the pipelines that you build with kafka anytime you're doing integration with another system you should probably be reaching for kafka connect as your first option so some people use kafka connects for building out streaming data pipelines perhaps as we can see here streaming data from maybe a transactional database through kafka and on to somewhere else to do analytics so we want to offload that data from our transactional system go and put it somewhere else where we're going to do our big crunching our ad hoc analytics our machine learning by streaming it through kafka we benefit from kafka providing the back pressure and the integration capabilities we could add in additional databases to this pipeline and we nicely loosely couple our consumers of the data from the producers of the data since kafka stores data we can use that same data that we've ingested from the database once and we can write it out to other places so we can take it to s3 in the cloud and we can have it on premises on htfs but wherever we want to have that data it's only coming from the source system once we also use kafka connect to help us with our applications where we're writing data into kafka from an application and we'd like to get that data on to somewhere else perhaps we want to stream the data to a nosql store or we want to write it somewhere else for audit purposes or for analytics purposes instead of our application having to take on responsibility for connecting to that target system and handling things like scaling and network problems and all the rest of it our application just writes its data to kafka and then we hook up kafka connect and that manager is getting that data reliably from kafka to that target system another pattern in which we see kafka connect being used is helping us evolve existing applications towards newer ways of doing them so a lot of the time our existing applications are built with databases holding the state so it's a existing in-house application or a third-party application and whatever it does it probably ends up writing stuff to a database when it writes to a database those are events and we can capture those events and we can use them to drive new applications we can say in this crm system that exists somewhere else when someone writes a record we would like to write a new application that responds to the fact that a new record has been created or the records been updated or even deleted and we can capture those through kafka connect into kafka and use it to drive applications those are some of the uses for kafka connect some of the patterns in which we see it being used so now let me actually show you in action and give you that kind of idea of how it's uh it's to use and what it's like to actually build things with it so there's a repository called demo scene uh this is where we put a lot of the demos that you see us uh presenting and we've got one down here called from zero to hero with kafka connect and within this you've got a docker compose file so all you need locally is docker and docker compose you can say docker compose up and then you can actually follow the full script through so i'm going to be looking at this one here and you'll see me kind of like moving between my screens and where i'm focusing because i'm going to take the commands from there we're going to use that to build out this demo and so you can see i've started the stack up already and we've checked that kafka connect is available the next thing we're going to do is we're going to check that our particular connectors that we want to use have been installed so we go to that we're saying to kafka connect so kafka connect uses a rest api so you use it to configure it and to find out about the state and we can use it also to find out which connectors we've got installed at the moment after i've shown you kafka connect in action i'm going to talk about how kafka connect actually works internally so we understand enough as end users how we actually go about using it i'm also going to talk about running it and like how we go about deploying it so when i talk about installing connectors and stuff like that we'll come on to that later don't worry so we can see we've got different connectors installed which means that we can now actually crack on we're getting some data into kafka from our source system so what we're going to build is an integration between a relational database streaming its data into kafka and then we're going to take that data and we're going to stream it out to two different syncs we're going to stream it to elasticsearch and to neo4j so our relational database is my sql so let's launch in my sql prompt and you can see that's started up there and if we say sure tables we can see we've got a table called orders so let's have a look at that order data and you can see within that table you've got information an order that's being placed so you can see the id of the order we can see the value of it what was the order for who placed the order when was it created where was it updated so i'm going to set a little data generator running i'm just going to create a separate window here and set that running and if i go back to my sql now and if i query it again and say just show me the latest record you can see you've got new data arriving in it so order id 509 order id 519 521 so these orders have been created all of the time and just arriving continually in this relational database so that represents our existing application somewhere in our business somewhere in a third party we're going to stream data from that database into kafka and i'm using a database but it could be a message queue it could be a flat file it could be anywhere that we want to integrate with we're going to stream that into kafka using kafka connect so let's see on here let's come out my sql i'm going to set this running which simply pulls my sequel every second or so and it shows us the latest record because we can compare this to the data as it's arriving into kafka once we've set that up so now we'll go and create the connector so this is the actual integration piece this is the configuration so it looks like this it's quite a long bit of json because it's a fairly uh there's lots of different things that we need to set up within it but basically we're saying we would like to use from the debesium project the mysql connector we're going to say here is where my sequel is here's how we connect to it here's the particular table that we want to get data from this orders table and a couple of transformations which i'll talk about later but how we're going to process that data as it passes through so we can see here we've sent that to the rest api and it says we've created it and now we can say well let's have a look at the status of that particular connector and it says the connector here it says it's running okay so the connector is running which means we should have data flowing in to our kafka topic so now we can go and do this we can actually have a look at that data so i'm going to run a consumer here using a tool called kafka cat and it's going to go to our orders topic and it's going to show us from the data that's flowing in well let me show you the raw payload to start with and then we can actually see what else is happening so each message that's come from the database we can now see in the kafka topic we've got the order id we've got the customer id we've got the order total and so on so that's flowing into the kafka topic that's as anything changes in the database it's going to flow straight through into the kafka topic let's go back to our previous screen where we could see the output from my sql so the latest data as it's arriving i'm going to split that screen there i'm going to do it like this horizon vertically and we're going to run that same command and this time we're going to say to the kafka consumer just show me the id the create timestamp so we're just going to see those two fields so we can actually track as this data arrives it's actually in sync with what's happening in the database so if we look on the left hand side here we've got the database id 834 836 and if you look over in the kafka topic on the right hand side there's 834 there's 836 so as the data gets changed from the database it gets reflected in the kafka topic straight away now let's take that data and let's stream it somewhere else so we've got the data flowing in from a database into a kafka topic and now we can push it from a kafka topic down to somewhere else so let's send it over to uh elasticsearch so again we're using the rest api of kafka connect we're creating a new configuration this time with the elasticsearch sync connector we're taking data from this particular topic and sending it over to elasticsearch we say we've set that up it says it's been created we're also going to send the data over to neo4j and you'll notice for each of the different connectors we're creating there's some standard configuration like here is the connector class here is the kafka topic and then you have configuration specific to the connector so where is elasticsearch what do we want to do with the schemas and the keys and depending on where the data is coming from and where it's going to you'll have different configuration that you need to specify so when we set up the configuration for neo4j sync again it's the neo4j sync connector there's our standard connector class but neo4j needs to understand how it models this data neo4j has got a language called cipher and so we express using cipher how to model that data that's coming through from the source and so we go and create that and then we say let's have a look at the status of these three and we can see here that we've got the elasticsearch one is running the neo4j one has failed which is terrible because this is a live demo or a recorded live demo so we're gonna go and check out elasticsearch and then we're gonna go and poke around and see if we can understand what's up with the neo4j one so let's have a look at elasticsearch i'm actually going to use a tool called kibana on top of that so here's kibano we're just going to refresh some of the fields and then we can open up kibana let's open that in a window there and put that on the screen over here so you can see it and within this we've got the data as it flows in to that topic so let me put the window like that so you can see what's going on here and close that down and put this window down here and now we can see we've got data flowing into the database here order id118 we can see the same data flowing into our kafka topic on the right hand side of that pane there and then here at the top of the screen is kibana which is sitting on top of the data in elasticsearch it's set to refresh every second or so and as that data arrives here's our order id 1142 1149 that's as the data is arriving in the database bottom left of the page flowing into a catholic topic bottom right of the page and then over into elasticsearch and being rendered in this dashboard within elasticsearch at the top of the page neo4j we'll kind of like we'll figure out later if we get time we'll actually fire up a bit of a debug session but for now i want to go back and explain more about how kafka connect works internally now kafka connect is part of apache kafka so part of apache kafka there is an open api you can go and look at the java docs you can look at the kafka improvement proposals the kits the design documents behind it all which as uh developers of plugins i was like coding this kind of stuff that's super interesting and super relevant as users of kafka connect as developers as data engineers we only need to understand some of it we don't need to dig all the way down into it for deliberate reasons which is kafka connect abstracts this concept of integration for us and it exposes a configuration interface we don't need to understand all that kind of the bits and bytes all the way down there if we want to do that we go and write some separate code but the whole point of kafka connect is you don't need to write that code you just use the configuration but you do need to understand what's happening in the pipeline that we're building so the first piece of the puzzle is the connector plugin now this is the piece that speaks the specific technology with our source or target so if we're pulling in data from a database it speaks like jdbc or it goes to the my sequel bin log it speaks the actual api that's necessary to get the data in from that source system or write it down to the target system we specify it using the connector class and when you install the connector the documentation will tell you the name of the connector to use that connector is the only bit that's specific to that source or target technology so if we're using a connector reading data from a message queue it's going to speed the particular api for that message queue but it passes internally within kafka connect a representation of that data it passes the actual payload the value of it and the schema of that data but it doesn't have anything more specific to do with that source system anymore it's now just a generic representation of that data taking that data and writing into kafka is the job of the next plugin and this is the converter plugin so when we write data to kafka it's written as just bytes we just write bytes to kafka topics kafka is not opinionated about how we serialize that data i am opinionated about how you serialize that data you should be opinionated about how you serialize that data because there are good ways to do it and there are less good ways to do it when we take that data from a source system it has a schema almost all type all the time it has a schema we just sometimes choose to ignore that fact because it makes things a bit more complicated but they're complicated for a good reason which is when we're building applications when we're building pipelines schemas act as our api as our contract between the different pieces of the puzzle so if we chuck them away it makes life so much more difficult and it makes the processes that we build much more brittle so use something like avro use proto buff use json schema use something where we have a schema for that data because it makes things so much better in the long run with the schema for the data it gets stored in the schema registry and then kafka connect or any other tool that's writing to kafka will write the message itself onto the kafka topic if you're using avro if you're using protobuf it's a nice small little binary representation of that data with a schema available separately and since the eva schema is available separately it means that any consumer of that data whether it's kafka connects as shown here or your own consuming application that you're writing or something like key sql db it can fetch that schema from the schema registry it can deserialize the data and then so if we're writing the data down to a database because we've got the schema we can go and build that table automatically from the own that connector we don't have to go and do it manually and find out the schema manually because we have it already from the schema registry so the converters are very very important part of it it's really worth your time understanding exactly what they are what they do and how to use them properly you specify them as a global configuration in the kafka connect worker and i'll explain workers later on you specify i would like to serialize the value for my message using this converter i'd like to serialize the key for the message using this converter and also when you're deserializing it which converter do you want to use now obviously you want to make sure that you match these up you can't serialize it using one method and deserialize it using another because that just would not work so if you're writing avro onto the value of the message you need to read avro from the value of the message and if you don't you hit the kind of errors which we'll talk about briefly later on now in previous versions of apache kafka there was a configuration called the internal converters which was exposed and this caused lots and lots of confusion and wheeling and mashing of teeth because people sometimes run into trouble with converters that's what they say right i've got a value converter and then i can see there's also an internal value converter and things aren't quite working so i'm going to add in another one i saw this thing on stack overflow or i googled something and i found this blog that said i should also have a key internal value converter and then a key internal value converter and then things still aren't working so i'm going to add in more and more of these things and please just hope that something will work with these converters now since apache kafka two dot something it was quite a while ago now those internal connector configurations have been deprecated and you just specify your value converter and your key converter the third piece of the little box of plugins that we have are single message transformations and these are optional when you're building a kafka connect pipeline you need to specify your connector plug-in where's the data coming from or going to the converter how we serializing or deserializing that data with a kafka topic transformations give us a really nice ability to apply transformations the closing the name to the data as it passes through the pipeline so as we're taking data in from a source system we can say well drop these particular fields we don't want to write those to the kafka topic as we're reading data from a kafka topic writing it down to a target system you could say i would like to change this data type here i would like to add in this piece of data lineage so transformations they're part of the configuration of the connector that you create they're not this most straightforward things to set up but the the syntax looks like this we say we've got the transforms prefix so that's the transformations and then for each one we give them a label so we say it's going to have a date it's going to have a label for bar now label for bar is a silly label to give because it tells us nothing about what it's going to do but we've got two transformations with two different labels then we say for each transformation so we've got transforms dot label as a prefix and then each one's got a type and then other configuration elements so here we've got a timestamp router which is saying take the topic name and append the timestamp with a year and month on it and we've got one called labelfubar which if we look at it carefully is actually saying rename delivery address to shipping address so we've got these three different plugins that we use in building out our pipelines it's all extensible you can write your own if you want to so connector plugins there's a huge uh ecosystem of them out there from the community from vendors so a lot of the time they exist already but if you've got some new technology which one doesn't exist go and write it if you're writing your own plugin don't start mucking around with thinking my plugin must handle like i'm going to write to the kafka topic because you don't do that your connector plugin just takes the data from the source system and passes it on internally within kafka connect the converter is responsible for serializing that data so if you've got a different serialization method you can go and write your own converter and transformations as well people quite often end up writing these for particular purposes so you can get all of these off confluent hub so go to confluent hub type in a particular technology that you're interested in or transformation that you're interested in so you want to transform data that's coming in as xml you type in xml you search for that on confidence hub and you find there's a single message transformation that will do that for you so that's enough of what you need to know about what's happening within the kafka connect box to be able to set it up as a developer to build some pipelines and integrations with it but then you need to run it then you need to actually set these things going and manage yes and operate it you could just use confluent cloud where we have managed connectors so then you just configure it and it gets run for you but if you're running kafka connect on premises yourself then you need to understand a bit how are we going to actually go and deploy that what's the the run time look like so we have some more terminology we've talked about connectors and converters and plugins now let's talk about connectors and tasks when we run a connector in kafka connect we have this idea of a logical connector so we've got like a sync writing data from a kafka topic to amazon s3 for example that gets carried out internally with the actual runtime by a task and you could have a different connector you could have a jdbc source pulling in data from a database and again we have a task that runs this but kafka connect can also parallelize work within a given connector so we might find that the jdbc connector says actually i'm going to run two tasks because you asked me to pull in data from two tables and i'll pull both uh tables in parallel so that's this down to the author of the connector plugin who will say well i'm going to enable it to scale out if the user has allowed me to and i'm going to do all of this work in parallel depending on the integration that you're doing doing it in parallel might not make sense you might want to force it to be serial so that can be configured these different tasks are actually executed within a worker so we're now down to the actual nitty gritty of the jvm of the process that we're going to run to actually run kafka connect when you go to the command line and say run kafka connect for me please you're actually instantiating a connect worker this is the jvm process that you'll see if you can look at what's running on the box it's called a worker within the worker the actual tasks and stuff like that get executed there are two different deployment models for kafka connect workers there's one called standalone and there's one called distributed now it's worth understanding this carefully because you may not choose the one that you might guess is the appropriate one standalone is not usually what i would recommend you use even if you have a single nodes standalone that you want to run it on and this is why the standalone worker is not fault tolerant at all if it dies you lose your stuff you have to kind of like revert to a backup or something like that it's not scalable so if you want to add in additional capacity you have to set up a separate worker and you have to partition your connectors across them so in this example here we've got the jdbc source connector it's got two tasks running within it if you need to scale that in just you can't because you have one worker running one connector that's as low down as you can go whereas if we run a distributed worker the distributed worker can run on a single node so it's like it doesn't have to be distributed to actually run the distributed worker we have a single node running all of our tasks but it uses kafka itself to store its configuration and progress and stuff like that which means that if we add in a second worker it can find out the configuration and so on from kafka and it scales out beautifully so we can now have tasks running across one or more different workers kafka connect allocates that work accordingly if we lose a worker kafka kite will say okay we've lost something here we need to move that work around to make sure that all of that works being carried out so we can scale it out as much as we want to both for redundancy and throughput and if you go from a single node to another node you just add in an additional worker whereas if you're using standalone and then you say oh i need a second worker or i want to switch from standalone to distributed it's a different way of configuring it it's a different way of running it it's just easier generally to start with a distributed worker on one node and then if you need to scale out if you don't well that's fine because it's just as easy to work with on a single node you don't just have to build one great big cluster of kafka connect workers we can and some people do but you also see organizations who will partition their clusters of distributed workers so you may have one cluster of workers that's like resilient and fault tolerant and scalable handling one set of connectors or one business units work and then a separate one running a bunch of other stuff so it's up to you how you deploy that and that could be running against a single kafka cluster or different kafka clusters you can carve it up entirely how you want to you run kafka connect as a jvm process so you can run it on bare metal you can also run it on containers so there's a base image that's provided if you go along to docker hub you can find that on there and then to install your particular connector plug-in that you want you need to install that into the container when it's actually run otherwise you just have the runtime the important thing about installing these connector plug-ins and converters and transformations as well if you're using them is that they need to be installed before the worker runs you can't install them afterwards which means we need to think about how we're actually going to do this so you use confluent hub usually to pull down the particular jar file for the connector or the plug-in or whatever that you need and then we can do it one of two ways we can either say at runtime we're going to instantiate this base image and then before we launch the worker process we're going to put in a little bit of code to say we'll pull down the additional plugins that we want and then launch the worker process or we can actually go and build our own image which is probably the more proper way of doing it but it adds in additional uh steps if you're actually just doing this as like a pro prototype thing on your laptop so you'll see all the demos and stuff like that that we publish on demo scene a lot of the time we use this pattern because it's just like here's the base image edit the docker compose yaml to add in a few more things and then off we go but if we were doing this thing for real in production you probably end up building your own images you can also automate creating the connectors themselves because you create connectors by using the rest api so you can actually put that into your docker compose also or into however you're deploying your containers now i mentioned we talked a little bit about troubleshooting as well because we've seen what kafka connects for we've seen how to go about configuring it and understanding it from a an end user point of view we've seen about understanding it from an operations and runtime point of view but now let's think about it like well what happens when go things go wrong because things do go wrong let's be realistic there's different concepts within the runtime that we've talked about we've talked about workers and we've talked about tasks and one of the first things that throws people off is that you build something out and you've deployed your configuration it says yes i've got the configuration and things are running but i've got no data why is there no data flowing between my source and my target and you go along to kafka connect and you say i'm going to use the rest api to say what's the status of this connector and it says the status is running and you think oh where is my data if you drill down within the status you find that each task has its own status and if the task is failed if all the tasks are failed you're not going to get any data so you have this concept of like it's logically running but in practice the connector isn't running if all the tasks are failed you're not going to get any data so to then troubleshoot it further a lot of the time you're going to end up going down either into the log itself or you can also use the rest api to pull out some of the status information and get the stack trace that way but if you don't get it straight off the bat so like the example we've got here it's actually showing what the problem is and we've got that through the rest api a lot of the time though we end up going down into the log where you get the log from depends on how you've deployed kafka connect did you deploy it's using the confluence cli is it using docker is it just on bare metal one of the really useful things that was added in apache kafka recently was dynamic log levels so if you've got a kafka connect worker running and having problems with a connector you're not quite sure what's going on you can actually change the log levels dynamically as it's running and also targeting specific loggers so if you set all the debug sorry all the uh logging to debug or trace you just end up with screenfuls of stuff that you can't find anything in it's like a full load of haystacks with no needles to be found but you can say well i'm interested in this particular logger like just for a particular connector just for particular connectors specific operations and you can set those up to debug or trace which makes it really much easier to find out what's going on now sometimes the connectors go wrong and you need to go and troubleshoot them like this sometimes though you end up with kind of like logical problems in what you've built so kafka connects supports the ability to do error handling when it hits messages that it wasn't expecting and also to write messages out to a separate area if they've hit a problem so dead letter q is what we call it here's an example of how you would use that so a very very common error that people have with kafka connects is unknown magic byte which sounds fun but i promise you it isn't the tldr too long didn't read of this particular error is that you're trying to deserialize data using the avro converter and the data it's reading is not avro that's what that magic byte thing's referring to the avro kind of like wire protocol but let's look at this in a bit more detail so let's say we've got a topic on kafka and the messages are in json we've written them using the json converter and then we set up our sync connector to read them from the topic using the avro converter this was like well you can't do that you can't read json using the average converter it will throw that specific error there it just doesn't make sense so the solution here would be to say well we'll just use the json converter for the sync and then it works as it should do what about though if you think well i'm supposed to be using avro and i've set up my avro sync converter i'm writing avro messages to that top the topic it's reading from i'm still getting this error a lot of the time particularly when people are starting out with a catholic connect pipeline they'll inadvertently write non-overall data to that topic they'll be like trying that particular configuration and writing a json to that topic trying out another bit of configuration writing some afro trying something else out and you end up with a bit more json on the topic so the topic ends up with a mishmash of things and if that avro converter reads a message that's not avro it's just going to stop so let's look at the different behaviors that we've got within kafka connect for handling this so by default kafka connect will fail fast it'll hit a message if it has a problem with that message you'll say nope i'm done i'm tapping out i can't process it we stop which sometimes is very useful if you want to make sure that like you're not having any data going through which you really shouldn't be there sometimes though you just want the damn thing to work so you say well if i hit an error we've got this errors tolerance setting errors tolerance equals all if you hit an error screw it we don't care just like let it be which in itself means that things keep on flowing but perhaps that's not a great way to do things just like ignore errors as they happen because they could be indicative of like a real problem somewhere like you've got duff data coming through that you're completely unaware of so a much better approach is to say well if you hit an error don't stop because we want the rest of the good data to keep flowing through but write that bad message or bad messages to a separate topic called a dead letter q so that dead letter q is now gauss in our example here json messages on it we're reading through we're reading avro messages this is all good i'm here to json one we don't stop but we write that json message out to the dead letter queue and then we carry on processing we can then take that dead letter q which is just a kafka topic and we can say we're going to process it using our sync connector but we use the json converter this time so now we can read json and avro from that source topic first off we try and deseleralize it as avro if it fails we write it out to a new topic the dead letter q which we just process as a source topic with our sync connector and we say we'll use the json converter this time and we try and process them that way there's a bunch of monitoring that you can get out of kafka connect you can use the wrist api to go and check on the status of connectors you can also use confirm control center or confluent cloud's got a web interface to look at the rate of throughput that you're getting on things to look at the lag that you've got on different topics as they're being consumed by kafka connect there's also jmx metrics that are exposed so you can get a bunch of low-level information here you can do things like we're using a dead letter q we're not expecting messages on the dead letter q so let's look at the jmx counter for dead letter q and if we see the rate going above a certain threshold we can start to sound sirens and page people and say well maybe there's something broken with our pipeline so there's different monitoring available with kafka connect so hopefully you found that useful hopefully you found it informative kafka connect is part of apache kafka it lets you do streaming integration between source systems and target systems you go to confluent hub to download the connector plug-ins and converters you get connector plug-ins from the community from vendors from all sorts of places it lets you integrate with all of these technologies that you want to stream into kafka and out of kafka to learn more about kafka and kafka connect and kafka streams and key sql db and all sorts of stuff in the kafka ecosystem head over to developer.confluence.io and if you have any questions and want to join in the community head over to our slack community group there's thousands of people there it's a very warm and welcoming place and i hope to see you over there soon thank you very much for your time
Info
Channel: Robin Moffatt
Views: 8,163
Rating: undefined out of 5
Keywords: apache kafka, kafka connect, data integration, kafka, data streaming, mysql, elasticsearch
Id: dXXfkoXXBbs
Channel Id: undefined
Length: 33min 49sec (2029 seconds)
Published: Fri Oct 23 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.