Ray: A Framework for Scaling and Distributing Python & ML Applications

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
for that kind introduction and kind invitation to be the first speaker for your first uh meet up for the new year my name is jules danji i'm a lead developer advocate in any scale before i was any scale i was working very closely both with karen and and dorothy at databricks for about six years so this is like coming home to me and i'm i'm very delighted and and pleased and honored to be coming back to the data plus air community and sharing some aspects of some of the things which actually happening in the in in the in the industry so today's talk is going to be an array of framework for scaling and distributed python and machine learning uh what we're going to do is is sort of start with uh sort of overview at a very high level because this is the first talk on ray and some of you might have heard about ray i probably haven't heard about ray so what i want to talk about is really starting the 20 20 000 level fee what's the motivation behind ray what what is ray all about why ray and what's the ecosystem that that allows people to write distributed application at massive scale uh and then we'll drop down to about ten thousand feet below sea level uh to above sea level to the ray architecture and components you know when you when you peel the curtain out uh what are the internals and what the guts are free what how do what do they look like how things work behind the scenes and then we'll go down a little bit to um the sea level where we talk about the rare core libraries the native libraries the apis the design patterns that allow people to write um in scalability applications and then we'll look at the related libraries which are built on top of ray and then particularly i'll i'll give you a quick overview of raytune which is a very common library used by data scientists and ml engineers to tune uh hyperparameter shooting which is a very common workload that people actually do and also if you get time we'll try to talk about exhibit ray but i'll i'll do a demo that that talks about xc goodray so that way we don't have to get into slides and then we'll the demo is going to talk about what are the common workloads that you actually scale you do training you do inferencing uh and you do hyper primary chaining those are the way uh development cycle of that and then i'll try to answer some questions that you have so um so why ray you know um but before we talk about why re i think i think it's it's fitting to to talk about some of the industry trends which are actually happening so when the original creators of ray who were at rice lab which is the same sort of crucible of innovation where spark was developed in the pre-predecessor rice layer which is amplab mesos were developed at the emp lab uh tychon was one of the uh uh distributed systems developed uh when they knew rice left actually came into being the researchers and pha students over there along with john stoica was a former uh director executive and the founder there taught about what other things are actually happening in the industry today and i think there were three fundamental factors that i actually looked at one was that machine learning is very pervasive in almost every domain today and i think it's fitting to say and i would contend that we have sort of entered what i would call the zeiss guys to ml right we are in a place where we have more applications being being affected uh and written and are deployed in all areas of the vertical domain you go from health to financial industry to automotive to uh scar industry to health ml is is all of the place and so it is very progressive and because of these sophisticated applications which are deployed in these industries they require a fair amount of compute they require a fair amount of compute and the only way you can actually do that is to distribute that scale and so now that was that's that was sort of fun the the the first bit of it the second bit is that because of this nature now distributed computing is going to be a necessity it's going to be a norm it won't be an exception today people are going to be just doing writing distributed applications as if they're writing python functions and python classes on the laptop on the jupyter notebook and all of a sudden it's now running uh seamlessly on the cluster and this is going to be a gnome and a necessity for for us to meet these particular demands and one of the things just to give you an idea of why that's actually is is is important because if you look at how the ml applications and and the deep learning images that have the the trainings that has evolved over a period of 10 years you see that the the demand of the pet outflows needed to actually do this compute is increasing every 18 months and the cpu load at which we actually increase is just no catching up so there is this enormous amount of gap between between the demand and the supply and you might say well what about the accelerator like we have gpus and tpus and they do make a difference there's an incremental difference but still there is this enormous gap that actually exists between what the today's very sophisticated both deep learning and and and ml applications are being deployed uh then i should require that and it's not only just just the fangs of of the world or the mangas of the world who are doing that ordinary startup companies actually using the sophisticated application they need the computer power to do that and so we contend that the only way you can actually address this demand the only way you can actually minimize the gap is to actually go out completely in a very distributed manner right and that's where the second demand is coming is that distributed computing is a necessity it's it's going to be a norm and we better be ready for it and we better have frameworks we better have programming languages we better have compute strata that easy is very easy for data scientists and machine learning engineers to actually build that rather than rather than having a very difficult time and so the third the third problem that they were trying to look at the third trend was that those of you who are on call or those of you have written distributed applications those we have worked with with other distributor applications um it's not easy right it's the the programming bit is a bit hard because you have to really understand a lot of concepts and it is notoriously hard and if you look at the existing solutions that are out there you can sort of put them into this true lens of the ease of development in the generality and and if you look at the middle the median between the two these are the systems which are sort of stitched together by you know system experts by sres by people who actually really understand the nuances of distributed computing got an mpi kafka spark flink spring horror war tensorflow kapha they sort of stitch things together and they work you know it doesn't it is the existing solution word but they're traders right and turtles are sometimes hard to develop sometimes they're hard to deploy and manage and the reason being is that these all bespoke solutions they have different language semantics they have different communication protocols so to make them work very harmoniously can be sometimes cumbersome but they do work and they exist so that's sort of the medium ground that people are actually doing today we all use these systems today when they work they work effortlessly and they break they do break notoriously if you go to the far right i think there is this notion about not invented here so we're going to start everything from clean slate do it yourself right so you can actually have a doppler container you can just put it on kubernetes you can have all your applications organized and you run your communication using either the latest grpc and you do it nothing wrong with that it's very general it scales enormously but is expensive to develop because you knew really system experts who really understand who can actually glue things together and then if you go to the far left which is the most easiest part of people like you and me and other people who are writing sort of serverless applications just like lambda functions so they just actually could do a big query or they just use databricks sql they just send out a query and let delta lake lake house take care of it and and it scales automatically and and it it handles how much compute you actually need the problem there is a very cloud specific so if there are certain things that doesn't exist on the cloud you are kind of beholden to to that uh limitations they are stateless only so if you actually have a lambda function or if you have a big query uh if it's not maintaining a particular state the server is going to go away unless you have a third-party uh rest service to which you actually persist then then you have to sort of write more code to it and there might be no you might not have all the gpus and tpus and hardware and accelerators you actually need so there are some there are some limitations of it so you can actually think of these existing solutions are between this particular trade-off so how does ray actually fit in to address these particular demands and i think i think the ethos and the principle behind behind ray's vision was that we want to make distributed computing simple we won't actually make it uh accessible to every developer in other words they really don't have to be system experts that can actually just write python functions and python classes as if they were writing that writing python code and by using or some of the design patterns that i'm going to talk about momentarily change their code into an existing or distributed system that's sort of the key is that let's actually take that and make things easier so how does how does it really do that right so as i said i'm going to start very high at a very high level if you look at ray you can think of it as a layered tape right it's a layered cake of functionality and capabilities right at the bottom you've got all the cloud on which area can run right you can actually run it on aws any of the cloud providers including your laptop which is actually brilliant because now you can actually just develop things on your laptop or you can run on on on-prem or on any hardware if you go up on the on the stack you have the essential universal framework which is the core of rare what we call the the general purpose distributed computing and the philosophical tenet behind that the the premise the the the axiom behind is that ray is general purpose computing it doesn't deal with any specific workflow it doesn't have a certain abstraction on which you actually build the apis instead it gives you this low level primitives such as tasks actors that talk about which you can actually write to build any specific purpose workload that you actually want to deal with and that's the essence of the ray which is that you know we're going to take care of all the elasticity all the fault tolerance all the the compute substrata and you can build things on top of that and we'll give you the primitives to do that think of it as like unix operating system where the the the the guys at their labs you know 20 or 40 years ago actually just gave you the the basic api is to build things off of that you build an api to write a device drive you build an api to do window manager you write apis you do the super computing you fork and exit all that and then you build things on top of that very similar notion where you actually are given certain apis and you build things on top of that and then we go a layer up is where the native libraries campaign and the native libraries are using these apis to actually handle specific workloads so if you actually wanted to do distributed training you use ray train if you actually wanted to do reinforcement learning use error lib if you actually want to scale models using ray so if you want to tune your parameters uh you can actually use hyperparameter tuning so those are the native libraries which are very well built on top of that along the along the right hand side in the middle you have all these third-party integrations who actually have different levels of of integration we play this level one two and three in each of these have various ways to integrate with ray and all of them are actually using these primitives to plug into ray in order to scale those things and then finally if you are a distributed computer programmer who actually wants to build your own distributed framework that actually has nothing to do with machine learning it's got nothing to do with with any of the stuff you just want to write your python application in a distributed manner you can actually use this api and run that so that's sort of sort of a layered uh of of what wood ray is actually at that at from from 20 000 feet and the and the ecosystem is where the power of ray actually lies where he actually now handles the specific deep learning the ml workloads that you actually need to do these machine learning models at massive scale right so the batteries are sort of included if you're actually doing data processing you can actually use data core in data sets you can use modern if you actually want to do pandas on on a distributed fashion you can use dusk on ray a lot of people actually do that if you are using spacey or if you're using tensor tensor library to do massive calculation you can use mars on red that allows you to do those things in a distributed manner if you're training right deep learning trend you can use pytorch tensorflow or what is a plugin along with this particular library do that serving is another one that you can actually use once your models are trained and deployed you can actually use ray serve to deploy those models and racer allows you to build these pipelines allows you to have all these different patterns that you actually see how ml models are deployed in production and finally a very common workload is hyperparameter tuning right it integrates away with hyperopt uh or optuna whatever search algorithm you want to use whatever schedules you want to use you can use hyperparameter tuning to distribute your trials and then finally if you are a gamer or if you are doing simulations if you're using rex system you can actually use rlib to build simulation models uh to to do your that particular workload so that's fairly a rich ecosystem and then more and more people actually sort of building things on top of these primitives to actually have their own integration between so that's at a very high level who's using red i mean i have to have this imperative slide we actually have some notable companies who are using rate at a massive scale amazon is actually using uh das conway to do massive their data parallel processing to ingest data in their in their models dendrites using that hyperparameter mckenzie alibaba all these people actually using alibaba and and has got over 200 000 cores on which actually they're unreal at a massive scale so these are the companies are actually using um all these different different ways of doing things so just now i'm going to switch the gears and i'm going to um sort of raise the curtain a little bit to look behind the scenes you know what what what ray looks like right what are the internals you open the hood you pop up the hood and you'll see something now this is a common architecture if you come from say a spark if you come from dusk or if you come from hadoop many years ago or hpc the idea is is not very dissimilar you know you have um you have worker nodes that actually have some processes which are actually running um the only difference between the head node and the worker node is that you have an additional uh worker process called driver which which can run anywhere it just happens to be running on this one and then you actually have what we call the global control store which is like a global meta directory that keeps track of all the resources available who is doing what uh so that's like you know the brain of it and then what you actually have is is these railets which are your distributed scheduling right so railits are like peer-to-peer communication that actually tells you what am i doing what my resources are and they're responsible to create your worker processes every worker has has one process that actually running which is tied to a core and then you have a distributed object store which is runs on shared memory and objects are sort of shared across the global cluster of their their either passed over when when when the task needs that so this is something quite unique to ray nobody actually uses this and because of the distributed scheduling and because of the object store we can actually scale that very easily we can say hundreds and thousands of tasks running asynchronously and using these stores so if you actually have worker processes that need something uh on on a particular node um and there are other other processes we have created data on that you can actually use shared memory to do zero copy that's quite actually unique so let me just go one level below and and sort of share with you what's happening at each and every node so as i say each and every node has these worker processes and the worker processes are are attached to a course if you have 12 cores you'll have 12 worker processes and the railad is the one that actually communicates so whenever you actually want to schedule a particular job the railroad is the one who would say oh do i have the resources to have you know four gpus that i need and this is much memory i need so yes i'm going to take that and i'm going to run the particular task and he communicates all the resources that you have to the global store and the the global metastore then broadcast um the the picture the mental picture of every 10 every 10 milliseconds what the cluster actually looks like so there's one rayleigh per node and it actually is responsible sort of coordinating all that and that's sort of the distributed nature of it and global story is the key value store where currently we use redis but you can actually use any pluggable that you want and there's a process that runs on top of that that manages and communicates with that so that's sort of um so behind the scenes virtually what happens so let me go now all the way down to sort of a little bit lower level the distributed design patterns that actually exist now design patterns are not something new in our consciousness you know most of the software developers probably haven't read that or heard that book or their their swear by their particular book from the gengo four and those were the design patterns that were sort of introduced when object-oriented programming actually came into being and allowed us to you know reuse objects and they introduce concepts like signal turns and design patterns like decorators and and and iterators and co-routines and so on and so forth and so this is this is not something new and so what ray does it use builds on this particular design patterns to introduce its own design patterns and there are sort of three design patterns associated with ray one is and there are many design patterns obviously but these are the three ones that sort of introduces the concept of array one is that all your functions can be converted into what we call parallel ray tasks that actually run as units of execution on on ray and these can be distributed anywhere in the cluster in any worker or any driver or any worker can be a driver that actually executes that so there's no central place where a driver dictates everything right any worker can can submit tasks the second is that object stores array objects are futures right the object references to something that that will return to you when the particular task is finished so if the python creates an object or pattern returns the object you won't get that object right away what you get is a handle as a future to those of you who deal with scala futures or python features it's a reference you can actually ask at the future and these are immutable stored in the object store right the shared memory and the third one are actors and if you are not familiar with actress these are actors that you might have heard in arkau erlang uh but the difference between them is that these actors are very stateful and they use the object store to maintain a particular state and this is the reactors are the fundamental design pattern that you actually see being used in all the native libraries and we'll we'll see in a bit what i mean by that so these are stateful services that actually allow you to maintain a particular state of a particular machine learning algorithm you're actually running so especially when you're doing great tuning or when you're training parameters you're running trials the actors are running the trial they're keeping state of that so if something goes wrong they can actually begin from where they left off now there are links over here about you know patterns of parallel programming rays design patterns these are just three but about you know six or seven of them which are more advanced for people who actually want to write distributed applications at massive scale and there is the design pattern for how libraries are integrated i mentioned about the third part libraries and they have different levels of integrations along one level one two and three so if you're interested to integrate your library into ray you follow those design pattern and you can seamlessly integrate with ray at these three different levels and each of them are there's a degree of of difficulty or degree of ease so let's look at the design pattern function as i said can can can be converted into tasks they can run anywhere on the node a class can be converted into a stateful actor and that can have design and methods can actually run on a node and then you have the distributed object store where your object can be now become a reference which can recite anyway in the node and there is this notion of primary ownership of of a node so any function that's being scheduled on a node a if it creates a particular value and returns a value that particular node will own the metadata for that so that's actually primary copy so that node will own that particular data and if another task is scheduled somewhere else and if that task needs it then that will be sort of copied over using you know gsp rpc so that sort of is these are the three design patterns so let's look at some code right so i have a function that i want to convert into a task all you do is just annotate that particular function with with ray dot remote and now this is going to be converted into a distributed task um i have the two functions one reads two numpy arrays and then just adds them together and then when you want to invoke those things what you do is use the name of the function and then just annotate with the dot remote and then pass the argument and what happens at this point is that it's going to be scheduled somewhere on the node and it returns right away asynchronously which is the power of that right because that way you can actually scale a lot of tasks it returns you an idea reference and then you can actually use the second function to call the second file and then you can just use those references to send it out to the third function called remote and then you can just do a get right so all these are non-blocking calls the only the only code that's going to block is is is the last one so here not array one we'll have file one that's actually sent um or or a path and it's going to read that either from the gc store or from edible yes or if it's if it's if it's on the local file system where you have already populated that then it's gonna read that and remember it's gonna create graph dynamically the second will be scheduled on node 2. it's going to read that particular file now you're going to send it to add remote it's going to create create a graph and then return you the task right away and then when you actually do a get those are running in the background if they're finished uh this won't block very easily but if they're wrong a long-running task they'll wait till till till it gets finished and and they'll return back so that's essentially what what sort of happens behind the scenes with the task api and and the notion of of this is is that we want to run things dynamically um and asynchronously because we don't want to wait in the order synchronously other other distributed systems do where we actually wait till an action is invoked or a sudden next call is executed then it it sort of executes the particular over here things happen dynamically and the reason it's important dynamically is because in certain machine learning algorithms where you're deciding how you're going to dynamically create the next task you have to know exactly what the result was from the previous task so that way it executes things eagerly rather than lazily um the notion i want to talk about is distributed object stories i say this is something quite unique to it we have a shared memory that runs across the uh across the the the older workers and you can think of its shared memory is is an apache spark in the s plasma store which is the the pi aero uh version of of the the plasma store and that actually is shared across all the nodes so now if you have worker processor running on the particular node and they need some data that other processors create you just do a zero copy like so if you have a numpy array or if you have tensors which are which you have saved process a can just read the pointer and that way they are not doing any ipc so the performance is very high if somehow your data doesn't fill up on the shared memory store it can always be spilled over the data and all that tracking is actually done by by the metadata to say okay this store was was over here but it has been has been spilled over to the disk in this particular partition this particular sector so if you need that pretty good data load it back in again and if there's old memory sitting there will be garbage collected so this is all done behind the scenes you don't have to worry but i thought i would share that with you as an important aspect of it and so here's an example of how the data locality work i have distributed emittable object i have node one that actually is executing a task it returns a value x i have another function called g which actually gets a value it's going to return y when i do f dot remote it's going to be scheduled on node 2 let's say for for argument and then i'm sending g remote the object reference that will return to me right away and what happens is now g is going to be scheduled because of the data locality on node 2 and it's going to just read the data from the shared memory and then return back the value of of of g and then i can just do do do the get on that and now i actually have the value so you can actually see how how the object store comes into bing and how a synchronicity of the task being executed in parallel allows people to just run you know things at a massive scale so just to give you an idea how things work for example you know i have arrayed that remote that that is going to be scheduled somewhere the way it actually works now again over here i'm showing you driver but any worker can actually execute that just it just happens that my code is not running on driver so it's going to ask this local relay hey i need i need i need to schedule this task this task needs you know five cpus or or one one cpu um do you actually have resources for that it will give it will give it a list to it and say yeah i i i have it and and go ahead and execute it and that particular driver will serialize the the code and now the worker is actually actually executing if the worker actually has if the code within within double is calling another function driver is not involved in there now now the worker is the one who is going to now submit the task with another node right you would say well i need a gpu but i don't have that i'm going to ask i'm going to ask the relay do you actually have the resource to do that i don't have but you know node number three has it it gets the list for it and holds on the list to execute very similar tasks so that's how we actually sort of scale this is sort of just an algorithm the way it actually goes through getting at least and so on and so forth so i'm not going to belabor you guys with how uh the leases are sort of scheduled so that we don't we don't do this a constant grpc so then once i have the list i can have thousands of of tasks which are similar type need similar resources can be scheduled on that particular machine and if they have 16 cores they'll run on that if they're 32 cores that run on that if they run out of scope they'll be scheduled on on the second note so all that actually happens dynamically all these scheduling policies is done dynamically and you wouldn't have to worry about that so i think this is this is sort of now now at the sea level what i want to talk about before actually that's a good idea before i go into see if i uh if there any um questions over here okay maybe we'll just hold on to the end brilliant okay so what i'm going to talk about now is the ray ecosystem and i and like i said i can't talk about all the libraries given the time that we actually have so what we're going to do is i'm going to talk about two i might skip over xc boost ray because when i do the demo i'll talk about exactly what actually boosts day rows under the theme but i want to talk about ray tune because if you're a data scientist and pml tuning is a very important aspect of machine learning workload you normally do that you know you're going to create your baseline and then after that you're going to use hyper parameter tuning to actually do that and then tune is a very flexible powerful library in red that allows you to do that the masses scale across the cluster so let's take a look at that so what is ray tune right um a very simple library um supports state-of-the-art algorithms from the most radius literature that allows you to run trials in parallel um it the because it runs on ray all the orchestration all the distributed occupation of your trials for the config space are done in parallel so that's you know sort of leaves you the burden of of of doing that in the sequential manual you know each each worker or each ray with actor will get a slice of the of the config space and you will actually have heavier trial and er it will go through the the the motor training to do your programming tuning it's easy to use an api so the all important thing is that you just use tune you have your training function and you just run train model and you can run these on a single process right that can use all the cords if you actually want or you can actually use multiple gpus on the same machine or if you actually want to use multi-node multi-cluster you can actually use that as well so it's very simple it integrates very easily with all the compatible machine learning libraries that you actually have which are very essential to do hyper parameter tuning so xc booster a scikit-learn tensorflow pytor keras you name it all those libraries are available for you to do that the good thing about about this is this notion in in real libraries that you can have other libraries run as a subroutine so my training model could be a pytorch function which is actually another library right but i'm running that within rey and that's a powerful notion that that that you have to grasp that and then it's very interoperable with all the other ones so i can if i'm using ray tune i can use ray train uh to convert uh my to do distribute training i can use ray data sets to hear my sharded data across all the workers that my hyper parameter tuning will need to to do things in parallel so it's actually quite powerful and just to give you a quick sort of um rundown of all the state-of-the-art search algorithms that today ray tune supports you know by default you will do the random grid search uh you got bayesian uh bandit optimization so these are all very easy um built-in algorithm that we actually support in order for you use them when you actually when you're creating array tune um the object the library you just just create an instance of the algorithm you actually need and it's going to use that search with this hyper-opt whether it's obtuna we support that or it's integrated with ray and then you will actually use that to search to your hyperspace you can actually you can also use schedulers right so schedulers are the ones that that schedule the trials based on certain policies that you might have so you can use asha for hyper parameter scheduling housing algorithm that schedules only things that are doing well and other other things are dropped you can use hyper hyperband you can use all this population based training schedulers with ray tune to be able to tune things on massive scale and if you're new to you know hyper parameters if you need to have hpo what does it actually mean to to hyperparameter what does it actually mean to to do tuning for those of you who are new and i'm sure for those of you who are familiar with that this is just a sort of easy thing so you know the two kinds of parameters right they are they are the modal parameters which you actually learned during the time when you're actually running trials and these are the weights these are the biases these are all the things in in in the linear regression model that's what would be in the network this will be the parameters for your weights that that are passed between each layers and hybrid parameters are set before training and they're sort of determine how the learning parameter will actually fare and these could be you know learning rate it could be related to your pipeline configurations how you actually want each and every learning rate across your your let's say psychic lung the number of trees if you're using a tree based algorithm the depth how often you're going to to to uh to do the random so these are the things that that sort of define your hyper parameter and here's here's another example of hyperparameter tuning where if you if you're tuning a particular network you know what kinds of layers what kind of filters you use you know we're going to be your max pooling how many dense layers you can have these are the parameters you can ask your your training algorithm to to figure out given the space that you actually have and you'll come up with the best optimal algorithm based on the maximum or minimum loss that you actually want so that's sort of um uh the gist of what we're parameter now there's some challenges you know when you're doing hyperparameter tuning on a massive scale this can take you know uh from from from minutes to hours to today so you have to be cognizant of the fact that it is time consuming because you have a large hyper parameter space and you want to have the best model and it can be actually costly too and some of the challenges are here to make sure that it's going to use the resources in the more optimal way and it has to be elastic it has to be full tolerant so you might be doing your hpo and about five hours all of a sudden one of the servers goes down what happens then you have to start the job all over again no you don't have to so i think these are the hyperbar with the challenges that you have to deal with and sort of ray tries to address those ray tune in particular tries to address those in three ways right you can actually do an exhaustive search and keep on keep on going until till till you find the best one or you can use sort of more bayesian optimization where you only pick up the results from the previous one in a sequential manner so now you can say okay i'm going to drop this one but i'll use this one because the the previous trial was was a better result and i'll do that or you can do advanced scheduling like ashas does that or other more uh state-of-the-art algorithms that have certain policies that that do for example early stopping or halving algorithm that will only go forward if there is a promising trial that actually runs so if you look at each of these you know you can do exhaustive search it's very easily paralyze them because it's easy to implement you just have a certain grid and you can parallelize that right and you exhaustively go through that and randoms are just the same way you uniformly go across your your parameter sweep and you do that but it's very inefficient right because you don't know right we don't we don't keep track of of what was the the grid parameter from the previous one should actually now use that combination so it's inefficient but today the literature now is actually changing whereby they are doing using more more efficient algorithms to actually parallelize that so you can use for example results from the previous one in order to decide what you're going to be doing next um bayesian optimization is another one if you look at this particular chart if you don't if you use bayesian optimization very tuned it will try to figure out what's the best minimum loss that i have and i'll i'll sort of create a particular space around which all the other configs if they're falling to that then i'm going to go with that the rest of the configurations i want i'll toss it out so that's sort of a good way to do that it's inherently sequential because you depending on optimization depends on the previous trials results to see whether i want to schedule a similar config parameter of my next trial or i just want to toss it out and put it on the the red zone so the ones in the blue zones are the one who would fall into that and and do that these libraries uh ray tune is integrated with hyper rob so if you're familiar with hyper opt you want to use that like i say you just create hyperbiometer search and you use that and boom ray will take care of that optima skype scikit optimize is the new one that and that has been has been integrated and never grabbed is another one so that's sort of second way how you can actually reduce the cost to do that the third one that the ray tune actually uses which is early stop and so what it does does that you will fan out all this initial exploration to your config space and they will just use the intermediate results with this from the trees or from the samples and it will prune dynamically as it actually goes in so you will start with massively and the one the the trials which are not fairing well they use the halving algorithm to actually stop those earliest opening so this sort of reduces of course right he doesn't wait till exhaustively reaches that as soon as it sees after a few runs that the loss is not changing and nothing is changing it's going to drop that and they will pick up the next one as you can actually see as as a time progressive the ones which are doing better you would start continuing along with those so those are some of the you know some of the the the strategies and and the policies that ray tune actually uses and just to give you a sample of the code how easy it is for you to use retune very simple step this is like your your picture function that that you actually create like a training function you define that particular function you create a particular model you're gonna have an epoch that you're gonna go through uh that's the early function you define your training function you give it to tune right so retune.run is the one that's going to take that particular function and now you're going to provide the uniform config space that you actually want the number of samples you want the number of trials you actually want uh like number of epochs in this case and then what schedule you want to use right you want to use r share to do early stopping so you can actually use how the algorithm to make better decisions for the next trial uh what search algorithm you want to use you're going to use optuna and you just specify that and then uh when you say when you say you run the tune what happens at this point is that now your driver process where your driver is running is going to create this function called rate tune that run that's that's the the the main actual main tracker and it's going to launch these actors on what we call worker processes and each actor will have a copy of your civilized function and will have a slice of the configuration and it's going to run in parallel and what it will do is report metrics back after the trial is finished so then the the tracker can say okay what what is the next one that i want to orchestrate or should i just drop that or is early stopping and i'm not going doing very well or should i just house it and things of that sort so report metrics go back it's if there's an early stop it's going to stop that particular worker and then we'll launch a new trial with a new configuration this sort of goes on if something goes wrong you know we do checkpoint on periodically uh trials checkpoints can be done on local directory or can be done on the central place in the cloud so if the worker goes down we can actually just load the checkpoint and start restoring from the last checkpoint and start training so you don't you know you don't train all the data you just strain from the last chip when you go on so that sort of in essence is is is what ray train what uh xgboost actually does let me pose here and see if there's some any questions there excellent okay what i'm going to do is i'm going to pose now here i'm going i'm going to sketch i'm going to skip all the uh the xc boost ray because um i'll talk through that when i'm actually going to give a demo to some of the design features but just at a very high level um just every high level you know the the xe boost is really a distributed uh training right there is there is the xboost which you can use distributed on your single machine on the single course but this is xc boost ray that actually allows you to take your training and do it across across multiple cores so it's like a data parallel actually you each each each worker will get a copy of the particular model and even use that on on the section of the data it's very full tolerant um because it it uses that so i've lived these links over here and i'll get that i really don't want to get into it because i want and i want to sort of show you how you actually scale those things on on a particular cluster so let's uh let's do this in and go to my demo you guys can see is the um it's good it looks good okay so what i'm going to show you over here is that in your sort of journey every day um when you're building a particular model on your laptop or eventually then you're going to you're going to use the cluster that you have uh you're gonna be using three very common workloads that you're gonna scale you wanna train one is training uh which you actually wanna train um in at scale with large amounts of data the second workload which is very common in your daily iterative process about modal building is hyper parameter tuning which is you know how do you actually get the best model right you create a baseline but then you have a config space or which you want to do hyper parameter tuning and then and the third one is inference you know once you actually have the model for which you have got the best model how do you actually do inference on that at a particular scale and so the idea over here is that we try to do something on on our local machine that has a limited capacity and it has a limited number of cores and we'll see that the amount of time it actually takes is enormous and then we just scale it across a ray cluster uh and and see how it actually fares time-wise so those are sort of the the three things i want to do so what i'm doing over here is i'm just a regular uh classification algorithm that i'm going to use xc boost to to train that classification algorithm and just regular imports uh no different i have about 10 million 10 million rows that i'm classifying across about 40 different features and the two classes that i do want to um do that and this code is just a code that actually reads reads three reads the file and then shards it across using ray data said to have it across my cluster and i have three sets of files because i want to use you know three different ways that i can test that 300 megabyte three gigabyte and 11 gigabyte files right so let's look at each of these so first what we're going to do is we're going to train using regular xc boost right so when you use your regular actual boost this is your training function you would actually use using actually both uh no different what you're going to do use over here is you define your execute parameters that you are going to give it to your your exhibit trainer my objective method is approximation i'm using objective logistic logistic classification by evaluation actually is going to be loss and error i import my xc boost and i'm going to import my d matrix train and the function called train very very generic xiv strain function that you're actually going to use all this is all part of the xg boost you're going to have your labels you're going to read load the parquet files um divide them into 75 25 you're going to then convert them into the d matrix which is the efficient version of how how xgboostray puts that in a data structure uh you're going to create your training function and why you pro you you you set the parameters you set the evaluation that you actually want the results you actually want back whether you want verbose not and then you want the 10 rounds that you actually want to train over and then these are just some callbacks you actually have very standard stuff that you would actually normally use so let's see if this function actually works so what i'm going to do is i'm going to try and make sure the function works on on on a small data set which is about 300 megabytes right so i'll just go ahead and train my particular border this is running on my uh on on jupiter on a local node that has about 16 cores and you can see okay this actually works 20 megabytes no problem then i'm going to do is see if i can actually try that on the largest data set which is three three gigabyte now i'm not going to run it because you know this is going to take about four minutes so i'm just going to show i just ran this just before that you can see that when i ran xg boost on the local machine with three giga only three go about five it took about three minutes or or yeah close to close to um two and a half right instead what about if i use xc booster array and the way exhibition rare works is that it has this notion where remember i told you that the way the the actors work you have a tracker which is which which actually creates a number of actors you actually want each worker will get an associate actor on which it will run it will have a it will have the the data for it for for for it to run on and all these actors communicate across each other using the tree based reduced algorithm called rabbit that's how they they synchronize the gradients and then they return back to the tracker so this is how x exeggy boost on ray works which is drop and replacement from using using the normal ray but now you're actually running in a distributed fashion across the cluster rather than on on the same node so everything is actually the same exactly the same i'm providing my xg burst parameters same same parameters the only thing i'm changing over here is i'm importing exe boost array from executive straight the array database the array the matrix version of the of the d matrix which is the the the way it actually efficiently stores the the data i'm going to import the train algorithm the train function and the rare parameters which i'll show what it is and it's actually the same function the only difference over here is i'm now using the same same code same same data but i'm using very d matrix which will now distribute across my workers and then i'm providing the same argument except the difference over here is that i'm having this additional parameter called ray params and i'll talk about what these ray params are in a minute so we're going to do that and we're going to define the parallelism that i want so it's exactly the same code i executed above except that now i have i'm using the extreme parameters which i sent the data file is the same three gigabyte file and now i'm telling array uh to to use eight actors eight eight actors or eight workers on each machine and use 16 cpus on that right so i'm just going to go ahead and train that so let's see if this this works so now this is actually running on my array cluster that i created on in the scale and you can actually see it uses the rings to do the do the uh all map reduce to to to communicate with the with the with the gradients and it's now actually running on this all eight different nodes using all the all the gpus that are provided so this was actually done in about 13 seconds so this was about three times faster uh than running on my on my local host on my local local node so that's sort of uh the ray train uh exe boost where you can run running on on the node with a small data set to see it works and then you train it on a large data set to do that so that you can actually see the big difference between running it on on a local node and running it across the cluster and when you actually paralyze those using using rain it took about 13 seconds which was you know um faster than the much faster than the the one on the local note the second um workload that people normally uh do a lot repeatedly when you're building model uh at scale is pepper parameter tuning and the hyperparameter training again is is is very much um sort of built in where you can actually use actually boost your training function in there to scale it out so here i have pretty much the same exhibit config i'm using binary loss uh then i'm providing my hyper parameter tuning using this is my eta learning rate to be uniform from you know from one from from from the parameters that i provide uh the sample the depth that i want from one to nine and then i and then i suggest how i actually want to paralyze that so i want to have i don't have any gpus i'm just going to use new actors i'm going to use two gpus for actor i'm going to have eight actors which are gonna run across my node and then i'm just gonna give it to tune tune dot run and and then provide my uh train exe boost uh function the files that i want which are three gigabyte the rare parameters that i want in the progress bar and i start running this particular in parallel and this is what happens over here is now it's going to go out and create eight actors and start using so you can actually see it's actually using 136 gpus and and 14g of all the 144 cpus that i have and now it's actually doing training across my hyperparameter states in parallel so i have eight actors each get gets a copy a slice of the config parameter and it just keeps on going in training and at some point um it's going to terminate so right now i'm running eight eight trials in parallel this is my last trials best parameter and currently i should have about yeah one is terminated i have seven running you can see now it's actually reusing 119 cpus drop down to 85 because i have three ones which are terminated determine it doesn't mean it's top terminator he's finished the trial and it got its results and this was done um i got five more running so what's actually happening over here is as i say uh this is exactly what's happening over here you see now you've got a trial which are running in parallel by the actors and they're reporting metrics back to see how how each each each of these metrics actually doing and based on that the second triad is actually launched so we have about five seven terminator one running and you can you can see it's now it's actually just using sentient cpus to finish the last one and it's terminated in about 82 seconds if i ran this on my on my laptop or my single node on the head node this would take you know uh considerably longer than that so hyperparameter tuning is something that that you use quite often uh and to scale that you can actually use ray tune on array cluster to to give you this best parameter and this is my best approximation of my my parameter my eta 0.15 uh subsample plus 5 the bex number of maximum depth 3 was 3. so the third time here okay so the third and the final uh training that most people actually do workload is inference right you actually want to do inference at a massive scale so now you train the model you got your best parameters optimization you're going to train this particular model and now you want to do the inference to see how it actually fares so let's first try to do that on on on a single machine uh using pretty much the same model with the predict but except over here we actually know we are not sending we're not distributing if we're just using the d matrix uh part of it and let's run that so this is going to take about you know 90 seconds i believe to do that so what's happening what's happening over here is that it's actually getting it's going through the the three gigabytes of file and it's actually doing prediction on each of those and it's doing it sort of you know in a sequential manner on on a single node and it's actually going through this particular data this is running let me just check okay so um we got about seven gigabytes so we probably got another three three more to go since it actually is sort of really churning really hard it's not taking care it's not it's not really capitalizing on the fact that a hybrid cluster and i can do inference in in parallel rather than doing a sequential so we're almost there and this is going to take about maybe uh a few more seconds but we should be done with that so we went through all the 11 gigabytes and they should actually give us the timing how it actually took to to to get the to get the results so it took about you know 81 seconds um a little a minute and a half or so and let's look at the results uh of our inference and that's what our classification algorithm in return now we're going to do the same thing we're going to do the same thing on executive tray and you see the only difference over here is that now i'm using the rd matrix to get my inference data parameters and then the only thing i'm sending which is different from the previous one is i'm saying use nine actors to do inference in parallel so this is gonna now run in parallel well i've now created eight remote actors and i'm just doing inference in parallel and that should just return almost immediately and that took about nine seconds and about ten times faster so you can actually see the difference how you can actually scale things locally and scale things remotely one of the great things about the about race that you really can sort of use your laptop for your segmentation and then when you want to something when you want to want something running on the on the cluster you just connect to the cluster usually using ray in it and and it it connects it very well so those are my results you can see they're quite identical so i have some you know notebooks over here that that um go more deeper into this but this is a very common workload that people actually do and ray tune is one of the libraries that allows you to do that hyper parameter at scale it works very well with other libraries i don't have to use xgboos if i wanted to use pytorch my training function over here now would be a python function and my model over here would be a convolution net that i actually have and my training function then would be just given to the right train to go ahead and do that and if i have a hyperbola with this space that i provided that i want you know my my dense layer to be the uh to to be within their particular range my filter is to be that kind my connected net to be this set and they would just figure out what's the best what's the best parameter over the particular space so it's quite powerful it's it's it's enormous in terms of the benefits that you actually get and the time it's saved in the course it saves so i think that's that's essentially what i had in terms of the um the demo what i'm going to do let me just go back to my slides over here quickly before i end oh so yeah i mean here's an example you actually saw where where i'm using the simple api example to use my executive array without without the array and the only thing i i changed over here i did import um imported radio matrix exe ray the parameters that i the the the parallelism that i want and the training function from from re and i just change this particular mind that's it and i just use raytrain to send those parameters and now i have my training function running in a distributed manner that's only three or four lines of code you have to change taking your existing actual boost array and then convert that into a parallel parallel code so that was it takeaways are you know distributed computing is going to be a necessity you know we have to accept it now today almost everything is is a gnome now and rey's reason is to actually make things a lot simpler make the apis a lot simpler that's sort of the ito's behind it right when i i remember when i was when i was early when i joined databricks you know six years ago very early over motorways that make big data simple over here other model is make you know make distributors competing simple where you can just use your python functions as if you're writing them on a python file and then use the remote decorators to do the distributed computing and that really take care of the hard work you don't have to be a systems expert just use very remote to do all your stuff and you know scale your workload using the real libraries and along with the ecosystem that provides you to do purpose specific stuff um we have those of you actually are interested in reinforcement learning we actually have a conference coming up in in about a month um it's free virtual you can actually join here's the url uh when i send the slides you will actually have the url over here as well and then if you actually want to take this really interesting tutorial about contextual bandwidths how to use offline learning using rare lib from the creator and the maintainer lead maintenance rlib um there's a 50 there's a discount there for you just use that meetup code uh and you can actually get that this is usually worthwhile because aria live now is a new thing that people like people are using quite a bit and if you want to start running you know learning about ray you know just install ray on your local machine and start start using that we have copious amount of documentation i'm putting a lot of code examples i have tutorials on it as well we have a meetup we have started revived after two years of corvette we had our first meeting in january we have another one coming over march the second which deals with ray train which is the new library for deep learning which was released and then on march 30th i'm hosting the the race server meetups if you're interested in productionizing models using race serve attend that meetup uh we have a vibrant community that actually has has has a lot of discussions going on to join us and discuss that ray uh there's a slack uh for the community as well follow us on on media and uh visit us on github and and give us a star if you like so i think thank you very much if you want to get in touch with me um jules at anyscale.com um follow me on twitter i'll follow you you know i know it sounds dodgy but i use that online all the time and then uh yeah connect me with linkedin tell me you attended my presentations hope that we know who you are so that in essence was was was a lot of stuff over here that i covered let me see if i have if there any questions that in the q a and there's some chat if anyone has questions uh jules will answer your questions right now yes no i think oh there's one question um if i heard you correctly internet communication is every 10 milliseconds if i want something within shorter i think i i could have miscoded that i think it's a lot shorter than that there is there is a configurator parameter but i'll check with the tom i think i think 10 milliseconds is is is it might be 100 milliseconds but i'll check right i'll check that and i'll make sure that it is right thing um i think 10 milliseconds or something else where it actually checks the object object store um cache for eviction but i think it's much so the heartbeat is it's much smaller than that but but i'll verify that 10 milliseconds doesn't sound right it is much smaller than that okay let's see if we actually have anything on chat anyone else have a question for jules you're going to ask me why why okay i heard you've heard that so this is a former spark pit of you probably gonna ask me well why spark why why you're a what's the difference well okay here i have a question for you jules um can i integrate rey with spark good question that's that's a very good question but before i should do that let me just let me just share this line with you because i can just like this slide just specially for you dorothy because when i was giving the topic you said well you know we want to find out what's the difference between the two and i think i think they're very very complimentary really spark and ray are are extremely complimented they they sort of you know um spark finishes sudden point it really picks it up but but just to give you a very high level sort of differences between between the two um they are very complementary and they work well together uh you can actually run ray on spark um if you actually want to do each year that are processing but just at a general high level i talked about the array is a very general distributed competing system it's not specific addressing a particular load it gives you these lower level primitives finder and primitives to to to write any any any distributed application you want so think of it ray is a is a framework to write other frameworks right think about that where if you go on the on the right hand side you know spark is very powerful very performant for specific purpose distributed um data loads you know it is built on top of the data data data frame abstraction right and because of that you actually have sort of more coarse-grained apis like dsl so with data frame extraction in the dsl you actually tell spark what to do right in other words in other words it's very coarse-grained um the other difference is that ray is very distributed scheduler there is no single driver node that sort of controls everything right whereas if you look and and the graphs are are computed dynamically and the reason you use a dynamic eager execution is because especially when you're dealing with sort of machine learning uh algorithms like training and tuning uh you have to decide how you're actually going to schedule your next one and that's based on something that you just computed so it's very eager eager execution it just executes right graphs are created dynamically whereas spark has a static scheduler right i'm going to create my data frame list of methods and then i'm going to invoke an action and when the action is done the entire dead gets executed the deck goes through the the the logical abstract the logical optimization the physical optimization it creates a final rdd abstraction and then executes that right whereas over here everything is actually happening not that one is better than the other it's just that that's the nature of how spark was built on top of a data frame abstraction just like das was built on the data frame extraction for tasks and bags ray doesn't have this notion of an abstraction right it gives you the primitives to build other things we use distributed object store in other words you know everything is actually shared across and it's very asynchronous so we don't wait for synchronously whereas if you look at spark you know when you actually create a transformations you're going to create a list of actual list of methods on your particular data frame and those are going to be executed sequentially whereas over here you just execute something and you're going to get a reference back and at some point later on you can actually ask whether i'm done with the task um ray is not there to displace any libraries and we are the we are really here to integrate and interpret very openly so all the third part is libraries you see it's the displacement right so if you actually wanted to do for example data processing which is not which is not necessarily today raised strength you know we the ray is not it's not a data processing framework it is sort of main for you can you can write your own data process you can use downscript right you can use spark and ray as well and so the native libraries are there to sort of address those workloads it's very pytonic in nature in other words if you look at array code the way how other libraries are integrated it is very similar obviously we actually have pi spark over here which has has few sort of scala pullovers um yes spark supports uh deep learning integration through horror wood and pie torch and tensorflow whereas over here you can actually use use this different levels of that and another question was that well how do you how do you use spark with ray it's very simple there's a project called spark on ray and and here's a very simple code of spark running on red right and so all you do is you actually wrap your driver around this class called spark and that's going to be remote and then when you do a a raid dot you need what you're actually creating is is a spark context and a spark session and then you're telling rey actually to create two java executables right so you actually don't use that security it's not it's not doing all the communication all the communication between mapreduce or between the executors is handled by by underlining jvm right so all ray is actually doing is launching this this java uh jvms on on the actors the actors are just managing that and then and then now you just do a driver part remote and and you can uh execute that function called remote to do your partitioning to do any any transformation that you actually want it is as is as simple as that you create a spark session you use spark method to go ahead and do your transformation that you actually want and then you just invoke the the result by just using ray.get so that's here that's your answer to the question dorothy million million million dollar answer to your question yeah jules there's another question here about ray data and charity uh let's see curious about ray data and charting for example currently chart using rank and how its size is introduced in horror word defined shell how would i do the same array good question right so the array so the so the ray data sets is the new library which is in which is in experimental that was released uh we have had a couple of meetups and we actually have a webinar coming up uh on the 23rd but the whole idea behind ray ray data is to provide it's not a replacement for data frame please don't get that wrong ray data is there to provide last minute transformation from your from your um from your pocket files when you're injecting things into into your training function so when you actually want to charge your data across all the workers what you do is you check your array data set that you actually read that say from an s3 bucket in the parking file and you repartition it based on the number of cores that you have a number of great actors that you're actually going to give that and that's how you actually sort of shard it across that that's what that's what ray data ray d matrix did in my demo where when i said read the metrics you read that particular process file and then it uses the it uses the the the shared memory to put the stuff in it and then now my ray workers which are running training could actually access to that the horror would expect the horrible aspect of that is the way horror world and tensorflow and and pytrosia integrated with ray train is what we call level one integration where the level one integration is that rey does not handle the lower level communication ray only handle handles the x the the the launching of the worker processes and the internal communication whatever the internal communication whether it's horror what or whether it's ddp that's using pytorch or whether it's using tensorflow is the one that actually does the communication right so we don't do that we just we just use the back end to do that and then ray data integration is just a way to to to take your parquet file and convert that into into let's say tf records or convert that into a dask data frame or convert that into your spark data frame or convert that into a data into a pandas data frame if you actually wanted to use some sort of transformation so the data data ray dataset is the last glue i can think of it another way to put it i can think of it is that where where you uh where your etl in sql ends um uh the the the ray processing actually begins so things think of it that they they're using you know delta lake and apache spawn to do all your etl because if i was if i was going to use del if i was going to use etl for spark streaming and for for for spark work for sql workloads i wouldn't think about anything else but spark on data bricks using delta right once i built my my lake house i have all my data in there and then if i just wanted to use pick up all the parquet files from it and then ingest it into my into my deep learning uh distributed training let's see i think there's something else on the chat ready to answer the question hope so does anyone else have questions [Music] okay uh if no one else has questions for jules then i think we're pretty much done here and thank you very much jules for a wonderful talk uh this is a very very slick tool that you're working on um and uh you know stay in touch all right thank you very much for the invite and if you have any questions you have my uh you hear my contact information feel free to drop in the line i'm more than happy to do that or join in the slide channel uh we'll definitely keep in touch i'll try to answer that as well but again karen and dorothy thanks a lot for the invite it's good to see both of you again i feel like i'm home again so um stay stay healthy and hope to see you both uh sometime in the near future maybe maybe at data plus a summit in a few months okay yep yeah okay yeah bye bye all right thanks joe thanks everyone bye cheers
Info
Channel: Databricks
Views: 28,403
Rating: undefined out of 5
Keywords: Databricks
Id: LmROEotKhJA
Channel Id: undefined
Length: 70min 42sec (4242 seconds)
Published: Sat Feb 19 2022
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.