Building event sourced systems with Kafka Streams (Amitay Horwitz, Israel)

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
thank you very much for coming this is my first ever conference talks I'm very excited to be here and I hope you'll find it interesting event-driven architectures and events or thing are not a new thing however they regain traction these days in the cloud era has traditional architectures such as crud based applications are becoming more difficult to scale and manage in a distributed environment my name is Amit I and I'm a software engineer Twiggs I'm into functional programming distributed systems testing and exploring new programming languages and technology stacks like I mentioned I'm working at this awesome company called Wix and we create a platform that allows our users to create beautiful websites this is actually a real website from one of our users he has an online store for selling unicorn adoption kits don't ask me what that is but what it exists so you can check it out it's very cool and we have over 120 million users 2,100 employees out of which seven hundred engineers and over 600 micro services running in production our engineering teams are spread across various location in Lithuania Ukraine and Israel and we have an amazing R&D center right here in Kiev so check out our booth outside afterwards so I want to share with you my story with event sourcing and before we do that I need to explain what event sourcing actually is I will describe event team which is the library we wrote as an infrastructure for events or systems at works and what were the challenges we faced after running production for about two and a half years then I will keep and give a quick introduction Kafka streams which is a stream processing library which is written on top of Apache Kafka and finally we will demonstrate how it all fits together and how you can create a reactive eventual system using Kafka streams as the backbone but first some background our story begins back in 2015 we were asked to create a new service as part of weeks as CRM suite which is called weeks invoices and weeks inverses is meant to help businesses owners to manage their invoices send them to the customers and receive payments online now this is it doesn't sound like a very exciting field but actually this has a quite interesting and complex domain model which contains a lot of pieces of data and life cycle phases and since we're dealing with payments and taxes and revenues auditing must be heavily integrated into the product for ensuring the integrity of our customized data for example this is the invoice details page so we can see we need the business information here we also need the customer information and other information related to the universe itself like the line items and tax information on which we perform various calculations now this isn't too bad when we're viewing a single invoice however when we want to display a list of all invoices and want to give the ability for doing searching and filtering and sorting and want to display so all sorts of aggregations for reports and stats and this view also captures other information like payment events and pageviews so we decided to implement tweaks in versus using event sourcing so let's start by describing what event sourcing is and I won't get into too much detail here because this is a very big topic and there is much to say about it and we already had had in this conference great talk about it for example it dance talk from yesterday if you didn't attend it I highly recommend checking out the recording afterwards so the obvious solution for dealing with such a problem is just create a normalized schema in relational database this is how we used to work and this is a method that has proven itself successful for many years and it has really nice advantage advantages in terms of data integrity and consistency but as a schema evolves the queries become more and more complex and you need to join many tables together in order to satisfy them so you you can't keep your domain model line with your DB tables for long and if you want to model your data based on your data persistence you're gonna have a bad time eventually this problem is called the object-relational impedance which mismatch so the basic idea with event sourcing is quite simple rather than saving and mutable state in database and updated in place we save the succession of events that brought us to this stage we can always derive current state by replaying all the events from the beginning of time in functional programming terms this is called the left fold over the event stream and this method has many advantage advantage as a benefit since we're not overwriting our data and but merely accumulate facts about what happened in our domain the big advantage here that we don't need to decide in advance what the queries you're gonna need and montolo data with this bias with which might be wrong will change over time and we can always project the event stream to whatever query you're gonna need after after the fact which give us great flexibility I'm sorry in advance that all the code samples will be in Scala even though it's a Scala conference but I'm sure you'll be able to pick them up okay so let's take for example the life cycle of an invoice and now it will develop over time so an invoice is first created and we can see below the drive state of that event and over time for example line item is added another line item is added line item is removed and we send the invoice to the customer and we receive payment and so we can project the state and from any point in time and we can create the queries and reduce the complexity of the of the reads quite dramatically if we would have done it using traditional methods this brings us to event him the actual implementation we created four weeks invoices we had several design goals when we started out the project wanted it to keep the libraries small and simple and are intrusive we wanted to maintain data integrity meaning we don't want to save events that violates any business environment invariants and we added the ability to easily add custom views because we had lots of reed models and we wanted the views to be to be quick and fast so event team is actually split into two parts we have the right path and the right path let's start with the right path so we have the user interface and this is just your regular browser it will create a command JSON and push it to the server and the first thing we do is decode the command JSON this is the point where we can create simple validations on the payload itself these are not yet the validation that relate to the business rules this will check for example for empty strings or numbers in range etc then we have a command object we can pass it to the command dispatcher the command dispatcher holds inside it several command handler each handler can handle a certain command and the command dispatcher finds the handler that can deal with this specific command and pass the the command to this handler and up until this point this has nothing to do with event sourcing but one type of command hell you have with the event source command hello so the events are scheming harlot takes the command the first thing it does is load the current aggregate the aggregate is the domain object in this case the invoice this will usually get done just by loading the events from the event store and and rebuilding the current state and once we have the current state and the command we can we can derive what are the needed events that the command will produce and if the command is involved we can reject it at this point and not produce any events so if we did create events we have a new state of the aggregate we can validate everything is fine and publish the events back to the event store we use optimistic login to ensure that no concurrent rights are taking place and once we save the event in the event store we can acknowledge back to the client that the command was executed and we also publish these events to an event bus this basically ends the the right path and so the right path we have the event coming into the event bus and we can attach several event handlers on this event so one event handler will be a view projector which means it will reading the events and update some persistent view and for example a flat table in database which is already denormalized or elasticsearch index so whatever you need in order to satisfy your read needs and but you can also have another event handler for example for sending emails or generating reports and but once you have your denormalized view you can easily have you using interface query against it very efficiently so this is really high-level overview of event team and but we had some pain points along the way and despite of a simple design the library became quite big and this architecture has inherent eventual consistency because it takes time until the view gets updated and this is the conscious decision we've made to sacrifice strong consistency in order to achieve better performance but this is not correctly integrated into the product and there are many situations where the client performs a read after write for example create a new invoice then we load the invoices list and the new invoice is not there and most of the time it's fast enough that you won't notice it but there are times it will not update fast enough and this will compromise the user experience but I think the biggest challenge we had was actually rebuilding the views which is quite a complex operation and involves many different moving parts and I'll try to explain why this is complex so from the right side it's quite easy to add a new event handler that will listen on the event stream but if you just subscribe a new event handler and it's start listening on a live system you can just handle the events and write them to the database because they might rely on past data being there so you must ensure that all past data was already written before you can handle the new event coming in and we have a similar problem from the right side if we add a new event handler but no event events coming in to trigger the the view rebuild and the view will already try to query this database and we don't have any data in there so we created a mechanism to handle with these problems but like I said it was quite complex and and not easy to do in a distributed environment where events are coming from various servers and and it was quite complicated so this brings us to Kafka and the problems I described brought me to search for alternative architectures we wanted to keep the benefits of event sourcing but with simpler ways of dealing with inconsistencies and rebuilding views let's you see a quick show of hands how many of you using Kafka in production okay it's about 1/4 keep your hands up if you ever used like stream processing applications like after streams or other ok cool and so Kafka basically is a distributed append-only log and it's most often used as a queue or a pub subsystem but as we'll see it can do much more and it's really battle-tested technology being used at really high scale companies and and proving itself worthy of dealing with really high load so the basic instruction Kafka gives us is a topic this is basically a logical q it is partitioned meaning Q will divide its messages across multiple partitions which are configured and producers will push messages to to each partition depending on the key of the message the important thing to notice here that the order is maintained within a single partition so if you need to keep ordering you need to produce the message with the same key now after you have these these messages produced in EQ you can then consume it Kafka keeps for each topic a retention policy which is also configurable so you can tell it how long you want to keep the messages and this gives you great flexibility because you can add consumers that will replay all messages that already happen you don't lose messages once they are consumed and you can add additional notes to what's called a consumer group which is basically just one logical consumer but you can scale it up to multiple nodes so the kafka will handle the the balancing of which node handles which partitions so if one consumer can't keep up you can simply add another node to the consumer group and Kafka will do its magic and rebalance everything and like I said because this is the partitioning model that Kafka uses so you still get the ordering guarantees per partition and this is a real high-level overview of Kafka and Kafka streams is a library also by Apache Kafka that is meant to do stream processing on top of Kafka and it uses two main abstractions which are the streams and tables so streams you can think of it as data in flight these are unbound and content continuously updating data set which means it starts somewhere but it never ends it's like an infinite collection and because it's ordered replayable sequence of key value pairs this makes it sound interesting for an event source system tables on the other hand you can think of them as data at rest and this is what the the data looks like at a specific point in time and if another message comes with the same key it will override the previous one so like you know from a database [Music] one important observation to notice is that we have sort of a stream to table duality and basically there are the same two sides of the same coin so let's see an example you can take in a table and stream all changes to the table so now you turn this table into a stream but you can take this stream and materialize it back into a table this is how databases replicate and persist the data internally but it's not visible to you so let's see a real example this is for example a user that shows a page with a table that shows pages per user and so we start with alledge with Alice with one page view and this table develops over time and we see that each change can be streamed back to create these key value pairs and we can materialize back this stream into the table and get the same table back so this is one thing to notice and another key ingredient we want from any stream processing application is the ability to do stream transformations so we're used to doing these transformations in a small if you use skylight collections of Java eight streams or scholars collections you can take a collection and do for example map or filter a flat map or other operations and get a new collection out of it but the ability to do this in the large on streams which are infinite and and you don't know with though ever and so this is really nice ability to have and you declare this the transformation really really nice programmatically and declaratively another thing you can do is take two streams and join them to a third stream yeah so this basically takes the messages from both of the streams and pushing to a new stream that contains all the messages and that's the obvious thing you can do is take a stream and a table and join them together this is possible because we talked about this duality between a stream and a table but also this is an interesting transformation to make so cough Katrina give us the ability to create stream processing apps and that will transform and enrich your data it will support stateless and stateful operations stateless for example the mapping and filtering we saw and stateful if you need some sort of aggregations it supports windowing operations for dealing with down Windows this is a really strong feature I won't talk about it too much but we will see an example afterwards and and the cool thing is it's embedded inside your app you don't need to deploy a separate cluster for doing this processing you just use the library as a draw inside your application and all transformation happens inside your app so you get also the properties of elasticity and scalability from Kafka itself because everything relies on Kafka so in order to define your stream processing app what you need to do is basically tell it where Kafka is this is a bootstrap bootstrap service config and you need to tell it your application ID and by giving it an application the this will use Kafka's consumer group mechanism and you're able to scale out your application by just deploying your service with the same application ID and Kafka will automatically distribute the load so let's take a closer look what each of these stream processing apps looks like and what you basically need to do is define your processor topology and the topology is basically just a graph of transformation and you define each of the nodes and how they are connected together and you have two special kinds of nodes these are the source processor and the sync processor and most most of the times these will be from just kafka topics you'll read in information for topic and pushed downstream information back to Kafka topics but you have other options as well and you see here we have several stateless operations like the mapping and filtering but we also have joints which are which are stateful for this Kafka stream uses what's called the state store this is how Kafka internally saves a local state on on the machine itself by default it uses rocks DB but you can plug in other backends for that and it uses Kafka itself for replicating in these state stores so they're also resilient so how do you define each of these nodes you have several options and the most basic one is the processor API this is the most low level so most of the time you probably won't gonna need it but it's worth mentioning that all standard operations like math and filter are using this API to to define these operations so if you need very custom and processing logic you can you can use it my personal favorite is the streams DSL which like lets you programmatically describe your topology and let's see a quick example of how that looks so the first thing we need is a streams builder and now we create a stream from the topic text lines this will give us a string a stream that will update every time a new message comes into this topic now we can take these text lines flat map the values by splitting each line in towards so we had a stream of text lines now we have a stream of words now we can group this stream by by the word and we get a table and we can count each value spare key and materialize it as a state store which is called count store and later we can take this table and transform it back to a stream and push it to a new Kafka topic so we had a topic of text lines and we created the topic of word counts yeah so we have another option which is called K SQL this is the next SQL dialect for streaming data it's more high-level this example shows you how you can create a table with possible fraud attempts by selecting a authorization attempts for the same number that had more than three attempts within five seconds so this shows you an example of the of the time windows capabilities I talked about and and it shows you how how concise it is to define really complex semantics so it's really nice um so let's see how we can use these tools to create a simplified inverses app on top of kafka streams so high level design again we have the user interface and we have a node server and our copper cluster so the user interface will publish a command I create comment to the node server and all it will do is push a comment to Kafka at this point we can acknowledge that the command was accepted and sometime in the future it will be processed so we have a stream processing application which which will handle these commands and once the processing is done it can push back the result to a different channel a different topic sorry and no the note server can listen on the topic and push live updates to the clients using a WebSocket so we also have your inherited ventral consistency like we had in inventing but this is where we embrace it rather than ignore it and really take advantage of the a sync nature of the system so we can create another stream application and which is a projector with which which its job is to project the events into some view which the node can a server can just read straight from the view and like we said because this is Kafka streams who can easily scale up or down as much as we need each part of the system so let's take a deeper dive and see how these command handles are so these stream processing apps are implemented so we'll start off with a command handler like I said the thing first thing we do is listen on the command stream so we take each command we need to map it to some result transform each command into a result the result is either the command succeeded or failed so for that we need to read the current state the aggregates so we have a state store that keeps the snapshot of each invoice and now we can decide whether this command is is valid or not and we can create a the failed or succeeded result this result will be pushed back to our commands resolved topic and for successful results will produce events so we can flat map these successful results into the eventful we push downstream to the events topic and we can later listen on that event stream and aggregate it to update the state store and it's also useful to push the statutes to its own topic because why not and now all of this command handler in logic is just 660 lines of Scala code so again it's you get a lot of a lot of benefits and really concise in a declarative way so let's take for example a closer look on now the this node is implemented how we aggregate the events into snapshots so this is basically the code we take a stream from the invoice events topic we group it by K which is the invoice ID and then we aggregated you giving it a function for doing the aggregation itself and we materialize the aggregated table into a state store which is called snapshot store all right the projector is is even simpler we just listen on the snapshots stream we map each snapshot which is the current state of the invoice to how the record will look like in the database and then for each record we persist it we just save it and we can push it again downstream to a new topic which contains how the records look like in the database so we get this architecture where Kafka is in the middle of all and we have small microservices talking to Kafka for example the invoice just takes commands from the comfort class and pushes back information back to Kafka and we can have other micro services that communicate the same way with themselves amongst other microservices and we can easily push information into Casca for example changes in inventory or get information back from cough are like producing analytic reports so this gives us really decouple the nice architect architecture which is really scalable so what did we gain and amazing this design compared to event him like I said this is very simple and declarative and eventual consistency is handled gracefully and we we embrace it and it's very easy to to add or change custom views and obviously we got all the benefits we had with events are seeing plus the scalability and fault-tolerance properties that Casca provides us so now let's see a quick demo mirror the displays [Music] so start the command header and I'll start the projector just a second Thanks yeah live demos you know yeah the demos let's try to get try that again okay let's try that again okay so the command handler okay so it doesn't work but you know live demos but I have this prepared yesterday as a video so that's exciting but let's see it okay so this is imagine I'm doing it like live now starting the command Handler and starting the projector like I told you now I have this UI for displaying the list of invoices this is just simple data and I create some invoices you see the little lag I have there between the command the time it's accepted until it's acknowledged and processed so because I'm pushing the changes back to the client and I can open another client and create invoices from that plant and everything will sync up and magically with WebSockets and if you issue invalid commands you also will see the result that the command is not valid with the reason and another nice property you can just kill your command handler altogether and if you try to create a new invoice at this state it will just hang nothing happens but the command was already sent to Kafka so if I bring back the command and they're back again and wait a while it will pick up where it left off and the command will be processed and you see it it got updated Wow live demos well alright so let's get back to the slides so we saw that already demo okay so what are the takeaways and [Music] event-driven systems an event sourcing can help build very flexible and scalable systems but you should always know your trade-offs regarding consistency guarantees and schema evolution data integrity and and Kafka and Kafka streams are powerful tools and that can be employed in many use cases so that's basically what I have for you so stream on and here are some resources including the demo code which will hopefully work for you and some other information and we have actually quite quite a lot of time for Q&A so if you have any questions hey thanks for the talk you mentioned there is an stage store right so is it persistent I didn't quite get that and use the Kafka feature or Kafka streams feature or it's your own stuff which a state store yeah the state store are an internal part of Kafka streams each time you do some stateful operation like aggregating or joining these nothing they can take a value and return a value right away I need to keep some data persisted so cough cases state store to do this to do this persistence they are saved locally to disk on the machine that does the transformation but it's also replicated via cough gates using special internal topics to other nodes so you can lose this node then you want to send data so yeah it's sykov cast rings feature and you can plug and play the the internal back-end of the persistence layer so the default is rocks DP but you can use other custom States so as if you need them Thanks thank you for a talk it was nice in which way should we handle errors during Kafka streaming as I but do any reasons or we got invalid message or we can't send message to this destination topic you mean how do you handle invalid messages come into the queue okay so like you to handle errors in any other application you need to to catch exceptions because the streams application will will shut down if it has an exception so you need to make sure that exceptions are handled and if you have for example different message types pushed into your topic and you can't handle them this is where Kafka is using what's called the soda it's it's it's how Kafka uses serialization and deserialization so you can use for example a proto bar forever which are tools for ending with scheme evolution with your messages so this way if involve message comes it will you know how to handle with it is it possible consume carrier so is it possible to receive duplicates of events for example if the consumer handler dies yes so regarding delivery delivery guarantees this is also configurable in Kafka streams you can define what semantics you need whether it be at most once at least once or exactly once now the exactly once comes with the caveat because you have to to make sure all your processing is is idempotent because there are stuff that that are not possible to do exactly once but Kafka will will run the entire stream processing topology and push all messages downstream you can think of it as a single transaction so it handles all this all this is handled for you by Kafka streams and and for each messages that come into your topology everything must be finished before the next one can be processed thank you for the great talk and my question is about distributed group by for example you have some map operation and then you have a group by on your stream by some new key and you need actually for this you need a shuffle some kind of shuffle operation you need to move data with the same key to not to be towed to the same node to be aggregated how does CAF custom handle this and what happens if some of this node fails for example so basically all operations are using Kafka's partitioning model to distribute the load so if you do an operation which preserves the key for example if you just mutating the I mean mapping over the value for example this will stay in the same partition and you can handle it in the same node if you change your key obviously you need to push it to two different partitions so so this is basically the partition model is out Kafka handles with this it's distributed across the partitions yeah well the question who need to handle this repetition so how how we need to push this mapped date and with new key to announce a Kafka topic or we or Kafka streams handle this like we saw in the code examples the the code you need to write is very declarative and the library does all this this work for you okay yes my question is so we have a very simple use case we have a data publisher yes data publisher and data processor and we have some error for example database error when processing some some some event so can we skip this message and leave it in our queue in our topic and return to precision after five seconds for example let me see if I understand your question if for example your database is down so you can update it but you want to process this message we need to repeat some operation after for example five or twenty seconds so can we leave live message in this topic and then so kafka topics are configured with with retention policies so so messages are not lost once they are processed I mean you can define how much time it will take for them to be deleted it it's a matter of trade-off between how much storage you want to to invest in it and now how important the messages are for you so in such a case if if your process of fails and for example your database down and an exception is thrown when you start it it will it will remember its position and will try to pick up from there it will not it will not skip this message if you want to handle it explicitly you can catch this exception and and for example create some kind of retry mechanism and we you're on semantics but but yeah you have to handle it somehow but the messages are not lost at the important part so you can always go back to messages and reprocess them so I should I should save this event to retry mechanism in another topic that's a possibility depending on your product needs okay thank you what about committable source need support committable source I connect to cough cover stream and if I process message I commit manually to Kafka oh if whether you control the commit committable yeah oh so like I said like a committee source so I I'm not too familiar with akka but but I talked about the delivery delivery guarantees that you need to configure so again the library handles this for you and it will make sure that the message is fully processed inside your topology before it will consume the next message so you need to define whether you need the semantics of at least once exactly once or at most once so this is tunable you're going to regarding your needs who do you actually use for you and store button what do you actually use for event store yes so it's it seems strange to some people because we didn't see any database that that holds the events but you can think of Kafka itself as a sort of database because it already has the durability and fault tolerance and replication so if you define your topic with a retention policy of keeping the events forever so it's basically it acts like a database if it's really important for you for example to to perform various queries on the events afterwards you can always add a new stream processor that will read the event stream and push them to some other database but what you don't have to I mean you can treat Kafka as a database in these cases this is what I meant where it's not only a queue or pub subsystem it can do much more okay so you actually can configure Kafka to do not delete all data at all yeah okay think is there possibility to support somehow distributed transactions with cross streams yeah so since version 0.10 I think Kafka added this feature for creating distributed transactions where you can commit messages to different partitions transactionally and safely and when you enable the exactly ones property in Kafka streams this will internally use this transaction mechanism for this so yeah hi I have some questions for example we have two two topic and create and we have a V end of a 5-minute and we want to join this two topic into as a topic a using a Kafka stream but with not by k but by two fields is it possible and for example should week some create a new topic a topic with needed k and then and only then join this two field two topic in other when you streams you you choose the function that takes the key and a value from the first one and the second one and then you can choose by what you want to join so but you you might again not the key for the partitioning but we want to join not by k but by some field which we have in a message in this function this subject will let you also not just look at the key but also the value you get the values from both topics and you can decide whether you want to join what you want to join on so if I remember correctly you can do that whether you want this or by transferring to a different topic with key and then joining again it depends on your your needs and your ordering semantics oh okay thank you very much [Music]
Info
Channel: jeeconf
Views: 14,271
Rating: 4.8060608 out of 5
Keywords: JEEConf, event sourcing, Kafka, Kafka Streams
Id: b17l7LvrTco
Channel Id: undefined
Length: 45min 11sec (2711 seconds)
Published: Tue Jun 05 2018
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.