[MUSIC PLAYING] SAMI ZUHURUDDIN:
Before we get going, just a quick show of hands. How many people
have used Bigtable in some form or fashion? Oh, cool. Cool, all right. Some of you. Not a whole lot. Let me ask you in
a different way. How many folks have used
YouTube, Gmail, Maps in the last 24 hours, hour, it's
what you're doing right now? [LAUGHTER] All right. It's OK, I attend a lot of
talks, I do the same thing. So yeah, Google Cloud
Bigtable is essentially the power-- or
Bigtable, I should say, and be specific here. Bigtable as a technology
is behind pretty much all of Google's
multibillion user services. So there are multiple
services at Google that exceed a billion
user accounts, or in terms of active users. And to support that, Bigtable
is behind the scenes. So if that sounds
interesting to you and you want to do
something similar, or want to know how it's done,
you're in the right place. My name is Sami Zuhuruddin. I'm a senior solutions architect
on the Google Cloud Platform team. I'm joined today shortly
onstage by David Mytton, who's founder and CEO of Server
Density and Anthony Fox who is a lead developer
and [INAUDIBLE] director of data science at CCRI. All right, just a
quick agenda today. I'll go through an introduction. I know it's a
300-level course, so we will talk about some definitely
refined patterns for how to use Bigtable effectively. But it always helps
to sort of baseline get everybody on the same
page in terms of terminology and descriptions of the service
and how you interact with it. So we'll go quickly
through that and then start getting some of the more
complex use cases and patterns you should look for. So Google had to deal
with a lot of data, or continuously has to
deal with a lot of data, given its mission. So what that means is if you
look at different data types, you have so many different
kinds of semi-structured or unstructured data types. And they're not exactly
confining to a certain schema, a certain rigidity. Things like this,
geographic information, per user information, click
information, URL contents, all of these are essentially
semi-structured information that sort of has an
infinite tail to it. I think that's kind of the
key thing to think about. They have a unknown
dimensionality ahead of time, and they have sort of like a
huge long tail in terms of how much data could be coming in. So you might be dealing with
arbitrarily huge amounts of data, excess of a petabyte. And if you think about
the use cases of things like Gmail or Maps
or whatnot, you need to have pinpoint
accuracy in terms of speed to be able to get with that. So Google had to deal
with this challenge if it was going to solve a
lot of the product challenges that it needed to deal with
in its consumer products. Released the Bigtable as
a design implementation early in 2004, and then
talked about it in 2005, the paper finally
came out in 2006. And at that time,
there was applications like web crawling,
essentially a search engine, that were using
Google Earth, Google Analytics, and other products. But it quickly became sort of
a necessary staple of the diet within many of
Google's services. So what is Bigtable? Essentially it's
NoSQL, so you're not dealing with pre-defined data
types along well-known column definitions. You're dealing with essentially,
effectively a key-value store. Large data sets,
and like I said, we're talking about things
that are potentially thousands of attributes or hundreds
of attributes, unknown ahead of time, and
then potentially just have a long tail, things that
can grow to [INAUDIBLE] speed. And high throughput. And why that's important, we'll
talk about that in a second. The Bigtable family tree, so
this is in Google Fashion. I just showed the paper that
was released around 2006. And so that's just a
paper that's put out, much like the GFS paper
or MapReduce paper. And that turns around and
influences the community. So one of the things that it
influenced was Apache HBase. But then if you study Apache
Cassandra and Accumulo and Hypertable,
all these projects, either directly or indirectly
inherited from what was described in
the Bigtable paper. And the Bigtable paper
is just one of many. So this kind of gives you
a feel for, this is all publicly available at
research.google.com in our publications. And we're generally
pretty discoverable, in terms of Google does
a good job of putting out the things that we are
working on internally, at least in some
sort of academic form that can be turned around
and adjusted and turned into a project like Hadoop. And in some cases, they
become first party offerings. And so this isn't really
time horizon correct, this is just an example of the
fact that it's in some cases now, especially under the
Google Cloud umbrella, it's not enough to simply talk
about certain technologies. There is a certain
expectation that there's an operational
heritage that goes behind running these services
that becomes necessary if you're going to bring
these products to some sort of useful bearer for customers. And so the papers that were
shown on the previous slide, essentially influence, if not
affect, the Google products you see in front of you. The difference
here is like, we've been running these services. Google has been
running the services that as described,
as written by Google, internally for 10 years. So there's a lot of
operational know-how that goes into knowing how
to run essentially a worldwide database or a
worldwide storage system that operates at a very fast clip. OK, so we talked about
Bigtable as an academic paper, and Bigtable in its
usage internally. And now Cloud Bigtable. So Cloud Bigtable is the
same Bigtable service that Google uses
internally behind, like I said, pretty
much all its apps, all its major apps have
Bigtable in their pedigree. But Cloud Bigtable is the
externalization of that, so that it produces a
consumable consumer endpoint that our customers can use. Same SREs behind it, same
higher level of service. But essentially you can
use and build services to the scale and sophistication
of, for example, the services you see there. So why would you use it? You need something that's
consistently fast, fully managed and infinitely scalable. So consistently fast,
what does that mean? Essentially, single digit
millisecond retrievals gets essentially consistent
no matter how big the data size is. Your P99 is always
around six milliseconds, pretty much always
under 10 milliseconds, and petabyte scale. And so the other
implication there is that it's also for things
that are sparsely populated. So if you're collecting
attributes or dimensions or measures along certain
keys or a certain element, you don't necessarily
have all of them or they're not all
known ahead of time. This helps facilitate
that kind of model. These are the use cases that
are becoming really popular. Either they're currently
popular in HBase or they're now getting popular
on top of Cloud Bigtable. But anything where
you're taking in millions upon millions of signals,
perhaps refining those signals and in terms of updating
them or expanding them are continuous
measures of things. Some of these cases we're
going to talk about slowly. But keeping track of clicks. Particular user, click,
what was a property that they clicked on? Or in terms of what was the
internet property they clicked on, when did they click on it? All these are
essentially dimensions that keep increasing and
keep having some measure that you need to stick with. So here's the question. So if you have a
user database keeping track of things like
somebody's language that they want to see on
their website and things that are in their shopping cart,
and your entire user database fits in about 50 gigs,
that is a great use case for a traditional SQL system. That's not a great candidate
for Cloud Bigtable, or Bigtable in general. So it's important
to sort of make sure that you're sizing the
write back in technology. So I kind of liken
it to like I said, if somebody's in
a user experience and they're not going to change. Oh, I'm going to go from
like, German to English as a user preference. And you take that for your
entire user base and all your user preferences, and
it amounts to 50 gigabytes, or pick a number. That's a great use case for
Cloud SQL or for Cloud Spanner or something like that. That's not probably the
right fit for Cloud Bigtable. Cloud Bigtable is
specifically when you want to pick precision accuracy. It's pretty simple to get
started with the service. It's not too encumbered. This is the UI example, but the
CLI example is equally trivial. But essentially you
just pick a name. So once you pick a
name of your cluster, or once you create an instance,
you have a thing to address. And that essentially gives
the connection library a thing to connect to,
whatever you've named it. You pick a zone. Cloud Bigtable is
a zonal service, you pick a zone that it runs in. You need to pick
a number of nodes, a minimum number
of nodes is three. And so that's
again why, there is a certain sort of
minimum size that makes sense for data
that you're going to put into a Cloud Bigtable. And then you'd pick
basically SSD or HDD. SSD is more expensive,
but it's more common. It's a lower latency service. HTD would be a great
example for things that are, you're going to have
a much larger data set that sparsed the access. So you can kind of decide. And if you really have a true
use case where you kind of have both kinds of datas, it
might be efficient to create two clusters in that case. Clients, so I described,
you create a cluster, clients are going to connect in. They connect in really
to sort of a load balance tier, essentially an
endpoint that we give you as part of the service. And that routes to any
number of Bigtable nodes. The nodes themselves, I'm
going to just abstract that away for a second. But you connect to a node. The load balancing
there helps you. And the nodes themselves
then retrieve and read and write their
data from Colossus, from a storage system. So it's very important
to understand that the storage within Bigtable
is separate than the nodes. The nodes, you can think of
it as dials of throughput and how much
throughput you have. And the storage is always in
a heavily, heavily replicated, superdurable file system
that sits underneath. I call nodes essentially
custodian of keys. So then when you want to
access a particular key, you get routed to the node
that's handling that key, and then that key essentially
goes down to storage and gets it. And essentially, data
access looks like that. If you had five keys
here, some nodes are handling any given
division or segmentation of the population of keys. And what that does is that
as nodes become hot and begin transacting on keys, if
there's hot issues with a key, like a key is getting
access too much, and it makes more
sense to distribute, which nodes are dealing
with which keys, the service takes care of that. It's constantly
learning access patterns and rebalancing as it needs to. And similarly, since data is
not actually stored on nodes, it's stored in a storage
system, it's stored in Colossus. If you wanted to increase
and go from three to four, that's a second operation. It only takes a few
seconds to do that. Because we're not
really moving any data we're simply bringing
up a new node which comes online within seconds. In other words,
[INAUDIBLE] adjusting metadata of which nodes handle
which segmentation of the data. It's important to understand,
that in the example I just talked about, adding
nodes, that's a completely linear upward scale operation. And so every node can
roughly take 10,000 QPS, or that's what it's quoted at. And so it's very linear. So if you have
three nodes, which is a minimum of cluster size,
you're going to get 30,000 QPS. But that just keeps on going. So if you had essentially
30, that's 3 in 1,000. And 300, that's 3 million. Hopefully I get the math
right because it's on YouTube. [LAUGHTER] But it is. We had a great talk last
year we were hitting, I think 25 million QPS. It was in last year's Next. So these are real numbers,
and essentially we have real customers doing
these things at scale. Again, with the spirit of
just how do you connect to it. So we have a cluster,
we have a name, we have a name of the thing
that we can connect to. And it has sides, and you know
what you're reading and writing there. You can connect
with an existing, so if you have a an
HBase client, essentially a Java client, you can
continue to use that. We put out a Go client,
a Python client. And there are some community
clients for things like Skala and .NET. But these are the clients that
Google specifically puts out. Once you have a client,
once you have a cluster and you have say, for
example, an HBas client or we provide a CLI, you can get in. And this is sort of like being
in your administrative session inside of the databases
where you can make tables inside your clusters. And then that CF stands
for column family, and I'll talk about
that in a second. But you have a cluster,
you can connect to it, and then you can start to
take on admin operations, or admin plane
operations inside of it. That's all stuff you can
kind of take a look at. So if you have an existing
HBase deployment, things you're already doing
in code in HBase. And you say, hey, I want to
take this HBase application, and how do I use it with
Bigtable, it's really simple. You simply change
the connection string from being the HBase
canonical connection factory to a Bigtable connect object. And then an
additional import that just says that you're going to
use a Bigtable configuration. That's approach one,
that's the simplest. There's another
approach which says, I don't want to touch my code. My code is running. Or maybe I want to leave
my code as agnostic, which just means outside
of your application code. You can essentially introduce
a small HBase site, XML, which basically populates your
instance, your project ID. Hopefully everybody knows
what a project ID is now. And then you can choose
to include that in your build file. So bottom line,
it's very, very easy to go make the switch from
HBase to Cloud Bigtable. It's pretty trivial. Moving up, we're moving up
from inception to what it is, to how to use it, to what does
it look like internally here. The data model. So data model, it's
NoSQL, no-join, distributed key-value store. So essentially you
have mechanisms to retrieve keys
very, very fast. We'll talk about some other
operations you can do on keys. But it's only one index. It's like having a table with
one index, which is essentially the row key, as it's called. And you can do atomic
operations inside of a particular row key, which
means that basically if you decide to update
something or decrement and increase something in the
context of a particular row key, then that works. But you can't do anything
like for example, join information that's in
one key with another key. And nor can you say, let me for
example, decrement from key A and plenish it into Key B.
That would be essentially a cross-key transaction. So again, you can
do things with a key and know that that's
consistent within that key, but not across keys. And then it's sparse. So if you haven't written
a particular column, then it's not taking up space. You don't have to
worry about that. In fact, that's encouraged. If you haven't taken a
particular reading type, or you don't have a particular
dimension on what you're reading, then you don't have to
worry about that being wasted space, much like
you would perhaps see that as being wasted space
in a traditional RDBMS system. So when you have a row key,
there's a couple of things. Actually, let me go back. I skipped a section, sorry. So after the row key, you have
this notion of column families that was in that previous,
when I showed the HBase shell. So the column family is sort
of things that are roughly grouped together,
which is just sort of like a macro qualifier
on individual smaller elements, which are
called common qualifiers. And so if you wanted to refer
to any given cell in this table, you would specify the row key,
and then which column family and which qualifier
you are interested in. And essentially that zeros
you in on the exact element that you want to retrieve. The cells themselves, these
individual cells, [INAUDIBLE] you zeroed in on a cell. The cells themselves-- say
that fast everybody, together. So the cells themselves
are versioned. So what that means
is as you write data, the data has this sort
of implied version, which is N64, which
is a flavor timestamp. You can technically make
it anything you want. But the default is, it's
the current timestamp. And so if you were to write
the same cell again, or do the same put again, or do a put
in the same qualifier again, it would simply layer on at the
latest timestamp the new value. And the old ones
wouldn't be there. And by default, the old
ones will get garbage collected at some point. And so that's why you can have
a very high throughput that's very consistent. You can choose to have
a retention policy that doesn't expire, or keeps a
number of copies or something like that. But the default is that
you just keep layering up. And at some point the old
versions get wiped away. So what are the operations? So in terms of getting
data in, you can do put. It's a canonical put. You can put data in,
which is putting one. You can increment, so
whatever the value is, then that happens on the server
side where it can increment. Append, or if you just want to
canonically append something, you can do conditional updates. So if the value is x, then
only if the value is x, I want to do y. And then bulk import
by using some sort of map-reduced
process or data flow. Getting data out, you can get. So basically it's
just like, this is the row key, get me that. Range scan, that
says I'm basically going to start this pattern
and at this pattern. Range of the scan
and this end, we're going to hear some
very interesting things you can do with a range can
towards the end of the talk. Filter, which basically
means after I get this, I want to filter what I'm using. Full scan, literally full
scan is always the worst thing you can do in a database. But it's from top to
bottom, essentially get me the whole thing. And then an export, again
using some bulk tools like MapReduce job or data flow. All right, so schema design. Basics, so you have
rows, you have row keys. So you would store some
of the big things here-- I'll let you kind of glance
through this, some of this has already been
talked about before. But highlighting things like
you want to avoid on your keys, you want to avoid things that
are monotomically increasing. You want to avoid choosing,
for example, a timestamp or a sequence number or
something that's always increasing, because
that will potentially create some hotspots
in your architecture. There's some strategies
we'll talk about in a second to help avoid that. The other is, if you're going
to have something that's-- store things that are related
to a particular entity in the same row. If you can't do that
because of some size or for some other
restrictions, then you can essentially think
about splitting them up and using your key as a way
to group like items together. Store related entities together. We're going to talk about that
in a couple slides as well. But just keep in
mind, essentially when you don't
have a lot of knobs to turn in a NoSQL
system, what can you turn? Those things that you
can turn are essentially the row key and how
you pick your row key, and then what you're going
to keep in your rows, and how that information
can be stored and retrieved. And so what do I mean by that? So you have to start
getting creative. So if I just say, memory
uses as a row key, it's going to need
essentially an element that's always receiving an update. And so it's going to
create a certain hotness for that particular row key,
because it's pretty simplistic. So we do something here we
call field promotion, which is we take something that
was originally a column, and we shove it over to
be essentially combined with a row key. So in this case, if
I take timestamp out of being simply column
data, and move it and combine it with what was
the original name, which is memory usage, that helps a lot. Because now it's sort
of added some variation to something that was a
static element before. But the problem is, even that
timestamp then within that may start to get hot. And so you could think of
hey, if I take another field and promote that,
which is user, and put that in, that
essentially is basically creating shards of the way
you can query your data. It doesn't effectively
create shards, but it basically helps
distribute that data into discrete different
locations within your system. There's other techniques too. This is called field promotion,
this is the most popular one. We'll talk about that some more. You could use randomization,
you could use hashing, there's other things you can do. But if, for example, if you
choose to hash something, then just remember that
that's always going to be a client-side operation. You could choose
to salt your keys. There's different
techniques here, this is the most
prevalent one, which is to simply field promote. And field promotion
is interesting because then what will
you do, is it sort of gives rise to just how
creative you can be with that. So there's a design element here
called tall versus wide tables. And so in a wide
table, essentially you have an element. This is a Prezi
example that we use on our social network
involving presidents, which is kind of ironic nowadays. But if you had essentially
hey, for this given key, tell me everything
about it, that's an example of a wide table. The width of your table should
generally have a horizon. If it has no horizon,
like keeping track, like hey, I'm going
to make a column for every single
timestamp and it just goes on and on and on,
literally into infinity, that's not a good design choice. Because essentially row sizes
should be kept in general, any given row size
should be kept in general under 100 megabytes. So if you think that's the
case, then you probably want to change your
schema a little bit, maybe break that up instead
of it being off into infinity. Maybe you say well, my row
key is some element and then the day, so at least
that row has some horizon that it's going to finish out. But having a row design that
makes it so that you're always forever going to have ever
expanding columns is a problem. So that's wide. If you've got a query pattern,
I'm going to ask this question, I want to know
everything about it. There's a different kind
of question, which is, does this relationship exist? And the way you could
kind of mimic this is to say, well, let me
concatenate-- for example, two elements. Let me see if they're related. Like in this example, where
the names have been hashed and concatenated together. And so if you were
looking for that, you could just say like,
OK, get me this key. The absence of
that key tells you that the relationship
doesn't exist. And if you in fact,
get that key, then the elements you'd retrieve
would give you essentially the dictionary of all the
items that you would see, that would tell you that not
only the relationship exists, but everything coming
back is what's relevant. After everything I just told
you, I will say, don't do that. So if you want to get that
low level in terms of the data model and how to use it
and how to incorporate it in your applications,
by all means do so. And there are a number of
projects that come out. The best place to start
is from the top down. If you're going to start
using Cloud Bigtable, the best place to
start is to say, is there a project framework
piece of open source software that I can leverage to help
solve this problem that's already figured out how to do
this effectively with Bigtable, or a NoSQL database? This is an example of them. A graph database is
janusGraph, OpenTSDB we're about to hear about. And there's other time
series monitoring databases. GeoMesa we're going to
hear about in a second too. And then on top of
that, Google itself has deep integrations
with some of our products. So with that, I would like to
go into one of those use cases and ask David Mytton, this
is the co-founder and CEO of Server Density, to
talk about time series. DAVID MYTTON: Thank you. Afternoon, everybody. So my name's David Mytton. I'm the co-founder and
CEO at Server Density. And we do systems availability
and performance monitoring. So we have an agent that
exists on your servers, sends data into our
system, and we also do availability monitoring from
locations around the world, so you get that internal
and external view. And everything you can
see on this dashboard is essentially now powered
by Google Cloud Bigtable. So the graphs pulling
out the latest values, generating calculations
on uptime, this is all powered by our new
integration with Bigtable. So what I'm going
to talk you through is a quick run-through
of the old system. We've been in business
for seven years. And the old system
lasted about six years. The challenges that
we faced last year, and why we decided to
pick OpenTSDB and Bigtable as the time series
storage engine. We're going to talk a little
bit about the migration, which we did without
any customer downtime, how the new system looks, and
then two lessons that we learnt whilst we were
doing the migration and the implementation. So the first language
that I learnt was PHP. And back in 2009, there were
no time series databases that would work for us. And there was nothing
available, there were no products,
nor no services. And MongoDB was very new at
the time, it was pre version 1. And it worked
really well at doing really fast writes,
had some pretty good drivers for the languages. And decided to use it as the
time series data format storage system. So the way that this
works is that data comes into us on a push basis. You install the agent
into your system. It sends data to us, it's just
the JSON payload over HTTPS. It comes into our system, and
then it goes into the queue. We played around with
RabbitMQ and couldn't get it to scale over
multiple data centers, so we implemented our
own basic queuing system on top of MongoDB. That then passes it through
into a rules engine. So when you configure
your alerts, it looks at whether a
threshold has been met, and whether we need
to trigger an alert, or maybe close an
already open alert. It then sends it off to
a notification system. And I started
[INAUDIBLE] as a project to learn Python, which is what
our monitoring agent is written in. And the notification system
was written in Python, so that's sending
emails and SMSes. And then the second
route is to send off into the time series data
storage, which was MongoDB. So when you load a
dashboard, it would query an internal API, which
would then query Mongo and pull the data out. Well, this system's scaled for
us pretty well over six years. Rewrote it once, and again
evaluated the options, and there were no real good
solutions that were good enough to convince us to migrate. But early last year, we started
hitting a couple of problems. So one of the problems that
we've had over the years has there's been no
service available for a challenge or
a problem that we've been hitting at scale. And so we've had to
build a lot of custom implementations of things. And so we were spending a lot
of time maintaining that system, not only on the
infrastructure, so it was running on
bare metal servers, it was running at softlayer. And not on the infrastructure
itself in terms of the servers, but also all of the
custom code that we had to write to do things like
pulling data and displaying graphs and calculating maths
functions on the data, sums, averages, that kind of stuff. We had to build
it all ourselves. And that's not a
good use of our time when we're a relatively small
company of 20 people now, but it's grown
gradually, organically as the company has scaled. And it's not a good
use of our time to be building
what is essentially undifferentiated heavy lifting. We also had problems
with scaling it. MongoDB does scale,
contrary to popular belief. But you have to put a lot of
time into managing the shards and managing the replication. And in particular, when
you add a new replica set, you've got to migrate that
data over to it, which does have an impact. And then keeping all
of this data in memory, so that when you're
loading your dashboard, the graphs load
within the second. And the typical use
case is loading data from the last day or so. And so we have to keep that
data in memory to make it fast. And then slightly older
data goes onto SSDs, because hard disks
are just too slow. And this is actually
really expensive. Even with modern pricing,
having really high spec service is not cheap. And we had many terabytes
of RAM and many hundreds of terabytes of SSDs
just to store this data. So we did an evaluation. We looked at different options. And there are a couple
of different things that we could have used. But we ultimately selected
OpenTSDB and Bigtable as the storage engine
for a couple of reasons. So the first of these
is that it is a managed service with a small team. And we don't want to be spending
time managing the underlying infrastructure. So we don't want to
be upgrading RAM, we don't want to have to put
in tickets to get RAM upgraded, we don't have to deal with
disk failures, all of the stuff that it's just not a
good use of our time. Bigtable is a completely
managed service, and so this is one of
the selling points. I Now OpenTSDB is an
actively developed project, it's open source. And there's a lot
of functionality built into the product,
that meant that we wouldn't have to build it. And there's a few things
that we've contributed upstream, a couple of fixes. But for the most part, it's
feature-complete from what we need. We also get linear scalability. And this is a really
important point for planning, because we can
just add new nodes, and we know we're going to
get an additional 10,000 QPS. So this makes it really easy
to understand how much more capacity we need and
when, and exactly how much it's going to cost, because
each node is a fixed cost. And it's a specialist
data store. Bigtable is specifically
designed for time series data. And so all of these
combined means it can be significantly
more efficient than moving MongoDB over to
Compute Engine, for example, or using a generic SQL database. The whole point of
Bigtable is to store this exact kind of data. So the migration. Now, as a monitoring
product, it needs to be up, it needs to be available. And we run on the principle
that your monitoring should be more reliable than the
infrastructure it is actually monitoring. So this meant we couldn't
have a period of downtime whilst we moved the data over. Also, we're migrating across
two different environments, between softlayer
over to Google. And we looked at a couple
of different options, including VPNs and
DirectConnect, which are good options, but ultimately
settled on just sending it over the internet. And this is because all
the internal systems within [INAUDIBLE]
our micro services, so we could just extract it,
put it into Google Cloud, and expose it over
HTTPS, with of course, the appropriate
whitelisting for our IPs, an have the data going over the
internet with minimal latency. We were able to pick a zone
very close to the softlayer data center. With two systems
running, we needed to write the data from
the production environment on softlayer into Google. And this has to happen
at the exact same time. We picked a date, we
went live with Bigtable, but behind the scenes,
so customers were still using the old system. But data was being
written into both. And from that point,
all the historical data, so going backwards,
could then be migrated in our own timeframe. And so we picked the historical
data, sent it over to Bigtable, and then flipped a
feature flag, which meant the reads went over. So the writes were always
going, but then the reads were going on a
per-customer basis. So as we completed
each migration, we flipped that flag,
and customers were then having their reads
go over to Bigtable. And despite this connectivity
over the internet, we gotten significantly
better performance. And the graphs,
whatever time range you might pick, from 30
minutes to 30 months, we get pretty much
exact same loading time despite the latency of
going over the internet. And of course, we're
monitoring company. So we had detailed
metrics so that we could compare the
performance before and after. And this allowed us to
verify that the migration from our testing
over into production was working as expected, but
also allowed us to pick out an interesting
design flaw, which I'll show you in a moment. So this is the new system. We also took some time
about two years ago to get rid of that MongoDB
queuing system, get rid of PHP, and replace it with
a proper system. So that's Kafka and Storm. Today we might look at
Google Cloud Pub/Sub, but it wasn't available at the
time we started that project. Kafka is a great
queuing system, and we were able to just port
in our Python code. And we routed it from PHP
into Python into Storm, because it has a wrapper, so
it doesn't have to be in Java. The flow is then the same, so it
goes through the rules engine, which was also rewritten into
Python, so it's done properly. And notifications,
very similar as well. And that remains on
the softlayer slide and it's still there today. Over the next couple
of months, we're moving everything
over to Google Cloud, but the Bigtable project
was the first one that allowed us to
move over this really core part of the product. And then on the
Google Cloud side of things, because
it's a micro service, the API itself for
querying the graphs. So if you load up
our interface and you inspect the code in
the console, you'll be able to see the internal API
requests that are happening. And this hits the
Google load balancer, then connects into OpenTSDB. And this runs on
Container Engine because it is itself
stateless, and allows us to scale it as we need to. It runs on containers. And that has an
official connector, which allows OpenTSDB to use of
Bigtable as a storage engine. And then we deploy this
across multiple zones, And so that we get
the redundancy. Now, two interesting
lessons that we'd learnt. Now during our testing
and the development phase, we spun up the Bigtable
cluster and started sending in some
writes so that we could test to see what
the performance was and how it would work. And we were very
quickly dismayed to see that our estimations
were blown completely out of the water in terms
of the number of requests that we would need, and more
or less every single metric that we were writing was calling
a single API calls on Bigtable. This is just a subset,
it's not our full workload. So we were seeing very quickly,
60,000 requests per second into Bigtable. And because you pay
on a per note basis, and each node gives
you 10,000 requests, our estimations of what we
would actually going to do were completely incorrect. After talking with the
Google support team, we discovered the OpenTSDB
ship with an outdated version of the driver. And we were able to update to
a development version, which significantly reduced
the number of API calls because it had introduced
a new feature that allows you to batch those calls. So we're still writing
the same amount of data, but we were able to go from
60,000 API calls per second down to three. And that allowed us to half
the node size and half the cost of this test, and meant that
our estimations were indeed correct, and we could
continue with the project. So the lesson learn here
is, stay with the latest version of the driver and also
test with development versions, because the official supported
latest version might actually be out of date, and Google's
technical support are really good at helping diagnose
these kinds of problems. The second one is a
little more complicated. We found this after we'd
completed the migration, because we'd tested
with small accounts. And we have some customers
who are monitoring huge numbers of servers. And we've found that the
row keys that you set, it's really important
to ensure that you have the correct cardinality
when scanning rows. And the first
approach that we took resulted in too many scans. And every account has a
unique ID and metric name like load average or disk usage. And that was the
original key design. But by querying
on that, it meant that we had to not only
scan, but filter to map that metric for all devices. So a query on a particular
device for say, system load, had to scan and filter every
single device on the account. And this meant that
the number of scan rows was the number of devices
multiplied by the time range. So for big accounts, all
the devices for that account would get queried and scanned. And this reduced the
performance of the system as the customer got bigger. And we discovered this
with our monitoring once we started to get to
production to test that. So we refactored the key so
the only very specific devices are queried. So we went from the account
ID and the metric name to include the item ID. So an item is a server or a URL. And we included that
in the key itself. Now, tagged
filtering in OpenTSDB can still reduce
performance because it increases the amount
of filtering that needs to be done. So you have to be
careful with that. But it meant that
instead of going from querying every single
row, the scanner only has to do it based
on the time range. So this is what it looks like
with the refactored key design. And if we compare it to
the original key design, you can see the
amount of data that has to be scanned is
significantly reduced. So in the end, we were
able to migrate everything over-- we've just
completed the migration-- and have a linearly
scalable system, which has high availability
across multiple zones; allows us to build
new product features, because we don't
have to deal with any of the operational overhead of
running servers and building features that just
don't differentiate on the standard level. And we get that
with lower costs. And we're able to reduce
our cost of running bare metal along with MongoDB by
three times by migrating over. Now the original code
was written by me. And I had real
engineers to rewrite it. And these are the guys on
the team who did that work, and worked with our operations
team and our engineers, and also the Bigtable
engineering team, and of course, OpenTSDB. So I wanted to say
thanks to those people, and just show exactly
what this powers. It powers our Apple TV
app, powers our mobile app, and of course, the graphing
within the product itself. All right, so I'm going to hand
you over to Anthony Fox now. Thanks very much. [APPLAUSE] ANTHONY FOX: Thank you. Thank you. So my name's Anthony Fox. I'm from Virginia, where I work
for a small analytics-focused company called CCRI. And I'm going to be
talking today about GeoMesa and how you can use open source
to manage spatiotemporal data on top of Bigtable, and also on
top of some other Google Cloud architectural components. So just to level set,
traditional GIS spatial data is things like points,
lines, and polygons-- so natural features, geographic
features, political boundaries, those things that are fairly
static and not very big, not necessarily the best
use case on top of Bigtable that you could use that. There's also a huge amount
of satellite imagery that is being produced
every single day as part of the keynote. We heard that Planet Labs
was imaging the entire Earth every three or four
hours, I think, which is producing an
enormous amount of data. But then there's also
the data that we're going to focus on today,
which is the kind of data that companies like
exactEarth collect. And this is a growing area of
data, of spatiotemporal data. So this is a
visualization of the data that exactEarth collects
from their constellation of satellites. This is AIS data. AIS stands for the automatic
identification system. It's a protocol that
all marine vessels broadcast their location
on every six seconds. So every vessel in the world
broadcasts their location every six seconds. And exactEarth's
satellites pick that up as they spin around the
globe and downlink that to navigation systems,
safety systems, routing systems, all
kinds of other systems. So there's the satellite
collected data, but then there's data collected
from connected devices. So the data that an
organization like Strava collects, so Strava is a social
media application for fitness activities. So exactEarth collects maritime
data every six seconds. It produces, I think right now
about 8 million records a day. And they're launching a
whole suite of new satellites where they're going to
collect orders of magnitude more than that. Strava collects every
second, a measurement is taken for each activity. And that measurement has things
like heart rate, elevation, time, location. So they produce a lot
of data per activity, and then across all
of the activities. These are some
density maps that we produced from ingesting
Strava data into GeoMesa. This is commutes in downtown
San Francisco, which pretty much covers all of
the roads in downtown San Francisco. So spatiotemporal
data is growing. Volumes of it is growing,
it's very high velocity. In order to manage it, I'm
one of the original founders of the open source
GeoMesa Project, which is an Eclipse Foundation
location tech working group open source project. It's a toolkit for managing and
analyzing spatiotemporal data, both at rest and in stream. For managing and analyzing
the data at rest, it has integrations with
Cassandra, HBase, Accumulo, and the original NoSQL
database, Bigtable, which we're going to
talk in detail about. For streaming data, it has
integration with Apache Kafka. It's actually the same API that
it exposes on top of Bigtable, it exposes on top of Kafka,
which is kind of interesting. And for analyzing the data,
it has fairly deep integration with Spark SQL for doing
spatiotemporal operations. OK, so now we're
going to deep dive into how you might index
data, spatiotemporal data in Bigtable. So remember, Bigtable
has one index. It's a lexicographically
sorted key. So as an example of
what you shouldn't do, consider if you concatenated
latitude and longitude. So your row key becomes
what you see there. This is the Moscone Center's
latitude and longitude, 37.7839, negative 122.4012. The typical query that
you're going to make is going to bound
those dimensions. So you're going to look for
things around the Moscone Center. And if you do that and
you scan Bigtable's lexicographically
sorted index, you're going to get data points that
are near Fukushima, Japan, because they happen to
share the same latitude, and they sort lexicographically
near each other. So data that is very far
apart in physical space is close in Bigtable index
space, and that's not good. So we don't want to do that. So we have to be a little
bit more clever than that. And so we use something
called space-filling curves. So there's lots of different
kinds of space-filling curves. The whole point of a
space-filling curve is to encode multiple dimensions
into a single dimension so that nearness is maintained. So things that are near
each other in physical space are near each other
in index space. So here's a couple of examples. On the far right you
have the Hilbert curve, and you have multiple
iterations of the Hilbert curve. The Hilbert curve
has great locality. On the left you've
got the Z-order curve, which is incredibly simple. And so we're going to
walk through a deeper dive of the Z-order curve. There's also an
interesting library that Google has put
out called S2, which treats the spherical
earth as a cube, and then overlays a
space-filling curve on each one of those
faces, and in doing so minimizes the distortion that
is typical of map projections. All right, so let's walk
through the simplest space-filling curve
and how it gets used so you get some intuition
for how you produce these keys. So we're going to encode the
coordinates of the Moscone Center into a 32-bit
integer Z index. So we have 32 bits. We have two dimensions,
so 16 bits per dimension. We're going to
scale the latitude and longitude coordinates. They're bounded dimensions. So for instance, longitude is
bounded negative 180 to 180. We're going to map negative 180
to 0, and 180 to 2 to the 16th. We'll do the same
scaling for the latitude. And we have integer
representations of our x and y-coordinates. Then we take the
binary representation. So there's 16-bit
strings, essentially. Interleave those, convert
back to an integer, and that's our Bigtable key. And so what we've
effectively done is we've linearized this
multi-dimensional space. So in this chart
on the right, you can see a kind of conceptual
notion of a Bigtable index going from 0 to 2 to
the 32, and the point on that index where the Moscone
Center lands in index space. What's really nice about
this is that things that are near each other sort
near each other in Bigtable. And region queries, which
is your 95% use case spatiotemporal query,
turns into a range scan. So the range that you're
looking at there for that box is just a scan from a starting
integer to an ending integer. And we get the magic of Bigtable
handling of partitioning and load balancing
for free, because we have this nice machinery
behind this structure that we've set up. So if we jump back, there are
some additional properties to wrap up on
space-filling curve. They have this
recursive structure. So if you look at
the Z order curve, the first picture
is two bits, right? So it has this
quadtree decomposition. And you start adding
more bits, but you can do these common
prefix scans, which is one of the things
that Bigtable is best at. And those prefix scans map
to regions on the earth. They naturally extend to
higher order dimensions with just a few tricks here and
there so you can encode time into the Z order curve, and
you get a Z3 curve or three dimensional Hilbert curve. And there are some
extensions to Z order curves that allow you to index
nonpoint geometry, so geometries with spatial
extent like line strings and polygons. So that covers the
basics of how you use a space-filling
curve in conjunction with Bigtable's model. So another thing that I'd
like to talk about, something that we've spent a lot of
time in GeoMesa, but I think is a natural evolution
of a developer working with Bigtable. In Bigtable, you
have this one index. So you design your
table structure so that it maps to the query
patterns that you expect. And you make those query
patterns really, really fast. That's really good, you
feel good about that. And then suddenly
your users start doing different query patterns. And you realize that
your index structure is good for some queries, but
terrible for other queries. So much as you'd add an index
in a relational database, in Bigtable, adding
an index amounts to redundantly storing
your data in another table with a different
index structure. So in this query
example then I'm showing here from
a Strava table, there is a spatial
predicate the ST contains, an athlete ID predicate. You don't want to scan
all of the data that matches that spatial predicate
and filter down to the athlete ID, you want an index on athlete
IDs, so those are really fast. So in GeoMesa, we
have a query planner that will inspect the query
and find the predicate that has the least cost
in query execution, and dispatch the
query to that index-- in this case, the Strava
activities index-- and then apply spatial
predicate filtering in a post-processing phase. This is a typical pattern. So either you use this through
a library or it's something that you can adopt as you
begin to engage with Bigtable. So we've covered
spatial indexing, we've covered redundantly
storing your data with multiple index structures. What about analytical workloads? So we heard that there's no
query time joins in Bigtable, but you still want
to answer questions like in the exactEarth use case,
what is the optimal time of day to traverse the Panama Canal. It takes, I think on the
order of seven to eight hours to traverse the Panama Canal. So picking the right time
can be a big cost savings. So that's the kind
of analytical query that you might ask
of exactEarth's data. An analytical query you
might ask of Strava's data is, what are the typical
commuter routes of workplaces in downtown San Francisco? These are joins and
aggregations that Bigtable just doesn't provide at query time. But Bigtable does support
loading your data out of Bigtable and into
something like Spark. So you can use this
Spark SQL in conjunction with Google Cloud Dataproc
to recover the ability to do joins in aggregations and
execute analytical workloads on top of your Bigtable data. And GeoMesa adds a bunch
of extensions to Spark SQL, including geometry types,
spatial predicates, and geometry
processing functions. So let's dissect
an analytical query and see how the whole suite
of Google Cloud products allows you to execute
this particular workload. So this is the
Panama Canal query. We've got aggregations,
averaging counts, group bys. We've got geospatial predicates. In this case, it's ST contains. We want to filter
it down to things that are around Panama Canal. We've got geometry types. We're creating a polygon
predicate on a one hour geometry column within Bigtable. So what happens is, you spin up
a Dataproc cluster of whatever size elastically
scaled to the workload that you expect for that
particular use case. You submit your Spark SQL
query and GeoMesa's extensions intercept the query, discover
the spatial predicate, and use the spatial predicate
to compute the ranges out of Bigtable that they'll
use to provision the RDD. So only the data that
matches that predicate will come into the Spark
in memory run time. And at that point, Spark then
spins and does its aggregations and GroupBy operations. So we've been able to recover
this analytical workload capability by leveraging
additional components of the Google suite. And you'll discover that you
should time your traversal of the Panama Canal
to about 0600 GMT, which is the least
trafficked hour of the day. So that covers what I was
going to talk about today. As a note, this particular
slide is out of date. It says 131 snapshot. But the release was cut, 131
was cut maybe an hour ago. So it's just in time. are my coordinates
for contacting me. This is available now. Yeah, thank you. [APPLAUSE] [MUSIC PLAYING]