Best Practices for running PySpark

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hello everyone please join me in welcoming Juliet a data scientist from cloud era who is also a maintainer of sparkling pandas hear me all right good level not very responsive I'm not actually gonna pull you too much but I really like a little bit of audience interaction so I have a question I'm a few questions who has a right hand yeah yeah who has a left hand who uses spark who uses Scala spark who uses PI spark who wants to learn how to use PI spark better awesome great this gives me lots of good information kind of what I expected today so today obviously I'm talking about PI spark best practices I'm a data scientist at Cloudera and part of my job is to work with customers and to make sure that customers are successful with whatever tools that they choose so when I go to them they are jazzed about spark they want to use it they think it's going to be a wonderful useful API that will really let them get at huge amounts of data when I talk to their teams I would say in a team of 10 the manager will say we're gonna use PI spark and seven out of 10 will be very into that and there's three that want to learn Scala and for those you know I set them up help them all the way but from my perspective I'm used to using Scala SPARC at least initially when I joined cloud era and so I realized seeing this pattern over and over again the demand for Python and PI spark that I needed to start learning and using PI spark a lot more with customers and trying to help them be as successful as possible so usually what I do is something simple with their data to give them an example of how to use PI spark so I was working with a retail customer where that a huge number of stores and a huge number of items and they want to understand on a per store item basis when there's outliers and their sales and so the sort of examples you'll see sort of strumming through this talk are based on that particular use case I got a good view of my cost there so all the examples you'll see are based on my particular use case and what I'm going to talk about first is how Pais park works how it all fits together so I can motivate what I think best practices for Pais Park are so allow me a mild digression into explain how SPARC works and how Pais park works and then I'll move on to best practices which I think what you're all here for so you know we're using spark we get a spark context and we want to do something like readin data from a file do some processing on it so in this case I had a file that had keys that were store ID item ID and values that were the number of items sold in that day and so I created a method that takes all of these oh I guess this one I had already aggregated all the key value pairs together Maps over them and creates a serious so a panda's serious and then that Python users are used to using does an operation on that Panda serious I have a model that declares whether or not that the series has an outlier in it and then I count how many outliers exist so what SPARC is doing is that it reads in a file each of these partitions gets read in as a section of your RDD and we have a resilient distributed data set which is our initially read in text file you map over we apply another function filter remove all the non outlier time series and then we count them so to begin to really get at wipe iceberg is a different we need to understand the general operational model of spark and then how Python gets attached to it so overall I think we're probably familiar with this diagram and I've seen it over over again in this conference but one more time so and pi spark inside of our driver contacts we have a roads tie to our driver program we got a spark context and that spark context allows us to describe operations when did you on rdd's these get compiled into a dag which get translated into stages which then get executed on the worker nodes and so the worker nodes have executor x' in them each of these executives and processing multiple tasks which ten of correspond to partitions of rdd's so all of the blue boxes here I like to imagine as JVMs this is where a lot of the working spark actually gets executed so we add PI spark into the mix and we're not writing any code in Java and of course we're executing Python on the cluster so how does that change things well on the local side in the driver node we have a python spark context we get the same thing we're inside of our Python script or Python class we have the spark context that we can create our tea views from all of the operations that we describe has happening on those rdd's in our Python code get translated into operations on an actual Java spark context and so the way of the communication between the Python process that's running your Python Java driver program and the JVM that's executing the driver part of the spark program works is that it uses a program called PI for J so PI for J allows you to operate on Java objects from Python so you can say org dot Apache dot spark but spark context like so under the hood or gadot Apache dot sparked out spark context when we creating a spark context in some associated JVM so that's happening on your sort of local driver note on the cluster it's a little less transparent so what's happening is that you still have Java executors these Java executors are still operating on tasks and the way that the RDD is represented to the Java executors is in the most generic way possible you have an RDD of and so somewhere along the line we need to have to know how to translate these Python objects that we actually read our code around into bytes so looking at my example again I'm mapping over my RTD and applying this Python function called to series and a plant and pine another filter function has outliers in order to actually apply these on the cluster we need to know how to both translate the data that we have inside of our sort of Python description into bytes and how to transfer transform the functions that we need to apply into something that's serializable and appliable so not only the functions we actually need the total function closures so what this means is that or at least the way that I prefer to explain function closures is that a function closure is a description of what you're going to do to that data and all of the context around it that you need to apply and so this ends up becoming important because when we do serialization of functions we need to figure out sort of automatically we meaning spark under the hood you need to figure out automatically what that total function closure actually is so that it can be applied and serialized and shipped off so how do we ship around these functions we do something called pickling high high spark actually uses a library called the cloud pickle which was developed maybe by people who had previously worked at Berkeley this is actually a kind of find the fun side note cloud pickle is GPL whereas SPARC is Apache but if you go and look at the source code you can see that your received california-berkeley if it allowed them to use cloud pickle inside of spark so open source working together moving forward so we need to be able to pickle our functions so what really happens here and I don't mean this for this diagram to be misleading there's not pickling happening over the network but pipeline happens in this process that's not something we need to be really conscientious about when we're writing fine coat let me make sure that the code that we write is serializable so we now have a pretty general idea of how high spark works and how it relates to the general functioning of spark and I can get into being very opinionated about what I think best practices are so first of all ripples in notebooks I like ripples and it looks quite a lot and I think there are a really beautiful way to present information and begin to investigate but I like to use standard tools for source control and for sharing information across our organization and a lot of the organizations I go and I talk to you actually have large teams that want to use PI spark and do somewhat complex machine learning analytic tasks and so my my recommendation is use ruffles and notebooks for exploration and for presentation for when you need to describe the mathematical analysis that you did and the results that you had but when you need to be able to share functions or feature ization steps across an organization I think packaging those up ends up being very important you need to be able to share your code kind of like a delicious pizza so what does the standard Python project look like my awesome module hasn't served its script in in I can only have one module defined here but I kind of multiple some sort of bin directory there's a Doc's directory there's a set up top I file which allows the Python project to be installed using scripts standard tools like easy install or pip and tests and this is another reason that I find it very useful and I'm going to sort of like hammer Allen again in this talk is that testing your PI spark code both unit tests and integration tests is much easier than debugging distributed PI smart code so to understand how we might want to modify this standard Python project into a PI spark job or PI spark project I think we need to think and talk about what the shape of a PI spark job is what are sort of the repeated patterns that we see when people want to do complex machine learning or if it complex statistical models to the data that they have so for the problems that I work on with customers I've sort of found of repeated sections that we see and these are kind of generic sections that you see when you're doing machine learning modeling so general PI spark structure you parse some CLI arcs so that it runtime you can you know configure where your input files are or how many threads you're using for any local mode or so on you're going to read in the data so actually move from some sort of persistent data source into an RDD you're gonna turn the sort of raw data into something that's a numerical representation so for me that was that two serious function and then you're gonna do fancy maths with with spark and write out data so shout out to my colleagues in the UK for the math so what this translates into for me is that inside of my Python project inside of my module I will have a reading in the data component which I call something like data i/o a input data to something that is a numerical representation that so feature is Asian some sort of modeling and then more writing out and this sort of structure and thinking about this sort of structure makes it easier to think about how you're going to test your PI spark code and so keep this in mind when I get to the testing section and I'm gonna refer back to it as to why you might want to organize it this way so a way that I know that I've written that something that's actually testable and usable at high spark is that my main method looks really really simple if you have a lot of logic in your main method you're gonna have trouble testing it right so here like I said I'd parse some arguments I've read in data and then I analyze it I'm an analyzed method not particularly descriptive but it's at least one function that I call but it's also written somewhere else that I can go back and test so writing testable code first of all make as many functions as possible make as many small simple functions as possible this ends up being very useful for both the unit testing and for when it comes to making sure that your code is actually serializable so like I mentioned you need the function closures as determined by the cloud pickle library to be able to serialize to be able to be serialized and so the way that I tend to do this is to be incredibly pedantic so I have a class and it would be possible for me to write a function that references something else inside of that class sort of unconsciously cuz usually if you don't write a static method the first argument in your function is going to be self what if self has some sort of other object inside of it that isn't serializable well one way to make sure that you always know what the inputs to your function are is to make your function static so like I said I'm super pedantic all the functions that I'm gonna write that I apply to my rdd's I called them static and so if I run into a case where it isn't serializable I know that I can look at the objects that I create inside of that function and the objects that are passed into that function and use that to determine which one of them was not serializable in the first place writing testable code for me also means separating data input future ization and modeling and so there's two types of testing that I really think you need to be able to do to make sure ahead of time that your code is pretty likely to work you obviously need unit testing which if you're writing many small functions that you understand your unit test can be relatively straightforward and if you've separated out your data i/o from your future generation this means that you can change your inputs a testing time so when you go and run on a cluster you actually use you know your real file that's huge and out there and distribute distributed and a testing time you can use a small sample of real data which I highly encourage doing usually what I'll do in my test so they'll have sort of research resources directory that I can take a sample use that as actual somewhat realistic data and run as much of my pipeline as I can over that and then as you if you run into problems later on stage doing parsing of your data if you log the records that you had trouble parsing you can actually take those records add them back to your sample and make sure it runs on that locally again so you can fix your code sort of iteratively and improve your sample testing with real data so writing sterilizable code like I said keep your methods static and making sure the functions and their closures are serializable that's generally my technique some things are just steered not serializable you know I'll take a database connection you can't serialize a database connection and ship that to another machine I don't even think you can use it in a separate thread so if you need to do something to like you have a bunch of input records you have some sort of ID that you know you can join to another data set on and you want to be able to use some sort of database connection to do that join then you know that you need one but it's not sterilizable and I keep repeating you need to make your function serializable so how are you gonna do that you know it's not possible to serialize it but it's impossible Shh meaning it's there are other patterns that you can use and so this is actually a really common one that comes up which is I want to be able to use a database connection how do I do that and the short answer is that there's an operation called map partitions which lets you sort of define a setup phase before you begin to apply whatever function you're in applying your map and so that function can be a joy that actually uses a database connection and that database connection you set up at the very beginning of your cut of the function that you use inside of map partitions before you actually apply it more testing so whether you Scala or Python I highly recommend using spark testing base this was factored out of this was factored out of the internal spark testing facilities there's a github URL here and essentially it lets you not have to worry about setting up a spark context inside of your unit tests or integration tests and so if you had it set extend the spark testing based class each of your functions when you're doing sort of like normal nose testing patterns has a spark context that you can get out of it and begin to develop rdd's on that it also quiets down PI for J as I've mentioned PI for J earlier and when running tests I have found that if there's an exception here at failure you also get the full debug output of PI for J which is incredibly not useful the debug output for pi for J is along the lines of I got a message C and then the next line is I got a message e and if you're an internal PI for Jade developer that might be useful for me it means nothing and it usually clutters up all of the output logs that you got when we go to act and try and understand why your tests fail so simple example of a hello world pest so like I said unit test as much as possible integration test as much of your pipeline as you can and if you have a really simple main method you'll be able to integration test all of it things that you should test for our deviations of data from the standard format so if there is an exception thrown when you try to parse some record do you handle it correctly do you handle it well what sort of deviations do you expect and this is an example of why having that did IO factored out and available to switch out you're sort of like local sample is really useful because if you have in the real world when you actually deploy your application on the cluster logged where that record went wrong then you can add that back to your sample and continue on and see if you actually make your test work test we're already beads at them two partitions if you want to be very meticulous sometimes you can run into trouble with that and then last but the thing that people usually want to test first is the sort of correctness you like semantic correctness of what you intended I strongly encourage first emphasizing testing for unexpected inputs if your data is big you'll have many unexpected payments so writing PI spark is one thing running it is a completely different story and takes even more knowledge of the description of how spark works and how PI spark works together with it so writing that it easy is easy running it as hard so when you go and run some comic cluster there's two things you want to do and here emphasize logs mostly so I could put between Peaks blog lady on the slide but I also strongly encourage you to look at the spark web UI so you need to be able to axe know how to access and dig in to the results of what happens when you run on a cluster I always use yarn clusters so I have an example of here up here but whatever your cluster manager is figure out how to get it the logs I recommend quieting down pi for J you can do this in a log4j comp I believe you can also do this in your code using normal Python logging facilities and but like I said inside your group code if there's a part where you might hit a parse exception what you what I recommend doing is capturing the record that you hit the parse exception on and the logging that record so you can add it back into your tests when you look at the errors and the exceptions that you see recognize that the first few exceptions are probably more important than the later so in big complex distributed systems you can have sort of cascading failures right something fails here which causes something else to fail which down the line causes something else to fail and so I'll get questions from customers that have the last exception that was seen and when I go and look at their full logs it scroll all the way back and it's much actually easier to identify what the initial cause was and how to actually fix that and of course you're gonna get both the Python and the Java stack traces and so though it may have been thinking to yourself I will not have to learn Java I can just use Python forever you will definitely need to learn how do you breed Java stack traces and understand some of what they're saying there and you know like I said I put logs on here but the web UI is also incredibly important the spark web UI allows you to see the size of your input data the size of the shuffle both in men in memory and if it's billed to disk how much time is put on garbage collection and I recommend every time you've run something go and look at it look at the web UI and see what those statistics look like the reason that sends up being really useful is that you get a sense of what the numbers should look like and so if you see something later on we're like garbage collection is taking up a huge amount of time then that may be a good indication it's something that sort of wrong there so when using Python you really need to know the environment that you're running in and here we're working in a distributed system there's many computers and so there's actually many Python installations that are getting used there's different Python installations on each node in your cluster as well as on the driver down so you need to think about how you manage your Python dependencies and you need to actively manage your dependencies on your cluster using the default Python that's installed you can really get you into trouble if you end up wanting to change the Python version or install more libraries into the default Python and you're using something like sent to us then you can really bork an operating system by messing with the Python that it wants to use and so I really recommend finding you know a small isolated section to put to put either virtually or anaconda in in older versions of SPARC it required that the driver and the worker nodes had the same Python versions if you're working on the newer versions this part it's actually much easier and clearer when something like this goes wrong so the example that I had of finding these lot outliers in creating panda's series of course required pandas to actually be installed on the cluster as well as stats models and so I began to sort of scratching my head and think about what's the best way to manage Python dependencies and it's a somewhat complex topic I've written a blog post about it and you can find it on the how their engineering blog but the short answer is there's an environment variable that allows you to set your PI spark Python which is the path to the Python installation that you want to use on your worker notes so if you go and set up a Python installation and you have all the libraries that you need inside of it all you need to do when you run your job is make sure that that environment variable is set correctly thank you very much I appreciate your attention I think I have some time for questions question did we have time for maybe one or two so in general how do you deploy your Python code to workers do you zip it it into the eggs or something like this or maybe just a copy by SSH I'm sorry I couldn't hear you said you mean how do I was your question how do you deploy a PI spark job or how do you install Python libraries onto the cluster no just the code like you have your modules in package you know like future eyes etc and now you have to deploy it to do workers they know about that right so they're there there's two ways that you can do this if there is dependencies that you need to exist on the cluster for a large group of users you can install it already but if you're using spark submit there is a command line flag that allows you to add either a Python file or an egg file and so if you yourself has have built some sort of complex task and it's easy to distribute as a single file or as an egg file then you can pass on the command line or specify it inside of your PI spark job like on the spark context I never hey thanks for talking was very very good I think question about Python 3.4 it's officially not supported from what I heard she read at least recently would you did you ever try running it on spark and PI sparking my center point for and if yes do you have any suggestions how to take this I think I've only ever used Python to seven I believe that Python three is now supported in PI spark but I have not personally run it so if you run it I would really love to hear how it works out for you I'm very interested in hearing any more questions I have just a question have your any recommendation when you for tests in spark streaming applications have you any recommendation for testing spark streaming applications do I have any documentation recommendations recommendations recommendations for testing spark streaming Oh hmm with Python I have never tested spark streaming with Python okay I believe that's the end of the time so please join me in giving another round of applause to our speaker
Info
Channel: Spark Summit
Views: 27,723
Rating: 4.8562093 out of 5
Keywords:
Id: cpEOV0GhiHU
Channel Id: undefined
Length: 29min 41sec (1781 seconds)
Published: Thu Oct 29 2015
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.