- So now that we have that kind of high-level conceptual
introduction to the Ray framework, let's dive into some code so we actually understand
what Ray can do for us and how it can do that. And if we go to the GitHub repo here, we're at psychothan/scaling-data-science. Now if you're familiar with Git, you can actually clone the repo. And if you click on Code here,
you can see the clone URL. So when I do update the resources, you can just simply pull down the changes, but if you don't have Git set up, if you don't have a GitHub account, you can just download
the files as a plain zip. So if you click Download ZIP, it just downloads all the
files locally on your machine. The disadvantage to this being
that if I do update anything, if I do fix anything with
the code or the materials, you'll have to re-download the zip. Whereas with the git method, you can just git pull the
updates or the changes. So if you wanna just actually
run the example and the code, I package everything up
into a Docker container so that it runs now and
it runs in the future whenever you're watching this video, across different operating systems whether it's Mac, Windows, Linux, and hopefully to make it a
little bit more reproducible. If you're not familiar with Docker, it's similar to other
virtualization software, like Virtual Machines or Vagrant. And the really nice thing about Docker is that it's an open source project and has wide support
across operating systems and environments. So to get started, we're
just at the Docker site here, Get Started, and the easiest way is to
just download Docker Desktop. So I'm on a Mac. It tells me, download it for Mac, but if you're on Windows or Linux, it'll say get download for your
respective operating system. But if you don't actually wanna deal with all of the Docker
business, Binder's a project that lets you launch a Jupyter notebook right in the browser directly from something
like a GitHub repo. And it does this by spinning
up a Docker container in the background and
connecting you to it. And the other really nice thing is that the Binder service is free to use. Sometimes if a lot of people
are using the service, it might take a little
while to actually spin up and a little while, in this
case, might be a minute or two, but once it's running, everything runs pretty
effortlessly in the browser. So this is what you should see once the notebook gets started. And for this example, we're gonna be using the
intro-to-rayipynb notebook in the notebooks folder. Here I am in JupyterLab on my notebook, and this might look a little bit different than the interface that Binder gives you but all of the code in the notebook should function exactly the same. So to start, we're gonna
import a bunch of libraries. All of these are just in the
standard library in Python and the external libraries
we're gonna be using is ray, pandas and requests here. So to start, to kind of give an example of something Ray can do, it's not necessarily something
you would use Ray to do, but let's say we're trying
to index the internet or do some computation across all the websites on the internet. So here I'm actually just
getting the top 500 domains, according to moz.com. And this is basically
just a little example. We're obviously not gonna
be able to index all of the internet in this tutorial, but for the sake of argument, let's say this 500
represents the 500 sites we need to do this computation to. And in pandas we can
actually do something fun and interesting here. So since this file that
we're downloading is a CSV, we can get the text of the response, turn it into basically
a file with string IO and then feed it into
pandas and parse this CSV. So here we can see we have
the rank, the root domain, how many sites link to that domain. So let's get into Ray
and all of its goodness. So to start the Ray processes or the Ray framework, essentially, you're going to call ray.init only once. So typically you just do this at the start of your driver program. This Jupyter notebook is
my Python driver program. This is going to be the program that's gonna shove a lot of commands to the Ray cluster itself. Some interesting things you can see here is that it has IP addresses for the nodes, for the raylet. You'll see this in just a bit. It actually has a really nice web UI that gives you some
information and statistics about the machines in your
cluster, or the server, as well as a bunch of other information on where it's storing logs and so forth. So the first abstraction that Ray gives us is this ability to run functions remotely. So let's say we want to index these pages or do something with
all of these webpages. We'll define an index function
and it takes as input a URL. Now for this simple example,
let's just return the URL. So typically, if you were
trying to index the Internet, you might do something where you parse all the text of the webpage. You store it in a database,
you build some sort of index, but in this case, for the sake of example, we're just interested in the fun things Ray can you give us and not necessarily in actually indexing
these things for real. But let's just say this return URL split is a proxy for all the
computation we would wanna do. So if we already doing this
in just standard Python, just locally on a single machine, we might do something like a for loop or iterate through all of our URLs. So if we just want to
iterate through this, we can get the root domain column, and here if we just
actually print them out, we can see that it correctly gets them, but instead of just printing it out, let's say we wanna index it. So in this case, index
only returned something. It doesn't actually print anything. So for the sake of demonstration,
now we can basically, if we do wanna do some further processing, we can count up how many
are .coms versus .orgs. But again, this is just
kind of a contrived example to showcase some functions in Ray. So now let's say we had millions of URLs that we need to process. We can't just run it on our single laptop or our single machine. The really magical thing
is that you can run this in a distributed fashion with little to no changes in your code. So the main thing that
you have to do in Ray is add these decorators to the functions you wanna run remotely. And it's as simple as
just saying @ray.remote, decorating the function you wanna run in a distributed remote fashion. In this case, we just run it. And the only change when we execute this, instead of just calling
the function index, you call the remote method on index. So in this case, index is a
special Ray remote object. And if we wanna run index
remotely on our Ray cluster, we simply just call it with .remote and pass the same argument. So if we run this, we can
see something interesting is that instead of actually
printing out the results of running this, we get a
bunch of these object roughs. And this is kind of one
of the core things in Ray is that it runs all of its
code in asynchronous fashion. So if you're familiar
with things like node.js and JavaScript, all of
that kind of programming is done asynchronously where you run some computation and you eventually get some result back once it's done running. So in this case, since this index function could take a while to run, Ray says, I've scheduled this function
to run on my cluster, and I'm giving you this reference back to get the result when it's ready. So instead of actually just
running this in a for loop and essentially forgetting
all these references, let's instead run this
in a list comprehension so that we can collect all
of these object references in this list at the end. So this is the same for loop just written as a list comprehension. We can see here we get returned
a list of object references and in Ray it often refers
to these as futures. So it essentially gives you a promise of this will run at some point
in the future on the cluster, and when you do want the result, you basically resolve the future. And this gets where we have
all of our future objects. There should be 500 of them
for each one of our URLs. We have 500 futures and to actually resolve these we can use the Ray function called get. So here, when you call
ray.get, and we pass in, we can pass in one or a
whole list of futures. It actually resolves the
result of that function. So since our function is just
returning this url.split, we actually just get a list
of all the resolved values when you do ray.get. So if we were actually doing something where we needed to really
distribute computation, and not just do this trivial
kind of string parsing, this would be very powerful. We can debug our code
locally, test the function that a single function
works on a single input. And then when we're ready to
scale it to our entire data set or entire algorithm, we
simply can just annotate it with ray.remote. And that same function is
just magically gonna run distributed across our Ray cluster. So if you remember that
diagram I showed you, this notebook is running on the head node. This is running in the Python driver. This is the Python driver program. Whenever we attach Ray
remote to a function, and whenever we call that
function with .remote, it essentially sends this
function to the Ray workers. The workers execute that code and then they ship their value
back to the driver program. And you locally get the result
on your driver program here. And the one thing to keep in mind though is that all of these functions execute in whatever order kind of asynchronously. So you don't have any guarantees when you're running Ray remote that the first URL is gonna
run before the second URL is gonna run before the third URL. And to demonstrate this,
we can just, inside here, basically say which URL did we start? So that's I'm sleeping on URL. We sleep for half a second. And then we do a print of now we're returning that URL value. So if we run our remote functions again, we can see it goes through
the first four URLs, since in this case I think
I have four worker nodes in my cluster, so it says we have four nodes. Let's run one of these
functions on each of them. And they each return in a different order. So once one of them returns,
once apple.com returns, it then schedules a new one,
in this case, Microsoft. Once support.google returns,
it then schedules LinkedIn and so forth. But the thing to keep in mind is that these may not
necessarily be returned in the same order that
they were scheduled. So in this case, they actually
did return in the same order. So apple, support.google, google.com. But if we rerun this again, we can see that the
order they get scheduled might actually be different, and this isn't necessarily the
order that .remote is called. This is the order that the index
function begins to execute. So depending on the communication
in the cluster itself, even though these had .remote
called in a different order, the actual function might
begin executing on the cluster at a different time, in a different order. And here we can see that
the first return result is actually from support.google, which when we were scheduling
them, was the last to start. So this asynchronous nature is something that you need to assume
when dealing with Ray. So this is one of the
things that it gives up. Ray gives up this strict
ordering of when code's running for the benefit of running
things remotely on a cluster. And the last thing
before I ended this video that I wanted to just kind of introduce was the Ray dashboard, which is kind of this
really nice user experience. So if you go to that URL
that ray.init spits back, so in this case it's on
local host port 8265, we can see here that we
have all the machines and kind of what's
executing on each of them. So here I rerun that same code that basically does the
index function remotely. We can see here that we have
four workers on four cores. Since again, all of this is
running locally on my laptop, I didn't connect to a
server in the cloud yet. And the nice thing about
Ray is it doesn't matter if the code's running in
separate processes on your laptop or on separate machines in the
cloud, the API is identical, and here we can see how much RAM we have, how much disk we have. We can look at all the logs
and look at the errors. So this is a really good way to stay updated on how
the cluster's running. And if there are any errors or things going wrong
with individual machines, or if there's anything going wrong with the cluster as a whole. And here it's running the
index function remotely. This is how much CPU it's using. This is how much memory it's using. And in this case, since the
logs are gonna print out as we're actually running these, the number of lines in the logs should give you a sense of how
much progress has been made, but you can do better, smarter things to kind of track the progress of all these functions
across all of your machines. (calm music)