Kafka Exactly Once with NiFi

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hey everybody welcome back users ask me all the time how they can use apache nifi to take advantage of apache kafka's exactly want semantics nifi is one of the easiest most common ways to get data into and out of kafka but what we're talking about here is pulling data from one kafka topic doing some sort of processing perhaps we're doing some routing some transformations filtering maybe doing some enrichment on that data and then we're pushing the result into another kafka topic and we want to make sure that for every record or message in the source topic we have exactly one correlating message in the destination topic nifi's never really supported kafka's exactly once semantics it really didn't fit very well with the nifi architecture because nifi provides a loose coupling between components on the canvas and it provides persistence of the data across restarts kafka's exactly what semantics really require a much tighter coupling between the kafka consumer and the kafka producer and they work better in a stateless environment where the data is not persisted across restarts the nifi community has been working really hard on a new capability referred to as stateless nifi it provides a different architecture to run your data flows in with different design decisions and this architecture is really much better suited for the exactly what semantics but it hasn't been very easy for users to consume or to use that is until now with version 1.15 of apache nifi we are introducing tons of new features including the ability to use kafka's exactly want semantics now before i jump into exactly how all of these things work i want to take some time to look at some of the problems that cause nifi's architecture not really to jive very well with kafka's exactly once semantics that way you'll understand not just how to build a data flow to take advantage of these exactly what semantics but you'll also understand why we need to build the data flow the way we do but if you're not interested in the why and you just want to know how to do it feel free to jump ahead now in order for knife to support kafka's exactly want semantics we have three big obstacles that we have to overcome firstly nifi persists all data across restarts now this is hugely important for a huge percentage of the use cases but in this situation we can have a case where nifi pulls some data from kafka and begins processing that data but before the process results are pushed to kafka nifi gets restarted now in this situation upon restart nifi will go ahead and finish processing that data and then send the process results to kafka meanwhile the kafka consumer will have its offsets reset that's going to cause the consumer to then re-pull the same data and we're going to end up with data duplication secondly kafka's exactly want semantics require a tight coupling between the consumer and the producer this is important because if there is a failure when you're publishing the results kafka or there's some sort of processing failure along the way we need to make sure that the kafka consumer can be rolled back so that data can be processed again nifi provides a loose coupling between components which makes it really difficult to provide that information to the kafka consumer in order for it to roll back thirdly we have to deal with the notion of data splitting and reordering now for a really simple flow like just consuming data from kafka maybe converting json to avro and then publishing the results back that's pretty easy to handle but for a more complex flow that results in data being split apart and reordered it becomes a lot more complicated to support that in nifi with exactly one semantics now we could support just these simple use cases but that would really limit the capabilities that make you love knifei in the first place so what we want to come up with is a much richer more holistic solution so now we have three issues that we have to tackle avoiding duplicate processing dealing with a tighter coupling between the kafka consumer and the producer and the data processing and data reordering now stateless nifi is an alternate runtime engine that is capable of running a data flow that we design and build in traditional knifi but it makes different design decisions it's single threaded it can optionally keep the content of your flow files in memory which can make the processing really fast and regardless of whether you choose to keep the content in memory or write it to disk in order to avoid heap exhaustion no data is going to be persisted across restarts so we've now handled one of our obstacles we don't have to worry about the data duplication from data persistence across restarts stateless also runs an entire data flow as a single cohesive unit meaning that if there is a processing failure anywhere in the flow the entire transaction is rolled back so that solves the issue of tight coupling between the consumer and the producer so we've got two of our obstacles addressed just by using the stateless engine thirdly we have to consider the issue of data reordering now it's not a hard requirement the data that gets consumed from kafka gets published back to kafka in exactly the same ordering rather the requirement is that the data cannot be reordered between transactions meaning that if we consume a bunch of data we do all the processing and then we publish it back to kafka in a single transaction then we don't have to worry about data reordering we can still support kafka's exactly wants semantics even if the data gets reordered and that can be handled in the data flow and we'll talk about how to do that in just a minute but once we've built this data flow we need some way of running it using that stateless engine and that's where the execute stateless processor comes in this is a new processor in nifi 1.15 and we're going to take a look at how we can use that in order to run our data flow once we build it now that we understand the obstacles and the underlying solution let's take a look at how we can build that data flow so i'm going to start here with a consume kafka record 2 6 processor this is going to pull some json records from a topic named charges and because we want to make sure that we're doing this exactly once we need to ensure that we have commit offsets set to false now this is a new property that's introduced in version 1.15 of nifi and this is basically telling our consumer that once it's pulled the data don't commit those offsets to kafka so that if we have some processing failure we're able to roll back all of those records that we've already consumed so instead it allows our publisher to commit those offsets for us once everything's completed successfully next we're going to partition the data some of the data coming in will have a customer id field and other records are going to have a credit card number field so we want to route that data to one of two lookup record processors each of these is going to hit a different mysql database in order to enrich our data with both a first name and a last name but we have two different lookups because we need to hit two different tables in our database based on the type of identifier that we have once we've performed all of that enrichment we're going to send all of that data to a published kafka record 26 processor so this is going to take all that enriched data and push it into a different kafka topic now it's important to note that the configuration of this publisher is configured with use transactions set to true we need to make sure that we're using transactions in order to guarantee that we make use of kafka's exactly once semantics but notice that there's no new property in this processor to tell it to commit those offsets from the consumer instead that consumer whenever it doesn't commit the offsets it'll update our flow file with an attribute so that the producer automatically knows that it's responsible for committing those offsets for us and once we've sent that enriched data to kafka we're going to route the data either to success or failure depending on whether or not we successfully push that data now we're almost done but remember i said we have to ensure that all the data sent to our publisher is done in a single transaction so here's the trick to make that happen first we need to remove our connections going to our publisher and remove this one as well and then we need to take all of these other components so i'll select all remove these and we need to create a new process group that encompasses all of these so once we've done that we can come in here and create an output port and we'll call it success we'll connect our lookup record processors to it and then we can move up a level and we can connect that output or that process group to our published kafka record processor successful output we'll go to publish kafka record and we'll create another failure output port here and we'll route our failure connection there now the reason that we're doing this if we come in here and configure this process group we also want to make sure that we set the same parameter context the reason we're doing this is that we want to be able to update this process group outbound policy we want to change that from stream when available which is the default to batch output and we'll click apply so what that does for us is that it changes this data flow so that nothing is going to leave this process group from its output ports until there's no data in this process group at all except for the data that's queued up in output ports so that means that even if we have data coming in here going to both of these different lookup record processors even if we pull a thousand records and only two go to this lookup record processor and 998 go to this one and this one takes a lot longer that's okay because all of the data is going to leave the process group in a single batch and so that's then going to come into our kafka publisher in a single batch and we need to connect its failure relationship also and now we finish building our data flow it's time to go ahead and save it to the registry so i'll right click go to version commit local changes and i'll save that to the registry but we don't want to run the data flow like this of course we want to make sure that we run it using the stateless engine in version 1-15 of nifi we can make use of a new processor called execute stateless now this processor is going to allow us to take a data flow that we've already created in ifi and run it using the stateless engine this is extremely powerful because it provides a great integration between traditional nifi and stateless nifi so we can really capitalize on the advantages of both and you know that makes sense so let's configure it now the first configuration option that we have is where to find the data flow we can use a local file or a url or we can change it to use nifi registry that's why what i'm going to do here so the register url that i'm going to use is nifi registry port 18080 the registry bucket name and the name of our flow i don't remember what that was so i can actually come over here to change version and i can see the name of the flow is enriched charges so i can come back over here and i could choose a specific version of the flow that i want to run if i leave that blank it's just going to fetch the most recent version of the flow that's what i want to do for now or alternatively i could come over here and i could right click and i could say that i want to download a flow definition and i could then configure this processor to point to that file or i could even upload that flow definition to some url and then reference that url within this processor in order to grab the data flow and now for the input port i don't need to specify anything because i'm not planning to send data into this processor i'm only going to get data out of it for the failure port i'll use failure because that was the name of the port that we used in our data flow for failures and there are several other different options here that we can configure i think all of them are okay with the defaults except for the content storage strategy now i can store the content on disk if i want in order to avoid running out of memory but we know that these messages are going to be pretty small so i'm actually going to store the content on heap it'll be a little bit faster that way and now i need to connect this to something i could go ahead and auto terminate the success and failure relationships but i can also add other processors to my data flow so that anything that comes out of this processor after it's finished running in the stateless engine can be processed further with traditional nifi so i'll just add an update attribute for the output so anything that is successfully transferred out of the stateless data flow will come to the output relationship and i'll create another update attribute here for the failure and the timeout conditions now remember we had a few different parameters that we had configured here in our parameter context so we'll want to go ahead and set those parameters here as well so for instance i had a consumer group id that was set to enrich charges i'll go ahead and use that same value if i don't enter any value for this particular parameter it'll go ahead and use whatever was exported in that parameter context but for sensitive properties we know that those don't get persisted in our data flow so we do need to create a parameter for that so for the database password i'll go ahead and set that and we can apply those changes and now we can start it it'll take just a second to download that flow and go ahead and initialize and we can see the data starts streaming out of here now and so as we refresh we can see data coming through but if we want to get a little bit more insight as to what's going on in this particular processor we can actually come over here to the counters and for any of the counters that get updated within our flow we're going to see those updated by this execute stateless processor so we can actually see the number of records that were received by consume kafka and the number of records that were sent by publish kafka and since we're using exactly one semantics we should certainly see that those numbers are going to be equal and as we hit refresh we're going to see those numbers continue to increase so that's all there is to it go ahead and give it a go leave it running and you can kill your kafka cluster terminate your network connection you can kill knifi whatever you'd like you'll still see this going to provide you with those coveted exactly wants semantics but if you have any problems hit me up on the nifi mailing list users nifi.apache.org and of course i'll post a link to the dataflow in the comments if you find this content helpful and you'd like to learn more please do like and subscribe to the channel thanks for watching take care
Info
Channel: NiFi Notes
Views: 619
Rating: undefined out of 5
Keywords:
Id: VyzoD8eh-t0
Channel Id: undefined
Length: 19min 49sec (1189 seconds)
Published: Mon Nov 01 2021
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.