- You're used to doing Machine Learning and Data Science in Python. But now you have this giant data set, it's about 100 GigaBytes,
and it doesn't fit into your memory. You try to load it into
a Pandas DataFrame, and you get an out of memory error. So what can you do? Well, a popular solution today
would be to use Apache Spark. You can have a Python-like
API with the PySpark support. So Apache Spark is an
all inclusive ecosystem, which can give you parallel
computing, which we can give you machine learning tools,
SQL database access. So it's built to be very
inclusive so you can do all your work there but
it's based on Scala. So if things go wrong, if you have bugs, it will be very difficult to debug if you're used only to Python. And it's not the Python environment anymore. What about all your
favorite tools from Python, Sci-kit Learn, Keras,
TensorFlow, PyTorch everything. So you don't have access
to these tools anymore. So what can you do? Well, you do have a solution. And it's called Dask. So this is the tool I'm
about to show you today with a quick tutorial and
I hope you will enjoy. Dask is quite a mature project now. So it has about 4 to
5 years under its belt. And in contrast to Apache Spark, Dask doesn't aim to be a
complete full ecosystem. So its aim is more to
be in very good synergy with other Python
packages, while giving them more abilities with parallel
computing and scalability. So this is a great project. And let's see how we can use
Dask in order to parallelize and execute familiar
code a little bit better. It's best to learn from
examples, let's see how we can accelerate our code using Dask. Okay. So we have these two
very simple functions, one, which takes an input
and increments it by one, and one which takes two
inputs and adds them together. We add this sleep operation in
order to simulate computation, so it won't run too fast so
we can see the difference. I define these functions and
then I call them in this order. So we call Increment
twice, and Addition last. So this should take about three seconds, because each one of these function calls has the sleep operation in
it, which takes a second. And indeed, we got the
computation time of three seconds. So how can we optimize it? Well, if we look closely
at this order of calls, Python execute it in
sequence, but really X and Y they don't depend on each
other so they can be created separately, if we can send them to two different machines,
they can create them independently of one another. And then Z can add these two together. So let's use Dask to use it. So Dask has the Delayed
function, which is actually a Wrapper, which meant is
meant to work as a decorator. I will specifically use
delayed as a function call because I want to keep
the original functions in order to still compare them together. So when we call delayed on the function, it grants us the ability
to make a computational graph so we can see what kind of operations we can run in parallel. So I run the same code wrapped by delayed and let's see what happens. The code executed instantly
so the functions weren't called because we have one second sleep there. So, what happened? What was created was not the computation it was the computational graph. So, we see this delayed
object which was created. And this delayed object
is actually the Schema, as you can see here,
the computational graph, which describes the
operations in this flow. And so, if any of you are
familiar with TensorFlow graph, this is very similar, you
first define the operations and now we can clearly (see the) increment
nodes can run in parallel, and then we can add them together. So, now we can get the
values by defining compute on the computational graph. And it took 2 seconds. So it took 2 seconds, not 1 second, because can't parallel compute everything, we first compute these two together, it takes a second and
once they are created, we can add them together
in another second, overall 2 seconds. Let's try something a
bit more complicated. What else is slow in Python code? Well, for loops, of course,
let's optimize them with Dask. So we create a list of data
points and we want to increment each and every one of
them and sum the results. So a standard Python loop
would look like this: we iterate over the list,
we increment each element save the list of results
and then sum them. So this took 8
seconds because each one of these increment calls took 1 second. And we have the same problem,
each one of these calls is independent of one
another and it's a shame that we had to do this in sequence because of this for loop, so. Dask can help us here. We call the same delayed decorator on the increment function
and we can also call delayed on the sum function in the end
and if we can compute this, it should run faster. And as you can see, it
took about half the time. So this is great, it's
a very good improvement. And the computational graph looks like this. This is great. So we can see that we got rid
of the sequential computation entirely and we calculate
each one of the increments independent of one
another and in parallel, and then we aggregate all
these results together, sum them to get the final result. So we completely got rid of the for loop, and we made all this
computation in parallel. Great, right? Okay, up until now, we've
optimized very general code. Now I'm going to dive a little bit into optimizing machine
learning pipelines. And I'm gonna start with how
to handle data with Dask. So, Dask has its DataFrame,
just like Pandas. And it's also very similar
in API - they made it that way. And just like the Dask delayed function, which gives you a
reference to the operation instead of computing it, DataFrame is also like delayed data,
meaning that it points to where the data is, but it
doesn't load it into memory if it isn't necessary. This allows you to handle
data that is larger than your machine memory,
because not all of it needs to be uploaded at once. So I'm gonna work with
the NYC Flights Datasets and it's not bigger than my machine memory but it's enough to see an
improvement in performance. And I want you to see
how easy it is to handle several files with Dask. This is not so easy in
Pandas to load all of these CSVs at once but because
Dask already partitions the data into chunks so
it doesn't matter to him (it) if it's one file or 10 files,
it's already partitioned in chunks, so it's easy to
load multiple files at once. So we can see that we can easily load all of these 10 files into our workspace. And you can already see the difference between the Pandas DataFrame
and the Dask DataFrames that we don't see any
values, so we just see the delayed structure
and we have 10 partitions and only the first few rows are scanned. So Dask can understand
what are the column names, what is the structure and
what are the data types in the DataFrame and the
data types are inferred from the first few samples. So this is important to understand. And I'll show you why it's
important to understand. If we type the .head() method, we
will actually get the values they will be computed. so we compute only the first five rows,
and we can see it clearly. But now if we want to
see the five last values, you can see we got an error. So what happened? In this column, they expected
an integer but got a float. And in this column, they
expected float and got an object. So what happened here? So you can see this column the first few
entries are actually integers. So the integer type was
inferred but actually, later in this data, they
appear some values are floats and this got an error. And the TailNum, the first
entries didn't have any values in them so they default assigned as float, but later on it's actually a string. So we also got an error here. So in this particular case,
when the actual data types can't be inferred correctly
from the first few examples, we have to specify them manually. So now we specify the data
types and now we can access the tail and we actually see
that the TailNum is in string, and it is that the
elapsed time is in float. Now let's see how the data
frame handles operations like finding the maximum value. So if we were working with
Pandas, and we had this 10 files, so we would have to
check the maximum value for each and every one of these files, or concatenate all the data
together and that would maybe put us out of memory. But in Dask DataFrame, we can handle all these partitions efficiently. And the Syntax is identical to Pandas, we can call on the max
method on our desired column. And in this case the delayed
behavior is a default. So this is a delayed operation, and we have to type in compute
in order to get the value. So we compute it. It should take about
four and a half seconds. And it needs to scan about
almost 2M entries. And we got the result. And we can also because this
is a delayed operations, we can take a look at this graph. And this is really, really beautiful. We can see the 10 partitions of our data and the reading order of the entry, the delayed function, the slicing
where we get to the value, the max operation and then max
operation over all the values from the partitions
into one last max check and the last value, so. A computational graph was made
for all these 10 partitions in order to query all of them
together to get this result. This is extremely efficient and this is the default behavior of Dask. Now, there's only one
last thing we have to know to get the full picture, which
is how we pass on this data to train a model, so. You can handle the data
and just like in Pandas, most of the queries work,
you can ask for the shape here is a little bit of nuance. You don't know exactly
how many samples you have unless you compute it, but
you know the structure. You can make these little
checks for missing values, just like you would in Pandas,
but it's designed as a graph, so you need to compute in order to get it. And now I want to take
advantage of that I am in the Python environment
to use a Keras model. So I define this little
simple neural network, which I want to use in order to train and I'm gonna, I want to pass the Dask DataFrame to this model to fit. So what we can do is we can
make a generator, like so. And this generator will
only yield fractions of our Dask DataFrame. So we take only 1% and we
sample the Dask DataFrame and this will be our batch. And only when we will
actually yield the batch, then we will compute, we'll ask compute of the computational graph to actually fetch the values for us. And this will only yield
fractions of the Dask generator. And the big difference here
is that it doesn't matter how big the actual dataset is, I can have hundreds of files there,
hundreds of TeraBytes there, but I will only need to
fraction a small portion and only this fraction will
be loaded into machine memory in order to train a Keras model. So now I can fit this generator, the generator will yield
its batches of X and Y of its features and labels
and I don't have to worry about running out of memory. And it works. There are many more great features to Dask like Task Scheduling,
which is very important, but this video is already
getting a bit long. So to sum things up, if
you're not a big company, which works with SQL databases
or already heavily invested in PySpark infrastructure, I
really don't see any reason to choose PySpark over Dask. I think this is a great
open-source package, which keeps improving and
will keep on improving and many people are contributing
to this great project. And I hope that I helped you get into this because it can be a little bit
tricky to get into at first but this was the point of this video. If you've learned something new, please help me reach more people by liking, commenting
or sharing this video. I do this kind of video every week. So if you like this kind of content, please connect or subscribe and follow me and I'll see you next week.