Kafka Connect in Action: Elasticsearch

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hi and welcome to another of my youtube videos today I'm going to show you how can use Kafka connect to stream data from Kafka to elasticsearch so we're going to use Kafka connects which is a framework it's part of Apache Kafka if you'll to know more about it check out my video from zero to hero with Kafka connect the recording from Kafka sonnet online I'll link to it in the show notes here and it should be on screen right now and whilst you're at it make sure you subscribe to my channel and check out the other videos I've got about using cough could connect to get data from cough current of database cough go to s3 and so on but today we're going to use cough could connect to get data from Kafka into elasticsearch so let's start off simple we're going to put some data into a Kafka topic all the instructions I'm going to show you here there on github so I'll put the URL in the show notes as well it's all on the the demo scene confluence repository you can go through it you can try the demo for yourself you can see step-by-step exactly what I'm doing so what we've got is docker compose I'm going to make sure that the stack is running so I could compose yes and we can see that everything's running there okay that's good now what we're going to do is we're going to double check that KC called eBay is up because we need KC called eBay it's part of console and platform its source available it's consular community licensed and we're just going to use this as an interface for easily getting data in and out of a casket topic and also managing Kafka connect if you'd rather not use it you don't have to you can use calculate can use calculates directly they've got their own API is and you can configure it using the REST API with Kafka Connect you could use come from control center if you wanted to however you do it the base API is the same it's a part of Apache Kafka you can just configure it in different ways as I say how many is Casey called eBay just cuz it's kind of much easier so let me show you this I'm gonna copy the command into here it starts with case equal DB and whilst that's running we're gonna create ourselves a topic and we create ourselves a topic in case equal DB by creating a stream so this isn't a talk about case equal DB if you'd like to learn more about case equal DB check out this video which I'll link to the screen now that goes all in ii i what key sequel DB is at this stage with just using it it's a kind of a layer for creating and managing topics and put you some data string to it so i'm gonna create a stream in case equal DB which is basically just a Kafka topic so this one here test zero one which is going to schema it's got some fields in it so this stream Kafka so this stream called tests era one it's going to create ourselves a topic it's got one partition and if we say sure streams we can see we've got this stream here and I can describe it because it's got a schema actually describe this and you'll notice we've got a schema here of two different columns an integer and a varchar we've also got the key because Kafka messages are key value pairs and it's also got a time it's got metadata within us as well so whilst it we've already created a schema of two fields there's actually four available to us you've got the time stamp which is part of the metadata you've got the key which is part of how Kafka stores data its keys and its values and the value has got a schema within it of those two separate columns color one and Col - let's put some data into us we could use a producer API we could use the console producer I'm gonna use KC called DB because it's quicker insert into here so we're gonna insert values for the real key and the two fields and we're gonna insert another one now okay so we've created a topic with that create stream command up here and we've populated the topic using this insert into you don't quite believe me check this out sure topics we've got a topic called test zero one I can say print test zero one from beginning and it dumps out the contents of that particular topic so this is just a consumer against the topic okay so you can see here you about the real time which is that metadata timestamp I was talking about you've got the key here and then you've got the value so got a Kafka topic with some data in it I've populated it using KC called DB just because I find it easier you can populate your Kafka topic however you would like now let's get that data from Kafka into elastic search so to do that we're going to use Kafka connect and we're going to create ourselves a connector like this so let me clear my screen and paste this in here so we're going to create a connector it's going to use the elastic so sync connector here's where our elastic searches and then here's a couple of other bits that we're going to discuss as we go through this tutorial so we've specified the topic there and we've created a connector now here I'm using case equal DB to control Cal coconuts but in the background this is actually just making a rest cold down to casket connects itself we've got a casket connect work I'm running I could just as easily run a rest call myself directly against it it's just easier in case you call DB enough okay see called DB already you get the idea you can use it you don't have to use it it's Cal could connect under the covers that were interfacing with show connectors we can see what connector here and it's running so that means we can head over to our last exert and actually check out the data so obviously elasticsearch REST API at this point we'll get on to cabana later and with that REST API we're just going to go to elastic search and we're going to say show me what's in this particular index this is okay you've got two documents in your index here's the actual payload that you sent through and then we've got some other stuff in here as well so if you know it must exert you all know that documents which are what we've created here we've created two documents documents have got an ID so the ID that's come through here is not kind of like what would necessarily expect whatever you want the ID that we've set here is actually based on the properties of the kafka message so it's made up of the topic the topic partition and the offset it's a unique identifier for that message from that Kafka topic so you can see here got tests area one is our topic and it comes from partition zero there's offset one and there's offset zero so this is an important thing to understand about the connector because we can use it in different ways if all you want to do is take data from a Kafka topic and just append it continually gymnastic search index then leaving the key like this is probably fun and the way it's set is this setting up here this key ignore is true so what this means is ignore the key of the casco message and just create based on this topic partition offsets triplets we don't always want to do that let me show you why let's go back to our topic and that's got some more data into it we're going to create ourselves another message okay so it's got a new key so you got Rocio said that XY and now we've got said now we're going to create another message which has got a new value for the existing key so we inserted Y before we can see we've got Y there I'm inserted and what we can do is we can actually select from this so what I'm gonna do is I'm going to say set the offset back to the earliest so with case equal DB you can query from the beginning of a topic or the end of a topic so we're gonna say query from the beginning the topic or beginning of a stream rather we can say select the real key and column one and column two from test Sarah one and it changes just show me the first five messages missus here are the first four messages in your stream so here the first T that we put in x and y there were some of the values that we sent through there was the new one that we sent through and then here's the other one that we put through which has got the same key as that second one that we put in but it's got a different value okay so Carl one is four instead of two and column two is instead of if we go over to lastik search from now and say what's in the index it's just say okay well we've got four different documents because it sees these as four unique messages okay so here's our messages up here and Kafka and in Kafka was saying well the key they come like the unique identifier we decided to use as the key for the message here we've actually got three unique values and this one here the fourth one that we put in is actually a new value for an existing key so if you want to get your data into elasticsearch in this way where you can update existing records you can even do deletes against existing records then you need to tell the connector to use the key of the caf-co message for the ID on the our six outside so let's see how to do that first off I'm just going to get rid of the connector so we're going to say let's drop the connector so the connector or connectors here's the connector name-drop connector again this is just a rest cold so Catholic connected the background and then in elasticsearch we're going to delete the actual index because if we don't delete the index then it just got confusing so let's delete the index because we're about to recreate it delete index there okay that's gone and now we're gonna recreate the connector I've got there cheat sheet down here on a separate screen as I say all of these instructions are on github so you can try it out for yourself I mean can you sequel we're gonna recreate the connector I'm gonna change this setting here key ignore is now not true it's false don't ignore the key use the key of the kafka message when you set the ID of the target elasticsearch document so we create that we also create it with a new name so previously it was 0-1 this is 0-2 the reason for this is that any kafka connect sync connector is going to use the name for the connector as the consumer group and Kafka's really clever it lets you kind of have consumers which join and leave consumer groups so if we left it with the same name and recreated it it would think well this is just the same connector as before and it's already processed those four messages so it's not going to process them again whereas we don't say create a new connector with different set of settings and see how it's going to manage these four different messages that we've got on the kafka topic so let's remind ourselves what the topic looks like princess Irulan from beginning and prints from beginning there we go so here are four messages on the source topic we've got the connector and I can say show connectors and see if it's running and it is running and now we'll go down to elastic search we say let's have a look in this index and in the index we've got three documents three documents not follow because now the ID matches the key of the kafka message so here's the key in the Catherine message X Y Zed and then a new value and the same key why so down here you can see our X Y Zed and these the why key here has got the new value so column one is far come to as I can see here that matches the new message that came through on that sauce Kafka topic so that's what the key ignore setting does it says do you want to ignore the key of the Casca message and just use topic plus partition plus offset is a unique identifier all you want to use the key of the Kafka message which means that when you get a logical update on the Kafka cider an update or a delete you can model us on the r6 outside so we can write updates or overt elasticsearch that are being made in the sales topic but what about deletes we've seen how we can set up the keys to be handled correctly so that we can identify a target documents of nasty search to make an update to well we can also use those keys to identify a target document to delete so to delete a message in Kafka or to mark a message is logically deleted we use a tombstone in a tombstone is where you've got a key and then a null value so to do that I'm going to write when using Kafka cut so here's Kafka cut and we're gonna write so we're going to produce a message to our test topic just called test Sara one we're going to use - set which means send a null across as a tombstone and then Dutch case specifies the key separator so here's our producer we're gonna specify a key for documents or key Y and then a colon was the separator and then we hit enter and that's going to produce a null record so let's come out of that let's just go back into into case equal DB and just check what that's topic looks like now so you can say print test Cyril on from beginning and you can see now we've got here's the values as they roam afar and here's the actual tombstone message if we set this message as compacted that would actually when the compaction runs delete out the values for that key it's not as just a bog-standard topic so in this instance probably we'd be populating it from a source upstream maybe a database maybe we've got a database source connector that if you delete a record in the database writes a logical deletion a tombstone message into the Kafka topic and that gets sister down to elasticsearch so let's see that if we head over to elasticsearch and we were on our REST API column again here we can see that it's not debt which is a shame so if we go back into casing for DB insertion of connectors the character is running but it's not doing today which is indeed a shame so perhaps we need to make a configuration change so we're going to go and look up the documentation follow the elasticsearch sync connector when stuffs not working is you think it should do you go to documentation so in the documentation we can see this so we're on the Kafka connect elasticsearch documentation it says here behave your unknown values how to handle records with a none null key and value so dreams the records so by default it's just going to ignore it which is what it's done but actually we're gonna say it set it to do it we could set it to fail which means just like fall over but we don't to do that and want it delete it so let's head over here and we're going to drop our connector so say drop connector here that one and then over in elasticsearch delete that's index again okay so if deleted the index we've deleted the connector now let's create ourselves a new connector so create connector sync connector here so I'm using controller higher to get it I am my typing history there so it's useful way to ring stuff that you've typed before so give it a new name so that it processes the topic of fresh and then down here we're going to copy in this particular configuration we're going to say the behavior on lower values we're gonna set that to delete ok krei connector sure connectors okay says it's running let's head down to elasticsearch so I'm gonna gamble here I've actually tried this out what we should do is we should see f2 let's actually check our topic and see princess from beginning so how many different keys we've got we've got X Y Zed okay so we've got three different keys with varying changes of states and what we're saying is what I reckon is going to happen is we're gonna end up with two because the latest message is an old is a two so four key Y so when we go to elasticsearch I'm hoping we'll see two documents one with key x one with keys ed let's see if this thing actually works don't delete it we want to query it and there we go it works so two documents X and Z and we've deleted our target document based on the key and we'd have to make that change the configuration let's just remind ourselves what that looked like we added on this which might default again also to ignores the tombstones and we're saying actually if you get a tombstone if you get a null value delete the target document okay let's now talk about schemas so when we put data into elasticsearch we can tell elasticsearch what the mapping is for the day to our putting into it we say this field is this type this field is that type or we can just say to elasticsearch here are the values you figure it out now mastic search has got this clever thing called the dynamic field mapping what it figures out what it thinks the data types are which is kind of useful a lot of the time if we don't want it to guess we can also use dynamic templates where we actually explain to it if it matches this condition then set it to this type so the way that we tell CAF could connect how to interact with this behavior is with this setting here the schema ignore if we say schema ignore is false then it means that we're going to take the schema from our Kafka message I wouldn't know we're going to not ignore the schema that's kind like the schema ignore bit don't ignore the schema send the schema across to elasticsearch and use that to define them up in if we say we don't want to use the schema from the Kafka message or if the Kafka message doesn't have a schema then we say schema ignore equals true and then the connector we'll simply take the value and send it over to elasticsearch so let's see how that works so if we're using a half room we have a schema okay we can choose we'd like to use that schemer and send it to elasticsearch or we'd like to not and just let elasticsearch guess or we're going to use dynamic templates to kind of define it how we want to mastic search the handle those their value types if we're using JSON or CSV or some other format that doesn't have an explicit schema then we need to make sure that we don't tell the connector to try and use a schema to send the cross because there isn't one so I want to walk through now some of the common errors that people see when they get these things can I miss a line because there are a few different permutations so to start off with I'm going to create myself a Kafka topic I'm gonna put some JSON data into it so let's create a case equal DB stream it's backed by a Kafka topic called test JSON I'm a storing the data in JSON so there's no schema to this data that's create ads there we go and with mr. : oh yeah coach Arthur okay so we've created the stream now let's put some data into it so I put that in that's not work because we're supplying too much data that works and that's another one in as well make sure these enough data there okay so we've put those in now we can say print test JSON from beginning we can see what's in the topic and it says here's the topic and it's been starred as JSON it kind of doesn't what the key is because it's an ulcer kind of I guess is it might be anything who knows what and there's our payload and we know it's in JSON so now let's I get a bit more screen space here and do that for that now let's go and create ourselves a connector I'm going to use some different settings here so I'm also going to do is I'm gonna bring up here the Kafka connect log so uh Kirk compose logs there we go casket Connect so this is the Kafka connect worker and if stuff doesn't work we'll get to see it at the bottom of our screen here so first up I'm gonna create a connector I'm gonna say we're going to read from this JSON topic here I'm going to say schema ignore equals false so if you're following along closely we're probably not going to expect this to work because JSON data does not have schema and if we're saying don't ignore the schema I do use the schema this isn't gonna work so well so we're gonna create that I said we've created it but if we say sure connectors we're going to go to the REST API in the background it's going to say there's a warning and as a warning because none of the tasks are running ie it's all broken let's have a look at the errors in the casket Connect were clogged so we're going to page up a little bit here and the first one that we're going to see is it'll say there was an error and so we're stopping and then we'll go up a little bit further and it'll say okay we've got some errors we got to this one here fail to deserialize the data for the topic to Avro and an unknown magic bias and this is everyone's favorite era when it comes to Catholic context what it means is we've inadvertently told Kafka connect to use the alpha Rho D serializer for the data but our data's not enough room so Kafka Connect says well I couldn't deserialize it and you've told me to deserialize it to Avro and I'm trying to do that when you deserialize data using the Avro D serializer it looks for this magic byte at the beginning of the message and that's how it works and this is well it's an unknown magic right because it's just JSON and that doesn't have this this necessary magic right so what we can do here is if we'd page up or search back up through the Catholic connect work of log what we should see here somewhere is the value converter is as the value converter here we go it's set to Avro converter so whilst we've not specified it in our configuration for the connector so let's cancel that go back up to here emit this they go and page up there you can see here's our configuration for the connector we said create the connector and we specified the key converter we override that to a string we didn't specify the value converter so it inherits whatever set in the calf to connect worker our Kafka connect worker is set to use the Avro convertor by default and we've not all overridden but in the configuration for this connector so the first thing we need to do is to say for this particular sink connector here the topic we're reading from is got data that's C realized into JSON and we know we just saw it when we created it so the first thing we need to do is to see to the connector we're going to override the global default which is in the Cascade connect worker and we're going to use the JSON convertor instead so I hope you followed that because I only just about them so the first thing we're going to do is we're going to drop this connector here because we need to recreate it so drop connector and now what we can do is we're going to create it again but we're going to specify the JSON converter okay so here's our JSON converter the default value here is that we can have schemas enable so this is the next little wrinkle along the way and if I've not specified this here it would default to that so I'm just putting it in explicitly to point out that this is what's going to happen go create that let's cancel they output here on the connect wire currency what's happened so if we say here sure connectors is it working it's not working oh no let's go down to the connect worker log and see what's actually happening so again it says it's stopping again it says there's a problem what was the problem there was an error and now we've got a different error okay we've got a day two exception it's another exception to do the converter JSON converter with schemas and evil requires schema and payload fields so this is with the support that Kafka connect house for JSON with an embedded schema so not JSON schema which is like new in come from platform 5.5 well this is the support that's been in Kafka connect for a long time where you could take a JSON message and embed within it with the schema and the payload and if you do it like this you say to Kafka connects with the converter schemas enable is true so when you deserialize it it expects to find both of those in the value part of the message but in the value part of our message let's remind ourselves so if we're gonna print the topic again we can say print it's test JSON from beginning and all we've got in there is this payload it's just the value we don't have the schema and the payload fields that the converter is expecting so what we need to do is we need to recreate the character back up to here cancel that and we're going to drop the connector again we're gonna drop a different one this one here okay and now we're gonna recreate it and we're gonna change that setting and now we're gonna say so using the JSON converter because we got JSON data I'm gonna say schemas enable is false so I say when you use the JSON converter you take the message off the Kafka topic and you deserialize it into JSON schema is enable equals false which means the converter will not look for this payload and schema fields within the body so schemas enables equals false okay and we set that to run we see if this is gonna work any better and you can see at the bottom there it hasn't and it looks like the task has failed so let's page up and we can see it's stopping and what's the problem now we keep on going up here now we can see that it says here okay work a task due to unworkable exception oh no and then down here we've got the actual root cause these for exception cannot infer a mapping without a schema now what's happening we can't like getting along the way here so what's happening here here is we're saying we've got data that's in JSON so we're going to read it from the topic and we're going to deserialize as JSON we don't have an embedded schema so we say the scheme is enable equals false don't look for that payload and schema fields within the body of the message but we've told the connector don't ignore the schema ie when you write the data to elasticsearch make sure you take the schema if the document of the message that's coming through and create the mappings on the target using the schema but the problem is we don't have a schema we've said there is no schema in this JSON message so then that's what this message means we can't infer a mapping without a schema so what do we do we go and recreate the connector so let's create the arts let's drop that and all we're going to do now is have a schema ignore equals true this is the only thing that we can do because we don't have a schema because we've used JSON with our vetted schema so schema ignore equals true take the data off the topic deserialize it as JSON there is no schemer within it so take that payload send it to elastic search and that elastic search figure out what the mappings are so now if we say sure connectors we can see it's not working because why wouldn't it and now we can see what else is happening the key is used as the document ID and cannot be null okay we weren't expecting that one so let's drop it again so drop connector and let's recreate the connector and this one this is what I explained earlier about the keys so we said the key ignore equals false take the key from the message the kafka message and use that as the document ID and this error message here is really good it says the key is used as a document ID and it can't be not and when we inserted those messages earlier let's go and have a look at the print output up here we can see the key of those messages is no so you can't say use the key of the kafka message and set that as the document ID because it's not that doesn't make sense so now when you say key ignore equals true and finally finally find me I think this is hopefully going to work so sure connectors and it is running which is great and now what I can do here if you can say go and have a look at that data that's in the topic and it's going to be an index called test JSON it forces it to lowercase and there's our data so you can see it's taken the data it's sent it across column 1 column 2 we can see the ID it's taken the topic name and the partition and the offset because it's not easing the key of the Casca message because we said kids key ignore is true because the key is null in the sauce color message if you look at the mappings to test JSON look at the mappings make that a bit bigger you can see elasticsearch has figured out while column 1 it looks like it's a long value column - it's a text and we'll make sure that you can kind of analyze it and do all that kind of funky stuff so that is how you work with schemas in the elasticsearch sync connector schemas ignore equals true means you can do all the schema in the sauce message if there is one if there isn't one thing you must set schema signal equals true because we don't have one that we can use send the values alone across to elasticsearch let elasticsearch use dynamic field mapping to figure out what types used for the mapping you could use dynamic templates as well to override that and can like tell it if it looks like this then set it as that type if we do have a schema in the Kafka message if we're using alpha or using proto buffers and JSON schemas you can then say schema or ignore is false take the schema from the casket message and send it across and use that for the mapping in the index that we create okay so let's talk about timestamps when we've got data in our Kafka topics then we often have a timestamp element to it they want to make sure ends up in elasticsearch mapped correctly as a timestamp field we might want to make sure that when we're plotting the data out in Cabana it gets analyzed correctly with a timestamp across the horizontal axis or however we want to use the data we need to make sure that timestamp field from our Kafka data ends up recognized as such in your mastic search so I'm going to create myself a topic in Kafka using KCl DB and this topic itself it's going to have some fields within it it's going to have like just a dummy data here and then we're going to store a couple of different timestamps in different formats in our Kafka message just to show the kind of ways in which people store data in Kafka messages now I can handle those when we send them to elasticsearch so in one example we're going to start a timestamp maybe it's like when and I'll do is place or something and we're going to store that as an epoch so use the big int datatype and KC call DB I'm going to start another timestamp maybe when something was shipped I'm going to store it as a string so it's as a varchar field so it's going to get written to a topic called test zero two I'm going to sear on ice at all as Avro we've created that stream the topic underneath it so let's put some data into it just using insert into I could use any producer it's just a casket topic so insert into here so we're going to put a string timestamp in here and then we can use a key sequel DB function to work out given this string theis that timestamp what is that as actual epoch so big'uns so we insert that and we insert this so I've got a couple of rows of data I could say print - Sarah - from beginning and it will show me the data that's in it so I can see that it's Avro and we've got an epoch value here we've got a string value here so first off let's send it over into elasticsearch without cannot really thinking too much about what we're doing or create ourselves a sync connector I'm going to say take the data from this topic here use the message key from the Kafka message as the document ID in elasticsearch and don't ignore the schema so take the schema from the Kathleen message and use that when we create the mapping in elasticsearch so we go and do that let's just check that's running so show connectors and we can see it's running so if we now go and have a look at our mapping so for test Sarah - sorry our data first often elasticsearch we can see there's our key and then here's our payload so we've got the e Park and we got this and they've come across as you would expect as literal values and we can also have a look at the mapping so this one here some tests Sarah to index they go so we can see now it is done it says well that's an integer this is a 1 which maps to the big end and then the string has come across as a text because it's taken the schema that we declared in key sequel D be when we see realized it - Avro has taken that and it's use that to create the mapping in elastic search which means we don't have a timestamp field which means if we go over to cabana for example and try and create ourselves an index person against that particular index we say here's our elastic search and that's how that's created an index pass and it'll say this I'll say there aren't any time fields so we could look at the data and it's gonna have a look at the data you'll say well we can create an index person and I'll do that but what we won't be able to do is actually look at it in a time-based manner so we'll just say well here's the data it looks like that's a number that's a string but it doesn't understand it is a date and we want to get to the point where we can actually take data from the calf Qatar peg stream it into an elastic search and actually use the dates as a proper timestamp for analyzing the data so what I'm going to do is we're going to delete out that connector that we created in KC LDB so we drop the connector and then we're going to go and delete the particular index from elasticsearch because we're going to repopulate it now using a different way so let's drop that from there and instead of saying use the schema from my document offer my Kafka message we're going to say to the connector send the data to elasticsearch let elasticsearch figure out what it thinks the data types are so we're gonna say schemer ignore is true so ignore the fact that there's a schemer in the sauce message or if there isn't the schema in the sauce message that's coming out what we have to set it to we're using opera so there is a schema but we say well ignore the schema send the data to elasticsearch elasticsearch use dynamic field mapping to figure out what it thinks the data types are so we create that we're using a different connector name because otherwise kafka connector would think well this is the same as before and I've already sent those two messages so we're not going to send any we're not going to send them again so we give it the new connector name which it sends over those as messages from the topic again and now we go over to elasticsearch and we say well have I got the data and it says yep you've got the data it looks exactly the same as before because it is the same as before but if we can have a look at the mapping you can see what it's done with us so said well this column one it's a integer we map it is along this epoch here it's a big entry marketers along this string here the value in that field elastic says elasticsearch said well that kind of looks like a date so let's map it as a date and if we go over to cabana and go down to index mappings sorry index patterns if you take this test zero two and delete that so we can recreate it and see what it thinks about the timestamp fields that are present so let's go and create ourselves an index pattern the test zero two and do that and it says okay you got a time field so it's saying the the string base time field because it looks like a time stamp when we sent it to elasticsearch elastic shirt said well I reckon that's a Time fields and we'll map it as that which means we can now use it for things like your banner or kind like time-based analytics that we're doing an elastic search it will create that index pattern we're going poke around the data using the discover view it means that we'll actually plot it out using time and you can actually summarize it up and look at it over time because this field here is an actual timestamp so this one here we can see it's a timestamp value the epoch one it's still just a long time you're Biggins so this works great if we're starting our values as strings in casco topics and that string matches the format that elasticsearch uses for kind like guessing there are a chemist of the date field but what happens if we want to use that epoch field this order timestamp as our timestamp field for analysis or what happens if the ship TS string format that we're using doesn't match the format that elasticsearch expects it to be in if it's a different type of date formatting that we're using the elasticsearch doesn't recognize automatically to be able to do this thing for us so let's go through a couple of other options we're going to drop out the connector again and recreate it so here drop the connector down there drop the index and now we're actually going to explicitly tell elasticsearch this field is a timestamp so we're going to create it like this and we're going to use what's called a single Bish's transform so single message transforms that past the casket Connect framework and they let us modify messages as they pass through the connect pipeline so in the case of a sync connector we need a message from Kafka it gets decentralized then we optionally apply single message transforms to it one or more and then it gets sent down from the sync connector down to the target so here we're creating a connector it's going against the test topic we're saying don't ignore the schema ie the connector we're going to tell elasticsearch what the mapping types are and then here we're applying a transform the transform is the timestamp converter and we're saying this field here is a timestamp so we're gonna create that we go down to elasticsearch and say tell me show me about the data yeah we've got the data let's go look at the mappings and now this time the mappings look a bit different column one to ensure we know that the shipment timestamp is a string and it's come across as a text because we said don't ignore the schemas schema ignore is false so the connectors going to tell elasticsearch this field is it's a varchar on our side so it's a text field on your side but this one here the epoch field we said well we want to set that as a timestamp so Khafre connector said to elasticsearch this field is a timestamp so has mapped it correctly as a date field and if we go over to cabana and check this out so now we say test zero to use that elastic search index next step so now it says the time field filled time field that's available to us is the order timestamp which we were storing as an epoch so you can store your timestamps in different ways and we can figure calf could connect sync connector to send them across according to the data types that we want to use there's another way we can do it so depending on how you're storing your data and how you want to handle your data mappings you can choose these different ways of doing it so I want to show you all the different permutations that we've got so we've said we can either let elasticsearch just look at the fields and guess and hope that it figures out that this string here is a timestamp and use that correctly we can force it and tell it this big end here this long field is actually an epoch so interpret it as such and use that as your timestamp what we can also do is we can say we're going to use a dynamic template so that when the fields come through from Kafka connect into elasticsearch it says if it matches this pattern then map it as a date so to do this we're going to create ourselves a dynamic template in elasticsearch first so we've dropped the connector we've dropped the index this doesn't work I don't think if you've got if the index already exists I think this is when it's creating the index although you could test it out for yourselves and see but we're going to create ourselves a template here the temple name doesn't matter I'm gonna say map we've got a dynamic template here so match anything that's got underscore TS underscore in the field name and map it it's a date I guess that's kind of dangerous if you've got lots of different fields and you don't try and force everything into a date we could kind of let's let's list out the literals if you wanted to but we use that for the moment because it works and now we're going to create ourselves the connector yet again again using a different names that we reprocess the data that's in the topic already and this feels important because we're saying schemer ignore is true we're saying ignore the schema that we have on the kafka side or that we maybe don't have if we're using JSON let elasticsearch guess I'll figure out the field mapping types itself and because we've created this template here this dynamic template it's going to use that to inform it when it creates those field mappings so what we're gonna create that we're gonna check that we've got the data there it says yep we've got the data there and now we're gonna have a look at the mappings over here this is okay this looks a little bit different this time so within this index we've got the dynamic template which we defined and then these two fields here are birth date the reason they're both date is they both have this underscore TS underscore in there which matches the pattern that we defined here and it elasticsearch said okay it matches that so these fields therefore become a date type which means when we go to the back to cabana let's do this and refresh we can use either of them because perhaps you want to analyze our data based on different time fields so if it's a time feel do you want to be able to map it as such and then when it comes to analyzing it and using that data in elasticsearch then you choose which field to use the last thing i want to show you when it comes to time stamps is how you can actually use the timestamp of the kafka message itself in elasticsearch because the payload that we've been working with so far if we just remind ourselves so described test server - we've got various different fields we've got the actual schema the main schema that we defined it's got column of whatever we've got two different timestamp fields these the ones that we've been working with back and forth and casting them and so on but within a Kaffir message you also have a key and we've talked already about how you manage keys and document IDs and so on you also have this metadata which sits within every kafka message of a timestamp the timestamp of a kafka message it can be set explicitly by the producer and the producer can kind of say the timestamp of this message is whatever or you can configure the brokers such that the message arrives on the broker and gets given the timestamp of when it arrived on the broker like a processing time stamp but that timestamp is useful and it's embedded within the Kafka message in fact sometimes you don't even need a timestamp as part of your value payload because you actually have the same timestamp in effect as part of the metadata anyway you've got metadata in every category search for the timestamp sometimes you want to get that metadata and expose it out into your target system like elasticsearch so let's do that let's go and create ourselves a new connector let's drop the existing one like so let's drop this existing one down here I'm also going to get rid of my dynamic template just that we're starting from scratch so I'm going to use this one here and delete that one okay so can like back to square one with this and now we're gonna say it let's create ourselves another new connector all of this code by the way is on gets hard on kind of copying it from a different screen which is why I'm looking down but it means that you can go and look at it and try it all out for yourselves now we're taking the data from the same topic as before we're saying don't ignore the schema use the schema that we give you from Kafka Connect and we're saying extract the timestamp so it's a single message transform extract timestamp is simply the label we could call it through if we wanted to the type of this transform is insert field we're going to insert a field into the value part of the message and what we're going to insert is the time stuff so the time this time stamp it here is a special part of that operator you can also insert things like the topic name or just some literal text if you want to we're gonna insert the time stamp we're going to insert it into Field called message time stone so we set that running and now if we go over to here and say let's have a look at the data we've actually got different data we've got no data because something's going wrong well sure connectors okay this is it running why is that not working it's not working because I think I've reused a connector name because we created that one is D so this one should definitely be e or something different so because what happened was let me show you here's my history so up here we created that sync connector and this is when we were using the dynamic templates and then we said drop it okay we dropped it and got rid of it and then we've just created it again so we've reused that connector name and because we've reused it kafka code says well it's the same connector and it does the sensible thing which says well I've already processed this data for this particular connector I'm not going to reprocess it because you'll end up with duplicates a new target system maybe so what we need to do is we need to drop the connector I'll do that like this drop connector and we'll simply say create the connector again but use a different suffix okay let's see if that's worked so we've created the character show connectors and this time this is it's running and this time if we're gonna have a look at elasticsearch we've got some data and they've got some new data so we've got the key as before is set to the key of the massive if the casket message no column when we got the order timestamp got the ship timestamp we've also got the message timestamp okay so that message time some pets come across as an epoch value and if we're gonna have a look at the mappings for our particular topic we can see here that the message timestamp is chrome across is a date because that date field has come across as a timestamp type from a single mrs. transform so now if we head over into cabana and refresh that's the field names are available we've now got the message timestamp we don't have the other two because I got rid of the changes I made in the previous step with the dynamic templates and so on so it's gone back to the beginning which is taking the literal data types from the Kafka message which is big into a long and a varchar which it becomes a text field that the message time stat we extract it out into a timestamp data type so that got pushed across because we said they the schemas ignore equals false setting so use the schema from the Kafka side when we push it over to elasticsearch so now we can use the timestamp of the particular Kafka message I'm going to create the index pattern so always bear in mind your data has got different timestamps you've got the timestamp when it was created or updated or when it landed in Kafka or when we process this whatever so if we set change this here and have a look at the data overall time you'll see the timestamp associated with these particular values is from the 1st of May which is where I'm recording this if I go and have a look at the timestamp within the data like the ship timestamp is from February that epoch if we resolved it out I think is February as well but I've got different timestamps in the data and we can use the calf could connect sync connector to identify those different time stamps as we want to and set the data type correctly when it lands in elasticsearch so when we send documents through from Kafka topic down to elasticsearch the connector will take the topic name and use it as the basis for the index that it creates in elasticsearch if it's an uppercase name or faucet down to lowercase because that's what's required in elasticsearch but we can see here we've taken a topic called test2 and it's pushed it down through into elasticsearch you can miss out the indices and it shows we've got a test to index in elasticsearch but what if we want to change that target index name well this is where single message transforms come in again we can use the single message transform that modifies the topic name because the topic name is used by the connector to define the target index name so let's take this example here and instead of test 0 2 we're going to replace the test part of the name with a different piece so let's move that down here where the screen and here we're gonna say we've got the same topic test 0 2 what we're going to use a single message transform called the regex Rooter so this lets you specify a regular expression so here's our regular expression and what the regular expression means is take anything followed by 0 2 and that anything is captured that's what they do the brackets are there a capture group so test 0 2 is going to match that test bit within the brackets and then the 0 2 is the own capice outside of the brackets so we capture here the the test string and then in the replacement part of the singer mr. transform we say use this literal here and then dollar 1 for the first capture group which is this sounds more complex than it actually is we run that and we check it's running we don't do that I'm sorry it's sure connectors and it says it's running and a good answer elasticsearch we use the API to look at the look at the indices we can see you've got the existing test Sarah - that was there already and now we've got one called foo - test so foo is the hard-coded literal here and the - and then dollar one is the part of the capture group we've specified in our regular expression so if you want to do things like change it in firing a name or drop a prefix or add in stuff like that you can do that using the regex fruto another thing that we want to do with indices is sometimes we want to do time stamp based partitioning so we want to have the same base index name but our pens maybe the year and month or year month and day to help with how we organize the data so we can also do that and for this we're going to use a different single message transform this one's called a time stamp greater so all of these you can find over from the documentation so here you've got the time stamp reader there's the regex ratio that we use so all sorts of different transformations that you can use when you're working with Kafka Connect I remember single missus transforms their just power to the Kafka connect framework so if you're using a properly written connector it'll support single message transforms just plug it into it so the same way we plug in a converter to say we reduce alpha or JSON we plug that together with a connector you can also plug in these particular singer message transforms and use them with any Catholic connect pipeline that you're building so let's head over here and we're going to create ourselves another connector so this one is going to use the timestamp route oh and this is gonna say we'll take the timestamp of the Kafka message so the Kafka message timestamp that piece of metadata that we talked about before and we're going to append it to the topic name when we create signature transforms the transform just has a label which has the configuration associated with it so you can use something which is coming out useful like this but it's just a label it doesn't actually matter what it's called so that's the value there that which we then repeat in the configuration and we say well it's got a type which is the timestamp Rita and then different transforms it got different types of configuration so this one needs a topic dot format and here we say well take the topic name and add on the timestamp how do we want to format the timestamp well that's another piece of configuration for the connect for the transformation so here's that configuration and we say well we'd like to add on the timestamp in this particular format set that running check that it's working and now down here and elasticsearch have a look at the indices and now we've got the one that we created before another one we created before and now we've got this one here so take the topic name and append to it the date off the Kafka message so he sends it through on the first of May so that gets written in there if we add in new messages onto that Kafka topic on the 2nd of May will get a new target index and elasticsearch but it's all coming from the same source and we can combine these transformations as well so we can say well I'd like to modify the name so instead of test 0 - I want to call it foo - test how to also onto our pens the timestamp to it as well so it looks like this we say let's take the index name transformation and we'll use the timestamp transformation so first we change it the using the regex as we saw before so take the test part of the topic name and append that to this hard-coded literal here the foo and then what so append that to the foo and then we've got a timestamp road to saying take the topic name and at this point in the transformation chain the topic name is now this foo - test and then append to that the timestamp in this different format and set that running and say short connectors and check it's working and then we're down here we're going to elastic search using the API and say what indices have we got and we've got this one here a foo - test - time stamp so you can modify the target index names in elastic search using the single message transforms like the regex router like the timestamp Rita so let's finish up with this tour of the elasticsearch sync connector I talked about when things go wrong how do you handle messages that are coming through from a topic that maybe a badly formed or don't fit the format that's been accepted by the same connector so I'm gonna dive down into the console now leaving KC called DB behind we're just going to create ourselves some data using Kafka cows Kafka Katz my favorite Kafka tools you can use it as a producer as a consumer there's a troubleshooter it's I've got a Swiss Army knife of Awesomeness for Kafka and so here we're going to use it as a producer send the data to this topic called test zero three and I'm also going to run a second one of these which is going to be here and it's going to be a consumer so we can see everything that we send to the topic also being echoed out to the screen so there's my consumer here's my producer up here we're going to produce ourselves a message on to the topic it's just gonna be a very very simple lump of JSON and it's got a key on it here so Kafka can't lets you send keys so - k-means there's a key curl on is the separator that we're using so one is my key where's my separator and then here's my value so send that across and we can see in the producer down here there's my key here's my value you can also see the topic partition offset just because that's kind of useful know let's create ourselves a very simple source sink connector using this data so we'll shove that way on this one as well and create ourselves a bit more space and now we're just gonna use the rest api of Kafka connects directly so I showed you before all that stuff I was using case equal DB that was simply a wrapper on top off-the-cuff connect REST API it's all Kafka connect under the covers it's just that now I'm going to use curl to call that REST API directly nasty search sink connector here's my topic and we run that and that's gonna go on creating the connector if I go on have a look at the data that sat on the topic test 0 3 we can see that data's come through and that's kind of as it should do what happens though if I go and write a message on to the topic that's badly formed so at the moment we're using the JSON converter so if I go home and put something on the topic like this and I can I can forget to do something and I kind of said that so now we've got some data from the topic that's badly form JSON that's not good use and if we go down here and we can have a look at the status of the connector and if we can see here that it's failed so I'm using the REST API again with a bit of bash magic around it to farm attic nicely we've got a sink connector the connector itself the logically logical entity is running but the task within it was failed let's find out what its failed gonna have a look at the lokes the logs are always where we're going to go there's also a REST API for finding out the status directly but I kind of just have the habit of looking at the logs so in the logs we can see that the things stopping the task is aborted and if we page up and page up some more we can see this here so I've got a day to exception and error converting bytes to Kafka connect data fail to generalization error blah blah and here so the JSON parser that Kafka connect users it was expecting to a double quote to start the field name and if we look at this here so that's the thing that we sent across and saying here the unexpected character is kay and he was actually expecting a double quote start the field name which is what valid Jason would do it's valid JSON you've got a quote start in the field name field names he here's the JSON I sent across and it's just like a lump of gibberish like that so it's bad JSON and it's crashed the connector the connector said well I can't read that so I'm going to stop Kafka Connect has got different error handling characteristics depending on where something girls running the pipeline so here the problem lies in the D serialization it's at the converter stage and converters if we remember where these pluggable things that are separate from the actual connector itself so at this stage we're at the converter stage and the converter of Sigma I can't read that and so therefore the whole thing stops but Kafka can act as a framework supports being able to continue processing if you hits problems at certain stages so if you hit problems during the converter or the transformation you can tell Catholic connect this is how we need to handle it we'll talk later about how we deal with problems in the actual connector itself like connecting out to the end system and various errors in the API there so now we go and create ourselves a new version of the same connector and we can clear that's there and just create it here I'm using the pot API because you can just override the existing configuration would you have to delete the connector and recreate it like we're doing a key sequel DB previously I just do a pot it creates it if it's not there it updates it if it is and what we're adding in is this we're saying the errors tolerance is also by default is none but by default it'll just stop and sometimes you want a data pipeline to do that if you get data that would not expect him that's not well formatted then stop because we need to check it out but sometimes we want to say well it's like it's fine that the good stuff pass on through and that's what this does so the tolerance is all and then we're going to lock those errors I'm gonna include the messages associated with them so we create that and if we go and have a look now at last exert we can see we've still got that one document there but if we're gonna have a look at the status of the connector we can see it's still running and that's good because if we go up here and we send another message over so let's send the message for a new key and it's gonna have the same field which is called a and we'll give a different value like that too so we send some good JSON over you can see it's arrived in the topic there and if we go down to elasticsearch hey yeah we can see that's made it over into a stick search if we hadn't set this error handling in place then the pipeline would have stops because the connector had stopped and if we restarted the connector have stopped again because they've tried to reprocess the data that it hadn't processed yet and it would have got stuck on this message here you say well I've not processed this message that I need to process it try and process it well I can't answer it would stop so setting this option here to ignore the errors that's the pipeline continue processing and it just drops this one out and it just ignores it writes out to the log file we don't always want to just like ignore these messages or write them to a log file that we have to hope that someone notices we can also use what's called a dead letter Q and this is an idea of saying well the good stuff passes on through the stuff that we can't process we're going to write out to a separate topic our dead letter Q and that's separate topic we can then consume because it's just a cosmic topic and processor monitor as we wanted to so let's set that up we're going to go on amend the pipeline and we're going to set this in place so again we're just recreating the existing connector we've got our previous settings here the errors tolerance and now we're saying with a dead letter Q write it out to this particular topic if we hit an error do you see realizing the data or running a single message transfer against it just these certain stages at the catholic pipeline and we're going to include information about the error in the header of the Kafka message that we write to the dead letter Q so we've created that now that's going to send another bogus message so this one's going to look like this and it's going to look like coming files we do this we send births so that's a bad message we're going to see this it won't actually go to elasticsearch okay which we wouldn't expect it to because it's badly formed should hopefully see the connector is still running which it is so the pipeline hasn't failed but if we go and have a look that's going to create ourselves a new window and if we're gonna have a look at the actual topic itself that we've created using Kafka cows because it's just a casket topic we can consume from us we'll see but on that topic we've actually got the data so here with formats in the output we're saying right the key and then write the value and then write the headers so here is the topic name there's the key there's the value and then here at the headers and the headers is all this information so we've got stuff like the topic it came from the partition the offset and then here's the actual error converting by its CAF Connect data failed due to serialization error and there's the actual stack traces we can see down here we are expecting a co alarm and that's natural offending entity so messages that we can process we can't deserialize we can't run the transformation against they can get routed to a dead letter Q without actually breaking the pipeline all of this is covered in an article I wrote here it's on the confluent blog Kafka connects deep-dive error handling and dead letter queues and it goes through all of these options that I'm showing you here and it also shows you down here at the bottom an important table so this tells us which things aren't handled by the Khafre Connect framework and which things need to be handled by the connector itself so of things to do with writing down or reading from specific technology with what you are integrating those are handled by the connector itself so let's look at an example of this if we remind ourselves the data that we've put into elasticsearch it looks like this now we can also look at the mapping we can say show me the mapping file this particular index and it says well it's got one field and it's a long data type so if we go and send some data to that topic that's valid JSON okay so let's send some JSON across and it's got a field name of a but we don't send it as a long compatible number let's just send it as a string okay so I put a string in there so this is valid JSON this isn't going to fail at the D serialization stage and we are singing our message transform that we're running we send that across we can see in the topic we should see down here that the status the connector still says it's running but if we go and have a look at the actual logs we'll see you down here though we've got an error and writing it to elasticsearch okay the bulk request failed because it's a an input string it's not a valid type for that particular field and now if we go back to the topic and we try and write some valid data we'll see what the impact of this is so here's a valid message here's the invalid message there's a valid message we try and send that and if we go down here and have a look at what's in elasticsearch we'll see that we don't have that message so even though we've sent a message a valid message here we don't have that in elasticsearch because that invalid one is bugging up the pipeline the pipeline's basically stalled behind that one because you can't get through because the connector says I can't write that one so I can't process the rest of it so this is where this last setting comes in which is where we say if you've got a malformed document then just ignore it I'll write a warning so let's change the connector definition and we say here behavior on malformed documents just write a warning so we do that and if we go and check that the connectors running it says it's running if we have a look at the locks we're going to see a warning down at the bottom which says well this is malformed but if we go and have a look at the actual data that's an elastic search here we can see that that document now has come through so we've got the valid message from the sauce topic has made it through to elasticsearch the one which is malformed we're trying to send through a string value into a number field that just gets ignored or escaped written to the log file but it doesn't actually break the pipeline for us so that SCAF could connect streaming data from a Kafka topic down to elasticsearch I hope it's being useful remember to subscribe to the channel remember to go and check out the confluence youtube channel as well check out the conflict log for lots of our Kafka connect and Kafka and confluent related blogs you
Info
Channel: Robin Moffatt
Views: 11,552
Rating: undefined out of 5
Keywords: kafka, kafka connect, streaming data, Confluent Platform, Elasticsearch, Kibana, JSON, Avro
Id: Cq-2eGxOCc8
Channel Id: undefined
Length: 65min 16sec (3916 seconds)
Published: Tue May 05 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.