Parallel and Distributed Computing in Python with Dask | SciPy 2020 | Bourbeau, McCarty, Pothina

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
okay uh good morning everyone um welcome to the uh parallel and distributed computing in python with das tutorial um i'm gonna be the host for this session um and presenting our james orbeau uh mike uh his name is i forgot uh and darha's patina um and uh so i'll be the host um i'll be monitoring the questions section um so if you have a question um we prefer that you ask that question in the ask a question using that as a question button you can also ask the question in the chat and i can promote it over to the ask questions section and then i'll be um marking when the presenters have answered questions um and then moving them into the answered questions uh column uh and then we please ask that everybody follow the code of conduct um and if you uh anybody doesn't then i have the power to remove them from the room so please don't make me use that power um okay uh so with that uh i'll introduce james forbeau who will be the first presenter and i will um uh mute myself and mike i'm gonna um take you out of the room for now okay uh so closing that and then great i'll go ahead and share my screen all right so now everyone should be able to see my screen great so uh just to confirm uh can people i guess i don't wanna hear our hotstar so you can see the screen everything's okay oh yes great i'm gonna maybe increase the font size to here hopefully this is okay for everyone um i guess our house i can hear you does this look reasonable on on your screen at least uh this looks reasonable on my screen and it looks like in the comments people are saying it looks good awesome okay thanks for uh checking all right uh welcome everyone uh thanks for uh attending the desk tutorial session here at scifi 2020 today uh we appreciate the uh you know both your time and and your attention um so yeah i'm my name is james berbeau i am a one of the maintainers for dasc i'm also a software engineer at coiled computing um i'm also joined uh by my two co-instructors today darhaas and mike and just to minimize the number of times we invite people on stage we'll have them introduce themselves when we get to their portions of the tutorial um so uh from a really like high over uh high vantage point das is a library for python library for parallel and distributed computing in python that scales the existing python ecosystem so a lot of the das code that you'll end up writing looks very familiar to maybe numpy or pandas code that you are already used to writing um the idea is that uh the desk variants will just scale to larger uh data sets that you wouldn't be able to handle using numpy and pandas so this tutorial is targeted at new users to desk so no prior das knowledge is assumed um we are sort of hoping you're familiar with things like numpy and pandas um but you don't have to have seen basketball to to uh get something out of this tutorial uh so desk uh broadly speaking consists of uh sort of two you you can do desks as having sort of two portions a uh collections uh portion which is for creating uh task graphs or creating setting up computations so there are different interfaces for this there are desk arrays and task data frames for example there are other interfaces as well but dash arrays mirror numpy nd arrays data frames mirror pandas data frames and there's also other interfaces that we'll talk about for instance the first one i'll talk about is das delay which is used for creating custom workflows so these collections generally are for constructing uh setting up a computation that's one portion of task the other portion are schedulers which actually execute those computations potentially in parallel and we will talk about those later in the tutorial excuse me uh so yeah if you want some uh examples of desks um like in the wild that's been used in the wild i recommend checking out this use cases link here so you can see that real quick yeah so there are several sort of different testimonials about das being used in real world for today the tutorial materials are available at this github repository so github.com hyphen tutorial that's here so there are a few different ways to set things up for the tutorial uh the you only have to do one of these things there are three different options here a b and c you only have to choose one of those options you don't have to do all three preferred method is to use our this conda environment file to create a new content environment activate it and then install um uh the jupiter lab extensions there is also options b and c as well and those are for setting up the tutorial and installing all the packages locally if you would prefer to not do that and avoid any local setup in the towards the top of the readme there's this launch binder button that you can click and that will uh thanks to the binder team that will launch a jupiter lab session for you it has everything already set up it has all the uh notebooks for the tutorial as well as all the necessary packages already installed um ready for you to go so just wanted to give a big shout out and thanks to the entire binder team for providing this infrastructure for us so that's it for the setup there are some additional links here that are relevant so we link here to the desk documentation we also link to the desk source code in github and the das community also maintains a blog that you can go check out to see sort of what the the current activities are happening in the in the ecosystem um then here are a couple links for asking for help so you can use the das tag on stack overflow if you have any usage questions so um how do i do this what for instance would be an appropriate question for stack overflow if instead you have a bug report or perhaps a feature request we would ask that you open up a new issue on the dask github issue tracker for for bug reports and feature requests some additional information here about the structure for the tutorial the tutorial is uh takes place in a series of jupiter notebooks um so yeah we don't need to go over all this material other than i guess the main thing is if you're totally new to jupiter when you're in the different notebook uh cells as well as we'll see hit shift and enter to execute the contents of the cell so one thing i didn't want to point out is the exercises rather throughout the course tutorial there will be a series of exercises will be some prompt here telling you sort of what the exercise what the what the goal of the exercise is then you'll see a code cell here that um typically will have some comment to that portion um and you're supposed to then put your solution for the exercise into this cell so for instance uh here it's saying print the text hello world so we would do print hello world and then to execute cells again shift enter there we go we see the output and then beneath this cell where you've executed your uh where you've uh put in your solution for the exercise there will be a another uh cell with just three three dots here like an ellipses if you click that that will then have that will then reveal a solution for that exercise so you can use that to then compare your solution to the exercise against a solution we've already prepared or you know if you get stuck in an exercise and need some help you can then you know consult the answer yeah that is it for the overview um if you have just to kind of reiterate the administrative point if you have any questions ask them in uh be sure to ask them in crowdcast and if they're sort of um require a more detailed answer then they'll be moved to the slack channel the das tutorial i think it's tutorial underscore desk slack channel okay so any questions at this point i'll maybe go over to the questions okay so just ask use mpi internally we can answer that at another time okay um so hopefully at this point everyone is set up um if you're not set up at this point i recommend using binder to avoid having to do any local setup so just click go to the tutorial and click this launch binder button okay so the first notebook we're going to talk about today we're going to go through is this 01 underscore das delayed notebook so um as you might expect in this notebook we'll be covering the das delayed interface so this is a way to use desk task to delay function evaluation this is commonly used to paralyze existing workflows so if you have a code base that is running synchronously with no parallelization using dassly is a common way to then incorporate parallelization additionally it's also used to build really complex user workflows as well that don't fit nicely into like array and data frame workloads um so here are some relevant links for das delayed so uh there's uh gastelade specific documentation there's also a nice screencast on youtube that walks you through some of the delayed basics uh we also linked to the delayed api documentation there are delayed examples and in particular i want to point out this last link the delayed best practices this contains you know a series of best practices for using gas delayed as well as solutions for some like common problems that uh users will run into using delayed in particular people who are new to using das delayed so highly recommend checking out the best practices page um so here we have uh we're basically going to ignore this for now we're going to ignore for now and come back to this later so das has several ways in which you can uh create once you can execute things um this way here we're using a distributed cluster we will come back to this later in the tutorial so for now ignore this we will just execute this cell and come back to this later so let's uh set up the the basics here so we here have defined two functions an increment function and an add function the increment function just takes an input and returns that plus one the add function return you take two inputs and adds them together and returns that and they both sleep for one second here this is to simulate some kind of work happening you can imagine in your actual workflow this might be load data this might be process data for instance so we'll define those two increment functions and then we will run increment one just assign that to x increment two assign that to y and then add those together to get an output so here you'll see i'm using the um uh jupiter time magic that's what the percent percent time does what that does is it spits out this extra output here so what this tells us is that i went ahead and executed the cell and it took three seconds for um the contents of that cell to execute so that's given this cell took three seconds in total 3.10 seconds in total um so we see however here that there is some opportunity for parallelization for instance add here takes in two inputs x and y but then if we look at x and y we can see that they are completely independent so um i can they don't depend on one another we can run those in in theory and parallel so let's do that using gas delayed so um delayed uh object or the delayed function lives inside das so you can from desk import delayed and then how you actually use the delay decorator is you wrap functions uh with this delayed decorator so if we compare this cell to the cell just up here you can see the only difference is that i'm instead of calling ink on one i'm first wrapping ink in this delayed decorator and then uh that's it i'm i'm doing the same down here with ink i'm doing the same here with add so let's go ahead and execute that cell so that ran much faster that took 1.46 milliseconds whereas before it took three seconds that's because this didn't actually do the computation this didn't actually really uh run these increment and it had functions what it did instead was it set up a computation and built a task graph for what you would want to do to compute these different variables but didn't actually run the computation to actually run the computation you can call compute on these objects so here's where the actual computation happens and notice that took two seconds so uh faster than before before it took three seconds now it took two seconds um so let's sort of unpack what just happened there a little bit more so up here i've defined z to be delayed add and then i'm going to pass into that x and y so what that returns is a delayed object so whenever you wrap functions with the delayed decorator they then will when those functions are then called they will then return delayed objects and as i said before these delayed objects basically uh keep track of the function that is being executed and the inputs to that function they don't actually perform that opera they don't actually like evaluate that function with those inputs they just keep track of them and then you can pass delayed objects to other functions that will then build up the computation themselves um so delete objects like all das collections uh have a visualize method that you can use to visualize the underlying computation that's being built up for that object so for the z delayed object here we can visualize it and we get this uh graph here so here we can see our two increment calls so to kind of interpret this graph uh the circles here um represent function calls and like the name of the function should be inside the circle so these are our two increment calls here they both produce output that are represented by these two uh boxes here and we take those outputs you pass them to the add function and the output of that is this box here and that's sort of our final output so when we call z dot compute we then go ahead and actually compute all these function calls and what is returned is this final output here so what we can see is that these two increment functions could in theory be called in parallel because they're totally independent so some questions to ask about our workflow so um before when we called compute without desk we it took three seconds to run with das delayed it took two seconds to run why weren't we able to paralyze down to one second um that's because each of these function calls to these inks and this add function each sleep for one second so like the maximum amount of time would be three seconds which is what we're getting in the synchronous mode that's at without task however there is this dependency structure between the add function and the outputs from the two increment functions so we can't run add in parallel with these two increments we have to run these increments we can run those separately but then we have to wait for both those to finish and then we can run the ad so these sleep at the same time for one second and then the ad sleeps for one second that's how we're getting the two seconds instead of one so that's sort of the basics of gas delayed the idea is to wrap function calls in gas delayed it will then become lazily evaluated and delayed objects that you can use to build up computations in a very similar way that you would do without using das delayed so the code should look very similar you'll just kind of pepper in a few dash delayed decorators here and there so we're now at our first exercise paralyzing a for loop this is a very um common problem that people run into is they have some for loop that could be paralyzed um each iteration each each loop in the for loop is totally independent of the other um of like a subsequent iteration in the loop and so in theory you can run those in parallel so here we have some data we have a list uh you know one up to eight and here is some sequential code i'll go ahead and run this you see we have a results empty list we then loop through our data we then pass each item as we're looping through the data we then pass that through the increment function which sleeps for one second to get y and then we append that to our results list and then we uh sum up all the results in the end so the result we get out of the end is 44 that's like the sum of all these results and it takes eight seconds to actually complete to run this code so for the first exercise what we want to do is use das delayed to paralyze this example here so you can go ahead and do that here so we'll pause for a few minutes here to allow time to go ahead and do that and then we'll uh come back and talk about the solution do we want to look at some of the questions it looks like we have a little backlog of questions i'm doing now so does desk use mpi internally um das you can deploy uh das using mpi desk does not use npi terminally although it integrates well with npi there's a there's a das mpi project on github if you're if you're interested in looking at how you can run desk on mpi yeah great question what is a task graph so uh a task graph is the computation that is built up so maybe if i share my screen again so this picture here is a visualization of a task graph so what a task graph is is um a structure that contains uh like what a task is is a function that you want to call and the inputs to that function that's what a task is and a task graph is like a collection of those functions that you want to call that encodes their dependencies so for instance here we're saying we want to call this increment function um this increment function with some inputs i think we're calling one of them is incrementing one as the input and the other one is the ink function with two as the input and then we want to add them together the outputs of ink one and ink two we want to then use as inputs to the add function so a task graph is a collection of tasks which are functions and inputs to those functions and then dependencies so you can see here we're saying call inc get an output call inc get an output take those outputs pass them to the add function then uh the actual output for our computation the output from this task graph that we want is the output from that add function so that's what a task graph is it's it's a structure that sets up a computation it doesn't actually perform the computation but it sets up a graph structure that allows us to later when we call compute on uh the in this case the das delayed object or other das collections we can call compute then we'll actually execute the task graph to get our actual answer out so answering the next question um we are not building the task graph we are just um making different functions delay and putting them together and in the background uh dasc is building the task graph for us and later we'll get to how the task decides where to move those computations and how to run them uh lazily evaluated what this means is um das does not actually compute anything until it needs it so it just builds up the task graph but the you don't actually have the answer until you explicitly tell it to compute the answer or it hits a particular location where it can't move forward without computing the answer all right i'm going to go ahead and share my screen again and then we can hopefully at this point people have uh a solution for the exercise so um and then yeah i don't answer i saw one other question about decorators which i'll answer next so here is a solution for this problem so we can see the original code up here and the dash delayed version down here so what you'll notice is that it looks very similar um the only difference is that here we're delaying the increment function whereas before we were just calling the increment function directly and then we're delaying sum here and at the end we're calling compute so that's really the name of the game here using uh das delayed is to delay functions uh delay expensive functions uh by applying the delay decorator and then the only extra step you have to do is because as we've mentioned desk is lazily evaluated you then have to call compute at the end to actually get your result out so here we can go ahead and run this code let me actually add the time magic here yep so we see we ran the code here and it took one second so and keep in mind i am computing the result here so i'm not just setting up the graph and doing nothing else i'm actually computing the results uh so it took eight seconds before i have eight cores on this computer right so it takes uh one second here because i'm able to run uh eight different things in parallel um i did see a question in the question section about using decorators so here in this tutorial we're delaying everything explicitly um sort of using this notation we're replacing ink with delayed ink um oftentimes like so so this is nice in a tutorial setting because it's very explicit about what is being delayed where oftentimes you'll see people use the decorator syntax instead so you'll have a function like this increment x return x plus one often what you'll see people do instead of sort of where the function is used uh wrapping it in this delayed function we'll use python's decorator syntax this way that's also totally fine that's a very common pattern i actually personally prefer this way you're welcome to use whichever one you want um then you'll see now increment here is this delayed function so you can use the decorator syntax if you would like that's that's totally fine for here for explicitness we are delaying everything uh in line here now because i can redefine my ink function let me go up to the top and execute the cell again to make sure that my increment function is not delayed okay um so yeah there is one uh section or one question here uh what happens when you delay the function what happens if you use the sum function directly rather than delaying it so here i've delayed the sum function what happens if i didn't do that if i just called sum on results where again results is a list of delayed objects we'll see that i still it still finishes and i still get the same result and it still takes about the same amount of time even though i use the built-in sum function let's compare visualization real quick um so here is what it looks like if i use the delayed this total here i'm just copying that over and using delayed and i visualize visualize that result i get something that looks like this so here are my eight different increments i'm then adding them all together and getting my final result so if i don't do that i still get a like everything still works i still get a dash graph back but it's much more complicated and gets a little uh gets a little crazy here that's because the built-in sum functions implementation is to when you have like an iterable of things take the first two items sum those together then take that intermediate result sum that with the next one take that intermediate result sum that with the next one over and over and that's what we're seeing here so this graph structure is a lot more um complicated and like more difficult to sort of comprehend um but we'll still execute and still get the same result however for my personal taste i would prefer to delay this sum to get a nice cleaner graph structure okay uh oh uh yeah then that also reminds me let me do this here real quick so this will return a delayed object and this will turn a delayed object um and we can actually add them together and get back another delayed object but that's because delayed objects you can add them together or multiply them together and they will construct as dar has said they will construct the task graph for you lazily on the back end so you don't actually have to you basically just write normal looking python code i mean das will handle um constructing the graph for you so here you can see we uh here's us adding these two functions together we get out a gas graph okay all right this brings us to our next exercise so in the previously we paralyzed a for loop here we're going to do a similar thing but with a twist we're also going to introduce some control flow so here we have a double function which sleeps and returns two times the input an is even function which does not sleep so this function is quite quick and it returns whether or not the uh input is uh evenly uh divisible by two and then we have some data sitting here uh so one through ten so we now have these two functions in our data and our sequential code will look uh similar to the previous example we have some empty results list we're looping over the data and sort of appending results to it although now we're going to have this extra step where depending on what the item in the data is if it's even we're going to double it and include that in our results list if it's not even so if it's odd we're going to um include the incremented version we're not going to double it we're going to increment it and include that in our results and then again at the end we'll uh sum all the results together so we do that here uh execute this code and we'll see how long it takes and what our final result is at the end okay so that took 10 seconds to execute and our final answer was that we got out for this total sum was 90. so here we will go ahead and paralyze this code using gas delayed so i see there's a question uh there's also a multi-processing package in python that paralyzes processes are there any advantages disadvantages of das over that so under the hood and we'll talk about this a little bit later under the hood das does use the python uh or can use the python multiprocessing module in the uh to execute tasks so you it it will execute tasks in process pools it will execute tasks in thread pools um the advantage of using task is it's the code you write is much more similar to the non-paralyzed versions of code you're used to writing so here for instance in the dash delayed exam notebook we're writing code that looks very similar to the non-parallel version and then to add parallelism we just simply add this decorator and then call compute at the end it's still going to execute things in a process pool or a thread pool we'll talk about how to specify which one later um but the syntax is very user-friendly there's also later on we'll talk there's a lot of advantages of using desk over the multi-processing library there's a lot of extra additional features in logic as well this question is order maintained in the output for paralyzed for loops if the function runs for variable time depending on the inputs uh yes so you're just setting up this graph structure and um it will execute the graph so all the dependencies like run this before that then after you're done with that run this next thing all of that will be maintained all that order will still be there so you will get the same result out at the end if you as if you were not running in parallel you'll just get it faster again there's a so there's a question about how does task um handle parallelization um and then it mentions something about processes versus threads uh we will discuss that uh later on um in the tutorial so yeah you can use processes or threads or both multi-threaded process multiple threads and different processes uh using das and you'll you'll sort of want to tailor what parallelization resources you're using depending on your particular workflow we'll talk about the pros and cons of using the resources processes later okay so i'm gonna share my screen again and go back to the exercise okay so make sure i'm sharing yeah so uh let's go ahead and look at the solution here go ahead and copy this time result here so uh this is the a solution for paralyzing the synchronous code you'll notice we've decorated a few but not all of the functions so we've distilled the double function we've delayed the increment function we've delayed the sum function but we uh haven't delayed the is even function that's because it's being used in a conditional statement here so uh what that means is what we're saying here is if is even x we actually need to know if x is even to evaluate we need to cast it to a boolean we need to know um if x is even or not to know which branch and the if else statement to go into so that can't be lazily evaluated then that means that we can't delay that computation we actually have to have the concrete result here at this point so delayed values cannot be used in control flow in that way so you do need to be a little careful uh when incorporating control flow into your workflows you need to be careful not to delay those results so let's go ahead and run this that actually just set up the computation notice i did not call compute anywhere here so we'll call compute here and time that result and that's what two seconds and we get the same result out of 90 as we did with the synchronous version so before it took 10 seconds without das delayed with dastoid it took two seconds so just to kind of reiterate that point again let's see what would happen if i delayed is even here if i had tried to just like apply delay to every function um i get an error truth of delayed objects is not supported that's because we're trying to this returns a delayed object which we're trying to cast to a bull which is uh not allowed okay we can then visualize this result so here we see sometimes we're incrementing sometimes we're doubling depending on if the input is even or not and then at the end we're aggregating all those into a sum and returning the final result um so yeah a few questions um so i guess we've already talked about this what would happen if we had delayed as even we would get an error so casting things to a boolean is one like if else is one kind of control flow that can't be used um for delayed you can imagine like you can't uh you can't also check if things are greater than certain values like you can't say uh you know like like is this delayed value greater than 5 because we don't know the value okay that brings us to the last exercise here we're going to paralyze a pandas group by reduction so this is a little more involved than the previous examples so what we're going to do first is execute this cell here which will download some data to your machine if you're running in binder it won't download it to your local machine it'll download it to the jupiter lab session running and then we will go ahead and look at that data so what this data is is for flights out of new york city between 1990 and 2000 we link to the original data set here we can see each year has its own csv file we can use pandas to read in the csv files so here we'll use pandas to just read in a single csv file flight data for and then we'll get the first five rows for the data frame so you can see there's lots of different data here the each row here is a different uh flight so we can see year month uh what the arrival time was uh what the carrier was the flight number things like that a bunch of different information you can see the data types here so this is a you know sort of a rich data set there's uh object d types floating around and we can get the origin for all these flights so what airport did these flights depart from and then we can call unique on them to get just the set the unique set of airports so we have jfk laguardia and i believe ewr is actually newark airport um and then we can so those are the uh origin those are like the airports that you can that all these fights originated from we can then group by the origin and get the average uh departure delay time for each uh airport for again this is all just for that one year for 1990. so we can see from newark the average uh delay departures is nine i want to say the units on this is minutes there's an x-ray tutorial later if you want to have units on things there's also jfk and laguardia so that is the synchronous sort of example for a single csv file but now you want to analyze all the csv files in this directory you want to do this for each year so what we can do is we can get all the file names here we can glob this uh it's the same thing as before we're just now glabbing and we have a star so we're getting all the csv files instead of just one and then what we want to do what we're doing here in the synchronous version is we are going to uh calculate the uh mean for these delayed times per origin or per airport so we're going to have an empty list of sums and counts this is how we'll perform sort of a this is sort of a standard way of doing uh averages over multiple data sets you calculate the sum and the number of entries for each data set and then do an overall mean so here we'll loop through each file name we'll read in uh we'll basically do what we did before we'll we'll read in an individual file uh do a group by uh get the departure sum uh the departure delay count and then we'll append those to the sums and counts lists at the end we'll then add up all the totals add up all the counts and then we'll take a uh the ratio at the end to get the final overall mean [Music] so we can execute this cell it will take a little bit of time takes 4.84 seconds and the final result we get out is this so the exercise for this exercise is for us to paralyze this code snippet here to do that we'll need to know a little bit more information about das delayed in particular need to know that method and attribute access works on delayed objects automatically so if you call a method of a delayed object that will then return another delayed object so for instance here we're delaying uh numpy's a range function that returns a delayed x object we can then add one to it we can then slice that array that we can then call a method the sum method and everything here will be lazily evaluated that will set up a task graph for you automatically you don't have to do anything special other than delay the original function um and then as well while we have just been calling compute on single delayed objects you can call uh their desk has a compute function that you can call on multiple desk objects and what that will do is share intermediate results so if there is a particular step in each of these computations that is shared in those task graphs then it will only compute that once and share the results in both computations so if you just called compute individually on each thing it would uh compute that intermediate result twice so this is a nice way to avoid that and here's a nice result here so we can compute x dot min there's our y dot min and y dot max and those will both uh share this x squared step so when we compute both these things uh it will only compute x squared once so the goal here is to uh paralyze this code which has been copied here we'll import compute from das uh here is the synchronous code that just been copied over again so for for convenience and here in the is where we will uh paralyze that so stop for another minute and give some time to go through that exercise and we'll answer questions in any time okay a bunch of people are getting a 9 on the jfk example you just showed um all right getting a name uh in the middle do we need okay go ahead as i say uh question about the man's for people getting mans are you running on binder are you running locally on jupiter lab locally no man so it's looking like nan's on binder okay binder has limits on the amount of uh uh like space you can take up so when you are launching binder we're actually only transferring over a smaller version of the data sets so i'm suspecting what's happening is that we don't actually have uh the full data sets there and we're getting nands in some of those fields okay yeah everyone who's having an issue is on binder that's my suspicion is that there was an oversight on our part that we don't have the full data set on binder okay i see thank you for posting that uri uh second one do i need to think about the available cars on my system how does it paralyze i think we're going to get to this a little bit later in the talk um the answer is yes and no a lot of times you can just task will do the right thing but there are cases where you do need to think about how you do chunking and things like that depending on how many cores you have so yes and no does das have an overhead where it may not be quicker for options for a large number of simple computations i've noticed that sometimes i get better speed with a regular loop for certain calculations um yes das has an overhead it's a fairly small overhead but if your data fits in memory and is a serial computation das is not going to speed anything up if if you have a more complicated algorithm where some of the things can be done in parallel it will speed things up or if you have something that does not fit in memory then it will give you ways to do those computations and spread it over multiple cores or multiple machines other situations where das does not do a good job of evaluating whether on whether or not tasks or chains are independent how is this happening under the hood james do you want to take that one uh yeah sure there uh so desk schedulers uh will compute the order that they go through graphs um and it takes into account a lot of additional information um generally it's it's a it's a pretty battle tested uh uh the schedules are pretty battle tested so uh in general it will do a good job um if you go to the source code you can see the exact uh uh uh order code where um those decisions are made they take into account things like well depends on what schedule you're using we'll talk later the the distributed scheduler um has a few more advanced decisions it makes with uh things like data locality when it's scheduling tasks um but yeah in general it does a good job okay i'll go ahead and share my screen again okay so let's go ahead and look at a solution here so uh the solution so going back up here this is the synchronous version of the code this looks very similar to the task version of the code except for uh one difference we are delaying uh the pandas read csv here so we're not delaying other things like method calls that's because those are automatically delayed for us so here this is a whenever i delay panda's read csv this will be a das delayed object and then so this will be lazily evaluated because it's a method call same here same here and then at the end we call compute on multiple uh on all the sums and all the counts to share uh any intermediate computations in our intermediate results we can go ahead and execute this um and that took two seconds so notice we are calling compute here um so we're not just setting up the task graph we're also executing it and that took two seconds we get the same result um so yeah sorry for people in binder it sounds like there's an issue with the data uh that's available in binder so there there's gonna probably be an end in your results here as well um but yeah that should just be an artifact of the data available not the actual code itself um so let's compare that to the synchronous result which took almost five seconds okay so i think at this point we are safe to uh um move on to our first 10-minute break i just again wanted to point out we have some documentation here the delayed documentation delayed screencast and again the delayed best practices document which i highly recommend checking out um so we're now going to loop back to the beginning of that note of this notebook where we instantiated that client and said ignore it we are now going to uh execute the cell and then also kind of ignore that and we'll come back to this later in the tutorial so uh at this point we're going to take a 10 minute break and we will then uh start with uh dar haas will then cover the 03 underscore array notebook okay and while we're on the break i guess we'll go ahead and finish answering the questions which are still here um we defined four workers but it has eight elements in the loop how is that shared and computed i guess we'll get to that when we talk about the scheduler there is a scheduler that decides how to send all those tasks to the different cores and processes and stuff like that so we'll get to that later by default that will um by default that will create a thread pool whenever you call compute there'll be a thread pull or a process pool of a certain size you can specify how many threads and how many processes you actually want to submit tasks to by default it'll create the it'll find out how many cores you have on your machine and create that many threads or that many processes so on a four core machine there will be uh four threads or four processes but you can also that's just like the default behavior you can totally say i want ten threads or i want two threads why does it take two seconds why doesn't it take two seconds four workers eight tasks i don't remember the timing on that one which example or which exercise yeah i don't think we have enough context for the for loop example okay we've answered the non jfk jfk one that's because of binder [Music] comparisons between dusk delayed and job live delayed i think i think they're quite similar i there is uh so i don't have a ton of experience with jabla but yeah i think there is a very similar concept there is uh as maybe mike will talk about later i'm not sure what's in the desk now notebook exactly um there is a desk and jobwood interoperability as well so they do play nicely with oneplus so when calling compute it will wrap the delayed operation operations together in a parallel way and give me the results yeah so it won't again it will just set up the computation for you lazily it won't actually compute the result but yes it will if you have a delayed like in the exam the last example a delayed read csv so you get a data frame and then you call a method on it it will construct that pass graph for you we'll say first call the pandas read csv function and then after that call the i'm forgetting what method we called but call the sum method after you've read the csv it'll set up that dependency for you could you just call x plus y as well not exactly sure what example you're referring to but yes if x is a delayed object and y is a delayed object then that addition will happen seamlessly das will take care of that for you it will return a new delayed object which is take x take y add them together that would be the test graph do we gain significant speed ups by using das for paralyzed work or ingest aids in using a larger than memory arrays um both it depends on your particular uh workflow so it depends on your particular problem and your the data sets involved so yeah certainly um one of the uh selling points of desk is that it scales to larger than memory arrays if you wanted to load in a one terabyte uh array into numpy um that you wouldn't want to do that um so you'd want to use data so it does help you scale to larger memory data sets it also is a general uh like dynamic task scheduler so it also helps you paralyze works to get speed ups that way it helps you for instance in our couple examples we had for loops that weren't being paralyzed you can then use das to add parallelism to the same exact for loop where does the results.append y operation happen in the visualize tree how does dasglade know that that's associated with the delay itself so the results.append step is not like that's just a normal list append there so there is no desk um code happening there we are the items we're putting into the list are das delayed objects but we're still just building up a list of delayed values so the thing that we actually visualize in the end is the summed result um so we then take that that result that list of delayed values and then call some on that um so yeah the append uh portion does not involve any gas code james did you cover the how to choose the number of workers in the first cell uh no i did not um yeah so let me when you create that client the local cluster you can um specify the number of workers uh when you create the client or you can uh grab the cluster object out of it and scale depending on uh which client you're using there so in this case we're using the local and so we'll add more worker processes essentially when you scale or when you specify the number of workers so if you already created the cluster argument you scale rather than creating a new one and again that's for the distributed cluster which we will talk about later on more detail okay so we are at 10 o'clock which should be our uh end of our break uh star house i believe you're next with the array notebook all right okay hi guys um my name is starhas kothina i'm i work at kwansite and um i have an original background in computational physics and doing a lot of hurricane simulations on supercomputers um and i really like dasc it's a tool i've used a lot let me work out how to share my screen give me a second [Music] okay um let me make this a little bigger in your folder you should see a uh if you look you should see a notebook called zero three underscore array and that's the one i'd like you guys to open um how is my screen looking is that a good size for folks could you increase just a tiny bit outside yeah that looks good okay so we are now going to talk about task arrays and the stuff we did so far with delayed is kind of like the low level guts of how das works and if you have a custom let me make it slightly bigger oops okay we'll see how that works if you have a custom algorithm that you're trying to paralyze using delayed functions is the way you go but uh many times we use the highest so das has its base delayed interface and then it has several higher level interfaces and typically most people will be using the higher level interfaces and not the low-level das delayed interface but you can mix and match so we're going to now talk about arrays and a lot of you are probably familiar with uh numpy arrays uh which are basically a two-dimensional block or a n-dimensional block of numbers it's just a you know how many dimensions you have one two three four it's just a block of numbers in memory and dash arrays provide an abstraction over those numpy arrays to let you do bigger things than will fit in your own computer and looking at the screen what it does is it gives you a parallel larger than memory array system using blocked arrays and one way to think of it it is distributed numpy so this the same api is numpy but on larger data sets so you can do two this gives you two advantages one is you can run things on parallel in parallel so if you have a large computation on a matrix or or an n-dimensional array you can use all the chords in your computer to run that or if you have a cluster you can use all the cores on in your cluster to run that the other thing is it lets you work on data sets that are larger than memory one of the problems i think a lot of us have had is we try and load a really large data set in and then our python kernel crashes because we run out of memory what task arrays let us do is do operations on those larger than memory arrays by breaking it into lots of small pieces doing the computations on these smaller pieces and then putting them together so effectively if you have a really huge data set on this that won't fit into memory you kind of have a view over that data set and you walk through it piece by piece to do your computation and it does this using blocked algorithms and we'll we'll talk a little bit about blocked algorithms and we're going to build some ourselves to get a feel of how you would do this if you didn't have to ask and then we're going to show you what das can do so a lot of this tutorial is showing you how things work under the hood uh but when you actually use das you might not use some of these under the hood pieces um now here's some just links there's some links into in the notebook pointing you to examples and documentation and some other websites so you can look at that but initially we're going to generate some random data so just run the first cell it'll take a couple of seconds and it will generate um a random data set which takes a few seconds um and then again we'll talk about uh the scheduler and clients this is the same as earlier we're going to just set up a um a desk cluster a small one and this is taking longer than i expected give me a second so while we're waiting for that to run i'm going to go down and talk to talk about the blocked algorithm so we take a lot of data set and we break it into small pieces an example is if we have a billion numbers and we are we want to calculate the sum of the bil the billion numbers instead of calculating this typically the way you do that with num with a numpy array is you load the entire data set into memory okay this is finished running um we load the entire data set into memory and then we use a sum function on the numpy array but instead we can break it into smaller chunks um like you know maybe a thousand chunks each of a 100 like a million each and then we calculate the sum of each chunk and then we calculate the sum of the sums like so we calculate intermediate sums and then we calculate the big sum so we get the we get the intended results like the sum of one billion numbers um but we reduce memory usage by calculating smaller ones so let's look at this um we are using the the command we run earlier created an hdf5 file uh let me make this slightly smaller so we can see the whole command okay so here we have the the script we ran earlier created an hdfi file which is a binary file format um that lets you hold data and lets you pull out data piece by piece so it's a little bit nicer than using a csv or something and here we are um opening this file in read mode and we're pulling out a data set which is the x value from this file and this is actually let me just show you this is actually just a pointer to the data it actually hasn't read this in it's just said um this is a pointer to the data on disk basically and then if you look it's got one billion points so let's let's work out how we would calculate this the sum of this with a blocked algorithm so i'm going to walk through this what we're going to do is we're making a list called sums that is just um holding our intermediate sums we're going to just do a loop from one to a million uh going you know one to a billion going a hundred thousand at a time and we're going to pull out one piece of the data and and basically we we take that d set which is basically uh the x data from the h hdfi file and we are going to in each iteration of the loop we're pulling out one chunk we're going from zero to a million one million to two million two million to three million so we're taking out a series of chunks each of which is a million in size and then we are appending we are calculating the sum of each chunk and we're appending the sum of each chunk in this list and actually let me remember how to do this [Music] let me split cell here so if i run this this is looping through all the files and um if we look at sums this has the intermediate sums of each chunk and now if we calculate the total of the sums we get the the total um which this is a random randomly generated number so your value might be slightly different from mine now as a quick exercise let's see if you can take this information and do the same thing for a mean and so you've got the instructions we're going to compute the sum of each block compute the length of each block then compute the sum of the thousand sums and the sum of the thousand lengths and then divide one by the other so i'll give a couple of seconds um to see if you guys can um you should be able to copy and paste the code from up here um and then um do the do a mean instead of the summation so i'll just give you guys about a minute to try that then i'll walk through it um and let me look at the questions oh yeah the folks in binder might not get the same answer because i think you're using a smaller data set looks like you're using a data set that's more like about half or less than the size so the answer you get is different okay binder has five million so it has a smaller data set um let's see if there are any questions okay let me walk through this now um so similar to above we're just adding an extra thing we're calculating the lengths and then in here we're getting as earlier we get the sum of a um chunk and then we get the we append the length of each chunk and uh now if we total the sums and total the lengths uh we can get the mean and this takes a little while because it's walking through everything now this is what you would do if you were doing it manually if someone gave you a larger than a memory data set and you had to work out how to calculate these algorithms yourselves with a larger than memory data set sum and mean are very simple and you can kind of reason through them other algorithms can get more complicated and what das array does is it implements these algorithms for you it is basically a library that mimics the numpy api and does these kind of tricks to perform the same numpy algorithms on larger data sets that don't fit into memory it does not implement every single piece of the numpy api because the numpy api is quite vast but it covers a majority of the most used pieces now let's let's go back and repeat this um using task array and so we're going to import dash array as da and let's look at this command we're going to use a new command dash array from array okay and what this does is it can take any object that is array like like for example here we deset is um a pointer to a file that can give you back a numpy array and so what das what from array can do is take this and create a dash array and we have an extra piece we have to tell das how we want to break up the array because it is going a dash is going to break up the array into pieces uh which we're calling chunks we have to tell das how to break it up in this case we have a one-dimensional array so there's only one dimension to the chunk and so when we do this we're creating an x and if you look at this each value each chunk so this is a four gigabyte array and each chunk is about four megabytes the shape of the full array is a billion elements and the shape of each chunk is 100 000 elements and so you have 10 000 chunks so again here we have not this is going back to delayed at this point we have not read anything directly we have not read in any data we have not done any computations we have just made a pointer to all the data in some sense and now the we have a similar api and interface to what you would have in numpy but under the hood we have this distributed task array and so we can do something like um let me see if it's further down yes so if we do here we have results and result.sum and result.sum still has not done anything it's just a four kilobyte thing because again remember dusk is lazy so things like sum and other numpy numpy functions don't actually compute anything they build up that task graph we were talking about earlier if we actually calculate the result by using a compute now we will see the answer so now when we've done the result or compute it's actually taken each it's walked through the data set on file it's loaded each chunk and then it's computed the sum on each trunk and then it's gathered all the trunks together and uh it has um you know done that actually let me see if i can do this i'm pretty sure this will work but now this is taking a little longer than i expected so if you do result.visualize you should see not sure why it didn't show up okay if you oh it's very tiny i see it's uh i don't know if you can see this or not yeah it's very faint but the graph is so big that it made it into a very tiny image i'm not sure if i can pop that out yeah i'm not going to worry about this if you see very faintly the uh because it has a thousand elements the image is stretched across the screen and i think because i'm zoomed in it it looks a little weird um so anyway but uh the result of visualize does show you the actual um result okay quick example um how would you calculate the mean this okay let's see someone says if i double click it it should show up nope maybe oh there it is yeah so so there it is there's the big uh so if you look at this right at the bottom we have all these arrays and uh the chunks and then it's calculating the sum if you look in the middle somewhere here there should be a read but i think you have to zoom in more for that and then we we go up we calculate all the sums of the individual chunks and then it's actually calculating partial sums for groups of chunks and then it's calculating the sums of those and calculating the sums of those and then you finally end up with the final answer so that is the complicated task graph now when you're doing when we did our examples with the delayed for the for loops we kind of went through some individual it was a very simple task graph and it made sense to walk through it very straightforwardly when you're using when you have a lot of tasks like a thousand or more um tasks that we may want to go through the task graph differently and if you look on the right of my screen there's a image where i think red is when a task is being executed and blue is when it completes and if you look at it it's not going in a particular order it's kind of going in a slightly um interesting manner and what it's doing is it's trying to use multiple chords in parallel and it's trying to chain operations on single blocks before it moves to the next one um dascar rate you know it takes your opera array operations and builds it into this task graph or a directed acyclic graph of interrelated tasks and these tasks have dependencies um in between them um and it's working out what those uh dependencies are building the graph and then when you run compute das then executes the graph with multiple threads and we'll talk more about this a little bit later um someone asked about openmp or mpi i would say a big difference between openmp and mpi is with those you're explicitly defining what the task graph is uh you're telling mpi or openmpi how to compute things uh das is kind of doing that for you okay so did anyone let's look at the you know for for an example for the mean it should be x dot mean you know if you do x dot mean oh that didn't actually do anything it just gave you a delayed object so if you do x dot mean dot compute now it'll actually compute the mean and so any of the um any of the array objects numpy apis you can use with a numpy array you should be able to do something similar with the dash array like if you look at x and hit tab you have the same argmax argument you know you've got a couple extra things like chunks which numpy doesn't have and compute but then cumsum dot you know all these different things are available um another point which you know someone's mentioned that seems to take longer if your numpy array fits in memory and you're doing a single computation like uh that is in serial you won't see much performance benefit over using numpy because numpy is already very optimized and very well built dusk is going to be more useful when you're building computations that can be paralyzed or computations that don't fit in memory now going down here let's look at another example we're going to create a 20 by 20 000 um array uh of random normally distributed random values and this is two dimensional now so when we're doing two-dimensional and we want to make it a desk array we have to kind of say how you want to chunk it in each dimension and and for simplicity here we're taking a thousand in the x-direction and a thousand in the y-direction but depending on your matrix you know your your block size um we can uh vary these chunks to be more efficient in the way we read things and then we're going to try and take uh the mean along one of the axes and take the every hundredth element of that um so if we do that if we look at the size of this data set this data set is 3.2 gigabytes uh so it's a pretty large data set um and now let's time the compute it should take a couple of seconds i should be using four processes on my machine right now took about 18 seconds [Music] okay so this is just the data now i'm going to walk through some performance comparisons right now we're not actually going to run these um but this following section was built on a pretty strong uh like laptop um and you need about 4 gb of main memory to be able to do this so if you use straight numpy um and you do this computation it takes about 19 seconds just using numpy random normal and just doing the me if you use dash array it should take so the first one needs gigabytes of memory you need to load the entire data set into memory uh and it's probably going to be at least four gig or more uh if you use dash array instead of using four giga memory you're only going to use megabytes of memory and it should paralyze in a way so that the computation happens much faster because you can use all the cores um in in your machine so it should this particular benchmark drop down to four seconds and a order of magnitude lower memory use maximum memory usage so also notice that the dash array computation ran in four seconds but it used 29.4 seconds of user time um and this is because it was using all four cores and so the uh like that count is because of the transform computation with with through the chunk size okay so let's let's see uh see if you guys can answer these questions in the chat and we'll see if anyone comes with answers what would happen if we increase the chunk size to 20 000 by 20 000 um how much would the competition be faster slower how much memory do you think we would use in terms of peak memory um so um hoping some folks would be brave enough to answer in the chat on these questions well no i'm not uh so the question is if we specify the chunk as 20 000 by 20 000 the same size as the array what would happen okay yes uh so organ kind of has it correct if you specify the chunk really large um at the size of the array essentially there will be only one chunk and it will be calling numpy so the computation would be the exact same computation as numpy and the exact same memory used as numpy plus a little bit extra because of the overhead of starting task so it would be a bit slower than numpy is with the case now what happens if we make the dash chunks 25 by 25 yes so as grace said if you make the chunk size the size of the array it's the exact same thing as just using numpy directly and loading the whole thing into memory okay what about if we make the dash chunks really small 25 by 25 so basically if you make the chunk size too small your overhead is going to uh kill you uh you will use a lot less memory but you'll have a much higher cpu overhead because you the uh das has a very small overhead per task but there is a certain set so essentially what das is doing is splitting the computation into lots of smaller function calls and every function call has a certain fixed overhead so that fixed overhead will start piling up um and and so you so choosing the chunk size is important and it doesn't have to be the same in each dimension there is actually another project called xray that i think there's another tutorial for that uses das array under the hood and if you had gridded data like net cdf data or something else like that uh xray is actually a higher level interface uh on to some of it but you still have the same uh chunking issues uh how do you find the gold a locks of trunk size uh the the the chunk size issue is something very difficult to um have a single one size fits all because it dete it sort of deter is determined by the kind of algorithms you're trying to write and also the kind of compute power you have available and the kind of cluster you have so it is sort of trial and error in some sense there are some rules of thumbs you can use but yeah uh to some extent it's trial and error okay um let's move on we have an another exercise which is a instead of using random data is always nice but it's random and we we it's less intuitive so we're going to create a new data set which is based on some temperature data average temperature state of the earth and if we run this we will it'll pull some data for that um while that's running let me have a quick look at the questions if there's anything um okay the questions trying to see dasquaray show the output of an array in the notebook what do you mean like any array well i'm waiting for this thing to run right now so i'm gonna have to wait a second okay so we created the weather data set it took about 71 seconds um now here i'm just going to so if you if we look at this weather array let me see it's in oh did i get that wrong oh wait no okay so if we look at this it is a collection of files that are each about 500 mb let me see do you minus h the total the total size is about 17 gig so this is a pretty big data set and one of the things which is nice about uh desk array and also if using hdf5 and things is we should be able to open multiple files into a single data set and so here we are using a glob to get all the hdf5 files and we're making a list of d sets so if we look at these sets it's basically pointers to each each file and each file has an array which is this size and so if we looked at the d set of zero over here this is basically one data set of this size now the way uh we're using uh we're using h5 pi to reach this hdf5 file n and we can use regular numpy x style or python style slicing to pull out a small piece of it so this is just taking the first data set which is a two-dimensional data set and pulling out the first five by five elements um now let's just plot uh a small data set so we're just taking the first raster like the two by two thing and um sub-sampling it uh by every fourth element so this is just a way to make it a little smaller because otherwise this block might take a long time to load and this will take a couple of seconds and what we're going to do is we're going to convert all these regular numpy arrays or h5 arrays into das arrays and okay so here's a plot of the temperature um for one of those data sets for the entire us i think that is australia uh there is uh north america south america uh it's cold near the north pole anyway and the south pole so looks looks reasonable to me um now let's let's take all these desk arrays and we're going to con all these numpy arrays and we're going to read them into task arrays um looping through each data set so remember d sets if you do len dsets is a list that has 31 data sets in it and for each data set we are using da.form array with a chunk size of 500 by 500 and converting it into a list of gas arrays and so if you look at this each of these is a two-dimensional array um and now we want to put them all together into one larger data set and so we're going to use a command called da.stack and we want the final version of the um the final version of the data to be actually a three-dimensional array so we want it to look like uh 31 by 5760 by 1120. so where we're going to do that is we're going to use the stack arrays command and if you look at the a.stack it kind of tells you this is kind of thing it lets you stack a bunch of task arrays along a particular direction so in this case um i'm going to stack the arrays i'm giving the list of arrays i want to stack and i'm going to stack it along the zeroth dimension and if we look at this this will now build our three-dimensional array which is 31 by 15 11 15 20 by 5760 and it's chunked into blocks and each block is one element high 500 wide and 500 uh long i guess uh and so that so that so it's little blocks that form this big thing um now let's see if anyone can get this um how do you think you can complete this code and calculate the mean of this array along the time which is the zeroth dimension so the 31 or 31 date times basically so why didn't a couple we spend a couple of minutes trying this um just think how you do this in numpy if you wanted to get a mean along one axis and see if you can work out how to do the same thing in dusk oh that's right someone had a suggestion in the chat let me just see let's put that suggestion in and see if it works oops that didn't work one second okay let's try that it's chugging along and i think this is going to error and we will see why it errors in a second okay i'm actually going to kill this because it's taking a little extra time um okay let me just kill that just since it's taking a little bit longer um but essentially oh wait i'm on the wrong place sorry um so one of the things which came up so this was actually working i killed it because it was taking a long time uh this was the correct answer uh some people mentioned that why don't we use compute and typically you would use compute uh but there are um some places where like for example if you do a len on this len is required to have a value inside so it doesn't compute automatically and so uh this result um the i am show when you when you when you put it in a plot it implicitly does the compute for you because it needs the data to do the plot uh if that kind of makes sense so uh you could either do this or you could do compute both will do the same thing and you should get a uh a plug from above so yeah so let's try and see if you can do this let's i'll give a little bit of an extra time here um let's see if you can calculate the difference between the first day and the mean i guess we can look at questions and yeah like darha said um matplotlib under the hood will actually uh convert the desk array to an umpire rate it'll do that computation under the hood that's why you can pass a desk array directly to matplotlib and it will do the computation and plot it for you it's nice that you get the plot you don't have to call compute explicitly but you do need to know that math problem will do that computation for you so just something to be aware of looking at the questions at some point could you elaborate on the limits of desk array where do we start getting too big of a data set for das to handle so um too big of a data set verdasta handles uh so one thing there are a couple things to keep in mind there a couple limitations um and they're not really they apply to ask array but they're specific to other collections like gas data frame as well is you want to make sure that your data set can fit uh in the entire in your distributed cluster memory so you know if you have uh desk will spill to disk whenever memory starts to become an issue so if it starts to run out of memory it'll write intermediate results to disk and then to save ram and it'll just pull those intermediate results out as necessary although that as you can imagine writing and pulling from disk um slows things down and it's not as performant so in general i would try and have your data set fit into the entire distributed memory although it will work if it's larger the other thing to think about is the number of tasks in your task graph so there is some overhead associated with processing each task in like say a desk array so when your chunk size is really really small that's good because from a memory perspective because you'll only load in a few chunks at a time so if your chunks are really small then you'll be well below your memory limits on your cluster and that's that's great um the downside is you'll have lots and lots of tasks that you're processing and there is some overhead with each task so that will eventually become a a performance issue if you have like millions and millions of tasks in your in your graph it'll still process it'll just take a really long time to actually process all those tasks okay um let me get back to this now i'm going to walk through this let's look let's look at this um result and so here it's actually fairly straightforward we want to plot the difference between the first day from the mean uh we know how to get the data for the first day we know how to get the data for the mean uh and now let's actually split this here and let's see what this looks like uh always forget the shortcut will split okay so if we look at this um this actually didn't do anything it's just another delayed object um but when we throw it into the figure it should run all the computations and so you're using the same kind of api the way you use it in numpy now this is a key fact that's going to be important for both um uh dusk array and das dataframe is okay let me reiterate this thing why we don't need to call compute there are certain functions and certain libraries that know how das works and if you pass them a das array or das data frame and they need the result um they will do the compute automatically for you but there are other functions which um but those those those libraries have to be dashed dust aware so if you just take a random library and give it a delayed object then it will break if it's expecting an actual realized object okay so getting back to this when we do something like a result.compute so right now result is a delayed object um oh wait no result got computed never mind let me kill that oh no it's fine okay so result is this delayed object it's actually very tiny um it's only a couple of kilobytes it doesn't take up much memory if i do a result.compute it does all the chunk calculations and then brings all of that data back into the main memory of your computer and so if your computer does not have enough memory to hold everything in the um to hold everything in memory it will crash your kernel um it won't uh actually work so you have to be very careful when you do a dark compute or you do something that brings everything back into memory that either your data is small enough that it fits in memory by doing a filter or something else or here here's a like set of instructions on how you would instead of pulling it to memory we are going to write it to disk so we're going to use um da.2 hdf5 or d8.store you can use one of these two functions and what this will do is it will process the data in parallel and save each chunk to disk as its processing so you never re you never pull the whole thing into memory um i'm not sure maybe we should just get this exercise an interest of time um and something else to consider if you are doing really large data sets there's a project called tsar which is a really good way to write um really large graded data sets so um james what do you think should we stop the array part here and move to data frame i'm not sure if we need to we should do this last exercise yeah uh how many more exercises are there i think this is just this last uh the the lennard jones potential one okay um i think we may be okay then so i think right now we have scheduled another 10 minute break um maybe do we want to go over this limitation section real quick i'm not going to go through the last exercise i'm just going to go through some implementation some limitations task array does not implement the entire numpy database um there are bits and pieces that will not work it does not implement all of the numpy dot linalk one of the reasons for this is there are some really excellent low level libraries like glass and laypack that actually will do a better job than das does for those kind of computations so you shouldn't really be using tasks to mimic those um it also does not support operations where the resulting shape depends on the values of the array because das does not know like if you uh what we did earlier with the if condition on the delayed object where we needed to know the value to evaluate the if condition same way if you have some algorithm that requires the resulting shape be dependent on the values of the array das can't do anything with that because we don't know what the data is until we do the compute um it there are certain things which are inherently difficult to do in parallel um like sort uh and this is not an a problem with das this is just an a overall issue in the way how distributed systems work there are some operations which are just inherently difficult and um we usually have a parallel friendly alternative but there are some pieces which are not true the last bit is important dash development is basically done by volunteers and staff at different organizations and the development is based by need so you know my company needs certain things in task and we fund the development of those features so there are some functions that are not implemented just because no one's done it yet and if you have a certain function that you really like and want to use um you know contributing that would be an amazing thing to do so getting involved is is definitely a good thing so i'm gonna run client.shutdown and it should just close um my cluster a couple of things folks who are working on binder you might see warnings because the cpu time and uh the compute power that folks give you on binder are a lot smaller the other thing is if you're on a local machine if you just use client instead of saying n workers equals four it will just use all the cores on the computer and so if you want to use all the chords on the computer you can just stay fine okay um let's we're okay i think we're going to take a 10 minute break now before we start the das data frame um i guess we're we'll start at uh uh yeah in 10 minutes from now that's roughly 11am cst um in the meantime i'm going to stay on and look through the questions [Music] i might skip some of the questions from the earlier exercises because i don't think i can find the context so i'll try and answer the ones which i can understand the context from okay so cookie and ran out of memory using i am show there's actually another project called data shader in the hollow views tool stack and that's actually a better way to visualize very large data sets but that's out of the scope of this but the data shader and holoviews actually use dust under the hood so you can visualize very large data sets without running out of memory okay uh what can i say about using all four workers so at the beginning of this notebook we did this um let me find it at the beginning of a notebook we we set up a client like this if we change this to this then it would set up a client with all the processors on your um computer so if you had eight cores or two cores or four cores it would set it up with the appropriate number like using some defaults and later james will be going through the schedulers in more detail and so he'll give more options on how those different pieces work okay if you have particular questions that you are interested in please vote them up it makes it a little easier for us to walk through and see which um which questions we need to answer why is the sum from the dasc array implementation different from the chunk implementation up top i would say and i haven't looked at it exactly but das does a lot of tricks to optimize task graphs so when we build things manually we were kind of just doing a very simple parallel algorithm but uh actually dusk builds a task graph and then does an optimization stream to rearrange some of the tasks to be more efficient in the way it computes and runs on different cores and stuff so probably that's the reason why it's different yeah and that goes back to maggie's question as well in the visualization i showed for the sum it was actually doing some group summing and then summing the groups and then summing the groups and that was probably an optimization to make the compute more efficient so uh the holoviz tool suite is a good tool suite for visually data shader and polar views and bokeh and geoviews these are all a set of visualization tools that work with das under the hood that let you um or hp plot is the other one they're all part of the same suite and it's they used ask under the hood and so uh you can um do very big data sets in your browser very easily um let's see so is there a way to put arbitrary numpy based function into dash by just passing a task array into it i think you'd have to use the task equivalents of those um numpy functions there is some new work um there's some ongoing work with the numpy group where you use numpy more as an api and then under the hood you can use different backends like das or gpu or stuff like that but i think that is not fully mature yet it's more like a in development feature can dash handle communication between chunks i'm not sure exactly what you mean by that because the chunks are just pieces of data but dusk does handle moving data around from different machines and processors as needed to finish the computation if that's what you mean um can dash we use the multi-node setup yes we'll get that in the distributed section okay any other questions let's see how does that compare with number um you can kind of use that they're kind of doing two different things and you can actually use them together and actually a lot of projects use them together number will speed up a particular piece of code uh by doing just entire compilation of that code das helps you distribute computation across cores or across nodes okay can you convert a csv file to hdf5 files that's probably not the best idea but there's another format called parquet which is a binary format and it's very straightforward to convert csv files to two part k files and uh das actually most tools work a lot better with parque files and csv files there's a lot of good advantages to using parquet files so i've seen several questions on can i do x y z with dasc uh and i will mention this das is kind of the fundamental building blocks for doing a lot of parallel computer computations you have the das a delayed interface the das bag interface the das array interface and the dash data frame interface and we're going to go over three of those today there are also other libraries that are built on top of das for different things like x-ray is a really good library if you're doing very big raster or n-dimensional data type computations especially in geosciences and other things there's das ml which implements a large number of it uses the same scikit-learn api but implements a fair number of those algorithms in a parallel structure so there is a another suite of tools that sit on top of desk and use das underneath the hood to do specific things um so the examples.org website has some links to those you can also ask in the channel and we can point you to specific ones um okay what does the d set f slash x so so this is a particular thing so hdf walking through this um hdf5 is a binary data set that looks like a directory so it uses a single binary file and in that binary file you will have a um let me actually do this so if we run this if you look at f f is a h5 file pointer just points to a file and i think it's f dot [Music] so it looks like a directory structure it mimics a directory structure so inside f there is a folder called x that contains the x the array called x so d set f of x gives you a pointer uh so this essentially gives you a pointer to the x data set inside this file random.hdfi uh if that makes sense and and then you can load pieces of that data so it's a specific way of loading hdf5 data so does task automatically compute trunk size it can um but you also sometimes need to pick it yourself depending on the type of computation you're at um the high level point i'll make with the disc array and dash data frame is it does a lot of stuff very well out of the box and you can go very far without having to remember that you're on a parallel distributed system but fundamentally parallel algorithms and distributed algorithms are different from serial algorithms so at some point you will need to understand [Applause] some of the complexities of working in a parallel system and so automatic chunking great you can use it but in some cases you may have to pick your own chunking to optimize performance okay why do we have to assign x dot mean to resolve wouldn't it work in line uh yes it would work perfectly well inline we just did it to make the tutorial clearer for folks so it wouldn't get confused so if you're on binder you may want to change your data set to weather small and that will maybe not hit some of the memory issues that folks are hitting on binder dusk array returned a value that's rounded differently than the non-disk version i think that's something that can be controlled um i'm not exactly sure on how you how you do that but i'm pretty sure that's going to be controlled that also might be something which is just the uh the the you need to check that that might just be how it's printing um but we need to check that yeah so james said it's just the way the html is being printed in jupiter but the actual numbers are the same uh james do you want to take this one about uh whether we keep any of the or we don't compete the results from x dot mean um but there are ways to persist some things in memory and um you you there's actually a persist well we'll probably look at that in another place but there are ways to keep some things in memory if you expect to use them again later but um yeah okay mike is going to cover persist in one of the notebooks later okay when do i need tsar okay so um hdf uh okay some history hdfi so so if you're using using very very large data sets you should not be using text files you should not be using ascii csv or any of that kind of stuff that's very inefficient you don't have any information about data sets i mean data types you can't read bits and pieces of the files um the hdf5 is a file format that was developed quite a while back and it fixes some of those things so it's very easy to have really large binary data sets and pick little pieces of them for n-dimensional data zar is a newer project which takes a similar model to the hdf5 um format but makes it more cloud friendly uh because um with with hdf5 if you put hdfi files on the cloud you can only read all or nothing you can't read a piece of it uh the way htf5 works tsar keeps the metadata in a separate file and chunks the binary pieces in little pieces so if you put the data in some kind of remote storage like on s3 or on a remote file system or something then you can uh transparently if you ask for a small piece of the data with das or with something it will read the metadata and work out which pieces it needs to download and only download those so uh that's one of the reasons why you use r it's a it's a really great project i i highly recommend folks who are looking at very large n-dimensional data look look at zar as a way to handle their files or their data so okay i think um that's about end of the break time so let's switch to the next uh section um one quick question someone's asking uh how to find the solutions to the exercises in the notebook there are ellipses those cells are essentially hidden so if you click on those those will expand and you can get the answers yeah i'll see if i can find one but they're cheating yeah somewhere you should see some ellipses like three dots like the the three dots here if you click on those it expands the the to the solution so anyway okay so now i'm going to open the data frame notebook and task data frames are the equivalent distributed version of pandas data frames similarly to how we built numpy task arrays on top of numpy arrays so in chapter one we did a parallel data frame computation over a directory of csv files using dos delay that is a really annoying way of doing things and this what i'm going to show you is a much simpler way and the approach if so again similarly numpy the times when you want to use dash data frame is when the computation you're doing could be made parallel or if your data is bigger than memory under the hood das data frames are going to be using pandas so if you have a tabular data set that fits in memory just use pandas das becomes useful when you have really large data sets that don't fit in your memory's wrap um the demo data that we're using is small so it's not actually going to run out of memory so we're just using it as an example to show how to use that data frame but this technique will work on extremely large data sets um one of the projects i do at work uh we actually have data sets that are in the terabyte you know 20 terabytes zone and we use das data frame to um do processing on them do analytics on these really large data sets so you can um for a lot of people das data frame is going to be the main api they use when they use das again it mimics a large subset of the pandas dataframe api but not everything and this is for two reasons one thing is the pandas api developed over time and it is massive and it is really not possible to have every single part of the pandas api in another project but a large portion of the main parts of the pandas api are implemented in dusk and there are some things that are inherently hard to do in parallel again and those things um there are sometimes workarounds or sometimes there's just not a way to do those um again some documentation links and the main takeaways is if you know pandas you pretty much know das dataframe you know pandas is a in-memory serial data frame das is a parallel distributed um [Music] version of the same thing the second takeaway is the way you partition the database can be very important for efficient execution [Music] um okay i thought there was a diagram here but anyway let's create the data um you know and uh it's downloading the some flight data and then i'm gonna to get a um client set up so it's going to take a couple of seconds to download um trying to wait there should be a diagram which seems to be missing let me see if i can go to maybe it's laid down so this is kind of the diagram i wanted to show you guys similarly to how we had blocks in the um in in numpy array we we split the array into uniform blocks in x and y with pandas data frames we don't it's a table so we don't exactly have multiple dimensions uh the pandas data frame is a two-dimensional table kind of like an excel spreadsheet and so we don't actually split it in multiple dimensions we only split it in one dimension and that dimension is the data frame index and here's an example of a das data frame made up of multiple small pandas data frames each containing data from one particular month and you also see the sizes of these partitions are not uniform so that's just a quick aside i'm going to go back here hopefully this is finished running okay this is finished running and now um i am going to this this also does some uh data prep one of the things which i'm going to show is file name is a glob okay it is a pattern to match something so we're going to match accounts star.csv in pandas you would read something as import pandas as pd and then you'd say df equals read pd dot read csv probably something like this this is how you'd read a single file inside a pandas dataframe that's data frame we usually import it with a import das as dd and instead of giving it a single file name um let me split this here again instead of reading a single file name i gave it a a glob or a pattern of file names i gave it this pattern and now if you look at df df is now a um delayed structure like earlier it actually doesn't have anything it's just about this delayed structure it also if you actually if i do an ls of this um i wonder if i can do this ah one second let's just see how many we have here we have basically three files zero one and two and if you look at this um das is going to call read csv three times to read those three files in and that's just what this little piece means now if i do a df.head it's going to take a couple of seconds and it's going to read the first four or five elements of this uh combined data set now if i look at len it's actually going to do a couple things it took it took a it took a couple of seconds and it found the length of the data frame is you know 300 000 uh growth so what did what happened das looked and found that there were the three matching files the ones we saw up here it found these three files um it calculated the length of each chunk you know it loaded each file into memory and calculated the length and then it added the subtotals to combine that into a grand data set you know in a full data set a full a full subtotal so now let's try this with some real data we have another folder um which is called you know this data nc flights star.csv and uh you know this is just these are the same this is the same information you would give pandas with the the only difference being instead of being in a single file we're giving it a set of files and this also could be a list you can give it a glob or you can generate a list of files and you can give it a list of files so das just did enough work to work out how many files are there like there's 10 read csvs and it did a little bit of investigation to work out what the data types were i just looked at the first file and it picked some data types because the csv csv you don't have actually any information um on what the data types are even in pandas when you do read csv and pandas it makes it looks at the data and tries to guess what the data types are and so if we look at df.head this worked great you got the date you go the day of week you got all these other you know different um fields everything worked great unfortunately this broke df tail and if you go down this big error you go down you go down you go down and it's quite long away it gives you this error that there are mismatched data types and it gave you these two one is that there's this it was expecting an integer and it found a float it was expecting a float and it found a object which is a string and explaining this pandas read csv reads the entire data set into memory and then works out what the data types should be the data frame version on it doesn't do that because if you read the entire data set into memory to work out what the inferences were then you're defeating the point of doing things in parallel and you know using tasks to reduce memory usage so it looks at the beginning of the file or if it's a huge file or if it's a set of files it looks at a couple of the files and it tries to guess what the data types are in this particular example the last the first five or six rows have no value for crs elapsed time like if we go back up here um and we look at crs elapsed time well i guess it's one of the uh okay i'm not gonna go there it's it's one of these nan's there's no values at the beginning of the data set okay and but at the end so so it assumed it was um it assumed it was a float because there was no value later on in the last data set there were strings in that column and so it's an object so das gave an error saying hey i thought it was a float but it actually turned out to be a string when i tried to reach the last one so when something like this happens you have a couple of options one is you explicitly say what the d types are you know you use the detect command and you explicitly say this is a string this is a um integer whatever this is the recommended solution if you know what your data type is hard-coded in so das doesn't have to infer it if not you can increase the sample size so it looks at more of the data before doing the inference or the other thing is um till recently integers like so so numpy and das use mp.float as a missing value and integers did not have a flow like amazing value type so if you had a data set that was integers and had a missing value it would get converted to a float but when you look at the first few rows of the data if there are no missing values it will make the type an integer and then when you read the last data set there's a there's a missing value in there so it tries to read it as a float and when it tries to do the whole thing together it breaks and so you can use and assume missing command to um so you can either use an assume missing command or you can basically just make it a flow so we're going to use the first option and we're going to specify the d types of the columns that are breaking so we're going to say tail number is a string crse elapsed time is a float and canceled is a boolean and now if we do that das knows what the date type should be and so tail works one of the takeaways of this message is csvs are a really sucky format for data please stop using them and use something else like car k or hdf5 or anything else like that a lot of problems you'll have with very large data sets will come from the fact that csv files are messy and annoying um okay so let's look at an example of how we would do a similarly for the example we did with the array computation if we had a set of csv files and if we try to read them all into memory at the same time and they would be too big to fit in memory how would we do that without while minimizing memory and um okay someone commented that csv is a readable csv is a readable when you have small csvs uh but when you have like terabytes of data csv just become impossible to use so you can do a df.head and read it that way so um with the here we're going to loop through each file we're going to do the same thing we did so we're going to find the maximum departure delay uh across all our data so we're just looping through the file names we're reaching each file name into uh pandas data frame just with you with df.read csv we're calculating the maximum of each um data frame we're appending the maximums into a list and then we're doing a final max this is exactly what we did with the numpy arrays um we could also wrap this read csv with a delay function and then we'd be doing things in parallel um so this is how you you could build a parallel pandas using das delayed and pandas so you could do that and essentially all das data frame is is all those parallel algorithms already built for you mimicking the the pandas api so you don't have to do that yourself so if we did this loop um we would be able to do that but now let's do this in parallel remember df here let me let me go here remember earlier we read in df and df is a delayed data frame um a dash data frame and nothing's being computed so now i'm going to time df.debtdelay.max.com so this is the same thing you do in pandas but you're it's running in um desk okay so this took a certain amount of time it calculated the answer so this this kind of does the delay computation for us and um creates it as with das delayed and with a lot of the array stuff we do need to call compute uh we everything is done lazily um das will also delete intermediate results as soon as possible and this lets us handle data sets that are larger than memory and this also means that if you do another computation you're going to have to load in the data each time so if you okay so let's look at the visualizing of this and let's look at what it actually looks like so if you look at the bottom each it split it into uh so we have the 10 csv files each csv file is being read and it's getting the data from the csv file and it is calculating the max and so it's doing that for every csv file and then it's got all those maxes and it's aggregating the max across all of the files and then it is you know calculating your result so this is the same kind of parallelism okay so we have our df if if you look we have our df um how would we get the length of this data frame um let's see if you guys can do that and let's so we're going to do two exercises one is how do we get the length of this data frame think of how you do it in pandas and then just do the same thing with um with this airframe and if you guys think you know the answer place in the column and i'll try your code should be an easy one yay someone got it so it's just land of the f land of df will calculate the length of data frame it and then we did this earlier so it's really straightforward um someone else had another one let's try that one so this works too and this is because if you just type shape shape we don't know the shape because we haven't read in the data and that's why you have to do a they did a shape of zero if you look at shape of zero it's just a delayed function and a compute actually does the calculation okay now this is a finder's data frame so you can do ordinary stuff oh my god it basically oops that was bad so we you know these are all the columns these are the columns we have in our data frame and there's one called uh cancelled so let's see how would we work out how many non-canceled flights exist so how would you do this in pandas okay someone asked this actually a good question uh basically the df dot shape this does not work okay this gives an error and if you look there's a reason for this if you look at df.shake the offshore.shape is a tuple the first element is a delayed object the second element is the number of columns which we already know so this is a tuple and a tuple is not a das object so you you have to do a a compute on this delayed object which is why you do the shape of zero and then that is something you can do a compute on if that makes sense okay so you know what i will just show this because i think we're a bit low on time we need to get to other things so here we're using uh pandas indexing cancelled is a boolean and so we can just say dftf dot cancels and we will get the length of what's cancelled okay now we have a question on why you didn't have to call compute on length df yes okay so again the length is a special python keyword that the length of a object is required to get like the specification of the lens function it's required to give a number and so uh das knows that and it um does the compute implicitly because uh the way length is supposed to work len if length gave you a delayed object it would break other things in the python ecosystem uh and so it's it's kind of something dusk knows let's special so it does it uh it so yeah it's the way we implemented it in invest okay um actually uh james how are we on time um i'm not sure or mike do you know how we are on time yeah you're doing pretty good you've got um it's 12 30 now uh you've got about 10 more minutes okay okay so we're gonna do a uh group by example um so let's see i'm gonna give everyone about two minutes to see if you guys can write a um group by to see how many non-canceled flights were taken from every airport so think of how you do a group buying pandas to find the flights from every airport uh and let's see if we can do this and if you have an idea um uh paste it in the chat actually while while you guys are working on that a couple of people are confused on this um if let me show this df.cancel.head let me just show you what it looks like um dflcancel.head is a boolean and so this has a bunch of falses if i put a tilde in front of it it inverts it so trust uh falses become trues and truths become falses uh this is true for a lot of things like nonpay arrays kind of you know this is just a fairly straightforward thing so here what i'm saying is keep the things where it's true so basically it's kind of saying where canceled is true keep it so okay so stephen has a group by that we can try and let's try it so let's see so what he's he he's got a he's filtering out if it's cancelled or not then he's grouping by origin and then he's counting the origin and then he's computing so let's see if that works and it does so great job uh stephen okay any if there's any questions about please ping and i'm gonna go to the next one now okay why does mean should work uh someone's asking why it shouldn't work for me it should i'm not going to go into that right now since i'm going to try and finish the tutorial but uh someone asked why mean didn't work and it actually should so okay now what's the average departure um in interest of time i'm just going to show this one uh again here this is actually the mean one so we've got a df dot group by um we're grouping by origin and then we're taking the depth delay column and then we're calculating mean and then we're computing it and so this is the average departure to play display um departure delay is the amount the time uh it took for departure so it's not a boolean it's a float i think okay let me just see how much more we have um i am going to go ahead and skip this last example um and i'm going to talk about sharing intermediate results this is just another example that um lets you group by the day a week and then do the mean and you can look at that but each of the computations we did earlier is doing everything from scratch we're reading the data in from well a lot of the computations from scratch and so we're not sharing things and this gets inefficient when you're doing a lot of stuff um so let's look here let we we have uh two different um we're going to find out how many flights were not cancelled and then we want to calculate the mean and the standard deviation of those flights okay the mean delay and standard deviation of the delay of those flights um and these are delayed objects like if you look at these this is a delayed object we actually haven't done any computation so now let's actually compute them and take see how long it takes okay and if we look at the time in a second uh taking a few seconds to run through it okay so this took about six and six seconds of wall time now we're going to call it a slightly different way we are going to uh use unpacking we're going to use das compute that lets us call the compute on both of these at the same time and it will give us an answer and a tuple and it'll unpack that topical into these two and if we do this if we should look it's taken roughly half the time you know this is almost seven seconds this is about three seconds and this is because the first half of this is the same for both of those um computations they have to read all the csv files they have to calculate this uh filter reduction dfdf cancel until they have canceled and then there are some sums and counts which are duplicated between both so if you want we can visualize it and if you look at it this stuff at the bottom is common and so it's doing it in common and then it is um getting the advantage of having a fair amount of this stuff done in in common so it's not repeating it um so uh and and again mike will go into the persist logic later where we can use a specific command called persist when you know that you're going to use something over and over again you can kind of have it pre-loaded into memory um and so thinking about the way you do things um can be important in terms of speeding things up okay uh closing out some final things candace has been around a lot longer than das and it has a vast api that's increased over time if your data fits in memory just use pandas the das data frame module gives you a limited but very i find it very useful um like a subset of the pandas experience specifically for when you have data sets or computations that don't fit comfortably in memory or can take advantage of parallelization so in this data set we've done some stuff we've used a small data set it's about 45 mb or 400 mb in in memory in this situation you'd never use das there's no reason to use dash for such a small data set um das data frame becomes really meaningful once you start hitting memory error or once your kernel starts dying when you try reading data sets we personally in in my line of work we have used das data frame with data sets which include 20 000 csv files or part k files on s3 multiple terabytes you know um really really large data sets and things like that and um i'll i'll throw a few other things out here uh getting back to the csv and part k example earlier some people are saying they love csvs one advantage of converting your data to something like rk is let's say you have this actually the case right now we have a client who has a data set with 1600 columns but they only need five or ten of those columns for any particular analysis if you have the data in csv you have to load all 1600 columns if you have the data in something like parquet you can pick and choose which columns of data you load and only deal with with that um the other thing to remember with das dataframes is when you do the final compute it brings all the data into memory as a pandas dataframe so if you take if you create a das data frame [Music] that is based on one terabyte of data on disk and you just say df.compute it is going to try and bring that entire terabyte of data into memory and you will crash your kernel the use case of das is das data frame is you load a huge data set and then you conduct various operations that reduce the size of the final data set like you do with some filters you do some group buys you do some averages [Music] can you guys hear me yeah i i can't on you darha so i don't know if it's yeah james can hear me okay oh that's gonna hear me yeah i can i can hear you yeah can you hear me now yes sorry my headphone battery died and took me a while to work on how to switch to the laptop mic um okay also uh pandas does a really good job of releasing the gill in certain applications and so certain applications you actually get a speed up and you can do those pandas operations in parallel i will say there are certain things which are inherently hard to do in parallel uh a global sort is one of them drop duplicates on the global data set is another one of them and group buys with two really large distributed data sets uh emerges with two really long mergers and joins really really large data frames is a problem there are uh there's a best practices guide [Music] which will fix some of those things but some of those things are just fundamentally difficult in parallel okay um i guess can you guys hear me okay i guess some people are saying i'm still kind of out of sync um i think one last exercise we'll just walk through is we have some built-in um exercise a built-in functions for das but if you have a for that data frame but if you want to write your own function um and i don't think i'm going to go through this entire example right now but this gives this this section here gives you an example of a custom function that you might write in pandas but isn't available in das so if for example you didn't so dd.2 delta 2 time delta actually does exist in das but if it wasn't you could use the function map partitions and what map partitions does is run a pandas function or run a function on every partition so if you wrote a function that took i'm going to get rid of this if you if you wrote this uh compute departure timestamp function that worked on a single pandas data frame and if you wrote this then um you like so this is a function that takes a single pandas data frame and calculates the time delta like that a timestamp now you could use map partition to run this function on each individual pandas data frame each individual piece and then combine them together so if i do this this calculates the date time so this is kind of a way mathematicians is a way to write your own functions um and then pass them in and have them run in a parallel manner across the entire dataset um again i already covered this um i think that's about it for what i'm going to cover on data frames um i guess uh james i think we take a break now i think mike is next um james will be next for a little bit but yeah we take a break okay um and we can try answering some questions during the break i guess if folks have any okay we're gonna do a five-minute break instead of a ten minute so we'll start again in five minutes and this entire house stop sharing your screen we can bring in james uh yes i will do that you as well for a little bit to answer questions once i woke up how to share okay there we go okay if you can if you guys can vote on the questions that'll help a little since there's quite a few of them yeah please do oh is there actually this is a good question is it possible to apply a lambda expression to dash data frames yes there is a dot apply and a dot map on the data frames and that's actually usually the way you do it kind of the same method chaining logic that you'd use in pandas so andrew's question about sending setting up desk on cloud servers uh and clusters um i'm actually writing a blog post right now on some better ways to deal with task workers and environments in the cloud and so later this week before scifi ends i'm hoping to write a blog post on some new tools we've built to help with that but there are a lot of different things that come into play there okay what happens if i use apply and a delayed trunk okay so general rule of thumb is you don't mix das data frames with delayed functions those are two different ways of doing things and they don't really interact that well so um uh when you read when you use dd.read csv can you use multiple patterns um not directly what you could do is use a glob with multiple patterns and assemble a list of files and then pass that in um there's a question uh when would you use data frame versus x-ray um so there's an x-ray tutorial i might already have it i think it is happening later um that i recommend attending so yeah x-ray uses das under the hood um so yeah when you're using x-ray you can also be using gas sort of without knowing it yeah the high level pandas used was it pandas in x-ray or was it asking x-ray what was the question when would you use that data frame versus x-ray so x-ray has nice things like dimensions and labeled axes things like that um so if you want those additional features i highly recommend using x-ray i will say x-ray uses desk under the hood yeah and das data frames is specifically for tabular data um x-array is for n-dimensional data so once you go into the third dimension you're not going to be able to use finder's data frames so i mean that's great you need to use x-ray yep and rich said uh x-ray tutorials on thursday thank you rich yes and definitely plug for x-ray that that is a great project yeah definitely okay there's a question if das decides there is nothing to be paralyzed it will execute sequentially right uh in this case will it be slower than uh versions without any delayed decorators uh yeah so if there's nothing to be paralyzed meaning like every step in your computation depends on the previous step there are no sort of independent uh tracks in your task graph uh then yeah das will just compute everything sequentially just like you would normally without using task for parallelization there is a small overhead associated with using like das tasks um so there is a small overhead that would make it in some cases slower than using uh not using gasoline just running things synchronously however the overhead is small so um you won't be noticeable unless you're using lots and lots of delayed tasks that are completely uh one after the other and nothing can be paralyzed there's a there's a memory warning question related to garbage collection and desk uh and asking for recommended ways for tracking that down i'm actually going to cover that in a moment in the advanced distributed section uh there's a there's a bit about debugging there uh someone's asking about overlapping chunks and you we do have overlapping chunks it's just a different api call right yeah there's a map overlaps function in desk array which will allow you to do overlapping uh computations uh involving overlapping chunks okay so i think we are about five minutes so let's go ahead and start the next uh notebook let me share my screen real quick too many video sources oh great okay so dar has just dropped off let me try again all right so the next notebook is uh notebook 05 underscore distributed um so this notebook here and let me go ahead and increase the font size real quick for everyone uh hopefully that is big enough uh so so far today we've been um basically constructing das graphs using a few different apis using the das delayed api the das array api and the das data frame api and then we've just been calling compute um on those objects or maybe we're calling das dot compute on several of those objects and haven't really been worrying about anything else you just said call compute it it computes the result for you so now we're going to scroll step one step back and start talking a little bit about what's happening under the hood there so what's happening is das scheduler is actually going through your task graph and computing the various tasks so back within the delayed notebook you know do this increment function then this other increment function after that go ahead and do pass those results to an add function and it's doing that by submitting uh tasks to different uh like resources there are four different types of schedulers that are available in desk so there is a multi-threading scheduler which is just uses a standard thread pool a python thread pool from the standard library there is the multi-processing scheduler which is backed by a multi-processing pool in python a single threaded scheduler and that just executes tasks um in the main python process so there is no parallelization um happening when you use the single threaded or synchronous scheduler and then there's also the distributed scheduler which we'll talk about later in the notebook these first three are generally like you'll often hear them referred to as single machine schedulers these are sort of the first iteration of schedulers and they require no setup they um the dependencies are just a python standard library whereas the distributed scheduler while more advanced does have additional dependencies like tornado so how do you go about choosing between one of uh these different schedulers so there are a few different ways you can do that uh the sort of most uh granular way you can do that is in individual compute calls before we would just call compute with open parenthesis and wouldn't provide any input there is a scheduler keyword argument that you can specify to select a different schedule so you would pass in for instance if you have some computation you want to do compute scheduler equals single threaded that would then use the single threaded scheduler you could change that to be processes and it would use multi-processing threaded multi-threading just for that particular compute call you might however want to set the scheduler for like a given block of code like i'm going to call compute on a few things and i want them all to use a certain scheduler so for that you could use a uh there are a couple ways you can do that you can use the dask.config.set context manager by and specify what scheduler you would like to use and then inside that block of code inside that with block any compute calls will then use the specified scheduler so in this case i've said scheduler equals processes so use the multi-processing scheduler and any compute calls there's only one here but if we had multiple of them they would all use the multi-processing scheduler and then when you exit the uh with block when you exit the context manager then we'll go back to just using the default schedulers for the different collections additionally you may just want to globally set it you may want to say i don't i don't care about a particular block of code i just want everything to use a particular scheduler then you can use still use dash.config.set but don't use it as a context manager instead just call it directly and then any compute calls that happen after that we'll use the specified scheduler in this case it will be the multi-processing scheduler one thing i guess that we're not covering here i'll show real quick is if we wanted to so say we have my value.compute and i want to use the um multi-threaded scheduler so i would i think threaded yeah i use the threaded multi the multi-threaded scheduler by default what this will do is create a thread pool that has a number of threads equal to the number of cores on your machine earlier i had said that you do however have the freedom to specify the number of threads you would like in a thread pull or the same for processes in a process pool you can do that using the num workers um keyword argument so by default it will choose if you don't specify them workers it'll choose the number of cores you have as the number of threads or processes but you can also tailor that to your your specific needs okay so let's move on we'll go ahead and load in some of the flight data again if you're running on binder you may want to add the attach touch small and we're just going to set up some computation here we're going to use data frames read csv to read in all the flight data csv files we're going to uh par states we're going to set some date uh some d types and then we're going to perform some computation uh here we're going to find the maximum average non-canceled delayed time uh that's grouped by airports that's a mouthful um so the main this is sort of similar to what we were doing earlier in the uh data frame notebook the main point here is we're just setting up some computation that we would like to run um so now what we'll do is run that particular computation using each of the different single machine schedulers so we'll first run multi with the multi-threaded scheduler then the multi-processing and then the synchronous scheduler and what we'll do is we'll get the time before the computation the time after the computation to figure out how long the actual computation takes and then we'll print that to the screen uh along with the result that we get for the actual computation so we can see here the multi-threading scheduler took just under four seconds and the result 10.35 the multi-processing scheduler took 10.6 seconds same result synchronous scheduler 6.2 seconds same result so the take-home point here is that these different schedulers have will all give you the same exact result so we're getting the same results each time it's going to go through the dash graph and submit tasks to various resources but you're always going to get the same result just with slightly different performance characteristics that could be important depending on your particular application so you can see here the difference in the performance characteristics you know the like for instance the multi-threading scheduler is taking you know a lot longer than the multi-threading schedule so uh you might ask yourself like why would i what are some rules of thumb here when picking between threaded processes and synchronous schedulers so um let me just say initially the the synchronous scheduler is used primarily for debugging there is no parallelization associated with the um synchronous scheduler so uh good rule of thumb if debugging uh which mike will talk about later um you can feel free to use the synchronous scheduler it's nice you get nice trace packs and things like that um when picking between threads and processes what you want to do is think a little bit about the actual uh functions and computations that are happening in your task graph so um when using a thread pool in python python has the global global interpreter lock yo or gil and that means that say you have a thread pool with 10 threads in it only one of those threads can actually execute c python code at any given time so if you're running a lot of um code that is holding on to the gill then you won't get a good performance there you won't be able to paralyze very effectively however lots of libraries like lots of uh sort of uh libraries in the scipy stack like numpy and pandas release the gill in most places so you can use the multi-threaded scheduler safely when using things like numpy and pandas general um if you are running lots of code that holds onto the gill i think this would be something like for example you're doing string manipulations in python that will hold on to the kill then you might consider using the multi-processing scheduler that's because the multi process processes in python uh can run uh see python code in each process concurrently so that's sort of the advantage of using processes however the disadvantage of using processes is there is serialization between processes when using multiple threads they all have the same shared memory space so you can like effectively just pass around pointers to uh like intermediate results and dependencies that are needed for other tasks there's no serialization cost there however in multi-processing uh you have to like pass dependencies between processes so you have to serialize some piece of data pass it to another process and deserialize on the other side where it can then be used so if you have lots of that communication you'll you'll also slow down your uh performance there as well so there's some balance here between um communication costs and holding on to the gill that you'll need to sort of put some thought into and kind of maybe experiment with as well there's some additional information here um in the documentation about choosing different schedulers again thinking about serialization and video can you um can you run visualizat visualize on that uh just to oh sure yeah definitely um can we we've got plenty of time so this one sure and we've also got some questions so here we go yeah so we're seeing we are getting some like these portions here are all independent of one another so these can all be run in parallel um easily but then we start getting these these intermediate results you see all these arrows like pointing to this circle or this circle so to execute this task we have to wait till all the other arrows that are pointing to it those tasks are done before moving on to this task um so this is not uh embarrassingly parallel however there are opportunities for parallelization here and there's our little final result at the end okay what i liked about this craft in this in this context as well is that it it is showing the communication overhead uh which explains why we're seeing um uh speed ups uh between the sync and the multi-process and you talked about the the communication overhead that you have there with multi-process you're seeing that here with the graph yeah exactly so what mike's referring to is just looking at this circle right here like this particular task needs a bunch of inputs to call this function like each arrow that's passing to this is a different input to this function and it could be that these intermediate results that are getting passed into this function actually we're computed on different processes in a process cool in which case like say this it's probably not but like say this is a little data frame here we would need to serialize that data frame uh send it to another process deserialize it and then we can finally pass it to this task for execution and that's the communication cost that mike's referring to there and you can kind of see it with all the arrows pointing here to this particular task in the graph so yeah uh highly recommend checking out the task visualization if your task is like not you know thousands and thousands of tasks otherwise we'll run into the problem where it was like super tiny um when trying to visualize it um okay so maybe i'll go over and uh do we have any other questions that are pressing right now uh that are related to this okay um let me go ahead and uh carry on then for the time being uh is for multi-processing is a single threaded each process yes uh yeah yep for multi-processing uh word so yeah these are uh we'll talk in a second distributed you can do multi-processing multi-threaded processes here we're having either a sync for the multi-threaded scheduling there's one process with multiple threads for the multi-processing scheduler there are multiple processes which are each single threaded great question hopefully that answers okay so that's it for the um single machine schedulers so we'll now move on to the distributed scheduler so although the distributed scheduler is called the distributed scheduler it does scale out to multiple machines it also works perfectly fine on a single machine so perhaps a better name for it would have been an advanced scheduler as opposed to a distributed scheduler what the distributed scheduler consists of like distributed cluster consists of are one or more worker processes these workers are where the actual tasks get executed so going back up to this graph here like each of these individual tasks and our graph are executed on workers and there can be one or more workers then there is a single centralized scheduler that maintains basically the state of the uh of the cluster so this will keep track of what tasks uh you've asked to be computed the dependencies between all the tasks so in our uh visualization up here it'll keep track of basically all these arrows uh that are linking different uh tasks together it'll keep track of what tasks are complete which ones aren't which ones are pending it will also farm out the tasks to the different workers to execute to execute them so the scheduler sort of controls like the state of the cluster the workers actually do the work they actually execute the functions and then there is an additional client which is connects to the scheduler and that's sort of the user facing object that you use as a user of desk to submit tasks to the scheduler so you will create a client that will then submit tasks to the scheduler the scheduler then handles everything else it handles farming out tasks to workers when the workers and everything have computed your your tasks uh you can then request the result using the client and uh the scheduler will take care of pushing that back to you there are lots of ways to deploy a das cluster a gas distributed cluster as i said you can deploy locally on your laptop using local processes and threads you can also let me click here on this deploy link there are lots of other options here you can deploy using a the command line there's an ssh option you can on hpc systems there's a project called job queue for for that you can deploy on kubernetes or yarn hadoop there are lots of uh ways to go about actually deploying gas we're not going to go into those uh here they're sort of at a scope for us today instead what we're going to do is the simplest one which is we're going to run a dash distributed cluster locally so we are going to be using local worker processes and our schedule and our scheduling workers are going to be on local processes on your laptop so to do that all you do is from dot distributed import client all you do to create a local cluster is instantiate the client without any additional um input parameters without any keyword arguments what that will do is create a local cluster uh here on your laptop so i have it will by default choose the number of uh workers to be one per core so for here i have four workers um notice that the workers are multi-threaded so each worker has two threads in it um so that's why we're getting eight cores here there's a nice little uh the distribute clutter distributed cluster has nice features like you can scale up the number of workers so i'm not gonna do that here but you could easily scale up the number of workers or scale them down yeah so that's how you instantiate a local cluster on your machine so here it says if you're in jupiter lab and not using the das lab extension be sure to click this dashboard link here that is one of the big sort of advantages of using the distributed cluster is it has advanced diagnostics so let's go ahead and click on that here so this brings up a separate this actually is a we're running it bokeh server um the scheduler as well as all the workers and we collect state and push it up to these uh bokeh plots so we can see this is a status page right now there's nothing on here because we're not running anything we'll see this be populated when we start running things here is a worker tab we can see here are my four workers each with two threads notice this is updating live like we can see the cpu usage uh and the memory usage so this is up the current status of our workers so this is one way to view things in a separate um tab in my browser here i actually have installed the das jupiter lab extension so what that allows us to do we have this little dash tab over here that allows us to bring some of these diagnostic plots into and embed them into our jupiter lab session here which is really cool so totally fine to view it in a separate tab if you prefer i'm just going to render them here using the jupyter lab extension in my notebook okay so let's go ahead and start using the distributed cluster now so what we're going to do here this should look very similar this is almost copy and pasted from the delayed notebook so we're going to have an increment function a decrement function and an add function that do what you think they increment decrement and add there are just these added sleep statements in here as well to sort of simulate some work being done so we'll define these functions and then we'll go ahead and uh do sort of a standard uh uh das delayed uh workflow here where we increment we delay our increment and decrement and add functions and then do some computation and call compute and what we'll see is over here we've started to do these computations so we can see our task stream over here this is the decrement function happening this is the increment function happening and then we have our add function happening so what happened here notice i didn't tell das to use this distributed cluster whenever you instantiate a that's because when you when you instantiate a distributed client it will automatically register itself as the default scheduler so after this point um unless you specify other work otherwise the distributed uh cluster will be used like for instance if i said here scheduler equals uh sync that then we'll use the synchronous scheduler but otherwise it'll use the the default uh by default it'll use the uh distributed scheduler so we see over here is this axis on the x the x axis is the time axis here we can see here's my decrement function it took three seconds let's go up here that's corresponds to the sleep three then here's my increment function took five seconds corresponds to the sleep five and notice those ran in parallel so we were able to run both of these um in parallel and then after both of these completed we then were able to run our add function which took seven seconds which is consistent with this sleep seven if you look closely you'll notice there's a little red sliver here between the green and the blue here um i don't know if i can quite get my cursor on it but what that is uh that that is a transfer uh so we're transferring a dependency in this case we're transferring the result from the decrement over to uh this process this worker so then we can then uh use it in our uh when we when we execute the add function so we're getting both parallelism here with the distributed scheduler as well as this dependency management here and there's a nice way to visualize it in the task graph so let's move on to a slightly more complicated or involved example so we're going to go back to the beginning of the notebook where we have that data frame example and run that on the distributed scheduler so here you can see there's many more functions that are happening um and yeah that happened pretty quick i can i can run it again in a second but here we're seeing we here's all of our read csvs happening then all these little they're kind of color coded by what function is getting called here here's a group by some on a chunk uh here is another one here is a group by count on a chunk then you can see here this red that's a transfer git item that means we're transferring some dependency from one worker to another because another worker needs it for the computation it's trying to do notice there is not a lot of red here so uh das distributed in general is quite intelligent about how it does computation so um it thinks a lot about data locality so we'll try to minimize the amount of data transfers possible so if there's a particular worker that has several dependencies needed for another task it will then run that new task that requires those dependencies on that worker that already has that data locally to avoid a transfer between workers where possible um okay great so yeah there are some other links here about the architecture for the das distributed uh scheduler as well as a link to some diagnostics um information about the dashboards so uh that's sort of the introduction to the distributed scheduler for this exercise there are a few uh data frame computations here that we can run on the uh on the distributed cluster so the point of this exercise is not so much the actual computations happening here but rather kind of exploring the das dashboard here so you can go into if you're using the jupiter lab extension you can go and check out these different types of plots here um if you're see here's a cpu one and if you're not using the lab extension totally fine uh you can use the uh dashboard link in a separate window to maybe make maybe put the notebook in the dashboard side by side so you can see what's happening um so yeah sort of experiment with the das dashboard here and then uh mike and myself can answer your questions in the meantime also add to connect to that dashboard uh you've got to copy and paste that url into the uh extension um some folks were asking about how to do that gotcha you showed it but you know yeah yeah sorry about that no noise yeah someone also mentioned um the client wrapper in jupiter um if you just if you have a client instantiated if you just output it in jupyter um it will have a dashboard link on it which you can just click and it'll it'll redirect you how to reset zoom in uh bokeh plot there should be a reset button it looks like a little circle with a couple of arrows in it i think uh let me pull that out yeah it's like a refresh button yeah like a refresh button yeah click that and it should uh pull you under the zoom you've done yeah so for the jupiter lab extension you'll want to um depending on how things are set up and like how things are set up like on binder we have it set up to pull the the correct address right away um you do need to pass the address for your uh dashboard into the uh upper left hand corner in the jupiter lab extension paste it there and then the lab extension will then be able to connect to your to your cluster is there a rule of thumb when provisioning resources for estimating scheduler requirements to manage the worker process versus thread threads so i guess to answer that i would i would look at the example that james showed in the beginning and look at those different uh schedulers and think about the different paradigms there so well it's down to your trade-offs between communication overhead between processes as well as you'll see that in a distributed cluster between nodes in that case versus multi-threaded and mcgill and releasing that for different computations so generally the rule of thumb um i believe there's also a doc's page on this do you have that handy james i don't uh i'll pull it up real quick and throw it in the chat yeah so i noticed there's sorry i must have missed this there's a question with a lot of up quotes um so das looks great in these tutorial examples but what are the limits of das that one should consider when using for real-world analyses yeah so so great question yeah the tutorial we're going through rather small sort of like more illustrative examples um but we're trying to allude at some of the i don't know that i'd say limitations but some of the uh some of the things to be aware of when when using that so um things like balancing the number of threads and processes this has come up a couple times already right so for your uh particular workload you'll need to think about are you holding the gill are the things you're running holding the gill often um are there lots of different communications happening between processes so those are definitely things to consider whenever you're creating your distributed cluster or any of the other schedulers as well another thing to think about that's come up is with the we see often people having issues with the like chunk size and ask arrays or the partition size and dash data frames and also something to like tune a little bit depending on your particular the particular resources available in your cluster so you'll want to make sure that you know you can fit a few uh partitions or a few chunks in um memory pretty comfortably um and and yeah also you you may end up running into if you have lots of super tiny uh tasks in your uh computation there is some scheduler overhead associated with each task so over time that will build up if you have a large uh like million or uh order like million uh task graph um there are some ways to go about uh uh like uh helping alleviate some of these issues so for instance like on a desk right you can re-chunk a task array so if you have lots of little tiny chunks um it may be advantageous to re-chunk to have a larger chunk size so you'll have fewer um uh uh tasks in your task graph mike in the next notebook will talk about persisting uh data sets to dash collections to distributed memory that's also incredibly useful um in helping avoid having to redo lots of computations over and over again so those are some sort of things that users run into commonly and some potential solutions also i see rich said love the cluster map totally agree rich it is very cool it shows like all the communication between the workers and the scheduler it's uh thank you jacob tomlinson uh i have to shoot the first time i've shared my screen so i have to um change some settings i'll be right back okay sounds good i'll keep monitoring the questions yeah so uh the next question is uh distributed.worker warning memory use high but worker has no data stored on disk um so michael talk a little bit about um debugging different uh warning messages like that uh in das distributed um that said there is a known memory leak in numpy that can result in uh this type of thing as well so that's also something to be aware of that's not necessarily an issue with desk but more um yeah like a memory leak and done by yeah i'm back hey welcome back all right i'll share my screen now if you're ready uh yeah no i think i think so all right hopefully you can see my screen now uh increasing the text just a tiny bit [Music] yeah that looks good to me hopefully it does for others too all right let me know if anyone can see okay so switching over to advanced distributed so um up until this point we've been um you know so many computations uh by calling compute uh mostly using either delayed or some of the higher level uh apis have been implemented such as fast array dash data frame etc so in this part of the the tutorial we're going to kind of drop down to a more advanced or lower level interface called desk features um futures uh this center this is an alternative that to to compute uh it's but it's derived from uh concurrent features so if you're familiar with those it's gonna to look fairly similar to you so as we go through this one of the things uh the way that you can submit one of the ways that you can submit a or create a future in desk is if you have a client you can call submit on it you can give it a function and any arguments that would go to that function and you'll get that feature back so that's what this example is here so just one moment while i spin up my local cluster again okay and you can see i have the dashboard over here on the right so we'll be able to see stuff as we run computations so this is that familiar uh increment method that we've been dealing with before only instead of using delay here we're going to use uh submit to create a future um and so one of the interesting things here is we're using that client we're passing the reference to the function and the arguments and we create the future it returns immediately but you see that we get some additional information here we get a status of pending we get a unique key as well as like the the name of the function these statuses actually change over time so we can see actually by looking at the the task stream that it's already completed uh so now if i try to access this feature again i'll see that its status is changed it's no longer pending it's finished and it actually has a type because we've now returned a value for it and desk knows what that type is it's the built-in end so that's essentially what we're going to be talking about here a little bit with the features interface there's some interesting alternatives you can run here besides um just returning the future you could call nada cell here you could call uh wait uh or progress progress which would return a progress bar so i'll show you what that looks like and we'll see a little bit more of that here in a minute oh yeah i have to import it mike let's you increase the uh font size just uh like one more click sure thank you thanks hopefully that's better and we see we've got a similar progress bar to what we have in the dashboard uh and it's complete like it is over there uh alternatively we could call wait uh and this would essentially notice how when i submitted this this function and got that future it returned immediately if i if i and let me call it again here so we can see that it has indeed finished great if i were to call weight on this now that i've got this imported this will work the cell actually uh will not return immediately it'll well it happens all very fast um oh i have to oh because i didn't change it that's right change this to two so we'll resubmit well that's not working for me wait is returning immediately i don't know why do you know why james uh it looks like the task is finished yeah i think it's just oh i don't have a sleep there i'm used to we're used to our sleeps so that's what's going on yeah i believe we have some of this down below but this is fun one second again and now we see that it's still blocked for a second which is what i was trying to show there so that's the the features interface in a nutshell with uh the progress and the weight um [Music] we've also got this other function on the client called gather and i believe i can call that here and it will return immediately because the task is complete but we actually get the the result now as well uh this is equivalent to on the future you can call results uh if you'd like what you can do with the gather interface interfaces you could you could actually have multiple features here to get those back kind of like uh calling compute or client.compute on multiple delayed objects mike could you uh maybe there's several questions about uh could you explain a little bit more about what a future is so essentially you can think of a future is uh if you look at the essentially it's the same as the concurrent features interface uh but essentially you can think of it as a reference to a uh task or a object that's going to be completed in the future right and it's essentially not a value and it has a status uh there's an asynchronous computation that's going on on the dash cluster whenever that computation completes uh the status is updated and the value is there so hence the name future is it is a reference to an object that will have a value once the computation is complete does that make sense yeah i know i think so we can uh see if other people have final questions thanks mike i can't see the questions right now as i'm sharing the screen so let me know if there's any that come in yeah i think we're good for now thank you okay cool so now let's go back to uh the example uh that we've been working with um from from delayed here so i'll go ahead and run this this should be pretty familiar to you guys at this point it's just using the the delayed interface and we're going to call compute on that and this time we're going to reference this as that future and we'll actually see the work being done on the dashboard so notice how it returned immediately we can call gather to actually get the result that will wait until the result is ready let me run that again and while that is running you see that it's waiting there you go so this is a you know way of calling uh a compute on delayed functions uh and returning the results using the futures interface as i said before we have the client submit uh go ahead oh sorry i was gonna say there's been some questions about um das futures versus das delayed because they look somewhat similar i just wanted to really quickly say the difference between the two so they are somewhat similar i mean we're just like in in a distributed cluster we're submitting a function to be executed and in gas delayed we're delaying a function to be executed so from that perspective they're quite similar the main difference is that uh when you create a delayed object nothing has happened other than you create the odd the the graph itself the task graph itself no computation takes place whenever you create a future it's an asynchronous computation so you submit a function to be executed on the cluster it returns a future to you to hold in your python process and that computation does start right away on your clusters that's the main difference delayed computations the computation doesn't start until you call compute on it uh whereas with a distributed uh cluster when you call submit uh the computation happens asynchronously on the cluster and you get a result a future that points to that computation and whenever that computation is done you can then call results and get the actual uh result from the function hopefully that helps sorry to interrupt mike well that's great please do and so this is what what james was just not talking about with client uh dot submit we're submitting a function and notice that this is the the ink function itself that's not a delayed object and an argument to that function so now we're going to go through an exercise where we'll take the delayed uh example from up here and we're going to implement it using using only das features uh so we'll wait a few minutes for folks to do that and they can essentially grab the code up here we actually don't need to define the functions again but the logic will be something similar to this rather than using delayed use this client.submit interface and james if you could let me know if there any questions that come in uh so there's a question um what are some examples of cases where you want to run a future versus chaining up operations with delayed and then running compute um so yeah so for that um for instance like for very dynamic workloads you may need to submit new tasks based on the output of some tasks so like for instance say you have one computation that runs and then based on the output of that computation you then may or may not submit other tasks dynamically so you compute some value if it's greater than 100 submit these tasks if it's not submit this other different set of tasks um and the futures interface allows for these more dynamic workflows so that's one way in which you would that's one particular use case in which you would want to use futures over dash delayed um and then yeah also there was a comment about um delayed being able to operate on results implicitly futures require you to wait on results before operating um you can also operate on results of futures like like you can use a future as an input to another future and that will totally work it will build up that dependency uh you know this future requires this one to be done first and that's the input to it um you can build up those uh those task graphs using futures as well exercise i'll show the answer in the interest of time but essentially what we're doing is replacing all the delay calls with client.submit calls and then calling gather on the end run that okay so now we're going to go into another example if you haven't ran this uh hopefully everyone's ran this at this point go ahead and run the prep dot pi to download the accounts and prepare it so here we're going to use this is actually using dashbag which we didn't talk about but we're going to iterate over a list of all the accounts we're going to um load those uh files into a desk bag and this will just pull out the lines uh we can then call a map on that bag to uh parse the the lines in this in this case we're parsing uh with json to return json objects um and then we're going to do some filtering uh pulling data out and flattening do it do a compute a mean and we're going to submit this all as a desk future and so while that's going we'll see stuff get updated on the right call progress if we call that we'll see all of those individual uh functions that are being called and then call gather to get the actual results and so that's an example of how you can use a future to do something a little bit more complicated than what we've seen before and then to delete a few uh features best features you essentially just use the del command uh keyword in python uh which will free up that memory on the cluster question about um so does the client cache previous results for instance running client.submit inc one immediately finishes the second time that's gonna run yeah we saw that earlier when i got confused um so if i if i hit submit up here whenever i was submitting uh futures more than once if it was already returned we'll actually just get the results so client.submit would do that here for example notice that's finished both the function and the inputs to the function to get some token on the future and it will check that um to see if you've already submitted a task with that same function and those same inputs before and uh if you if you have already submitted that task and it's uh uh it won't resubmit that task they'll say oh that's already been submitted whenever you ask for the result later it will just pull that out so it won't resubmit that same task if the function and the inputs are still the same all right i'm gonna move on for the questions so we're going to cover uh persist here uh so go ahead and run the the prep dot pi to generate random data in this case uh we're going to load that data um using hdf5 we're gonna load into a dash array using the desk array interface and we're going to uh perform a sum uh we're actually gonna do it twice because we want to demonstrate uh you know the fact that we're pulling this data into memory uh whether it's on a distributed cluster or locally here uh each time we call that compute uh we're going to uh it's going to take around the same amount of time i'll go ahead and run it here and that's because we're we're pulling the data into memory each time to do the computation so we got six seconds should see around that six or seven yeah so it's more or less the same amount of time because each time that we call compute we have to pull the data back out from the disk here whenever we run through all these computations in the test graph instead if we wanted to we could uh persist the uh the data once we've loaded it into uh the cluster or in this case on ram so i'll do that now okay you can see the amount of uh bytes stored on my cluster and if i call compute the first time it should take just as long and the second time should be much faster it's a bit faster i've noticed that sometimes it's not as fast as i'd like it to be if you play around with the chunks actually um you can get that to work but this is faster so that's good all right so quick exercise now that i've i've persisted uh the data that i've loaded from from the file onto my dash cluster how do i release it so i'll give you a few moments to think about that if you want to post your answers in the chat we can try it out but i can't see the chat i don't know why oh yeah there there are some uh answers being posted in the chat uh lots of people are proposing dell x someone else posted uh future.release like that uh yep so i'm gonna close that so it actually is the el x in this case you can see the byte story goes down there we go yeah so um just to kind of piggyback on on that right there uh the distributed scheduler uh implements garbage collection for all of everything that lives on the cluster so whenever there are no more references to a particular future it will then be garbage collected and cleaned up on the cluster so here there is this x object in a local python session that mike created and it was on lived on the cluster did some computation uh then when he did dell x there were now no longer any references in the current python session to that variable so that which is a future so then that future got garbage collected and deleted from the cluster all right so moving on one of the benefits of this interface is it offers asynchronous computation so unlike the delayed interface uh as james was alluding to there um whenever we submit a function and return a feature from the das client notice how we were getting the status as we as we were polled that future object essentially we can get status on the computation as it's being performed on the remote cluster we can also have results returned as the futures result resolve and actually get results and do things with those so in this case we're going to demo a feature we're going to submit using this rosenbrock function to return points and scores um i'm not getting the details of that function i'm sure some of you know that but in this case this is the function that we're going to submit this is kind of equivalent to our ink and add and decrement functions from before in this case we're going to use this in conjunction with a bokeh visualization so i'm going to load that um essentially uh we've got uh some you know x and y value we've got uh a background that we'll see in a moment um and then we're going to get into our actual work so what we're going to do essentially here is we're going to start at point zero zero uh as like kind of an initial guess and as we run this resin block calculation we're going to get kind of an approximation of trying to zero in on a optimal value uh essentially and so we're going to start off with some x and y's um let me get into the plot stuff here but here is actually where we're going to submit uh the future using our function on our x's and y's and then this is another function that we haven't really talked about too much that's in dash distributed called as complete and essentially what this is going to return is the futures as they complete uh as it says and we'll be able to iterate over those uh to do something with those results uh and so we'll we'll submit uh we're gonna be submitting these features 10 at a time and as they return we'll do uh we'll do some post analysis with them so in this case we're going to grab the result which is going to contain the points and the score we're going to look to see whether or not this score is better than the previous scores we've got we're going to print it out if it is and we're also going to update the plot here with the code here and then we'll submit more work essentially to the task cluster until we kind of reach some threshold uh in which we'll break uh and kind of be done so as a redness we should see the visualization appear and we'll see it actually update as we're computing on the cluster that zeroes in on that optimal value and this uh by the way if this is not working for you uh this is the one of those um jupiter extensions the bokeh extension that we asked uh for you to install uh but we didn't get that into the instructions until later so hopefully you pulled that down early this morning or we got that working so zeroed in there on that value and it should stop in a moment we'll see our points come through here and we see that around 1 1 is that optimal space so that's an example of using futures for asynchronous computations processing results asynchronously and updating a visualization any questions on that all right so i'll move on to debugging um in the interest of time here i'm looking we're almost at two here so uh we had some questions around how to how to debug so we've got a toy example here of a division and you know what can go wrong when we divide by zero essentially we get for that ratio delayed computation uh we'll get a division by zero error so essentially we're going to do is uh zip over those values um compute that ratio and then compute the sum we're going to return this uh as a desk future so i'm going to run that and here we're using delayed as the decorator syntax rather than just wrapping the function and this should return just fine okay so we get a result of 11. but if we have bad input into this calculation so here we've got a zero in our b column and we're dividing here so we will get a division by zero error and so yeah we get a trace back division by zero um so you you can see where the exception comes from in the traceback uh fairly fairly reasonable here so this is um this is essentially like what you'll get by default and what we're going to look through what we're going to look for as we go through these different scenarios below so in this case this is the exception we got it but you don't always get this so say here um i'm going to run with a process scheduler and we get this kind of weird cryptic error message oh sorry process i actually get a nice error message that tells us what value all right and so since i'm on the local scheduler i'll get um i'll get a nice error message but if i was on distributed scheduler i would not get that so we're calling compute here on the scheduler because we want that one and so you would want to essentially set this uh so you can get the error message to debug another option uh in that scenario let me rerun that to make sure that that works sync okay you can use the magic command to get a debugger which is really nice so we can actually look at b here and so one of the problems here is if i'm running you can imagine if i'm running this on a distributed cluster you know we have a large computation you don't want to run the whole thing locally obviously because it won't work in the first place so one of the things you can do is we have on the client this recreate error locally method which will give us that tree stack and we can use the postmortem debugger here too and then if all else fails we can actually use the client to inspect the scheduling and the tasks uh and they're all you can look at the docs there are all kinds of um there's all kinds of information that you can pull from this but here what i'm going to do essentially is i'm i'm going to loop over my scheduler's tasks and i'm going to look to see if there's an exception and then just print the state as well as the key which will be the name of the task and so we can see that ratio error so maybe there's some esoteric error you can dig down and figure out what task is failing and then start to inspect that you can do other things you can run run arbitrary tasks on the workers or on the scheduler to try to see what's going on there as well but there are some really good docs on that so that's um most of the advanced distributed notebook uh are there any questions before we move on to machine learning we only have 10 minutes question although i wasn't quite sure what was meant um is the memory taken when client with client.submit until we gather until we use client.gather um yeah i i think what you're uh feel free to update the uh question or respond in the chat if i'm not answering your actual question but um but what i think you're asking is is memory taken up when you submit something until you call gather i guess i should say what does happen is when you submit a task um it'll get submitted to the scheduler that then gets farmed out to a worker whenever it's ready and that result will stay on that worker until it's garbage collected so you can call client.gather and that'll transfer the results like the actual say you do some computation the result is five and it'll it'll then when you when you do client.gather it'll then transfer that result in this case into your five to your local python session but that future is still going to live on the worker until um references go away so you need to delete all the futures that are associated with that like that particular future to free up the workers space i think that's what was being asked in that question feel free to update and that's somewhat tricky because you may have multiple references to that same future in your notebook um or your code somewhere yeah so it's tricky in the same way that um uh like garbage collection and python it can be tricky sometimes all right sometime i'm gonna switch to the machine learning notebook oh nice i can do chrome tab let's try this all right hopefully you see the machine notebook [Music] could you bump up the font one one unit looks real good thanks all right great so we've talked about um that's delayed that's data frame desk array we've talked about how the the features interface um uh what we're going to talk about now real quick is another kind of high level uh library called vasquml for parallel distributed machine learning and this essentially is following the uh in many cases you can think of it as a scikit-learn for distributed machine learning and so there are many estimators and routines that have implemented scikit-learn that have been re-implemented for desk objects there's also there's also a way whether it's a algorithm a distributed algorithm implementation or whether it's just uh a scikit-learn-like implementation that operates on desk objects like desk arrays um and so there are kind of two different types of scaling that we want to talk about here this is uh a great uh diagram created by tom osberger um uh that talks about the two different problems that we're dealing with here so we're typically bound in many computing paradigms and especially machine learning now by cpu bound or ram-bound problems uh so sometimes uh our data will not fit into memory and that's kind of the classic case we've been dealing with here with desk and so up until this dotted line here as long as our data fits into memory we can typically use scikit-learn and then along the the y-axis here we're bound by the cpu and so say we have uh all of our data will fit into memory but we have lots of computations from forward cpu bound uh we want to do parallel computations like parallel training or hyper parameter optimizations we can use uh dash ml for that um and then of course there's kind of this uh area where where cpu bound and memory bound um will definitely some form of distributed computing there so i'm going to go through scikit-learn really quick just to familiarize ourselves so second one has a really nice consistent api has these estimators that all implement expected methods like fit and predict and score they also have kind of standard attributes as well there's a we're just gonna use the the make classification uh data generator inside kit learn we're gonna generate uh ten thousand samples and four features i can't really see that i'm gonna move this over a bit um so essentially what we have here is a multi-dimension a two-dimensional uh array uh with four columns uh in ten thousand rows we also have some y's so we'll look at those that's gonna be our labels essentially and now what we'll do is we'll use those x's and y's to fit a support vector classifier and so here we create the estimator object initialize with the random state and then call fit on that returns our fit our estimator that has been fitted and we can look at the support vectors we can also check the scores let's see and check the the accuracy essentially of our of our estimator that we've just trained so that's what we're doing in a nutshell here with second learn um let's talk a little bit about hyperparameters uh these are uh variables um that are uh initialized with the estimator they generally uh don't they don't change uh for the the training data um so here we're setting a couple of uh hyperparameters in the svc and just call in the same functions we have before we've set a really uh really tiny c value here and we'll see that whatever we score are um our score is much worse because of that so what this illustrates is by setting these hyper parameters uh we can affect the accuracy of the model and so these parameters become very important often you'll want to do what's called parameter optimization where you're experimenting with different values for these hyper parameters and then training and doing and doing the scoring and trying to figure out um which set of high parameters will give you the best score uh and so this is an example of a um hyper parameter optimization called research cpe in scikit-learn uh and we'll just run that and so essentially we're gonna build up a parameter grid of two by two values so four values total and we're gonna it's the cv value here is just going to tell us how many um uh how many estimators to train and so you can see that that takes some time to compute 22 seconds but you'll notice that it is you know we look can look at our graph here this is a pretty trivializable uh tree lies to paralyze amount of work that we're going to do socket learn has an internal parallel training mechanism based on job live and so by specifying the number of jobs here we can actually train in parallel to do our parameter optimization see so we got 13 seconds a little bit faster than 22 about half so by by using the job load parallelism uh the jobless parallelized library we can uh we can train uh concurrently there turns out that job lib supports uh desk as well for um for uh executing concurrent training so i'm gonna create client here real quick let everything connect up so now what i'm going to do is train on a slightly different grid to prograde search uh too much but just a heads up that there's we got a two minute warning yep this is uh this is the last family i'm actually not gonna run this last cell perfect so i'm gonna run with the java parallel back in context uh pass uh pass it forever uh letting letting it know that we're gonna use das um this scatter was something that was required at one point i don't think it's required anymore though i'm not sure but now we're going to train using joblib in parallel but we're going to use das as our back end and this allows us to train uh or to to perform our hyper parameter optimization on um a larger grid essentially i didn't comment some of those that i could have but i'm just running locally it would take too long and in the interest of time so that is ml in a nutshell real quick so stop talking thanks so much mike um and yeah just kind of wrap up thanks everyone for attending today we really appreciate your like we said before time and your attention during the tutorial um we know this long one um we can move the conversation to slack if there are any follow-up questions um otherwise this will be available later for reviewing all right um yeah just a final note as the host um thank you guys for a great session uh and a note for attendees so the next uh thing is at uh 1 45 central time uh the welcome stuff so um be there for that and uh thanks very much for attending thank you guys for presenting thanks right everyone so long
Info
Channel: Enthought
Views: 14,785
Rating: undefined out of 5
Keywords: python, dask, parallel computing
Id: EybGGLbLipI
Channel Id: undefined
Length: 228min 36sec (13716 seconds)
Published: Sat Jul 18 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.