Spark And Cassandra: 2 Fast, 2 Furious

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
engineer at data stacks I did a PhD at some point so I know a bit about bioinformatics I spent a lot of time with automation and now I work for the data stacks on the analytics team and one of my focuses is on the data stacks open source software SPARC Cassandra connector and open source software is awesome and the de sacs Marcus Tantra connector is as well it will let you interact with your Cassandra data from SPARC with a minimal amount of fuss it does ability it gives you the ability to do bulk writing to Cassandra distributed access to your Cassandra tables optimized direct joins with your data push downs on your secondary indexes and your clustering keys and all kinds of other great things before I get into any of the details of how that works I just want to make sure everyone knows that you can get it right now at SPARC packages comms pretty much the easiest way to get access to it our github is right below there if you're interested in looking at the source code and getting the very real latest release and below that our friend at Target holding friends has actually made a complete Python wrapper for the RTD API if you're interested in some of the lower-level api's but you want to access them from Python so all of this great stuff is super cool but I decided that since this is a spark summit I was going to talk about Cassandra first and this is the explanation of what Cassandra is according to Wikipedia it is a hybrid between a key value and column-oriented or tabular database management system with a data model that is a partitioned row store with tunable consistency so I know that's really clear so I would thought we might want to go over what exactly that means and why that's important to know when you're connecting to it from SPARC and we're going to focus really on two things what is a Cassandra partition because here at SPARC summit we're obviously talking a lot about SPARC partitions and the two are unfortunately not the same and then we're also talk about how Cassandra actually puts Cassandra partitions into your Cassandra cluster so where does the data actually live in the cluster and how the SPARC Cassandra connector uses that information to read data locally from your cluster so to start with just to show what a table looks like here's an example schema it looks a lot like SQL you basically you know define a couple fields and their types underneath that you'll see we have a primary key declaration and this is the only part where we get into the Cassandra world the first part of that primary key is what we call the partition key and the second part is a clustering key and these two fields that are in that primary key declaration basically describe how the data is going to be laid out inside of Cassandra so basically if I do an insert into this table and an insert looks a lot like SQL as well and I try to insert into a vehicle ID one with clustering a column zero so for the time stamp zero I'll actually end up making a header to a long partition in Cassandra which will basically just be the value one it will then be followed by a group of the fields so here x and y ordered by the clustering column and I put that on top time stamp zero so if I keep adding more and more inserts to this partition you'll see they'll get ordered like this where it doesn't matter what the x and the y value are but the timestamp values will continue in ascending order so basically this means for every given partition ID in Cassandra I end up with this long sequence of rows so this whole thing is really what we call the Cassandra partition and this entire long piece of data is going to exist on a single node now within that we basically can define a row as just one of these groupings and this is basically how how it gets laid out so I have many rows that are contained within a single partition and all the rows in that partition share the same partition key now what's important to know about this is that because our data is ordered by clustering key Cassandra is very good at saying given a partition key tell me all of the values that are within some range of clustering key values because you can see since the data is laid out all we have to do is go to the write node that holds that partition key then take that row out and cut it on the beginning and the end of our clustering range so that's the most common thing that happens in Cassandra but obviously that's very limited it means that we can't really do broad things across partitions we can't really do queries that are on X or Y because x and y basically would require us to look at every single cell within that partition to do any work but for that we have SPARC and that's one of the main reasons that we've made the connector for SPARC so the next thing I just want to quickly talk about oh yeah slices are very easy is that it is a distributed database so we talked a little bit about how I partition is laid out but a partition is going to be only on one machine so let's imagine we have three different machines we have a san jose machine we have an oakland machine and a san francisco machine the mapping of our data to these machines is done based on something called a token range so you can kind of imagine the token range as a very long circle of values where we basically go from one value at the top to a max value right next to it we basically cycle around we assign ranges of those values to each of these machines so in this case we would say that the San Jose Machine is responsible for basically all of the values between around 900 and around 50 so any partition which has those that address will end up on that machine and then we have the same thing for our san francisco node in our oakland node how do we actually know what the token of a particular row is well we do that by looking at that partition key value that we talked about before and basically what will happen if we do that same insert is that we'll get that vehicle ID which will have a particular value we'll take that value and we'll hash it using a special internal function that hashed value basically will turn into a value somewhere on this token range and tell us exactly what node it belongs in so in this case if we hash vehicle ID where the value is 1 and we get a value of a thousand or so that makes us know that this particular partition is going to be on the san jose node this also lets us know that when we're reading data it's really easy to find that particular partition because if we say where is vehicle ID 1 we can say simply hash that key again go to that node and pull it back off now given all that information I know that was a kind of lightning run-through of how Cassandra works this is how the spark Cassandra connector uses that information to pull data in to spark with data locality so the first thing that we do in the sparkle Sandra connector is we realize that that long range basically gives us these sets of tokens that I'll control all correspond to a single machine and we then break that up into spark partitions and here's where we're going to start confusing the terminology of a Cassandra partition and a spark partition a spark partition is going to contain all of the Cassandra partitions for a token range between two values so for example we might break up all of those values that are owned by San Jose into four different partitions where each spark partition contains 100 Cassandra partitions now in the real Cassandra world our token range is actually you know two to the 63rd values so it's much larger than this but basically this is what's going to happen under the hood and since we know that all of those tokens should exist on that particular machine we know that when we run a read command we can actually pull all of those data from that machine if we have Cassandra running into a spark executors on that same machine without incurring any network traffic so we know that as long as the spark executors on San Jose asks for data between 500 and 600 it doesn't have to ask any of the other machines and because of that we don't ever have to leave the node for data we're always going to be node local when reading now to actually read that data we're going to use the data stacks Java driver so there's actually the same Java driver connection that you might be familiar with if you've worked with Cassandra before is being used by the spark of Sandra connector under the hood and basically what happens is the spark executors will start up a connection to Cassandra and it will end up receiving one of these spark partitions that says what tokens it needs to pull it's going to take that information and prepare a select statement and the Select statement in general is going to look like this we're gonna select all of our values from this table where the token is at the beginning of our range and the token is less than the end of our range so basically we're just going to say give us all of those partitions that should be within this spark partition and pull them into this executor so you'll see that those values will just get popped in there and then we start pulling in data so Cassandra Java driver actually pages in data so it's not going to pull all of this all at the same time it's going to pull it in in blocks and continuously pull those in until the entire query has been satisfied so that lets us still kind of work in the iterator framework of spark and not actually pull everything before it's needed now when we write to cassandra we're basically going to use that same kind of idea but we're gonna do it in Reverse we're gonna use the data sex Java driver we're going to group things based on partition key and then we're going to write them back to the database now for those of you are a little more familiar with spark you've priority us and ER you've probably heard that it's really not a good idea to batch a lot of stuff together well there's a caveat to that and the caveat is that if you are grouping everything based on your partition key it's a very good idea because Cassandra internally like we showed in the storage engine at the beginning of this talk if everything is in the same partition key it belongs on the same machine it belongs on the same machine and the Cassandra process when it's actually writing that data to the machine can actually group it into a single mutation so it becomes extremely efficient so when the spark Cassandra connector does this it'll basically have an executor which will make a connection to Cassandra and then the rdd's that are being saved to Cassandra will basically serve up blocks of data and those blocks of data will be grouped based on the partition key of the Cassandra table that's being written to those blocks are the the groups are going to start filling up with data and once they've reached a threshold or you have a certain number of groups we'll take that full group and we'll write it over to Cassandra and then that will continue until all of the blocks have been taken on written to Cassandra so basically we'll try to do this efficient grouping as we're writing to give you the best performance can on rights so that's the basics on how how it works and I wanted to give a more high-level view of how you would actually use the API because knowing how it works in the in the fine-grained details doesn't really help you a lot when you're trying to actually write your application so we have two big sets of api's we have a set of our DD api's and of course we're also data frame compatible if you're familiar with data frames and data sources we provide a data source for Cassandra that you are also free to use within the RDD ap is that full table scan that read command that I was talking about is accomplished by something called SC Cassandra table all you have to do is import our implicit from the comm data stack spark connector package and you'll be able to call SC Cassandra table with a key space and table name and that will automatically create for you an RDD that represents all of the information in that particular table from Cassandra we also provide a full set of type conversion tools so that if you want to read all of your data from your table directly into a case class form Scala you can do that just by specifying that as a type parameter and you'll actually get an RDD of case Michael of type type class instead of an RDD of our generic Cassandra row objects so this is all really good but one of the most important things to know about when you're using the SPARC Cassandra connector is that reading all of the data out of Cassandra is almost always not the right thing to do you really want to limit the amount of data that you're pulling out of the database as much as you can to answer the problem that you're currently trying to solve and for that we offer two different little functions here that allow you to push down cql to cassandra so like I showed before we're actually executing cql queries under the hood on our executor and because of that anything that you can do in cql can actually be done in bulk through this API and we do that here first using a select statement and the other using a wear statement the Select statement is the same as doing a projection it basically will tell us to add on to our query at the bottom there instead of having a slug star will have a select column name and that will mean that when Cassandra receives this query it's not going to serialize the entire row it's only going to serialize the fields required by this query so in this case only vehicle ID will be pulled out of the database the second field which is a where clause on that timestamp is a slice and if you remember I talked about before Cassandra is very good at slicing on that clustering key value this is basically taking that great ability to slice on clustering key values and distributing it across all of your partitions at the same time so with this particular Clause we will not have to read all the data in the data and the database pull it in to spark and then do a filtering will only pull the data out of Cassandra that is expressly required and fits this particular predicate so that's how you're going to be able to pull in huge amounts of data but also not pull in any more data than you actually need if you want to do distributed key retrieval so this is a slightly different use case this is where you have a whole lot of different partition keys and you want to use all of these partition keys and pull them out of Cassandra in a distributed way we have a join with Cassandra table method and this is basically a direct to join so you can imagine that you have an RDD that's filled with all sorts of different partition key values and you want to pull just those rows out of Cassandra what it will do will is it will map those keys directly into the executors and pull those values from whatever node they happen to be on and you might notice that since your RTD might not have your data sorted in the same way that Cassandra does we won't be pulling directly from the local node well we actually thought that if this is going to be a problem we should make a way to actually reshuffle that data to align it with your Cassandra cluster before pulling and for that we added an API called repartition by Cassandra replica that lets you basically specify a table which will then take all of the keys in your RTD and organize them so that they will all be drawn locally from a Cassandra node that they happen to reside on so that's why I've kind of switched back from my tie dye to complete solid colors but this kind of illustrates that we've taken all that done a bulk shuffle and spark so that now when we do our reads they're all going to be local so this combination is pretty powerful for those folks who are getting an incoming stream of data and have a whole bunch of partition keys that they want to do work on but they don't want to do a full table scan over all the data that's in their database so in addition we also provide in case those two api's or all those api's are not enough to do the specific things in spark that you want we provide a distributed pool for prepared statements and sessions so if you happen to have some operation you want to do that isn't well defined by just select statements or inserts we provide this Cassandra connector object which is fully serializable and when it runs on an executor will actually create its own session pool automatically for you under the hood that you are free to use and use whatever queries you want with in addition this overrides the preparer method of the session and instead of using the normal preparer this will actually check a executor local preparedstatement cache so that means you can actually prepare things over and over using this kind of methodology and it will actually pull out a live prepared statement if one exists on that machine so if you have to do something more complicated with check and sets or you happen to want to do a bunch of inserts or you want to do a select and then an insert based on your select this is how you can kind of use these objects in these pools to really simplify your code so you don't have to worry about session management or your prepared statements it'll all just be done for you so in case all of that was you know a little unfriendly and you saw you know you you're really more of a fan of spark SQL you really want to do data frames cuz data frames are a lot simpler to reason out and they do a lot of the cool optimizations under the hood we still have your back we have ADA Stata sources API and all you have to do is specify a format and then within the options table you have to specify the key space in the table so when you do this basically it will just return for you a data frame that represents an RDD that represents all of the data in your cassandra table and if you look at our example underneath there we actually put in a little shortcut just in case you want to not have to write in maps and formats all the time and you can use the Cassandra format function that we added implicitly to just specify table and key space there's a bunch of other helper functions as well I suggest you check out the data frames documentation I have linked there below if you want to also simplify the creation of maps or specifying other parameters and the cool thing about working with data frames is that when you work with data frames and you do a select or a filter or anything like that that ends up making a query plan that has projections or that has filters those filters and projections are going to automatically be pushed down to Cassandra if they can be so you don't even have to worry about figuring out what predicates work and what predicates don't or whether or not you can do certain things because the data source will automatically do that for you the one thing to note about this is that currently the data source only implements what's called prudent filter skin and prune filtered skin as it says really only lets you do two things that lets you prune and it lets you filter so if we get projections which basically means we only want to get certain columns we can do that and if you want a filter if you have a predicate that we can possibly push down we can do that but if there's something like a join or a repartition or something like that that's something that isn't exposed to the data source right now and that's something we're looking at trying to fix later but currently can't be optimized the way so for recent features with the connector so we do support Cassandra 300 which is one of the most recent major release and all of its subsequent minor releases we support materialized views and Sasol which might be sassy and I believe I typed it in wrong we have some new advanced spark partitioner support and we of course have a great performance boost on join with Cassandra table that was recently added and just to talk really quickly about our spark partitioner support this is a rather advanced feature but I'd really like to go over it for those folks who are aware whenever you do a key to RTD within Cassandra you are within spark you actually have a partitioner that that RDD owns now knowing that we realized that there's some really obvious things here Cassandra data's partitioned with this hashing function that we went over spark data can have a partition err which tells you which data is in which partition we should really use the idea that Cassandra partitioner should be in spark that way when you do shuffles when you do joins you don't actually have to shuffle everything so to take advantage of that we basically took the key by function which is normally available on any RTD and we made it so that if you key on the partition key of your cassandra table we will actually take that RTD and automatically assign it a partition er that would emulate Cassandra doing that exact partitioning and this means that if you take this RTD at the bottom that now has this partition err and you do anything that requires a CRO group on this particular key so for example if I wanted to group on zip code I'd be able to do that without a shuffle now because it's already organized by key and we were able to express that information to spark or if you could imagine if we had another table which also had user ID as its key we would be able to actually join those two tables together without any kind of shuffling because we already know which values are in which partition based on this Cassandra partitioning and this is a quick little example of that we basically can take the partition or from the first RTD we apply it to the second RTD and now we end up with two rdd's that have the exact same partitioning and I can join them together without any shuffles so this is a new capability I encourage you to try it out because it actually has a lot of interesting implications and and speed ups that you can use so basically the kind of things you can do this with this with this is you can start doing self joins on your partition key you can do all kinds of groups within Cassandra partitions and you can do joins with other IRD DS so another really interesting thing is if you join against any other RTD that other RTD will now you do will now partition itself using this Cassandra partitioner so you don't have to actually shuffle both sides of the join like you would in most joints the enhanced parallelism that in the join with Cassandra table so some if you've ever used the join of Cassandra table method before and you wish that it was a lot faster we just recently increased its speed by increasing concurrency and parallelism within the method we've seen it up to a 5x speed increase so if you use this before and you weren't satisfied with how fast it is you should definitely check it out now this is thanks to yarek who is one of the new members of the data stacks analytics team who added a lot of interesting asynchronous execution code into this and as I'm wrapping up I just want to note that the open source for Cassandra connector wants you we want you to be a part of our project we are a full open source project with a mailing list and IRC and everything and we love to have a community involvement if you have a bug that you find if you have a feature that you really think we're missing out on or if you just see the docs and you say the docs are not good for this particular session please let us know please join us and help us out and figure out how to make this an even better project now in case I didn't mention this I do work for data stacks so I'm gonna also push more of our awesome data stacks Academy stuff if you check out our blog here Academy data stacks com we actually have a full series of videos that describe how to use SPARC all the intricacies of spark and using spark with Kassandra and the best part of course is that they are all free so you'll be free to go through this in whole course of instructor-led videos that'll teach you how to work with spark and of course I'd love to see everyone at Cassandra summit where we'll probably talk a little bit more about spark and Cassandra which will be September 7th through 9th this year so thanks everyone for coming and if anyone has any questions I'd love to hear them
Info
Channel: Spark Summit
Views: 10,374
Rating: 4.9245281 out of 5
Keywords:
Id: a84-UOGZiEg
Channel Id: undefined
Length: 24min 34sec (1474 seconds)
Published: Thu Jun 16 2016
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.