Advancing Serverless Data Processing in Cloud Dataflow (Cloud Next '18)

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
[MUSIC PLAYING] SERGEI SOKOLENKO: Hello and welcome. My name is Sergei Sokolenko. I am a product manager at the Google Cloud. And today with me, I have George Hilios, vice president of Two Sigma. Talking to you about advancing serverless data processing in Cloud Dataflow by separating state and compute storage. But before we talk about separating compute and sate storage, I wanted to do very quickly, a historical retrospective of how the industry used to process really largely data sets in the past. Imagine it's the year 2000, and you have a one petabyte large data set that you want to process. Who am I kidding? In the year 2000, a really large data set was probably something you measured in terabytes. Well, still, you have a terabyte of data. And you want to process it, and it's the year 2000. So you'd probably use something like this, an architecture, something like this. You would use a large box with lots of CPU and might be a mainframe or a Unix box. The storage would be provided through a storage area network. Then come 2004, new distributed processing frameworks were released. And now, you would probably be using a cluster of cheap Linux nodes with storage actually being part of that processing node, so local storage. Then as things moved into the cloud, you were still using these clusters of Linux nodes. But now, you would be using network-attached storage provided by the cloud provider. In 2010, Google published a white paper. We called it the Dremel paper, which defined the new architecture for processing really large web-scale data sets. And that architecture was based on the following idea. You would have a independent highly-available cluster of compute. And you would have highly-reliable storage facilities. In 2011, we launched a product that was based on this white paper, on this concept. We called it BigQuery. And probably at this conference, you've heard about BigQuery many times. Became a very successful product. So this is the architecture of BigQuery. It has the concepts described in the Dremel white people. It has a replicated storage layer. It has a highly-available cluster of compute. They're separate. They're working together. And they're connected through a very fast petabyte scale network. But there's a component in this architecture that was not part of the original Dremel paper. It's the [INAUDIBLE] in-memory Shuffle [INAUDIBLE].. Now you might ask yourself what is this Shuffle? Why was it added later on when we actually launched the product into general availability? So let's just quickly cover the concept of Shuffle. With Shuffle in data processing, when you have a big collection of key value pairs, and you want to do a grouping or a join with another data set, you need to sort these elements. And when you have a single node processing architecture, you can easily sort it more or less. Either in-memory, you can sort your data set in memory by key. Or you have efficient on-disk sorting algorithms. Unfortunately, once your data sets explode, and you have to move to a distributed framework or a distributed architecture where you have lots of nodes, it becomes somewhat more complex. So your end goal is to have everything sorted by key and all data elements associated with a particular key co-located on a particular worker node. And what you end up doing is you end up shuffling or physically moving data elements associated with specific keys from one box to another. And that's the process of shuffling. Now as you can imagine, this is a very resource-intensive and time-intensive process. And as your data sets scale, it becomes more and more complex. That's why BigQuery ended up building a separate dedicated component that was just responsible for shuffling big data sets. And it became a critical component that connected the storage as well as the compute nodes. Now I am a product manager of Dataflow, and this is a Dataflow-related question. So you might ask yourself, why am I spending so much time talking about Shuffle? We actually, in Dataflow have the same problem. We are providing the same data transformations to customers. We allow them to do groupings, and joins, and aggregations, and filtering, and projections. So the problem set is actually quite similar to what BigQuery is dealing with. Here's a screenshot of a real Dataflow pipeline. Let's just follow this pipeline as it lives in our service. Simplistically, it can be presented by this diagram. We have a couple of data sources. We have data transformations. That's the green things over there. And then we have joined some group [INAUDIBLE].. That's a special kind of a data transformation. And finally, the data ends up in a data sync. So as you submit this Dataflow job to our service, Cloud Dataflow, we are going to create a cluster of nodes to process it. And these will be Compute Engine virtual machines, and we'll also use persistent disk storage. And they're going to start reading from sources. Once we have the green boxes, the data transformations, they're going to run them on compute and use some local storage. And then when things enter the joins and the group [INAUDIBLE] they're actually going to do a Shuffle. And we're going to do the Shuffle today using resistance storage-- either the magnetic type or SSD type, whatever the customer specifies. Ultimately, data gets written into sync, and we shut down the cluster. And that's kind of, in a nutshell, how a Dataflow job works. Now in Dataflow, as I said, we are dealing with the same kinds of volume and scalability issues that BigQuery is dealing with. We have huge volumes of data that customers want to process. And so we ended up implementing a very similar mechanism for a distributed in-memory Shuffle service in Dataflow. And the service works like this. You have your compute clusters and data on your user code, which you write either in Java or Python. Then we have the Dataflow Shuffle. And the Dataflow Shuffle is deployed for high-availability reasons into multiple GCP regions. So in each GCP region where a Dataflow Shuffle is available, we also have replication and duplication of Shuffle in every zone. We have it deployed in every zone. We have a component we call the outer zone placement. And this component decides, based on the available capacity and the job, which zone it needs to be assigned to. So we take care of deciding which zone to run. And then we have a tier of components which coordinate the actual Shuffle operation among themselves. We have a Shuffle proxy, which accepts the job, and we have two file systems. One is in-memory, and the other one is on-disk. And so if your job and the available capacity allows us to do a Shuffle entirely in memory, your data will never touch disk and will quickly join results to you. But if your job is size such as that we have to cache some of this data on disk, we're going to transparently move some of your data from in-memory into disk and then Shuffle it there. For you, you don't have to worry about anything. We do transparent shuffling, and we just return results to you. So this entire architecture requires no code changes from users. You can tell us just by specifying a parameter that you want to use the Dataflow Shuffle. And we're going to switch from the PD-based Shuffle-- the one that we discussed earlier in the talk-- to a service-based Shuffle transparently to you. Now today, I'm very happy to announce general availability of Dataflow Shuffle in two GCP regions-- in US central one and US west one. Dataflow Shuffle has been in beta for awhile. Now, it's available generally to all our users. Let's quickly go through all the benefits of Dataflow Shuffle. The first benefit is many of our customers tell us that Shuffle is now much faster than it used to be. So if we take the same pipeline and compare the execution times of this pipeline using magnetic disks, magnetic PD, then SSD-PD, and the Dataflow Shuffle-- the Shuffle service-- you're going to see that the magnetic disks give you maybe 55 minutes of duration in your pipeline execution. Now [INAUDIBLE] pipeline, the one that is using SSD-PDs, persist in disks, will run in approximately 17 minutes. The Dataflow Shuffle one will actually run in 10 minutes. These results are not always applicable to every use case, but many of our customers are telling us that's what they are seeing. The other benefit of Dataflow Shuffle is that we are now able to process and shuffle much larger data sets. And you're going to see in a demo the sizes of shuffles we can now support. If previously the untuned jobs-- the ones that used magnetic disks-- were able to shuffle tens of terabytes of data, and jobs that used SSD-PDs were able to shuffle up to 50 terabytes of data, now, we can push into hundreds of terabytes. So with this, I wanted to show you a demo that actually runs a Shuffle job. And here's how the job looks like. I have two inputs-- two GCS buckets, a data set and GCS buckets. And I'm going to read a 50 terabyte data set in each case, in each of the inputs. And I'm going to join them and write them into a GCS bucket. The code for this pipeline is written in Scala using a framework that one of our customers developed. It's called [INAUDIBLE]. Spotify are the original developers of this framework. And I like [INAUDIBLE],, because it allows me to very easily define my pipelines. The entire pipeline is these 10 to 15 lines of code. For those of you who might not see it, let me just quickly explain what it does. In the main function, I define my first input and my second input. And then I do a left outer join. And I write the output to files. That's all this pipeline does. And let me switch to the demonstration. Before we switch to demonstration, you might ask me yourself, how does the input work? Here's a screenshot of one of the files that I will be joining. It consists of three columns. The first column is the key. That's the key I want to join on. And it's just random generated strings. Then I have a record ID. I have billions of records in my data set. And so this record ID is pretty long. It's a very long string. And then I have the payload, the value that I want to associate with the key. And so with this, let's switch to the demonstration, and let me run a few commands. Great. So I'm in bash, in terminal. The first thing I want to show you is the contents of my bucket from which I will be reading files. I've defined a bash variable, input 50 TBs. And this is the bucket that contains 50 terabytes worth of files. So I have 50,000 1-gigabyte large files that have data with random keys that I will be joining together. And just to show you that this bucket really contains 50 TBs, I ran a listing command. And here's my proof it does really contain 50 terabytes. I'm not going to run now, because if I ran it, then listing 50,000 files will actually take several minutes. That's why I run it before. Now my next command is the one that will create the pipeline. And I'm going to be using the Scala build tool to quickly execute from the command line the command that launches a Dataflow job. And as you can see, what I need to specify here is, what is my input? And both of the inputs will be reading from the sync bucket, the output bucket, and the parameter instruction, the Dataflow service to use the Shuffle service. So I'm going to run this command. And so within a minute or two, the code will be packaged. All of the dependencies will be deployed. And I'm going to initiate my job. I'm going to give it a few seconds. Depending on the Wi-Fi gods, if we're lucky, it will be done quickly. But in any case, I also ran a Shuffle just before the demo, a Shuffle that processed 100 terabytes. So here's my job. Just to give you a few data points about this job, the job took 500 billion elements in my data set. It was about 51 terabytes. In the second input, I had also 500 billion rows, another 51. And then I joined them in this operation. In Dataflow, we call joins co-group by keys. And so it created 100, I think, billion combinations of these keys. Some of the keys overlapped. Some of them didn't. And then I wrote them out into files. And I actually created 2.5. trillion lines in my output, 2.5 trillion, almost one petabyte of outputs. And this job took about 7.5 hours. And I was able to process it with 10 lines of code. Back to my slides, please. So hopefully, I was able to show you that now it's very easy to do large scale Shuffle processing with Cloud Dataflow. In addition to my 100 terabyte run, I also ran a few smaller jobs just to show you the scalability of this process. So I did a four terabyte run, a 20 terabyte run, and the 100 terabyte run. With 5x data increased every time. And the execution time was pretty much linear, which is what you want to have. You don't want to have a quadratic escalation of your duration. If you can achieve linearity in execution time, that's a very good result. What you don't see on this slide is that the resources that you consumed during this execution, what we call Shuffle data processed, also scaled linearly. If you're difference in data is 2x, you will only pay 2x more for such a job. It's another good property of a service. You are able to scale both duration as well as cost linearly as your data processes. Now we talked about batch pipelines. And hopefully, I was able to show you that in batch processing, being able to do efficient shuffling is important. But what about streaming processing? Cloud Dataflow provides both batch and streaming capabilities. And perhaps, in streaming you also need Shuffle, you might ask. The answer is, yes, it's also very important to be able to do Shuffling in streaming pipelines. Because customers want to join in group data elements. But in addition to Shuffling, in streaming pipeline, the other thing that is important is that you need to be able to efficiently store state. State, as I said, relates to windows, time windows that you create when you're on windowing aggregations on streaming data. So let's go through a similar life of a pipeline but for a streaming use case. So this pipeline leads from Pub/Sub, does a windowing operation by event time, does a grouping, does a group by, does an account aggregation, and then writes into BigQuery. Now the first thing to note here is that when you submit such a job to Dataflow. we're going to divide it into three stages-- everything that comes before the group by, everything that comes after the group by, and the actual Shuffle step. And once we divide it into three portions, we're going to start thinking, how do we scale and distributed this processing? And our way of distributing the incoming workload to multiple workers is by partitioning your data set by key. If your pipeline already has a key through maybe a group by operation, we'll use that key. If it doesn't have a group by, then we'll auto-generate a key and partition your data that way. So in this case, for example, we have split the key spaces for the pre-Shuffle and the post-Shuffle operations into ranges. I'm going to show you next what we are going to do with these ranges. Well, they're going to assign them to workers. So each key range will be assigned to an individual worker. Once we've done the assignment of key ranges to workers, we are going to persist the date, make specific PD volumes responsible for storage of data as they relate to these keys and key ranges. When we have to scale a pipeline in a streaming case, we're actually going to move an entire disk from one worker to the other. We're not going to try to rearrange key spaces and compress them or reassign them. We're going to take an entire desk, an entire persistent disk and reassign it to another worker. As you can imagine, it's another time-intensive operation. Streaming out of scaling has been working for many customers really well. But in some cases, it might end up being a little bit sluggish because we have to reassign disks. Now we talked about group by, let's quickly cover the windowing operation. Here, it's important to remember, in streaming, there are two important time dimensions that you want to think about. The first time dimension is your business time dimension. That's the event time. This is when the sales transaction happened if you're dealing with sales transactions. Or it might be the time when the user clicked on a link. So it's the business time. In our terminology, we call it event time. The other important time dimension is processing time. That's the time when the business transaction enters the processing pipeline. And as you can imagine, there could be delays between an event generated by the source system and this same event entering a processing pipeline. Now many customers want to do, they don't want to deal with processing time as a unit of analysis or as a dimension of analysis. They actually want to deal with event time. And they want to organize the data elements into windows of either fixed duration, or configurable duration, or session-based windows. But they want to group their data elements by event time. So with this in mind, what we have to do on the Dataflow side to allow such an analytical processing, we end up buffering data elements on disk. Because we have to store these elements until the window closes, and we can initiate the processing of elements in the window. So in addition to Shuffle, we also store data elements related to windows. So we asked ourselves, can we do something similar for streaming pipelines, as we did for the batch pipelines with Shuffle? And the answer is, yes. I'm happy to announce that today, we made the streaming engine available in beta in two regions-- in US central 1 and Europe west 1. Quickly about the benefits of the streaming engine and the architecture of the streaming engine. Your user code-- the code that you write that represents your business logic-- continues to run on worker nodes. But now, all the streaming plumbing that used to run in [INAUDIBLE] and co-exist with your user code that used to run on the worker nodes, it has been moved to a dedicated service in Google's back end. And it is responsible for two things. It's responsible for windows state storage as well as for streaming Shuffle. Your user code communicates transparently. You as a developer, you don't have to think about it or worry about it. Very similarly to Dataflow Shuffle in batch case, you only have to provide us with a parameter that tells us that you want us to use the streaming engine. So your using code transparently communicates with us back end. And we do all the shuffling and all the state storage for you. The benefits are, we don't have to move around disks anymore if you want to scale. So out of scaling and streaming became much better. We also can do maintenance on our service much easier. It doesn't interrupt you. You can continue running your pipeline. We can do maintenance and patches on the back end without interrupting you in most cases. And we also consume less resources on the worker nodes. So now, your user code that implements business transactions and processes your data element has more CPU and memory available to it. So it can produce more and crunch more data. Here's an example that shows you how streaming out of scaling works together with a streaming engine. And what this diagram shows you is a incoming stream of data that ran for about one hour and ranged in bandwidth and throughput between one megabyte per second to 10 megabytes per second. When I didn't use the streaming engine, Cloud Dataflow scaled my workers. And initially, it started with three workers. And then once Dataflow sense there's an increase in incoming data, it's scaled the number of workers to 15. It kept this number for awhile and then scaled down, and then waiting for another spike in inputs and then scaled up. Now if you compare this graph to the graph with the streaming engine, as you can see, we used less workers. And we were able to scale down much faster. Here are the two graphs overlaid. And it shows you quite nicely how the streaming engine is able to avoid scaling down to high numbers of resources, is using less resources, and is also more responsive to variations in incoming data rate. Of course, the best stories about Dataflow are told by our customers. And so I'm very happy to invite today, George Hilios from Two Sigma, who will be talking to you about how Two Sigma is using Cloud Dataflow. [APPLAUSE] GEORGE HILIOS: Thanks, Sergei. So my name is George Hilios. I'm a vice president at Two Sigma. Just a quick intro into who Two Sigma is. We are a Manhattan-based hedge fund. We take a scientific approach to investing. We hire scientists, mathematicians, engineers, the works. Our mission is to find value in the world's data. So in particular, I had our engineering group that deals with granular economic data. I'm probably getting some squints for most of you. So let me explain what that kind of means. Here's an example. The NOAA publishes a wealth of information publicly. There's weather forecast about all regions of the United States. And so we have a big question-- can we correlate weather activity with regional economics to predict financial outcomes? These are the types of questions data scientists and engineers in my group ask. And so this data tend to be very large. I actually jumped slides a little too quickly. Sorry. The NOAA data compresses about a terabyte or so. But when you expand it and look at all the rows of data, this is into the hundreds of terabytes. So it's very, very large to make sense of that. So what do we do with all this data? We can do a lot of things with this. We can do geospatial analysis, aggregate billions of rows of data, terabytes of data, ultimately, to build alpha models. So this type of work is very lucrative. It's very satisfying. However, there is a weakness there. We get a lot of our data from third-party vendors. And so we're at risk of bad vendor data causing bad predictions. So at Two Sigma, we take data quality very seriously. We build in anomaly detections into all of our pipelines to guard ourself against these bad outcomes. Now when you're dealing with data at this scale, we have to build very complex, high-scale systems to detect these anomalies and protect us and our investors. And so this is where Dataflow enters. We really don't want to be in the business of building infrastructure. We'd like to focus our energy on business logic, things nuanced to particular vendors and the types of quirks they threw our way. But also, we're not a particularly large team. We don't have the resources Google tends to throw at these infrastructure-type problems. So can a team of 10 to 20 engineers deal with 100, 200 terabyte data sets without batting an eye? Let me take a quick little sidebar here. How many of you know what RFC-4180 is? I'll be surprised to see any hands out there. I do, and this is the format for CSV files. These are comma separated values. It's the bread and butter in the industry for how files are distributed. Unfortunately, our vendors do not tend to know what that RFC is. Let me show you an example of what that really entails. And these are real examples. Obviously, change for presentation purposes. So that comma, is that separating two numbers, or is that 15,600? In this case, it was actually 15,600. Good luck to the pipeline understanding that. Within the same file, dates represented in different ways. Here is a new line character. Is that a separate row, or is that the same string that happens to have a new line in it. A lot of these vendors get their data from third-party sources to them, and they're just passing it along. So if it's unescaped and unquoted, any off-the-shelf CSV parser will choke on this. And here's another one. We've noticed that as the days and months go by, vendors decide to expose new fields to us, so the schema changes. So imagine you're a data engineer. You're writing a pipeline. And you have your code for doing aggregation. Do you have a line, a conditional in there that says, if date is less than January 1, 2016, I only expect four rows? All your pipelines would be littered with this vendor-specific logic. And it gets very unwieldy very quickly. So we wanted to use this opportunity with a tool like Dataflow to eliminate this problem entirely and get in front of it. So let me show you an example of what that looks like. We built some tooling, which we called the normalizer-- very creative naming we used-- on top of Dataflow. And the very first step up here really involves taking the list of files and a JSON file we call the schema as input. I'll go into details of what that schema file looks like in the next slide. But it's things like, the first field has this name, and it has this type-like string or integer. It has hooks for Python user-defined functions for doing custom things, et cetera. In this second step, we implemented a custom file source. It takes a file as input from GCS, and it turns it into a PCollection, which is a Apache beam slash Dataflow concept, a PCollection of generic type, which is an Avro object that has a schema. Again, we want to use open source where possible to leverage the innovations and the contributions of the broader community. So we turn everything into an object of a type. And much later in the pipeline, we split. The stuff that goes one direction are all the rows that we successfully parsed, that we applied our logic to normalize and put everything to a schema. And we use the same Spotify library Sergei mentioned earlier, CO, to save it as a parquet file. There is another functionality in there that I particularly appreciate. It's that you can say, I want 5 million rows in every single parquet file, which for those you have done Spark, that is actually kind of a hard thing to do generally. So when we output our parquet files and process them downstream, with all the files being very evenly distributed, you make much better use of your resources. The other output I want to call out, because this is the hard-earned experience that our data engineering team has built up over time. We output an extra file that includes summary statistics. I'll also show an example of this-- things like the total number of rows, the number of error rows that we encountered, and so on. And even more importantly, say you're on support, you get paged in the middle of the night, and there's a failure, there's a spike in errors. You'd like to have a file you go to that contains all of the rows that you saw that were invalid. Maybe the vendor got bored and wanted to throw a new cork your way. I'm laughing because it's true. And finally, I'm super-proud to announce that a partnership between us and Google, we really wanted to get parquet loading directly into BigQuery. And Google pulled through big time here. So we not only can produce these parquet files, but we can load them to BigQuery. So imagine having all your raw vendor data in BigQuery to do your research on, to investigate failures, et cetera. And then if your aggregation is simple or SQL-based, you can just let the Google infrastructure handle it for you. Furthermore, since these are all open source technologies-- parquet, et cetera-- the data are small. You can feed it into [INAUDIBLE].. If the data are big and nuanced, you can' feed it to Spark. We have one thing up front that handles all of this. It's a big deal for us. So here's an example of a scheme of file I talked about earlier. There's a few different types here. That first one is a date time in a very standard format-- an integer containing the number of seconds since Unix Epoch, January 1, 1970, and very standard convention here. Way of a string-- we have a string hash that takes its input another column. We found that if your primary key or your foreign key is a string, that's very compute-intensive to do joins or group bys on. And so with our standard upfront processing, we can hash a string as input and add a new column there. So when it gets loaded to BigQuery, we have the hash as well as the source. And then when we process in Spark, those groups are very fast. And finally, [INAUDIBLE]. Some fields are optional, and we want to make it clear which one should be treated as failures. So here's an example of our output-- the summary file. I'm super-proud of the work the team did. Because this particular vendor gave us over a trillion rows of data. And this is one of our days worth of delivery. We were successfully able to parse 916 million records. And we did this in a matter of hours on Dataflow. It scaled very linearly. There's 185 parquet files. You can kind of see behind the circle, they're very evenly-sized. This is great for Spark processing. And the errors rows is 0, because we had no failures that day-- very powerful. So let's tie this all together. We have the data from the vendor. They give it to us FTPS, 3GCS, carrier pigeon, whatever they want to do. We first store it in GCS and then process it in Dataflow. This is where we do our normalization, turn it into parquet, and output our summary statistics. We then run our ingestion checks. Like I mentioned earlier, we care deeply about data quality in Two Sigma. So we do things like fail if we have a spike or a sharp decline in the number of rows processed or the number of errors encountered. Ideally, we have zero errors. But sometimes, if you're processing a billion rows, you sort of shrug at five failed rows. That's just the reality of the situation. And last but not least, we load to BigQuery for analysis, research, and further processing. So I do want to highlight some of the high-level benefits we get here. A small team like ours can process a ton of data with very little effort. And that trillion row data set example I told you earlier, we tried it in Spark previously. And it took 1,000 workers two weeks to crank through everything. And this is a pretty simple algorithm, just parsing strings and turning them into parquet. In Dataflow, we did the whole data set in one to two days. One other benefit is we don't have to manage any of the infrastructure. We just write our beam pipeline and send it to Google. And there's one more thing I want to highlight there. Sergei talked earlier about Cloud Shuffle. We found we should just turn it on. So either the data set is small enough to where Shuffle is not invoked-- you can see on the UI, there's a number of bytes shuffled-- if this number is small, you don't really pay much of anything for it. But when that number is big, and your data set is very large, we've seen pipelines take much less time to run, which saves you on compute and storage costs. That's it. Thank you. [APPLAUSE] SERGEI SOKOLENKO: But let's wrap this up. Hopefully, you were able to see how having a separate distributed in-memory processing mechanism for Shuffle improves the efficiency of separating compute from storage. That was key for both batch and streaming pipelines. We have a new service available for you today, Dataflow Shuffle-- a new component, sorry, not a new service. We are not launching a new service. So for batch pipelines, we have a new option for you, Dataflow Shuffle, that makes your pipelines run faster. And you are now able to process much larger data sets and shuffle much larger data sets. For the streaming pipelines, we have the streaming engine. And it improves the scalability of your streaming pipelines. Both of these features are now available in two GCP regions-- in US central one, and Europe west one. Thank you very much. And we will be happy to take questions either over microphone or later on after the session. [MUSIC PLAYING]
Info
Channel: Google Cloud Tech
Views: 3,472
Rating: 4.6363635 out of 5
Keywords: type: Conference Talk (Full production);, pr_pr: Google Cloud Next, purpose: Educate
Id: 7Q5VJzJMG3A
Channel Id: undefined
Length: 39min 40sec (2380 seconds)
Published: Thu Jul 26 2018
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.