Introduction to Druid by Fangjin Yang

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
thanks hey guys um i want to thank you all for coming tonight and thank you so much to square for hosting this event my name is fenjin and i am a software engineer at a san francisco startup called metamarkets and tonight i wanted to talk to you guys about druid which is an open source distributed data store that i work on so some of the things we'll cover tonight i'll show a brief demo of a product that's been built on top of druid to motivate some of the problems that druid has been designed to solve i'll talk about the history and the motivation of why we decide to build our own data store and then subsequently open source it and then finally i'll talk a little bit high level of the architecture of druid where i'll show off some pictures with arrows and hopefully you guys can figure out what's going on so to really understand what the problems that druid is meant to solve i have a very brief demo of actually a dashboard that's been built on top of druid and this dashboard is meant to showcase edits as they're occurring on wikipedia so hopefully everyone here is pretty familiar with wikipedia there's a lot of pages about a lot of different cool content and basically anyone can go on and make an edit what happens when someone makes an edit is basically an event gets created and this event gets sent to an rc channel that we then scrape and dump into our data store so this event has a different a couple of different parameters it has a time stamp indicating when the event occurs it has a set of attributes or dimensions about the edit for example the name of the page that's being edited or the language of the page or like information about who's doing the edit like the user information there's also metrics associated with this event for example the number of characters that were added or the number of characters that were deleted so what this dashboard is meant to showcase is basically you can see as they're trending across time and you can filter on different attributes to see how those edits are changing so for example i can filter on country united states and i can filter on or city city or san francisco san francisco and then we can see that georgiaism is getting a lot of edits this week i have no idea what that is and also california culinary academy has been getting you know some love this week which is kind of cool and you can do things like you can compare how different pages are being edited across time you can zoom in on different time granularities and you can really explore your data you can drag like this graph back and forth and explore how your data is changing for different granularities of time okay so all the queries that you see right now are being issued effectively live to our data store called druid and if i drag this graph to the current time the ingestion should be relatively live as well so we should if people start making edits like right now we should actually be able to see them in this dashboard okay okay so the dashboard i just demoed is really a way to explore data uh explore specifically time series or transactional data in this particular case it was data for wikipedia um i work for a company called metamarkets and metamarkets is not a it's not a social media company we actually primarily deal with transactional or event streams that are specific to the ad tech space however the problems that we mean to solve with this dashboard and the problems that we mean to solve with druid are basically three main things one we want to solve the data exploration problem so we want to be able to arbitrarily slice and dice data and drill into that data effectively without any restrictions the second problem that we want to solve is around data ingestion so if some event occurs for example if someone edits something on wikipedia we want to be able to almost immediately ingest that event and then also make that event explorable finally we are a software as a service company so we are a cloud-hosted solution and it's availability is very important to us our systems need to be constantly up and be able to withstand all sorts of different failures that can occur without going down without taking any downtime so merrill markets was founded late mid 2010ish and we kind of knew in the early days that we wanted to build this like dashboard that we want to be able to deliver to our clients and have them be able to explore data what we didn't know was the data engine that we could use to power our dashboard so in the early days of the company there were actually a bunch of different solutions that we tried one of the first things that we tried was a relational database management system if you're familiar with relational databases you know their their benefits are extremely well documented mysql and postgres they're extremely popular in all sorts of different industries so we actually ended up trying postgres and the setup that we used is pretty common in the data warehousing space if you're not familiar with data warehousing don't worry the basic setup is you have like a star schema you have your fact table in the center you have your dimension tables on the side you have your aggregate tables and you have query caches to try and prove try and improve query latency what we found with using postgres was basically queries that were cached they were generally pretty fast queries that went against aggregate tables they were fast to acceptable but the real problem came with queries that hit the raw data and any time we needed to hit the base fact table and scan the raw data those query latencies were pretty much unacceptable to us for our dashboard interact interactivity was very important so we want a user to be able to click and get a get an interactive experience and see their data like updating without having to wait so what we found with benchmarking postgres was basically the scan rate was about 5.5 million rows per second per core um a query over about one week of data took about five seconds which is like okay the main problem is with queries that exceeded time ranges longer than that and especially with concurrent queries so a page load with 20 queries over about a week of data took an order of minutes and that was a really uncomfortable not very interactive experience every time you actually use our dashboard so in that wikipedia dashboard you saw earlier every time you click on something there's somewhere between like 20 and 30 different concurrent queries that go to our data store so we tried postgres uh we didn't really like it it didn't really work out very well for us so we started looking at other solutions to this problem and the time frame i'm kind of talking about here is about early 2011ish and in early 2011ish uh nosql key value stores were extremely extremely common and um and within the space you have data stores such as like cassandra you have like your hbase you have you know all the big data clones uh big data clones uh big table clones sorry and all the dynamo clones and you know they were pretty hot so we said hey you know let's give hbase a shot so because hbase is a key value store there were certain limitations as to how you could use it for data exploration okay basically let's say you had a very simple table a very simple data set here okay there's a timestamp dimension there's a gender dimension there's an age dimension and there's like a revenue metric so if i wanted to know how much money did i make when the timestamp is equal to one the value that you end up having to store in a system like space is basically your key is equal to one and then your revenue is the sum of these three similarly if you wanted to know how much money did i make when timestamp is equal to one and gender is equal to female and then the entry you end up storing is you know the key is timestamp one gender female the revenue is the sum of the last two rows here so to be able to arbitrarily explore this data set really without bounds you end up generating a lot of things that you you have to store in your nosql store and it's very easy to see that as your data volume increases as your data complexity increases the number of entries you end up storing kind of scales exponentially so what we found with using hbase was queries were pretty fast because you're basically just doing like a lookup into a map the solution tended to be somewhat inflexible um basically if something wasn't like precomputed or something wasn't pre-aggregated then you couldn't query it like an entry didn't exist in your hbase data store you couldn't query it another problem that we found was data was not being continuously updated so because we deal with like event streams and we deal with transactional data we see events constantly come in and when those events came in we could not use a system like hbase to explore them and then finally the biggest problem of all for us was the processing time the precomputation time to figure out effectively every permutation of combination of dimension combinations that you could possibly have your entire query set so so when you have like many dimensions in your data it's very easy for there to be an exponential increase in the number of entries you have to end up storing we saw this problem with hbase but we didn't give up right away uh what we actually tried to do was we basically tried to limit the the possible queries that a user can make so you try and limit your dimensional permutation expansion set within academia there's this concept called iceberg cubing and iceberg cubing is similar to what it sounds when you have an iceberg you can think of the most of the volume of the iceberg being hidden under the water and you have like a very small amount that's like the tip similarly with when dealing with precomputation of data what you try and do is you don't try and pre-compute out every single possible query that the user can make you kind of try and limit the queries that they can make and you try and restrict the query set but even using this method we didn't get the very we didn't get very good results and what we found was on a data set that was half a million records the processing time with 11 dimensions when we limited our dimensional explosion to about five dimensions took about 4.5 hours on a 15 node hadoop cluster similarly when that data set increased to 14 dimensions and we're still limiting our query set or we're still limiting like our dimensional explosion our processing time took still took about nine hours on a 25 node hadoop cluster so this was a huge amount of time to be pre-processing data and we kind of realized that we were going about this problem all wrong and like using a key value store wasn't the correct solution so around mid 2011ish we kind of looked around at other solutions in the space to see if anything else could solve our pain and there was nothing really out there that could so it was really around this time we started talking about maybe we should build our own thing and we looked at the lessons that we learned with using relational databases and using nosql key value stores and what we found was the problem with relational databases was basically the scans tended to be somewhat slow the problem that we saw with nosql key value stores was the precomputation time just took too long and we kind of decided maybe it's not too hard to solve the problem that we solve relational databases so around mid-2011 we started really building out this system called druid and druid is called druid because lead architect of druid played a lot of world of warcraft so this is tribute um this is our logo for druid it's somewhat of a work in progress right now so druid is a distributed column oriented data store and if you're familiar at all with distributed column oriented data stores they basically architecturally are all somewhat similar and each sort of distributed column or to data store does a few things particularly well so an example of dremel you know it does the google's dremel it does uh ingestion of like arbitrarily nested data fairly well in the case of like google's power drill it has some really cool compression algorithms druid is kind of designed to add numbers really really quickly and that's that's it's like main value add there's a few other things that i think it does pretty well it allows for sort of arbitrary slicing and dicing of data so you can filter your data really without any bounds druid is designed to be a production quality system so it's designed to be highly available and it's designed for an environment where failures are like an everyday occurrence as opposed to any sort of anomaly and finally druid is this concept of real time and you know real time is very much a buzzword i think in in the data space nowadays uh it's very much like big data or or cloud like everyone says they're real time but it's like very unclear as to what that term actually means for us in the druid world we consider real time by two facets one is really around the rate of query return so when i ask my data store some question how soon can i get an answer back and we've kind of designed the system such that an answer can return in somewhere between zero to five seconds and we find that for most of our customers that that query latency is enough to provide them with an interactive experience the other facet of real time is around ingestion latency and that is really around this idea of some event occurs how fast can my data store actually ingest this event and then make it explorable typically druid can do this in some more order of about a second often times it's in the order of hundreds of milliseconds but we find that interacting with our customers that they have a pretty interactive experience and they're pretty happy if they can see some event that has occurred in less than 10 seconds after it has occurred so how does the architecture of druid really allow for these like two facets of real time um i've been explaining the architecture of druid in terms of a popular tv show in the 90s called mighty morphin power rangers and uh the idea behind mighty morphin power rangers is whenever the good guys have to fight the bad guys they summon these giant robots called zords and then these zords come together and they form this giant fighting robot called a megazord which then like beats the bad guys and saves the day and druid architecturally is very similar adroid cluster is composed of many different types of nodes and each of these nodes are really specialized do certain things very well and these different node types come together and they form a fully functional system that's designed to deal with your data problems so there's a couple of node types that i want to talk about to kind of explain how druid works the first node type i want to talk about are real time nodes and real-time nodes kind of encapsulate the functionality to deal with the ingestion and querying of data streams so basically real-time nodes query data they make they make queries available on data as soon as that data is ingested and they do so by effectively buffering all your data in memory and periodically they take the data that they've buffered and they hand it off to some other node type to take care of so it's this idea that real-time nodes always deal with recently incoming data and once that data becomes somewhat outdated it kind of hands it off and lets something else deal with it what that's something else is called are historical nodes and historical nodes do exactly how their name they deal with more historical data so data that real-time nodes have handed off are passed down to historical nodes and historical nodes are like the main workhorses of a druid cluster they're kind of dumb but they do a few things very well like they download data and then they serve queries for that for that data so they do like the parallel processing of queries they do the distributed computation of of queries and they do like aggregates in front of real time and historical nodes we have broker nodes and broker nodes are they encapsulate the functionality of basically knowing what data lives on what pieces of the cluster so queries go through broker nodes and broker nodes kind of figure out what real-time nodes or what historical nodes hold pieces of data that correlate to that query and broker nodes forwards that query down to the historical nodes or to the real-time nodes and then the real-time historical nodes do their computation in parallel and then they return the results to the broker which does like the final level of merging and returns results to the caller so broker notes encapsulate this query scatter gather functionality and they also uh they also support caching okay so storage okay so what does the data actually look like for druid druid is designed to work with like transaction streams uh and time series data so all your events always have this notion of a time stamp associated with it andrew requires a timestamp with every event in this particular example this can be the data that we ingest for wikipedia which i showed in that demo at the very beginning of this presentation data it has a timestamp column with an indication of when the event occurred it has like a page that was being edited it has language the city the country of the person doing the editing it has things like the number of characters being added and the number of characters that were that are being deleted every time someone edited a page so druid stores data as columns um it is it is fundamentally a column store and it is the reason why like we can do aggregates and we can do like filters so fast so what we do when we store columns is we take our page page column for example and we place a method called dictionary encoding where we convert every value that we see in the page column to some integer representation so for example this page this page column has two values as justin bieber has kesha we map the first id justin bieber to zero we map kasha to an id of one what we actually end up storing to represent this column is just a series of ids so here we can see that id 0 justin bieber appeared in the first three rows he should appear in the next three rows similarly with our language column we basically map the language english to an id of zero and we end up storing is just an array of zeros so what's great about column stores is and what's great about column stores and dictionary encoding is that this method lends itself very well to compression when all you're doing is doing integers the column becomes very easy to compress there is a secondary index that we build and this secondary index really allows us to do very fast filters on top of data so what we actually do is for every single value that's within a column we actually build a bitmap index for that value and what do i mean by that uh in our page column here we have two values we have just to be where we have kesha if we want we want to basically store a representation of all the rows of our data set that only contain the value justin bieber in this case justin bieber appears in rows zero one and two so we can store some representation uh that that indicates where this value appears so we have basically this binary array where one indicates the value is there and zero indicates it doesn't so for kesha she appears in rows three four and five and what we end up storing to represent the rows that kesha appears in is this array where we only mark the last three rows as being true if you're familiar with like search engines at all this method we're all we're really doing right now is building inverted indexes so why do we do this well let's say if i issued some query and my filter was i only wanted to return those rows which can turn contain justin bieber or kesha what we can do is we can look at the bitmap decks associated with justin bieber which in this particular case are the first two rows and we can look at the bitmap is associated with kesha which in this case are the last two rows and we just order these two or these two binary arrays together and what we get is the set of rows which contain one value or the other so the bitmap indexes the inverted indexes are very fast way for us to determine for us to narrow down our data set and only find those those rows in our data set that match some specific criteria the other cool thing about building bitmap indexes is basically all we have is binary arrays and binary arrays are really easy to compress so the compressive compression method that we use is called concise it's based off of the thesis work of a phd student in italy and concise is really just a more efficient implementation of a word a word aligned hybrid which itself is really just a variation of run length and coding so we actually store our compressed bitmap indexes and we never decompress for any queries and what that lends itself to is using a lot less memory when queries are issued and subsequently faster scan rates as well okay so yes um we always dictionary code so both forms of like encoding that i've talked about is something that we always do for so dictionary encoding is done really at the column level and then the bitmap encoding the bitmap indexes that i was talking about are really done for like each value of the column so in terms of availability druid is highly available as i mentioned so druid supports replication it supports configurable levels of replication so if your data is more important you replicate it more and kind of a cool thing that comes out of replication is you lose a node it's no big deal because your data is still available and we use this to our advantage in order to do software updates where basically you can take one note down at a time you can update it you can bring it back up and you can do this for every node in your cluster so you get into this idea of being able to do a rolling update without any downtime of data um druid is designed to run on commodity hardware so really starting up a druid node is just a matter of starting up a java process and then terminating a druid node is just stopping a java process so really for about two years now we've taken like no down time to do any software updates to droid yeah is that usually is that because most of your clusters are okay with multiple versions running at the same time yeah so we use multi-version currency control actually because we deal with like event streams a lot of our data tends to be immutable so all data with on like a historical node all data in a historical cluster is immutable and if we do updates or deletes we're basically rebuilding blocks of data for certain time ranges and those blocks enter the system and obsolete like old old blocks how about storage itself like if you change like your store the way that you actually store things like this would multiple versions of druid interact well with each other yeah so like oh sorry right so the question was around storage and what happens with multiple versions of druid running together um our releases are designed to be backwards compatible so it is actually a very frequent case that will have like multiple versions of kind of druid floating around in a cluster and druid needs to be able to support that enabled in order to do updates without taking any down time yeah okay so um right now what we're working on is trying to release 206 and getting a stable version of that out there the idea behind druid 06 is we're really trying to make druid more as a platform and we've been really working on the extensibility of druid this idea that anyone can build modules for druid and basically just drop those modules onto the druid platform and have things just work so uh modules that we've seen like contributed fairly recently are around cardinality estimation we've seen modules for approximate histograms and quantiles and we also have like modules that we've built ourselves in order to do approximate top k calculations similarly we're looking at like we've actually built data ingestion modules so we do batch processing and batch data ingestion through hadoop and we do this idea of like real time processing using storm and with druido 6 it should be pretty easy for anyone to really build their own module if you want to build like propriety modules that you want to keep internal to your company and not share with the world that's something that's possible as well so the final point here is druid does have this notion of different node types and we kind of want to make it easier for people to build their own node types from different modules that already exist out there so if you have some problem that we haven't seen before hopefully it shouldn't be so hard to build your own node type to solve that problem right now so druid is right now written entirely in java and the modules to extend druid are also they also right now need to be in java we've seen contributions of ways of occurring druid that have been written in other languages because the query interface is really just like an http interface so people have written like a python library an r r if you're familiar with like statistics and then there's a little bit of work on like a sql adapter but that's not like very mature yet right so it's really our hope to to turn druid more into a platform and what we actually have sort of in production right now is we have batch ingestion that's being done through hadoop then it kind of just hooks up with druid we have streaming ingestion or streaming etl that's being done with storm that kind of hooks into druid as well and then we have these different components that build on top of druid so we have approximate algorithms that we've been experimenting with and this is like approximate quantiles approximate histograms cardinality estimates approximate top k's we have visualization components similar to that dashboard that you guys saw a little bit earlier and then uh we also have machine learning components and i have a question mark on it because we only have one machine learning component right now we do robust pca to do trend detection but it would be really nice if you're going to contribute more machine learning to druid that's that's something i'm always really interested in all right so in terms of the druid community we were open sourced in october of 2012 so almost exactly one year ago we have a growing community right now we have about around 30 contributors not everyone is like public right now and they come from a variety of different companies druid is in production at several companies already there's more information on our website but it's always our hope to get more people interested into it and more people into production our support right now is mainly through the community forums and through our seas and we really love contributions so contributions anyway whether through docs or like answering questions or code itself that makes us super happy okay so finally some benchmarks um we've benchmarked our real-time ingestion on real-world data from anywhere between about to 100 000 records per second per node ingestion rates very much vary with data complexity but for the data sets that we've seen from people these are some numbers um in one of our partner clusters this is actually uh deployed at netflix right now they ingest at a rate about 150 000 events per second and that's about 7 billion events per day and um that amounts to about 500 megs a second or two terabytes of data per hour yeah this is netflix they have their own internal deployment i want to say 15 but i'm not entirely sure do you know what the cardinality of the data is and how many dimensions you're talking about um i'm not i don't know the exact numbers the cardinalities like the highest cardinalities that we've seen are typically in the order of millions or yeah so to tack onto that question of carnality you don't handle uh something like floats for example what's a data type i can imagine a column of floating point numbers right typically like druid has different column types and float columns is actually one of them we typically treat flow columns as more of a metric which is like something we're going to aggregate over versus a dimension which may be something that we might want to like filter over so the code is in place and uh to be able to support like different column types but it's a matter of like the use case of of those different column types does that make sense yeah when your cardinality has like millions of yeah elements do you build it with map index for each one yes yes but uh there are like our sharding scheme is we always shard first on time and then we we usually do like secondary shards based on like the cardinality of dimension so if you have an extremely high cardinality dimension then we typically would build like several different shards for that dimension and so you have the index indices inside each time segment yes okay yeah so each each like yeah so each segment uh will always have a set of like inverted indices associated with it so it depends on this idea if you're if you're treating the column as something that you want to explore or treating the column is something you want to aggregate so if i have an event that happened just once for like a particular user yeah um but i have two years of data yeah i want to filter on that user do i have to hit every single historical segment or is there another type of index that says you only need to look at this segment so um sorry can you repeat the question again yeah if you if you have a user that only appeared in one of your segments you don't know which one yes but you have a lot of segments like two years of data yes and you want to filter for this user are you going to be inquiring every single segment yes so you're actually querying every single segment but the merge of the inverted indexes should be something that should be very fast because almost every segment would just have like no information about that user so i actually have a benchmark related to your question um the scan speed the last time we benchmark it was about 33 million rows per second per core and then our index compression we found that using this concise compression method our concise sets are about 70 smaller than just using straight up binary arrays we can do an or operation over about a million dimension values in somewhere around five seconds so in your particular use case where you're scanning potentially several thousand segments um an or operation over several thousand segments should complete relatively fast okay so some takeaways so druid i think it's very good for interactive fast exploration over large amounts of data and druid is a good choice if you want analytics and not just like a straight up key value store druid is useful if you want to be able to do analytics on your data as it's occurring in real time and if you want your system to be somewhat highly available so you don't want any downtime and druid is fairly useful if you want to be able to have extensibility and flexibility with your data store some things that druid may not be good for are if your data is so small that it fits into my sql then like my sequel is pretty awesome and there may not be a reason to use druid they might be overkill uh if you're kind of crying for individual entries or doing lookups so if you're searching for a single specific thing then perhaps druid may not be like the ideal case andrew it because dread mainly deals with uh append only like event style data it's not particularly good at doing like updates or deletes so if you have like oltp style transactions where you do a lot of updates and deletes then druid might not be the ideal solution and if you don't really care about real-time ingestion or you don't really care about downtime at all then there may potentially be other solutions out there as well so as a final note as i mentioned druid is open source so if you're interested our our website is druid.io we have a twitter handle as well i'm like learning a lot about how to use twitter so uh we do have a lot of information about events that we're attending and like recent updates and stuff on our twitter feed you have any questions we have our rc channel some of us basically just camp out there and just answer questions that the community has okay well thank you all for listening and i'll be happy to take some questions if anyone has any yes actually being stored there durably yes so i i actually um oh i'm sorry so the question was around the durability of real-time ingestion and specifically how durable is the real-time storage nodes that we use right so durability with real-time ingestion is done partially by the nodes themselves and partially by a message bus that we use uh to do like real-time ingestion and for us the message bus that we use is apache kafka so the idea behind apache kafka is it actually acts like a buffer for incoming events and within this buffer you can have multiple nodes like read from the same buffer so if you have like two real-time nodes reading from the same buffer reading the same set of events then you get replication of events across two nodes what apache kafka also allows you to do is it allows you to re-read events from some offset for so for example if your node goes down and it comes back like a few minutes later then you can basically check like the last offset that it committed and then re-read events from that offset um the the druid notes themselves also handle they also have features that are built in uh to do like higher availability but like we could perhaps talk offline about like the architecture there because it's a little bit more complicated okay cool any other questions yes so no downtime due to software upgrades yes so the number one reason so the question was uh there's no downtime for software upgrades but what other reasons do we have for downtime the number one reason that we've experienced downtime is because we run on aws and if aws has an outage then we will be affected we've been working on a feature such that we can replicate data across multiple data centers so we're thinking of basically having some of our data in aws and then some of our data in like our own data center and the idea is that if aws goes down or if our own data center goes down it doesn't impact us yeah so druid is designed to have no single point of failure but if like half our cluster gets wiped out but by nativist outage then that that becomes a problem for us um you mentioned the caching on the broker node yes is that uh just caching the result for a whole query or does it catch the results of the sub pieces yeah so it so the question was around how druid caches queries and druid actually caches queries based on the fundamental like unit of storage that it understands which are called segments so it always caches queries on a per segment level so it's not a query cache it's it's a cache for like the data pieces that are accessed as part of a query yeah sorry i think there's another question here um right yes and so my next point will be how do you compare with right so i'm not an expert on something bird i only know it at a very high level and and i think there's probably people that are better suited to answer this question than i am and unfortunately they're not here right now um my understanding of summing bird it's more like data flow control for storm uh and whereas for druid it's more as like an end a persistent storage for processing that's done in storm so how we use storm is we use storm to do things like uh lookups and we do use it for doing things like joins and when those operations are complete that data then gets fed into druid and then druid just acts as the store for that data so druid right now it has a real-time ingestion component but all data that's ingested by druid all has to be denormalized so like druid has no support for joints so we require like something like storm to do that so i can see uh a solution to be for like real-time injection to be like summing bird plus storm plus druid something like that cool oh one more question right um so the question was around synchronization of uh of the nodes that store data we use zookeeper to do sort of like all intra cluster communication and i didn't show it in the diagram but there's basically another node type it's called the coordination coordinator node or a master node and that node is really responsible for telling like historical nodes what data to load and what data to drop because the data that we deal with at least on the historical cluster are always like immutable then we get around like problems like reconsistency because we're always dealing with immutable data so the idea is like there's an explicit piece that does like coordination and it's an explicit piece that does things like replication and distribution of data and then the other pieces that are are there like historical nodes are mainly responsible for dealing with just queries and aggregates for those queries does that make sense okay cool cool thanks thanks guys
Info
Channel: Metamarkets
Views: 7,095
Rating: undefined out of 5
Keywords: Druid, Big Data, Metamarkets, Analytics
Id: GtHu4TVs0xI
Channel Id: undefined
Length: 45min 50sec (2750 seconds)
Published: Fri Jan 24 2014
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.