Moving on from RocksDB to something FASTER - Matthew Brookes

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
sitting between us now and the coffee break there's a few things I'd like to discuss today so the first thing is to go over some of our current approaches to storing operators state in popular data streaming platforms that we all know and love secondly I'd like to introduce both rocks DB and faster just to get an idea how many people are familiar with rocks DB like a lot okay great never talk to people who know more about your area than you're presenting and how many people know about faster okay a lot less that's good for me what I'd like to do after introducing both of rocks DV and faster is kind of compare their suitability for different streaming workloads quite what that means will make more sense later on hopefully and then finally conclude by explaining what that means for flink so if we go first the current approach is to storing operators state the topic of state management is massive I mean over these next two days there's lots of different talks with state management somewhere in the abstract what I mean here when I'm talking with state is specifically about where do we store state so in the first case when we started with data flow program the obvious place to store state is on the heap within the process is memory so this is really good for things like debugging fast iteration of our new data flow programs but at some point we realized that we really need to step it up and we're to have to start storing more state than we have available memory on a single machine and then what we might do is actually start storing it in a file possibly on a remote filesystem using a remote filesystem gives us advantages of using it on a single machine for some of the kind of obvious reasons of being able to rely on fault tolerance of remote filesystem like HDFS or something but at some point our file abstraction is not really quite what we need for our data store for stream processing so we move on and we decide to use a dedicated data store now this data store we either have two options really we can either embed it in the pro so this is what we're doing with rocks TV or we have a remotely accessed data store something like Cassandra HBase so why would we use a dedicated data store one of the main reasons I think is because we all love abstractions a datastore often provides us with a simple abstraction something like a key value model and that kind of moves us away from a physical model by physical I don't really mean looking to much of the low-level implementation details but the idea of how state is being arranged on hard drives and things secondly allows us to outsource responsibility for fault tolerance most data stores have some form of fault tolerance this includes the ones I'm talking about today both rocks DB and faster it allows multiple processes to share state especially if we have something like a remote data store we can have multiple multiple processes on multiple machines accessing the same shared State however I argue that this may not be actually that useful and I come back to this point later on finally it really allows us to separate responsibilities we let the data store get on with doing its optimizations it's looking after how to store our state to make it available to us we as dataflow programmers we just get on with writing the actual code and solving the problem that we care about so I said there's really two options with using a dedicated datastore whether to store remotely or store locally and both options are you equally valid really if we look here once using remote data stores we have a patchy storm with the Trident API relying on Cassandra and then we also have Google Cloud dataflow which is then using BigTable Apache Sam's Apache flink both very popular stream processing systems they actually choose to store their state locally within the process operating the data flow so when choosing a local data store if I'm a Samsa flink or indeed Apache Kafka there's only really one option that I've chosen over the last few years and that's been rocks DB so Roxie B is a very very popular embedded key value store it is real the de facto choice for choosing to store state within a process and what I'd like to do throughout the rest of the presentation is maybe challenge that assumption the Roxx DB should be the go-to for embedded bedded datastore so rocks DB to give an overview of rocks DB then it started life at Google as level DB and then was forked off for people who don't know it's an embedded key value store for arbitrary byte streams that means basically within a single rocks DB instance I can store whatever state I like as long as I serialize it down to a byte array it provides the basic operations that you might expect sort of inserts deletions gets but also more advanced operations such as merging multiple values together and also performing compare applying compaction filters when deciding on values to retain and things it allows very efficient range scans using prefix bloom filters and the architecture of rocks DB is based on a log structured merge street and LSM tree I'm going to give now a kind of a brief description of how an LSM tree works to try to motivate an understanding of the implications of fastest structure so when writing key value pairs we start by appending them to a log so this top here this is called a mem table in rocks DB it's just a an append-only structure that we have residing in memory at some point this structure becomes full and so we mark it as immutable and we spawn a new one this is now where we're gonna start appending onto our new key value pairs this process kind of continues until at some point we've run out of space and we have to perform a compaction so a compaction is taking multiple member tables from above in this example to for simplicity and it's effectively saw performing a merge sort so it's combining these two higher level mem tables together to give us the compacted view now a nice feature of this merge sort is that the result is that these are two are both sorted this kind of gives us a clue as to why range scans are so efficient because we have all the data here sorted in this level the process kind of continues and we can have more compactions as we do more operations in the top-level atmora pens we can have we can trigger further compactions further down in the tree it's quite like it's a recursive process this happening for simplicity I've just shown a happening directly straight after but in reality we would be able to configure how often these things happen so in box DB we have this top-level mem table as I said in memory and then we have all these these are called SS tables the ones which were actually storing on the disk rocks DB if you work CV you know has a million configuration options we can control the size of these different things how many of them exists before we start doing compactions and things we can also perform manual compactions of our LSM tree but i hope this is given a kind of rough intuition for the design of rocks DB we look at its full architecture now load we see ok the simplest thing is on disk we have these SST files we're restoring these lower levels of the LSM tree and then in memory we have both our mutable and immutable mem table so remember the mutable one is the one we're currently writing to and the immutable one is the one which we've closed but we're just waiting to be able to compact and flush the disk we then have a block cache for when we're accessing blocks our SST files we have a manifest file which is storing metadata about our blocks and we have a block index to be able to easily find blocks within SST files so having seen the design and the architecture of rocks DB I think it's important to look at what that kind of means in terms of using rocks db2 for storing state one consequence is a very high write throughput so I said that on this mem table we just have this append-only structure so that means we can work on it really quickly it's a really fast kind of concurrent action just to append on to these mem tables our compactions that we do can happen asynchronously in our diagram earlier I kind of showed that all four of mmm tables filled up and then we did this synchronous compaction in the real world it's not like that maybe three out of my four ones are filled up and then I do a background compaction while still being able to append to the fourth open mem table read requests may access multiple levels of the tree so my data becomes replicated in multiple different levels of the of the tree and it has to look in different levels to find the latest data this is called read amplification and amplification is this kind of measure of how many operations do I have to do compared to my one desired operation so if I have to if I want to write something if I have to write it three times and I have a read and write amplification of three and we find with rocks DB that we have quite a high fact to have read amplification and the final thing is to say because we have this append-only structure where we want to make a change we have to read the old change copy it to the end of the log that we're appending to with the updated value this is an expensive operation to perform so moving now on to faster I only do the same thing go through our like an overview of faster the design its implications everything so fast there is an embedded key value store just as we have before but now for arbitrary data types so I go say that again it's for arbitrary data types it's not as before where we had to serialize our values down to a byte array we are actually now able to encode in design of the faster instance the actual representation of our data we decide how it gets represented on fastest log which allows us to do some kind of clever things and avoid the need for serializing faster exploits the strong temporal ocala T in streaming workloads what does that mean it means that in streaming workloads we typically find that subset of total state is very frequently accessed so we call this the hot state what faster does is it separates its entire state that it's storing into an in-memory hot and on this cold set so that state that were accessing frequently is available in memory and state that we access less often is going to be stored on disk waiting for us when we do actually need it first is much simpler in terms of operations that support it supports point reads blind upsets and a read-modify-write operations these operations are all user-defined functions so I said that we storing arbitrary data types what faster does is it pushes the responsibility onto the application developer to decide how do you want to implement an upset how do you want to implement read modify write operation for example if I have a my data type is a pair of summon count if I'm doing an average or something like this then I want my read modify write to be adding values on to both of those fields and if say my data type is a list of int then I want my read modify write maybe to be an NQ on the back or push on to the front this is responsibility of me as the application developer to decide how to implement these operations and the core the structure of faster is a hybrid lock which supports in place of bigs so here's fastest design this is the hybrid lock which we can think of from a logical level as a contiguous space which is then divided into a portion which is stored on disk and a portion which is stored in memory so starting then from left and right make sense on disk we have the elements which are part of this cold set so these are elements which would have begun life at the end here on the right as new records at the end of the mutable section because they've not been accessed they've kind of ended up falling off the end of this logical address space as the stable period is overtaking it has become resident on disk we then have at the end of this in-memory period between on disk and mutable this read-only part but it's really the only part is actually pretty clever because it acts as a second chance cache and what that does is it reduces the chance that a hot item is going to be spilled to disk because as I said they stay period it kind of grows and maybe it's reaching a hot item and it gets actually put in this read-only section if it gets accessed again it gets moved to the front to the end of the mutable section and it won't be evicted onto the disk and then finally we have this mutable portion so this mutable portion is where we have in place updates supported that means it's super quick to make a change we don't have to do this expensive read copy update operation that we had to do with either rocks DB or when we access something from the stable period or the read-only period period I mean like section the read-only section of them so the entire architecture is here summarized on one slide with only two things and I really like this about faster it's a really simple architecture we have this hybrid lock which we can imagine as developers is just a contiguous logical address space and then a hash index the hash index resides in memory and is actually very cache optimized so there's lots of care taken in the code in the implementation to make sure that for example like a hash bucket entry is just one cache line regardless like the architecture being used so it's considerably simpler the simpler than rocks DB a lot fewer configuration options obviously there's arguments whether that's good or that's bad for me I think it's really good as a developer just to be able to spawn a for faster instance provide a size for a hash index size for the hybrid log and go as opposed to spending hours and hours tweaking a rocks DB configuration so the design implicate implications then of having this hybrid lock in this hash index in the architecture it has a very high read and write throughput and this is especially true when we're accessing the hot set state which is accessed often is just going to be available in memory and it's almost as if we were going back to that original place of storing our state just on the application heap obviously if we have to access it from disk it's more expensive because we have to do a retrieval but in the general case in our hot case it's very cheap fasters internal design is locked free and this allows very scalable multi-threaded access faster and supports that kind of a publish/subscribe mechanism so that threads can dynamically come and take part in the operation in its internal operations or without having to take locks and things which is quite in contrast to rocks TV where there's quite a lot of locking controlling a lot of operations fasters in-place updates reduce write amplification massively if I'm making frequent updates to a hot set maybe I'm I don't know counting the number of tweets made by Beyonce in the last day you know this is only taking me up one space in the in the whole hybrid log are not having to perform any kind of read copy update and for really increasing writes amplification in the way that rocks DV would then do so finally Theo scans are expensive if I want to do a scan I basically have to start at the beginning of the log and work all the way to the end to find all the different things which are in it this is not very good as we see later for doing something like windowing in a stream processing system oh and actually I wasn't my vernal point single first instance can only be used for one key value type so this kind of a subtlety in saying it so quickly that in the past when we used rocks DB we said ok here is our rocks DB this is our one database you use it for the whole of your stream processing you can store whatever it is you like in there now that we have to tailor it to one data model we can only use one instance per per state primitive basic key value type so I think that there needs explanation of what that means for using faster for storing operator state if in the first case then we like our way of doing rocks DB that we have four rocks DB is very generic mechanism to be able to store state however we like and we want to move towards doing that for faster then we have to look at how we actually represent keys and our values and it goes back again to serialization the same as we did with rocks DB is we have to represent our keys and our values in some kind of byte buffer so if we want to make a generic key value type then there's some things we need besides just the actual buffer that we're using to store the gist or the key or slow the value clearly we need the size of the buffer we use the size then to make sure that we're reading out the correct amount from the buffer but for our value we need a couple of extra pieces of information which have kind of an impact on the size of the the value class entirely and its scalability firstly this value length we support in place updates in this generic approach but an in-place update doesn't necessarily mean that new value is going to be the same size it might be smaller so if our new value is smaller we can still store it in the same byte buffer we still have 20 bytes reserved or something our new value is 12 bytes we just store it in there and just store the length we say that there's only 12 bytes of kind of usable space on the flipside by the way if we want it to make it larger if we have a 20 bytes reserve we don't need a 30 by thing then we have to do an RCU we have to make a new value because we need more space and then the last thing we have in this value type is a atomic gen lock so I said couple of slides earlier that faster was really nice and lock free super scalable but the reality is if we want to store just like an arbitrary byte array then we have to protect against multiple different multiple threads changing a value at the same time so we put a lock in there and writers have to acquire the lock before they're able to make a change so this design by the way this way of having an arbitrary key value is what we do in faster RS so faster RS co.design by myself and Max Meldrum who we probably giving the taught later we have created this very generic wrapper for faster to be able to store any serializable State and I have a link at the end to that if you'd like to check it out but by I don't know question is is all this overhead for arbitrary state necessary so we are allowed to store arbitrary state in Rox dB we've now come up with a way of doing it for faster but is this necessary and I think we make two observations the first one is that streaming jobs are data parallel that is that one worker is probably going to be responsible for the subset of the input that no other worker then has to worry about at any time during its during its processing so as a result do we need all these locks and things if I'm the only person who's going to edit the data I don't know the number of tweets by Beyonce then I don't need to acquire a lock every time I try to modify it because nobody else is going to try and modify it under my nose and the second point that I make is that we know the types of state that we're using are stream processing in advance we're not being super dynamic we're just retrieving receiving new data types and then having to store them in the way that rocks DB is kind of designed for we know what we're going to be storing in our stream processing job so let's optimize our system to be able to store that kind of data and we can start looking now at the advantages of using a hand-tuned store so in this experiment what we've done is we've implemented the why CSV benchmark which is from Yahoo it basically is a store key value of integers integers and we process a sequence of friends a transaction just to store and read integers from the database and we've done this using both the hand-tuned implementations so this is one which knows that it's going to be storing integers so it's able to just write the integers directly and then one which is generic the faster RS that we've seen earlier which has to do this realization acquiring a lock storing the extra information such as the size and the length and in our general case of a 50/50 read/write workload we can actually see there's a 1.6 X speed up by using a hand-tuned store rather than our generic store so to get to the crux of the issue now is to kind of compare faster and ROCs DB in a streaming workload setting for this benchmarking what we do is we implement a range of next mark queries so next mark is a streaming benchmark designed to replicate traditional rep relational operators things like maps joins filters aggregations and it models an online auction service so what we have is we have streams of bids auctions people making bids people making auctions and different queries you have replicating relational operators we run these on a ec2 instance which is equipped with an SSD because our experiments will be based on like larger than memory workloads our integration we do an integration between faster RS so the rust wrapper and then timely data flow so timely data flow is a stream processing system based on the nayad data flow model I don't want to talk about it too much because it fundamentally works the same way as something like flink apart from we can have cyclic data flows with everything we've really designed to be data parallel we take a logical data flow graph and it's just a runtime for working out how to execute it so first query we're going to look at is next mark query 3 queries 1 & 2 don't contain States so if I seem strange I just jump to 3 it's because of that query 3 then is an incremental join between two different streams and auctions and a person stream and the state that we're maintaining in this query is with maintaining this these auctions and these persons relations so that when we receive an auction or a person from the stream we're able to look it up in the corresponding relation and output the results of our match it's an incremental join which means that it continually accumulates state throughout processing so I put this graph up the party needs a little bit of explaining before jumping straight into it what we do to collect these results is we measure the per record latency when we're processing tuples and we calculate the CCDF which is the complementary cumulative distributive function or something like that and so the simpler way to read it instead of reading the numbers on the left these ten times whatever is to think of these in terms of a p90 latency a p99 latency P 999 as you go on so what the graph shows us is that mostly the performance is not very different actually between faster and ROCs DB until we reach around a p99 3 or so and then we start seeing this astronomical difference so we start seeing rocks DB actually having a basically two orders of magnitude latency being worse compared to using faster and I mean that's really quite terrible then we're having records being processed in basically over a second by using rocks DB so the next experiment we do is with query for of next mark next mark query for is reporting the average closing price of auctions grouped by their category so the state we store for this is we're storing the sum and the count for each category and we're also recording the bids for each auction once again when we look at the graph we can look and see that there's similar performance in fact for some it looks like rocks TV is beating faster up until this like kind of pain 99 p95 sorry and then we see an increase of rocks TV here is only about an order of an order magnitude difference but still rocks DB is is losing considerably compared to faster I want to go now into query 5 query 5 now is a bit different because it now we have windowing so here we have a sliding window which is reporting the auction with the most number of bits and the state is maintaining is account for each auction and then we're applying our windowing logic if we look then at the graph it's much closer in this one but still we see faster outperforming rocks DB and this when it begins to look a little bit like I'm just giving a sales pitch for faster and that this isn't very interesting faster is just better of everything which we don't believe is the case so this brings us on now to some research that we're now doing it at ETH which is to look and see what happens if we change our window implementation perhaps we can actually elicit better performance from rocks DB potentially better than faster by changing the way in which we do windowing so these next couple of slides it's early results from ongoing work so bear with us a little bit on it happy to talk more in the break about how we are deriving these kind of results and things but if we change our windowing logic so that what we do you know in terms of applying a window operator is we buffer records in States ordered by their timestamp and then when a window triggers we retrieve the records belonging to the window by using a range scan so we go through from the start the window and then we search for all our records in the in the window based on timestamp and then apply our evaluation function in this case I think we're just counting the number of records so what this maybe not so clearly shows where let me say first what it is that we're comparing here in terms of the colors that might be easier so the green and the yellow colors we want to compare so that green is like rocks DB and yellow I know now green and yellow are both rocks DB sorry and then purple and blue are both faster so what this shows us here is that rocks DB is beating faster and there's actually two different experiments here because what we've done is varied the slide on the sliding window so for the ones so for the if we look on the faster we look at the purple is with a two second slide and then with a blue is with a fifty second slide and then for the rocks DB the green is it's the same respectively what we can see here for rocks DB is we actually have a similar performance regardless of the amount of slide that we have so whether we have a two second slide or a fifty second slide we basically have the same performance we because it's a range scan so what I'm doing is I'm just going to the beginning of my by my window and just doing like this scan through this nice sorted list and being able to just pick up records and it's kind of then agnostic whether I'm searching for two seconds or 50 seconds they laid out after each other contiguously so it's not a big performance difference faster on the other hand is different if faster we can see that the performance degrades as we increase the amount of slide this is because we're having to do basically 50 lookups as opposed to two lookup so if we have a two-second compared to a 50 second one that's why we see faster doing worse and this suggests then for this kind of implementation of a window we do better by sticking with good old rocks DB rather than moving on to new kid on the block faster but now let's look at a different way of doing windowing and I think this way of doing it is the flink approach and what we do is we assign window IDs when we're doing our kind of windowing operator and we store steak heat on these window IDs then when a window expires we can fetch the window content by doing a lookup using the window ID for this it looks like we have faster performing better so faster is the purple and the blue line again and then rocks dB no sorry faster is purple and green and then VOC CB is the blue and yellow but fundamentally rocks DB is further to the right which is bad compared to faster I would see here there's two different types of window based on the amount of slide so we have this small sliding window of five seconds which gives us a small performance difference and then with a tumbling window where we have this 100 second slide basically we have a much bigger difference between the two results and so the reason for this is because we're going back to what we saw previously with faster outperforming rocks DB in terms of it being better for this kind of just look up get put kind of operations so what does this mean for fling with a fling conference disclaimer here we were really hoping to have kind of an early stages integration between flink and rocks DB to kind of show off now see how that works especially with the windowing logic a fortunate we haven't managed to achieve that just yet it's something we can definitely leave for the community we have the early stage I mean we have an integration with rust which is what you could then use as a as a basis to then do an implementation with Java we just didn't really have the J&I experience to try and push through it in advance of this talk so I think the the lesson that we learned and could go forward them for flink is that when it comes to choosing a state back-end the easiest one to integrates definitely not going to be the best one we've become quite settled using rocks DB but it's definitely time to consider other options to be able to have low latency data stores we definitely say that we've shown generic data stores provide great flexibility but by actually looking at what our state access patterns are and how we're actually using our data and how its represented we can actually look at the using different state backends which might be more suited and also like a death in a way to the monolithic back-end this is gonna become really important when using faster because we faster we've got this kind of caching functionality and if you imagine that we would use one monolithic back-end for an entire job then we're gonna start seeing different operators kicking another operator state out the hybrid log just because of sharing the one back-end also if we're faster if we're only allowed one key value type per per instance then as we're gonna store different types of data we're gonna need different backends anyway different instances so I have a little bit of a proposal to make which is that when we do eventually come to the integrating faster with link we create a suite of hand-tuned faster instances for storing common data types you know so when a data flow developer knows that what they want to store is a summon account they can go and pick up this ready on the shelf faster instance which is optimized for storing pairs of integers similarly for things like lists of primitives we can always have some kind of community contribution of different hand-tuned faster instances which would be really good for storing state and the proposal is also that to do the work that we're undoing ongoing at the moment which is to look at the different ways in which state is being used not just window operators like I put here but also generally when people are doing stream processing what are the state access patterns and what what state backends could suit that so some open questions to lead on for more research for anybody who's interested is how do we accommodate fault tolerance and all this so faster has its own way of doing fault tolerance rocks debris has its own way of doing fault tolerance say we did an integration we like Cassandra or something how do we combine all these different fault tolerance mechanisms together that's an open question how do these different ones compare for something like queryable state which is very much a flink thing it's not something we have in tiny data flow so we've not got any research on that and really what other key value stores would it make sense to kind of compose with faster and Rox TV so with that I have to put the obligatory we're hiring slide at the end this is the result of a lot of work that I did during my master's thesis at ETH Zurich they're doing lots more cool work in the stream on group lots of interesting research engage stream processing systems feel free talk to myself or Vasya who was just on before me about what's going on there I myself I now work at mon-sol in London if you've heard of Monza and interested in opportunities there feel free to come and talk to me I don't work with flank so maybe that's a shout into the void but I'd like to live so with some resources we have here links to the original faster repo our implementation of Max and I of doing it faster in rust sorry all the queries using next mark here available on my github and then finally there's a link to my master's thesis which I did at ETH which has loads more information loads more graphs and description of getting the best out of faster I think so with that we have to take any questions [Applause] one of the advantages of using an append-only log is that you don't delete from the middle the impression I got is that the faster log does delete from the middle of the log is that correct so fastest way of doing it is to apply like a tombstone header and then when we eventually spill it to disk we can inspect it and see if the header is there then we don't spell it to disk so if you move something to the front of the log meaning that it's fresh or hot then you apply tombstone in the back and you copy it to the front no because if we're then doing say a look up later on then our hash index is just pointed to that most recent entry if I do a delete of something and I never reinsert it or anything I would have at the deletion point taken out of the hash index as well so won't be found so this contiguous logical address space kind of becomes restored once I would in theory of spilled it to disk but I just don't do that and I'm guessing you spill the oldest ones which are the most at the left yeah well Julie yeah okay thank you let's thank the speaker once again [Applause]
Info
Channel: Flink Forward
Views: 5,179
Rating: 4.8933334 out of 5
Keywords: flinkforward, apacheflink, stream, datastreaming, streamingapplications, ververica, scale, realtime, Read-Copy-Update
Id: xWNbbkQMtfI
Channel Id: undefined
Length: 36min 52sec (2212 seconds)
Published: Tue Oct 22 2019
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.