Dask in 15 Minutes | Machine Learning & Data Science Open-source Spotlight #5

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
- You're used to doing Machine Learning and Data Science in Python. But now you have this giant data set, it's about 100 GigaBytes, and it doesn't fit into your memory. You try to load it into a Pandas DataFrame, and you get an out of memory error. So what can you do? Well, a popular solution today would be to use Apache Spark. You can have a Python-like API with the PySpark support. So Apache Spark is an all inclusive ecosystem, which can give you parallel computing, which we can give you machine learning tools, SQL database access. So it's built to be very inclusive so you can do all your work there but it's based on Scala. So if things go wrong, if you have bugs, it will be very difficult to debug if you're used only to Python. And it's not the Python environment anymore. What about all your favorite tools from Python, Sci-kit Learn, Keras, TensorFlow, PyTorch everything. So you don't have access to these tools anymore. So what can you do? Well, you do have a solution. And it's called Dask. So this is the tool I'm about to show you today with a quick tutorial and I hope you will enjoy. Dask is quite a mature project now. So it has about 4 to 5 years under its belt. And in contrast to Apache Spark, Dask doesn't aim to be a complete full ecosystem. So its aim is more to be in very good synergy with other Python packages, while giving them more abilities with parallel computing and scalability. So this is a great project. And let's see how we can use Dask in order to parallelize and execute familiar code a little bit better. It's best to learn from examples, let's see how we can accelerate our code using Dask. Okay. So we have these two very simple functions, one, which takes an input and increments it by one, and one which takes two inputs and adds them together. We add this sleep operation in order to simulate computation, so it won't run too fast so we can see the difference. I define these functions and then I call them in this order. So we call Increment twice, and Addition last. So this should take about three seconds, because each one of these function calls has the sleep operation in it, which takes a second. And indeed, we got the computation time of three seconds. So how can we optimize it? Well, if we look closely at this order of calls, Python execute it in sequence, but really X and Y they don't depend on each other so they can be created separately, if we can send them to two different machines, they can create them independently of one another. And then Z can add these two together. So let's use Dask to use it. So Dask has the Delayed function, which is actually a Wrapper, which meant is meant to work as a decorator. I will specifically use delayed as a function call because I want to keep the original functions in order to still compare them together. So when we call delayed on the function, it grants us the ability to make a computational graph so we can see what kind of operations we can run in parallel. So I run the same code wrapped by delayed and let's see what happens. The code executed instantly so the functions weren't called because we have one second sleep there. So, what happened? What was created was not the computation it was the computational graph. So, we see this delayed object which was created. And this delayed object is actually the Schema, as you can see here, the computational graph, which describes the operations in this flow. And so, if any of you are familiar with TensorFlow graph, this is very similar, you first define the operations and now we can clearly (see the) increment nodes can run in parallel, and then we can add them together. So, now we can get the values by defining compute on the computational graph. And it took 2 seconds. So it took 2 seconds, not 1 second, because can't parallel compute everything, we first compute these two together, it takes a second and once they are created, we can add them together in another second, overall 2 seconds. Let's try something a bit more complicated. What else is slow in Python code? Well, for loops, of course, let's optimize them with Dask. So we create a list of data points and we want to increment each and every one of them and sum the results. So a standard Python loop would look like this: we iterate over the list, we increment each element save the list of results and then sum them. So this took 8 seconds because each one of these increment calls took 1 second. And we have the same problem, each one of these calls is independent of one another and it's a shame that we had to do this in sequence because of this for loop, so. Dask can help us here. We call the same delayed decorator on the increment function and we can also call delayed on the sum function in the end and if we can compute this, it should run faster. And as you can see, it took about half the time. So this is great, it's a very good improvement. And the computational graph looks like this. This is great. So we can see that we got rid of the sequential computation entirely and we calculate each one of the increments independent of one another and in parallel, and then we aggregate all these results together, sum them to get the final result. So we completely got rid of the for loop, and we made all this computation in parallel. Great, right? Okay, up until now, we've optimized very general code. Now I'm going to dive a little bit into optimizing machine learning pipelines. And I'm gonna start with how to handle data with Dask. So, Dask has its DataFrame, just like Pandas. And it's also very similar in API - they made it that way. And just like the Dask delayed function, which gives you a reference to the operation instead of computing it, DataFrame is also like delayed data, meaning that it points to where the data is, but it doesn't load it into memory if it isn't necessary. This allows you to handle data that is larger than your machine memory, because not all of it needs to be uploaded at once. So I'm gonna work with the NYC Flights Datasets and it's not bigger than my machine memory but it's enough to see an improvement in performance. And I want you to see how easy it is to handle several files with Dask. This is not so easy in Pandas to load all of these CSVs at once but because Dask already partitions the data into chunks so it doesn't matter to him (it) if it's one file or 10 files, it's already partitioned in chunks, so it's easy to load multiple files at once. So we can see that we can easily load all of these 10 files into our workspace. And you can already see the difference between the Pandas DataFrame and the Dask DataFrames that we don't see any values, so we just see the delayed structure and we have 10 partitions and only the first few rows are scanned. So Dask can understand what are the column names, what is the structure and what are the data types in the DataFrame and the data types are inferred from the first few samples. So this is important to understand. And I'll show you why it's important to understand. If we type the .head() method, we will actually get the values they will be computed. so we compute only the first five rows, and we can see it clearly. But now if we want to see the five last values, you can see we got an error. So what happened? In this column, they expected an integer but got a float. And in this column, they expected float and got an object. So what happened here? So you can see this column the first few entries are actually integers. So the integer type was inferred but actually, later in this data, they appear some values are floats and this got an error. And the TailNum, the first entries didn't have any values in them so they default assigned as float, but later on it's actually a string. So we also got an error here. So in this particular case, when the actual data types can't be inferred correctly from the first few examples, we have to specify them manually. So now we specify the data types and now we can access the tail and we actually see that the TailNum is in string, and it is that the elapsed time is in float. Now let's see how the data frame handles operations like finding the maximum value. So if we were working with Pandas, and we had this 10 files, so we would have to check the maximum value for each and every one of these files, or concatenate all the data together and that would maybe put us out of memory. But in Dask DataFrame, we can handle all these partitions efficiently. And the Syntax is identical to Pandas, we can call on the max method on our desired column. And in this case the delayed behavior is a default. So this is a delayed operation, and we have to type in compute in order to get the value. So we compute it. It should take about four and a half seconds. And it needs to scan about almost 2M entries. And we got the result. And we can also because this is a delayed operations, we can take a look at this graph. And this is really, really beautiful. We can see the 10 partitions of our data and the reading order of the entry, the delayed function, the slicing where we get to the value, the max operation and then max operation over all the values from the partitions into one last max check and the last value, so. A computational graph was made for all these 10 partitions in order to query all of them together to get this result. This is extremely efficient and this is the default behavior of Dask. Now, there's only one last thing we have to know to get the full picture, which is how we pass on this data to train a model, so. You can handle the data and just like in Pandas, most of the queries work, you can ask for the shape here is a little bit of nuance. You don't know exactly how many samples you have unless you compute it, but you know the structure. You can make these little checks for missing values, just like you would in Pandas, but it's designed as a graph, so you need to compute in order to get it. And now I want to take advantage of that I am in the Python environment to use a Keras model. So I define this little simple neural network, which I want to use in order to train and I'm gonna, I want to pass the Dask DataFrame to this model to fit. So what we can do is we can make a generator, like so. And this generator will only yield fractions of our Dask DataFrame. So we take only 1% and we sample the Dask DataFrame and this will be our batch. And only when we will actually yield the batch, then we will compute, we'll ask compute of the computational graph to actually fetch the values for us. And this will only yield fractions of the Dask generator. And the big difference here is that it doesn't matter how big the actual dataset is, I can have hundreds of files there, hundreds of TeraBytes there, but I will only need to fraction a small portion and only this fraction will be loaded into machine memory in order to train a Keras model. So now I can fit this generator, the generator will yield its batches of X and Y of its features and labels and I don't have to worry about running out of memory. And it works. There are many more great features to Dask like Task Scheduling, which is very important, but this video is already getting a bit long. So to sum things up, if you're not a big company, which works with SQL databases or already heavily invested in PySpark infrastructure, I really don't see any reason to choose PySpark over Dask. I think this is a great open-source package, which keeps improving and will keep on improving and many people are contributing to this great project. And I hope that I helped you get into this because it can be a little bit tricky to get into at first but this was the point of this video. If you've learned something new, please help me reach more people by liking, commenting or sharing this video. I do this kind of video every week. So if you like this kind of content, please connect or subscribe and follow me and I'll see you next week.
Info
Channel: Dan Bochman
Views: 21,204
Rating: undefined out of 5
Keywords: dask, pyspark, spark, big data, python, tutorial, machine learning, ai, data science, data preprocessing, parallel computing, computational graph, data analysis
Id: Alwgx_1qsj4
Channel Id: undefined
Length: 15min 18sec (918 seconds)
Published: Fri Feb 21 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.