Martin Kleppmann — Event Sourcing and Stream Processing at Scale

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
so my name is Martin clapman and I'd like to start with a confession which is that I actually don't know very much about DDD at all and so so I'm really kind of badly placed to be speaking here but my background is in building systems large-scale data systems for internet companies and so I've done a bunch of research in that area and built various production systems as well at the moment I'm on the academic side at the University of Cambridge looking at some of the more fundamental parts of of building large scale scalable data systems and I'm writing this book here for O'Reilly which looks at the architecture of the system is used by internet companies and so as part of the research for this book I've kind of been giving conference talks and writing blog posts and exploring various ideas to see how people respond to them and especially I've been talking about building systems that use events at their core and people keep coming to me and saying this looks a bit like event sourcing how is it different and so my first reaction was what is event sourcing I've never heard of it and I replied to me saying it's a concept from domain driven design you should look at it and I go what the hell is the main driven design because is this kind of community of people that that I've come from actually this is a completely unknown thing barely anybody knows about DDD at all and perhaps that's actually okay because if I look at the kind of systems that I've worked on actually the business domain hasn't been particularly complicated so generally these are probably not really situations that need DDD that would benefit from DDD that doesn't mean that they're easy is just that the complexities in the hard aspects are elsewhere and so I'd like to draw this dichotomy here which is it's maybe a bit of a false dichotomy but I think it'll serve for purposes of making an argument between enterprise software on the one side and the systems of Internet companies on the other side of course in reality you may well find both within the same organization so they're not really that mutually exclusive but the main distinction I want to make here is that on the enterprise software side it seems to me that the main problem the biggest problem that software faces there is the complexity of the domain of course this is why DDD exists it's in order to give us tools to deal with that complexity and manage it so this is complexity in the business logic that we try to manage on the other hand in the kind of Internet infrastructure domain the problem is not so much the complexity of domain domain can be quite simple the problem the biggest problem is that there's simply so much data as vast volumes of data and it's coming in at really high speed and so that leads to complexity of the data infrastructure so simply the the servers and the infrastructure in order to handle that onslaught of data become complexity and so we've got complexity on both sides but for very different reasons and what I found surprising is that it seemed that the solutions to managing this complexity actually have some similarity across these two domains and that's that's really the main point I want to make here that well as part of DDD event sourcing has become like one way of structuring data where we say okay we can describe the state of a system not in terms of mutation to some like mutable data store but in terms of the sequence of changes that led to the current state of the system and each of these changes is an immutable event that is stored somewhere on the other side on the entire company side this is what I've mostly worked on it's string processing systems which also deal with sequences of immutable events and so what I like to do here is first illustrate some examples or stream processing from my experience of the systems that I've worked on and then we can see some of the parallels and maybe there's something that we can learn from each other here because I think it's very good to try and connect different communities that's why I'm coming here in the hope that maybe there's some interesting things that I can bring from the Internet infrastructure side of things but likewise I'd like to learn from DDD and see what I can carry back to the infrastructure side as well so I worked at LinkedIn until about a year and a half ago or so so I'd like to take an example for that is actually taken from LinkedIn production systems and a workflow that I worked on and you can imagine LinkedIn has rather a lot of data and a lot of it is in the structure of events and one example events that of which there is quite a lot is a pageview event or in the kind of past tense verb nomenclature you might call that use a viewed page or suchlike so this is an event that is generated anytime a user views a page on the website or in the mobile app and so a typical event looks something like this it's got a type it's got a timestamp like a UNIX timestamp it's got the ID of the user who who viewed the page it's got maybe a session ID which might have come from our session cookie which tells you which browser is doing that it's got a page key or URL which tells you which page was viewed in this case maybe one user viewed another user's profile okay so in that case you might want to log also the ID of the profile that was viewed so now we've got a relationship between two different profiles and you might look a few other things like for example how did the user get to that page in the first place did they click a link in an email the deck take a push notification they come from the search feature under website whatever and if you have these events collecting them is rather useful you can do many interesting things with it for example LinkedIn has this feature called who viewed your profile which you might have seen it gives you this kind of dashboard view of like a summary Google Analytics style of all of the people who viewed your profile in the last few months it tells you like how many per week of viewed your profile and gives you some happy smiling faces of some of the people who give you a profile and various other stats there so that's one thing you can do you can also build recommendation systems this is something that LinkedIn is quite big on is trying to use machine learning systems to build recommendations that help the people of the site using the site so one simple feature there for example is people who viewed profile X also viewed profile why this is called collaborative filtering in the jargon so if you know all of the data about who viewed which file you can start building these kind of features here where if you look at my profile on the side you get this little box suggesting various other profiles of people who are in some way related to me and with this event many more things you can do right okay so you can train relevance models for search for example so that you rank search results in a order that's useful to people you can monitor the performance of the site you can evaluate the results of a be tests to see if you made some change to the site does it improve metrics or not you can use it for reporting to management and so on and so on so I just want to focus on one of these examples here and how we might implement that this example is this who viewed your profile so zooming in a bit on that screenshot from earlier that's what's the data like this is the read view on to the collection of pay to events all of the views for a particular profile but that might be a fairly large number so it gets summarized into these okay there's a bucket per week and we have a counter per week also we have some interesting things like it broke down okay 70 of the people who'd my profile have a job title of software developer okay that's kind of make sense because I'm also a software developer and lots of them work at LinkedIn okay but if you think about the structure of this event we had earlier here there's nothing here telling us what the job title of the software developer at what the job title of the viewer is it just gives us the ID of the viewer which is enough to identify it but at some point some kind of join has got to happen here so that we can take an event that contains only a view idea viewer ID and somehow enrich that so that we also have the job title and also have the company name for the viewer of that event and then once we have that we can index it and aggregate it and create these kind of snippets of information like how many of the people who viewed your profile have a certain job title so there's clearly some kind of multi-step process that has to happen here and this is where stream processing comes in so you could imagine like a simple option might be to log the events with all of this additional denormalized data in the first place but then these events were get really large and you'd have to stuff into into those events all sorts of things that might be needed for some kind of purposes in future so in general it seems to be better and this is what LinkedIn does as well is just to keep the raw events as clean and simple as possible and if you need to join with some additional datasets then do that as a separate step as an enrichment step and so with a stream processor you would do this somehow like this you have some kind of stream of page view events happening that live on the sides every time a user load some page some web server emits this events and dumps it on a message queue of some sort then you have some kind of job which sits there consuming those events and it somehow has to add the profile information about the viewer to these events and so well the obvious way of doing that is to query a database so every time you get an event like with UID one two three four you query that database can I please have the profile for one two three four and you get it back and then you emit a composite event where with that profile data included and so the way that works you can think of as a join between the stream of events the page view events and a table of data the the table of user profiles where every time for every element in the stream you do a lookup in that table and then you generate the output somehow the problem here is a performance problem mainly and that is that these event streams can be quite high volume and we've built efficient systems for dealing with high-volume streams so the stream processing system that I worked on would handle hundreds of thousands of messages per node and so distributed across multiple machines you can get throughput of millions of messages per second but if you're going to do that kind of query volume to a database you your database is not going to be particularly happy in most cases and so there seems to be like at least two orders of magnitude throughput difference between the rate at which we can sensibly query a database and the rate at which these events are coming in so that's simply a bit too slow okay when things are too slow to standard a pro okay we add caching because caching makes everything faster doesn't it and now you have two problems of course the ugly cache invalidation problem and so that's the problem there is that well at some point somebody will update their profile and so this cache might be stale and now you say okay well you know put a purpose of analytics we don't care too much if the if the profile is a little bit stale so you know what do you do here do you have some kind of invalidation time like maybe keep an expiry of one day in that cache so that way the data won't be more stale than one day but it's really a difficult trade-off because if you make that exploration time too short then okay you've got up-to-date profiles but the cache isn't giving you much benefit because most of the requests to the cache will you misses but if you make the exploration too long then you've got really stale data okay and there's actually a more insidious problem there as well despite just the staleness of the data the problem is illustrated in this graph it might be a bit hard to see from the back I'm afraid I'll just explain it so this is a graph of the course of 12 hours from on the x-axis of queries per second to one of LinkedIn's production database clusters and you can see that this is kind of daytime so during this time it's about 67,000 requests per second and it's fairly stable down then evening comes and towards the evening it kind of goes down and then it's about 5:30 p.m. the throughput suddenly drops to almost zero and it stays at almost zero for about half an hour and then after half an hour there's a huge spike and the number of queries per second spikes up to about 24,000 per seconds so that's like three or four times the normal daytime peak traffic which is kind of not that nice to the database so what has happened here what happened here was that well we had a stream processing job which was taking in these page you events and that stream processing job got shut down for half an hour because you know it's not a particularly critical job it's just generating some analytics which don't need to be super up-to-date and so it's okay for it to be down for half an hour then after half an hour after the maintenance is over its restarted again and now this job wants to catch up on all of the events that it missed while it was shut down and so churns through that whole backlog as fast as it can and we built these systems to be quite efficient so it can churn churn through all of these backlog of events pretty quickly but as a result it's hammering this database really hard and now some completely unrelated part of the system which is also happens to be talking to this database suddenly goes really slow and suddenly we've impacted users by doing some what we thought was routine maintenance somewhere on the back end so this is pretty bad news so what we want to do is build these kind of stream table joins but we want to do so in a way that doesn't make us cause lots of operational problems and so I was talking a bit about this cache in the cache invalidation earlier and well rather than having just an expiry time on the cache we say okay we could have a kind of active invalidation policy that is most of the time people don't update their LinkedIn profile that often you know like you might update it every few months at most maybe every few years and so in this case actually we can just afford to take the changes and whenever a change to a profile happens then we update the cache and otherwise we just leave it there in a cache long term so somehow we need some kind of mechanism for detecting changes to people's profiles so we need some kind of stream or profile changes we could call that say a profile change event to a profile edit event and this is probably like horrendous Froy from you from a DTD point of view but like this is just as a first draft how you might structure that kind of event is okay you could say that at some point somebody changed their profile and their old location was London their new location is Brussels and they change the industry as well you could imagine maybe this should be two separate events like a chain location changed event and a industry changed event or so but the principle remains that somehow we've captured this change to the profile in an immutable event which we can stick on the stream and now if you've got the stream of events these profile edit events we can now feed that into a stream processor as well I've just labeled this Sam so here at the moment because that's the particular the particular system that I happen to work on but really any stream processor will do this here so you take this stream of events every time somebody updates their profile and you write that to either invalidate your entry in the cache or actually you know we might as well just call that a database because at this point we're kind of keeping a copy of the profiles database locally for for the stream processing job because this database now doesn't need to be shared with anybody else anymore we assume that in this kind of event sourcing style of thinking their stream of profile edit events contains all of the events that we might ever need in order to reconstitute the entire state of this profiles database and so therefore we can just say ok this is our own private copy of the profiles database that's quite nice from an operational point of view and now we've got the page view events and that also feeds into the same into the same stream processing tasks and now it can query that database for the latest profile every time an event comes in and then of course emits this composite output event so it looks not too different from what we had except the big difference is now that this profiles database is not somewhere shared with various other systems it's our own private copy of the database which means we can actually stick it inside the stream processor in fact we can put it in the same process on the same operating system on the same machine as the the code handling the data so we don't have a network up for every single message lookup which is great for the throughput because the whole problem with this discrepancy of two orders of magnitude throughput difference is well you know if you'd making a database query over the network for every single message well that's just going to take some round-trip times whereas here we can actually embed this profile database right there inside the job and you know with that this problem goes away because now although we still have this issue that the throughput goes down for a while and then spikes up but the space that we're hammering is just local on the stream processing machines so it's not affecting any of the rest of the system at all so from an operational point of view this is great so this is kind of a way how we handle this sort of pattern of a join of a stream with table like data except the table is also expressed in the form of a stream of events because that is what allows us to do this updating of the table whenever something changes okay so let's make this a bit more concrete I'll talk about one particular open source project that I've been involved with called Apache Kafka so this was originally developed as LinkedIn for exactly this kind of purpose of aggregating logs and processing events it's now an open source project at the Apache Software Foundation is fairly widely used and the principle it's built on is actually astonishingly simple it's it's actually very much like log files in your typical application server you note on output these plain text files which contain the log messages that were emitted somewhere within the code and these files are append only files which consists of records every record is like a line in the log so the record separators the newline character and they contain whatever information this thing happens to want to log like in a typical web server log it might be some some details like this and you just got this ordered sequence of Records and the way Kafka was intended to be used the way it was initially designed was actually very much for just taking log files across a whole fleet of different servers and bringing them together so did you can then feed them into analytic systems and for the recommendation systems that we talked about earlier but also for operational purposes like tracking down where the errors are happening and such like and so you know that on the whole this is this is kind of a fairly mundane but useful operational task to do and the main thing that katka does is it does this kind of aggregation of log files at a rather large scale so the the scale we're talking about here is like over a trillion events per day which is the rate at which LinkedIn is currently pumping data through Kafka Netflix is doing almost yeah almost the same rate of events and you know uber and Twitter and various other companies that are similarly using Kafka at fairly large scale so this kind of volume you obviously can't pump through a single machine because it's way more than a single network interface will allow you to to push through it so of course this is a distributed across multiple machines what it also does is replicate data across multiple machines so that when you lose one machine then another one can take over and you don't lose any data in the process and it tries really quite hard to not lose data and so this kind of thing starts having sort of database like characteristics already so even though it's it's not really a database but it stores all of its data on disk keeps the history of all of these events for quite a long time and so Kafka in general you know is not just for these web server server log events but really in general streams of events where you know the event could be anything that happened it could be like a web server log an HTTP request literally happened but it could be kind of product level activity events which are used for tracking and analytics purposes like we talked about it could also be kind of sensor style events where like once per minute you're measuring the CPU utilization of a server like the temperature of a CPU core or whatever and just omit that as periodic events and actually Kafka works pretty well for this style of event - and is actually used for that - but almost the most interesting one I see is these database change events and that's where we get kind of closest to the world of event sourcing actually and so this is really just another kind of event where like as we discussed the the content of the event is some change that happen to data which might be expressed in sort of a low level before and after you or it might actually be an event sourcing like a fully designed event describing the change that happened from a business logic point of view but all of these deal with these streams of events where each event just describes the fact that something happened at some point in time and that's an immutable fact another way that Kafka works internally is actually rather like the UNIX tool tale if you've used that before so it could do tale - f on a file and it'll just follow the file for any things that are appended to the end of the file and print it out on the console whenever something is appended and this is actually exactly how Kafka works except it's distributed it's like a distributed version of tale - f so rather than having one file we've actually got several files in parallel because that allows us to distribute our tition the whole thing across multiple machines but each of these is at least conceptually an append-only file of Records and so whenever somebody wants to publish a new message to Kafka - the stream of messages or the stream of events then it simply gets appended to the end of one of these files and when to when you want to consume the stream of messages what you have is is like tail - F each consumer has a current position into each of these partitions and then it just sequentially reads from that position and so because these messages these events are totally ordered we can say that okay each each each event in a partition has a sequence number on offset as we call it and the consumer simply points at the at the offset Dubbs is the latest message that it has heard of and then it knows because it consumes the messages sequentially all of the messages where the lower AFET have already been processed all of the messages with a higher offset have not yet been processed so it simply iterates through the messages in a partition in a single threaded way now it makes it really simple to deal with because it just needs to periodically checkpoint its offset to stable storage somewhere and that way even if it crashes starts up again it can just resume processing at the point where it last was however when when the thing is split across multiple partitions you have to keep an offset per partition but the splitting across multiple partitions also helps with scaling again because if each individual partition is a single threaded consumer you can have different partitions being consumed independently on different threads on different machines and so that way you still get your paralyzation okay and now if the type of data we're dealing with is this sort of database change event sourcing like data then you can imagine each event being taken in and being written to an index and this is somewhat like when event sourcing talks about taking the events and applying them to an aggregate which might just be an in-memory object what might actually be something that's persisted on disk somewhere that's what I'm talking about here so a consumer simply takes the events on a partition and over time it just kind of more event get appended at the end and as they get appended the consumer takes them and applies them to its local version of this data so it just kind of keeps moving forward in time there at this point it might be actually interesting to contrast Kafka with other types of messaging systems so you're probably aware of AMQP or JMS style message queues which seem to kind of serve a similar purpose of you know getting messages from some producers to some consumers and on that high level they are similar absolutely the biggest difference with between Kafka and these JMS AMQP style queues is to do with message acknowledgments and so that is how does the broker or how does the system as a whole know which consumer has successfully processed which message so that if something crashes you know which messages you have to replay and as you're probably aware the the model in AMQP and JMS is that every time the broker delivers a message to a consumer the consumer then has some time to process it and eventually once it's successfully processed it it'll acknowledge the message to the broker and that way the broker knows okay tick that message was successfully processed I don't need to keep that message anymore and occasionally something fails like maybe a message was explicitly marked as failed for some reason it crashed while processing or like somebody pulled out the power under machine in which case it simply times out after a while and what these message brokers do in this case is well of course we don't want to simply lose messages we have to make sure that every message is reliably delivered so a failed message will get re delivered but the consequence of this redelivery is now that the messages appear out of order potentially at the consumers and now in a kind of job queue use case if you're what you're trying to do is like the messages you're sending on the queue are requests to like please can you send this email for me or please can you charge this credit card for me it doesn't really matter too much in which order those things are processed you know as long as they all get done eventually that's fine and so for this job type job queue type use case the AMQP and JMS style message queues are exactly what you want whereas for an event log well if you're describing the sequence of changes to some database the ordering of those events is really important because you know whether X was first set to 5 and then set to 6 or the other way around first set to 6 and then 7 5 well that makes quite a big difference for the final outcome whether the final outcome is 5 or 6 and so this type event blog is where you want to totally totally preserve the ordering of the messages and make sure that everyone all of the consumers always see the messages in the same order and this is the kind of world that kefka lives in so because for this reason I don't really see Kafka as a competitor to JMS or MPP style message queues at all they really targets totally different types of use cases in fact a Catco type use case looks a lot more like database replication so if you think about the typical master-slave replication or I prefer I prefer to call it leaders and followers or primary secondary whatever you want to call it what they do is they accept all of the rights on deleted database and then replicate this change change stream through either like a physical writer head log that gets shipped over the network or through logical change sets logical chain stream this gets replicated over to the replicas and the ordering of course of the events there's totally critical because what you want is that after after all of the messages have been exchanged all of the replicas are in the same state that's the whole point of replications to have the copy of the same data on multiple machines so this kind of order preserving message streams look a lot like these kind of databases now there's one very nice thing that event sourcing talks about which is this complete keeping a complete history of all of the changes that have ever happened perhaps you make checkpoints and compact it down from time to time if it's in danger of getting too big but conceptually at least you have this full history and what that means is that of course you can rebuild the state of something off that complete history of changes whenever you need to and this is actually used quite effectively in Kafka esque systems as well so for example we talked about this maintaining an index earlier you can imagine this being for example a full-text search index actually you know for purposes of serving the search feature on your website you've got to take all of the items that are in your database and somehow index them by keyword and there's some databases which do full-text search integrated into the same database engine as as like your typical OLTP storage but in most cases actually you want a separate system for that and so you know what happens in that situation if you want to build a new index because you've got a new product feature and you need to take your existing data but somehow presented in a new way and similarly this stream of changes gives you exactly the tool that you need for that because you can start off with a brand new index server containing no data at all it's completely empty and you started off at the beginning of the stream and then you just through all the messages in a single-threaded manner very slowly until eventually you reach the head and even if this takes a few days that's not a problem as long as we can keep the complete history of all of the changes that have happened here we can run through this entire thing and we can build a new index which now is up to date with all of the data that ever happened and so from like a data infrastructure and operational point of view this is a wonderful thing to have now you can imagine this running as a background process at the same time the users are still reading from the old index so that's why it doesn't matter if this process takes a while it's just kind of a one-off batch process that you leave running for a while and your operational systems are still being kept up-to-date here with the latest changes being fed in from the log but at some point the new index is up to date and so at some point you can then start switching users over to reading from the new index instead of from the old index and then you can check do your AP tests whatever make sure that everything is working correctly and eventually you decide ok everything is working correctly now we can stop updating that old index we no longer need it and you can just throw it away and so this has been like a really useful way of building these systems that can gradually migrate from one data system to another without downtime of course this depends on keeping this complete history of all of the changes that ever happened and in in certain low volume use cases then it might actually be sufficient to simply keep literally all of the events but in general there will be some situations in which data gets too expensive because we simply don't have infinite disk space and so for that Kafka has a feature called log compaction which is kind of it's it's only database like feature really so caf-co doesn't have indexes or anything like that it's really just like log files streams of events that are stuck appended onto the end of files but one thing it can do is if you have a key on events in in this case each event has a key in the value the key is a B or C the value is a number Kafka can perform log compaction on this and what that means is that it's guarantees that it will keep at least the most recent message for a given key if there are several different messages with the same key then it's allowed to throw away all the messages for that particular key but the most recent message for a key is kept forever until at some point a later message with the same key appears or unless that message is explicitly deleted and so what this means is that eventually the size of this log actually compacts down to be proportional to the number of keys in the database ie the size of the database it's no longer proportional to the number of updates that have ever happened it's only proportional to the size of the database and while katka keeps all of this stuff on disk anyway so if you can keep it in a database you can also keep it in Kafka and then once you've got it there well you can do this index building and building downstream systems and feed it into your analytic systems and do your stream table drawings and so on so all of these things become very nicely possible now and so that kind of just leaves the operators of stream processing so now that we've taken these activity streams like pay to events and we've taken the database change events like these profile edit events we've put them all in streams so now we can operate on them in a uniform way so we can write these operators which like for example perform a map operations on every message they see they'll potentially transform it into some other message or filter out messages that they don't want and so you can now start producing these derived streams where for example you filter down to only those where X is greater than 10 to give a stupid example or multiply each by to whatever you want or you can perform aggregations now this is aggregations in the database sense not in the DDD aggregate sense although it's somewhat related I guess like for example at the number of messages you've seen with a particular key and and so what stream processing frameworks like sums are the one that I worked on what they do is they give you tools for writing these kind of operators so the streams are provided by kafka and the implementation of these operators you can do in a stream processing framework of course we're talking about these high volume streams here so they also need to be scalable but the way that is done is simply by input partition so we've got each partition here like each individual partition can handle maybe say 10 megabytes per second order of magnitude throughput but you can horizontally scale that by just having as many partitions across as many servers as you want and now each processor will take the events that happen on exactly one partition and just process them in a single threaded way and because each message is quick to process you know all you've got to do is increment a counter or write it to an index you're not like sending emails based on every single message and so this is feasible to run in a single threaded way but then the output could be found out to many different partitions of the output streams and so this allows you to do regrouping type things so if you want to make sure that all of the messages with a certain key appear in the same partition you can have a job like this which we partitions and effectively routes them to the correct destination and then a second stage which then actually processes all of the messages with the same key and this allows us to build these kind of systems quite nicely if we want to do these joins of several streams we can do that with these partition streams as well by Co partitioning so we make sure that all of the page view events for user one appear in partition one of this stream and we make sure that all of the profile edit events for user one appear in partition one of the profile edit stream so that means partition one contains all of the data for user 1 and similarly like user 5300 will maybe be in partition 3 or whatever and now the processing can then simply be partitioned so that well partition one of this of the profile edits and partition one of the pageviews get routed to the same task now this task is again just a single threaded process which is going to sit there churning through all of the events and it see is pageviews and it sees profile updates and so it can maintain its little local copy of the index can that index contains now only the profiles for the users within that particular partition so now we've taken our hundreds of millions of users and broken them down into manageable chunks but we can still be sure that we'll see all the right events because we've written all of the events for the same user to the same processing task and now this is really the kind of stuff that these stream processing is distributed stream processing frameworks help with this little database see on which you can maintain for example that index of profiles is provided by an embedded database such as rocks DB which is this thing that Facebook open source and and so it lives actually within the same JVM process as as this as the actual stream processing task so access to this thing is very fast now it's also not very fault tolerant because if you lose this processing machine you lose the local state that was stored there but actually that's fine because we designed this whole thing so that we can restore the state of the log of events and so in the worst case if you lose the data are well no problem you just spin it up on a new machine process that long history of of events that we logged in Kafka and thus you can restore the state that you lost here again and so this is really where I sees these interesting parallels where in ddd using events for the purposes for the purpose of modeling a business domain and really understanding what is going on in a business domain and reducing complexity there and so that helps us understand the business logic part of the system better in parallel it seems that people independently in this Internet companies world have come up with a very similar-looking idea under their name of stream processing solving a very different problem though solving this problem of scaling large data systems but I find it very interesting that these similar ideas have appeared in two very different communities that seem to barely talk to each other which suggests to me that maybe there's something fundamental underlying about this idea that makes it important that's about as far as I am with my thinking other people have written lots about this so you can you can find a full list of references on the website where put up the slides more thinking about this also in the book which is not finished yet but you can find an early release on of that of the first eight or nine chapters on data intensive dotnet if you're so inclined and I hope we still have a few minutes for questions because I'd love to hear what you think thank you how do we handle the ordering problem when it's code partitioned yes a good question so in this case the the ordering of events across two different partitions actually it's not defined so des there's nothing in cat got said this event in this partition happened before or after this event in this other partition there's no ordering information there so by default what these things will simply do is asynchronously consume both streams and you'll end up with some kind of random interleaving so it's it's arbitrary at that point and for these kind of analytics type use cases whether a profile change happened just before or after a particular page view is normally not particularly important and so in that case this non determinism is actually acceptable what they have been looking into recently actually is techniques for giving stronger ordering guarantees here and so you have to be kind of careful there to not destroy the scalability of the system because if you have to pump all of the events through a single CPU core then this would grind to a halt so some of the stuff I'm researching at the moment is actually about providing stronger ordering guarantees across different partitions using FN effectively a kind of time slicing approach so you could define slices across all partitions which saying okay all of these correspond to one logical point in time and then you know that messages before and after that checkpoints have to be ordered correctly across all partitions and so that gives you a a certain granularity you could these checkpoints once per second for example so that way it's not a big overhead on the infrastructure but it still gives you reasonably good granularity of ordering a question was how reliable is the persistence on Kafka it's been gradually developing over time so it was originally designed designed for these kind of web server log aggregation purposes where you know if you lose 1% of your messages is actually not a big deal and so like the first version of Kafka didn't actually have any replication so every broker was a single point of failure effectively but what's been interesting is that as companies like LinkedIn and others have been developing more operational experience with Kafka and it's become more mature actually it's durability expectations have been gradually increasing as well and so the state is app now is that every message gets synchronously replicated across multiple brokers you can tune the exact replication factor and how many synchronous acknowledgments it waits for but you know that's in terms of replication strength that's already on the level of many relational databases replication systems and some of the stuff I'm looking at the moment is about adding integrity proofs to Kafka so dudes the system can actually prove cryptographically that all of the data is there and correct and so it can be continuously audited that way some other people are also looking into adding transaction support for Kafka which would allow you to atomically publish messages to several different partitions as one atomic action and get an definite commit or abort on that and so this is still it's still fairly young technology so I won't oversell it definitely but it's the trend is definitely that has been moving towards use more and more like database like guarantees and so LinkedIn for example has this internal document database called data bus and it's now actually using Kafka for its cross data center replication and you know that's really a situation where you can't afford to lose any messages at all because it's database replication it has totally correct so it's been gradually moving to those more and more stronger type use cases yes please stop the world GT problems so well with rocks TBD are using their particularly its user Jay and I for off heap so that the memory use their actually doesn't affect the GC times for the JVM as a kind of more general point like yes sometimes people have problems with pauses with with with long GC pauses which it's most critical of course in online systems which are serving user traffic we're actually users get error messages if your system is G seeing most of the stream processing things we've been building have been these kind of not quite online systems where the we have maybe some latency expectations of normally get messages through the pipeline within a second say occasionally we can tolerate it spiking up to thirty seconds and so in that case is having a GC pause somewhere actually isn't all that critical because the system has been designed to accept a certain amount of latency anyway example you gave of that people viewing profiles as they don't have all information in a black video references or profile ID that Oldman's taste the data to use that these work openings opening cook up something like spark or pent out about just to the cattle screens so what was the question about embedding the denormalized data in the event so about the joining s so there's there's kind of different tools for that people are using and developing diets it's quite a fast-moving area actually so if you want to do these kind of joins they're like I don't say this kind of thing if you implement that as as an operator with Sam's Earth this system I talked about then you actually write code to do that so it's quite a quite a manual process there are efforts to make nicer higher-level tools where you can just somehow say declaratively say okay I want to join this stream with this stream on this particular key and it generates these operators automatically internally and so SPARC is doing some area and Sansa is actually adding support for sequel sequel queries which get compiled down to these kind of operators so that's it's it's a rapidly evolving area I think over the next few years we'll probably see a lot more of that so it's at the moment a lot of these tools are quite low level and deliberately so because they're trying to become really operationally robust at the low level before adding hansi fancy high-level features and then gradually moving up to higher levels one let's back this so my my instinct is that things like Africa are designed to be used in a distributed setup from the start for full tolerance as much as anything else so so if you run it on just a single nodes I think that the benefits would be limited but if you start running it across two or three servers and just not even partitioning the data but just replicating its then at least you're taking advantage of that replication already and at that point it starts maybe becoming interesting and actually two or three servers will be enough for many use cases like there's some benchmarks pushing I think two million messages per second through three servers and they're not expensive servers just kind of standards small rack-mounted ones so at that point it I guess it comes down to like to what degree do you want to be exploratory with technology because that these are totally admittedly these are still young fairly immature tools you can get some interesting benefits from them but it won't be as smooth riding as just using a standard relational database say so I think it's probably worth starting to look at these things and just kind of starting to think about use cases where you might use them but I wouldn't like jump on it immediately and start doing everything this way yes please yes yes it's fundamentally asynchronous there's there's not very much you can do about that without really coupling systems very closely and synchronously together well the key is you would need somehow some linearizable queries to an external system so like if that's super critical to you and you're willing to pay that latency cost of making a query to an external system then sure you can do that there's nothing stopping you from doing that for the kind of the the middle ground where like you want certain guarantees but you don't want to make a network request for every single query that then comes back to what we're discussing about ordering so there if you can then enforce some message ordering across different partitions then you could say okay this particular point in time we know that the set up the page you event happened before the profile edit event or it happened after the profile edit event so in that case you know you're there's a well-defined ordering and so you have to process them in the correct order in the consumer then so that starts becoming possible if that ordering information is reflected explicitly last question okay so I'll be around so feel free to come and talk to me thank you very much
Info
Channel: Domain-Driven Design Europe
Views: 42,062
Rating: undefined out of 5
Keywords: event sourcing, stream processing, cqrs, ddd, dddesign, domain-driven design, software, apahe kafka, apache samza, software architecture, messaging, ddd europe, dddeu, domain-driven design europe
Id: avi-TZI9t2I
Channel Id: undefined
Length: 51min 34sec (3094 seconds)
Published: Mon May 23 2016
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.