Exceptions are the Norm: Dealing with Bad Actors in ETL: Spark Summit East talk by Sameer Agarwal

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
[Applause] thanks a lot Jew sir good morning everyone today I'm going to talk about building ETL pipelines with apache spark and more specifically I'll focus on two aspects of ETL in this talk first of all I start with talking about what exactly is an ETL pipeline and how is it different from a normal query execution pipeline and then second I'm going to talk about some of the existing and upcoming features in a packaged spark that are really useful for building these end-to-end ETL pipelines so let me start off a brief introduction about me I'm a software engineer at data brakes where I focus on spark coal and Fox equal I received my PhD in databases from UC Berkeley and lab where my research primarily focused on green TV which is an approximate query processing engine that was built in spark so here is an overview of the stock as I mention first of all I'll start off my talk by talking about what exactly is an ETL pipeline and in this context we are going to talk about how this pipeline is different from a normal query execution pipeline second we are going to see how we can leverage Fox equal for doing ETL and in this context we'll be mostly focusing on two things how does Fox equal allow or help you to deal with dirty data and in this context dirty data might be data with just bad records or missing files and so on and so forth and second we are going to talk about how can we build performant ETL pipelines using some of the state-of-art features around Apache spark more specifically project tungsten and hosted cogeneration and finally I will conclude the talk by talking about some new features that are going to big array in Apache spark one 2.2 and 2.3 that focus on building these ETL friendly pipelines so here let me kind of like start the start by a brief overview of what exactly the data pipeline so from a high-level a data pipeline is nothing but a sequence of transformations on data here the source data is typically semi-structured or it could be even unstructured so examples might include JSON CSV or even a text log that you trying to process the output data on the other hand is structured in some ways and it's ready to be used by analysts and data scientists and last but not the least the source and the definition in these pipelines are often on different storage systems so here is an example of a data pipeline in this specific case we have data that's basically coming from some logs or maybe a Kafka stream and it's being ingested into a cloud warehouse from this this data might actually fed into a database which might then be accessed by a wide range of users we can also have intok grady's directly on this cloud warehouse similarly we can also pre compute some aggregates and these aggregates then can be served by some reporting application in terms of metrics and that's but not the least you can also create a machine learning model serve it and apply it back to the cloud warehouse so this is just one such example of many many data pipelines that we can build now ETL is essentially the first step in building these data pipelines it stands for extract transform and load and the goal of the ETL process is to essentially clean and curate this data it has three steps first we will try to retrieve the data from the source and this roughly match to the extract phase we then transform the data into a more consumable format and this essentially is the transform phase and we finally transmit this data to bouncing consumers and this is essentially the load phase in the ETL pipeline so it is an example for an ETL query in SPARC so in this specific case Park is trying to read CSV data so you can do something like Spock but read the CSV and give it a part that contains a list of CSV files so this roughly maps to the extract phase in the ETL you can then filter and aggregate this data in SPARC and this essentially is the transform phase and finally you can write this data to a file format like Bar K which roughly maps the load phase in the ETL now while this might look like just any other query in SPARC there are a number of reasons why queries like this actually deserve special attention so let's see by queries like this that the one that I just showed you are really hard to implement and execute in practice now there are a number of reasons why by what makes ETL or queries like that hard first of all if we look at the source data in that case it was a bunch of CSV files the source data generally is coming from a wide variety of sources and they might be disparate so the underlying data can actually be quite messy in practice the source data might actually contain a lot of incomplete information and to make matters worse different sources of data might have different notions of how they represent incomplete information so for example maybe some data if there is a missing field in some data they might be represented as an empty string some other source might just tag that data as none some other yet another source might tag that data is missing and so on and so forth now since this data is coming from a wide variety of sources this data can also be quite inconsistent across them and basically this means that data conversion data conversion and type validation in many cases is very very error-prone while building these pipelines an example of this might be that you might be expecting an integer but what you get is a string that wraps that integer another example is essentially you might be expecting a date in ddmm by by format whereas what you get is a date in a different format a year another example might be things like CSV data in which you might be expecting five fields but the underlying data that you've read actually does not have five fields so all in all the data can be pretty messy and inconsistent when you are trying to deal with these queries third if you're dealing with streaming data or data that's constantly arriving you are basically facing another whole new class of problem so in this case you certainly have to care about at least once or exactly one semantics you might also have to care about for tolerance of the underlying system and you might also have to care about scalability how can the system scale up and scale down depending on the ingress rate of the data and finally to make matters really worse the data itself that you are trying to read an extract can be quite complex and practice so for example it's not very uncommon to have deep mr. data in case of this on that needs to be extracted and flattened out and together with all the problems that we talked about if you are dealing with complex data all amounts of inconsistencies all amounts of like missing data makes these problems even more worse not surprisingly the things that make ETL hard are exactly what makes ETL important as well in a data pipeline you want ETL to be the first step so that you can actually clean this data remove all the messiness so that the end consumers or the various consumers of this data pipeline now don't have to deal with this messiness and complexity so on one hand we see that executing queries these ETL queries is extremely hard and on the other hand if we look at many general-purpose query execution engines we see that this offer very little support for executing these ETL queries so if you look at any general-purpose query execution engine it's designed in such a way that the moment you try to encounter a bad record or a corrupt data it just fails the query and essentially these are not the same as transient errors the next time you execute the job it will encounter the same record and it will just simply fail the query again so all in all there is no force of recovery in such cases second a general-purpose execution engine actually does not offer a lot of support around common ETL features so think just like support for a wide variety of file formats if you are trying to read like different file formats or you are trying to convert from a one file format to another have gaps in practice then you might actually lack a number of features that makes ETL queries useful which is around multi-line JSON support the date conversion things that mention and so on and so forth and finally since an arbitrary query processing engine knows very little about the data that you are trying to process and the query or the extraction or the load phase of the transformation that you are trying to do on the data it makes it very difficult to actually build performant into an pipeline so in the rest of my talk I'll try to convince you that sparks flexible API is support for a wide variety of data sources and the state-of-the-art function execution engine makes it a great framework for building your into an ETL pipeline so let's see how we can leverage spark sequel for building these ETL pipelines so let's actually go back to the original query we had you can actually write these ETL craze a simple example of this can be as follows here spark is trying to read some CSV data trying to do some filter or aggregates on the other light data and then it's trying to write it as 4k now when spark basically sends this query to the driver the driver then distributes this query to a wide variety of workers each worker in this case is responsible for reading one or more many files that that's basically in this path now one of the biggest reasons why queries like this actually fail in practice during the ETL fails is then many of these files might just be missing or can be corrupt so in order to get around this problem spark actually allows you to explicitly ignore any such corrupt files by setting this flag sparks or sequel to files or ignore files to true and if this file flag is set to true the spark job should actually continue to run even when it encounters some corrupt or non-existent fire and the contents or the output of this job would still be returned with whatever files are valid and have been read and it will just simply ignore the files that were corrupt on a more fine-grained level the other problem is in reading the ETL data as we talked about is when parts of the source data and missing or corrupt so in this case your entire file might actually be valid but certain records might just be malformed or corrupt so in order to get around this problem spot explicitly supports three different passwords for parsing or reading JSON and CSV data the primitive mode as the name implies Christ is best read as much data as possible and tries to basically like get around the problem and informing the user in case it cannot read the data the rapp got malformed mode on the other hand can silently drop all the corrupt and malformed recalls from your query so that you just have to deal the data that's normally better structured and finally the field fast mode as the name implies here's the query as soon as it encounters a bad card and the exception basically tells you which exact record is bad and what can you do about it so let's look at each of these mode in context of JSON and CSV data as we go along so in this example I have a JSON file it has three rows and three attributes so a B and C and as you can see the first and the third rows are actually valid JSON whereas the second row is malformed or corrupt so now if we try to read this data using the primitive mode in spark it basically creates a table with not three but four columns the fourth column which is the corrupt record column as you can see essentially contains all the non possible columns or the records that have been queried so in this specific case spark essentially would go on the second record as it cannot parse it as a valid JSON file it would just store it in the corrupt record column and then it's up to the user to actually either fix the column or fix the row or the input data or even like filter it out depending on what the use case is by simply querying this table and corrupt record column just like any other table by the way you can also configure the name of this column using a simple spocs equal slash it just called spot the sequel column name of corrupt record the drop malformed mode on the other hand would essentially drop the second row from the output data so in this case you would just have an output with the first and the third row and finally if you execute or try to read this JSON record using the field path mode we would actually just throw an exception as shown it would tell you what line it basically failed to pass on so in this specific case we try to read the second line and it basically failed to parse and it will throw a malformed exception in spark so depending on the use case we might actually use any of these partial modes to read and extract the ETL data so just I'll walk quickly through the CSV equivalent as well so in this specific case we have a CSV data in which case we have five columns here makes model comment and Bank it has three rules the first and the second row are somewhat valid whereas the third rule actually has only three values instead of five so in this case if you try to read with the permissive mode permissive mode would try to read as many rows as possible from this data so in this case it would automatically put nulls for the third row in the comment and the blank column for the drop manner mode it would just drop the last row and finally for the field first mode it would just feel with an exception explaining which row it basically failed on so in this case it will just filled with an exception saying that it cannot parse the third row so this was what sparked supports right now in dealing with dirty data and missing files and corrupt files now in order to build performant end-to-end ETL pipelines the other aspect of building these pipelines is actually to get or derive the maximum performance in from them so in order to actually build these pipelines in a performant way the last three releases of spark has been focusing on what we call as project function with the goal to substantially improve the memory and CPU efficiency of spots back in execution and try to push the performance closer to the limits of modern hardware so if you look at the architecture diagram function execution engine lies right at the bottom of the SPARC stack so any query that's written in sequel Python are streaming any of the ETL queries would basically get converted into a data frame which in turn gets executed over the transfer institution engine so as part of the function project we essentially had like a number of performance improvements as part of two phases in the phase one of this project which essentially encompasses around X part 1.5 and 1.6 releases we laid the foundation of this project by working on better memory management algorithms we also laid foundation to support this runtime code generation and then we also implemented some cache where algorithms that could take advantage of these more than Hardware properties as part of phase 2 we really work on two key things first whole state code generation and say vectorization that was able to basically make sparks performance by almost an order of magnitude for a number of operators so I would not actually not go too much into the details of sparks performance and essentially the whole execution engine but I'll be talking about this in a talk later today at the end of the day actually when I will be talking about sparks equal and how it actually compiles queries from like just sequel queries to rdd's and this talk would be at 5:40 p.m. and in that talk I'll talk about catalysts and the function execution engine it's much more technical detail but let me give a brief overview of what this performance related work actually gave us in smart so here what I have is some kind of operator benchmarks or the cost per row or cost of processing each row in each of these operators in nanoseconds and the primitives I have is various operators in spots that are pretty common in ETL so you have filters aggregates joints maybe sorts and things like that and this compares two things it compares part 1.6 which is a older release of spark to spark 2.0 which is a considerably newer release of Park and contains or all our improvements around whole circle generation and vectorization so as you can see from the slides for a lot of these operators filters aggregates hash joints we were able to almost obtain around five to thirty times speed ups between spark 1.6 and 2.0 for things like sort we were able to implement radix sort algorithms that give us around 10 to 100 times speed ups between these two versions of spark for things like sophomore join shuffle still remains the bottling and will continue to work on mitigating this bottleneck but for now we don't see a massive speed-up in these cases and finally for things like party decoding or essentially trying to read party data we were able to again get speed ups of about ten times because of a custom vectorized vida parka in a parka vectorized either implementation inspired so the high-level takeaway from these slides is essentially that due to the ongoing work in spark and the whole efforts are on the Thompson kitchen engine it makes it really optimal execution framework to build your ETL pipelines and again like this is just one slide that talks about a real-world benchmark in this case it's the TPC gets benchmark to the scale factor of fifteen hundred and hundred goals and this again talks about how SPARC 2.0 is roughly two to three times faster than SPARC 1.6 and previous releases so the x-axis here is then like number of TPC DX queries that we can execute in SPARC 1.6 and the y-axis here is the query time so each peer in this graph actually represents a query execution in 1.6 and 2.0 and as you can see the blue bars which are SPARC 2.0 times are considerably lower as compared to 1.6 so we have not only worked on improving the per operator cost but these as you can see it translate very well into like full-fledged queries which includes these ETL pipelines so next let asthma they talk about any screen note yesterday let's try to see what are some of the upcoming features in spark sequel especially in 2.2 and 2.3 releases that focus on building these ETL friendly pipelines so let me try to give a sneak preview of a number of features that I'm most excited about then we build these pipelines so from a high level these features can actually be divided into three categories first we would like to provide some better functionality while you want to build these pipelines so in this case things might include better JSON and CSV support when you're trying to read this data second there is a whole lot of work being done around usability and this includes tweaking a large number of exception traces so that you can generate better error message to the user so that you can exactly know which column or which record is actually faulty and then we can do something about it and finally we are always focusing on better performance so in this case here focusing on a better performance mode performance equal execution engine and we are also working on better support for Python UDF processing so just I'll spend like a little bit time on each of these three bullet points as we go along so in terms of functionality pop currently can only read json data one line at a time and this means that every time you want to read some kind of multi-line json it is choirs custom ETL but as part of quark 2.2 we are working on this spot 1 8 3 5 2 which includes native multi-line JSON support in spark and in this case you would just be able to point spark to any JSON path and just use this whole file option and set it to true next we are also investing a lot of time and energy in supporting higher-order functions in spark sequel for manipulating deeply nested data and in this case you can have operations like map filter reduce on complex data types you can think of arrays and maps that can just be manipulated in sequel or data field so as an example consider a table in which you have a key that's a long type and values which is an array of long so in this case if you want to actually manipulate this data in this case let's say hypothetically I would like to append the key to the list of values it's really complex to write these queries such that you can actually write them in a more performant fashion in Spock 2.3 and beyond once we add these higher-order functions in sequel we would be able to essentially write transformations or map functions like this so in this specific case if I am trying to basically append this value of P to the list of values we can just essentially do something like select key transform which is essentially a map function which takes in this values or the second column in this table and then it maps these values and appends this key element to these values so as I mentioned these operations might actually include these transformations or filters or reduce operations and things like that next we are also working on a much more improved and performant CSV data source and in improvements in this class of like line actually might include multi-line CSV support it can include a wide range of options for additional like CSV parsing and finally a whole text feature for data frames and again this would be released in SPARC 2.2 which would be code please later this month and finally we are always working on better from a functionality standpoint there is always a need for better ETL support and this is actually a wide range of Judah's and PRS that are basically going into this effort and the goal here is to actually have more fine-grained tolerance or record level tolerance to errors so we saw some examples of it with these like new partial mode but we are trying to extend it further in order to provide users with the control on how to handle these errors so as you already saw with the corrupt column you could actually ignore and report these errors forced some way the scan your data you basically put all the non possible because in that column and then you can report these errors to back to the user but we can also add some more support for example in this case we are thinking of adding a support to ignore bad rows up to a certain number of percentage and you are trying to read this data so this again would be available in spot 2.2 and finally from a usability perspective as I mentioned before we are continuously tweaking a large number of exception traces to generate better error messages and the key idea behind this effort is to make sure that sparking nodes are not just throw an exception to you in case something goes wrong but also try to explain why the underlying data is bad and this is especially true for data conversions where many of you when you try to read some data might actually see some exceptions like this a match error when your data type does not actually match the schema type and this exception itself is actually pretty useless because it does not really tell you which row or which column of your data is actually failing and how this exception actually maps to your underlying data so as part of this and again like this is an ongoing effort because there are a lot of like exception call traces that needs to be qe3 we are trying to essentially augment these exceptions with better failure messages around what roles and what columns can actually be at fault and finally if we talk about performance two major things that I'm excited about the sequel execution engine in itself has three kind of like really exciting initiatives that are going to make its main SPARC 2.2 and 2.3 first of all as you might have heard the talk yesterday spot one six zero two six add support for a Costas optimizer in catalyst which essentially will be able to leverage table and column level statistics to optimize joints and aggregates examples might include just basically picking the right joint type or joint reordering and things like that by figuring out the cardinality of many of these operators and as part of this in spark 2.1 the community has already integrated the statistics collection framework and then there has been efforts and considerable efforts in the direction of integrating the courses optimizer in spock 2.2 second we have also been working on boosting sparks performance on many core machines or on single node machines and as part of it we are working on an in-memory single node shuffle and last but not the least there is also a massive efforts in improving the quality of the generated code so in spark and this includes better integration with in-memory column formatting sparks so all in all a wide variety of improvements around the sequel execution engine are going to make its way in spock 2.2 and 2.3 and finally python UDF Python as you know is the most popular language around doing GTL and in a way python ureas are often used to express these elaborate data conversions and transformations in your ETL pipeline which means that any improvements to python UDF processing will ultimately improve ETL and as part of this inspark 2.2 or 2.0 and 2.1 we have been actually working on adding a lot of support around python using chaining spilling and things like that but I'm actually not going to talk a lot about it because the next talk in this session would focus precisely on that in fact this would probably talk about how you can improve Python and sparks performance and interoperability and talk a lot about like these piping areas and performance implications around it so just a recap of my talk in this talk I focus mostly on three things first we talked about what exactly is an ETL pipeline and how is it different from a regular query equation pipeline unlike a regular aquaria christian pipeline which is basically designed to finish from the start to N and easier pipeline essentially is designed to work well with exceptions or corrupted data so in that case it's very different in terms of an execution engine when you try to educate that pipeline then we talked about how spot sequel is actually can actually be leveraged to build these ETL pipelines and in this context we talked about two things the support for Spock sequel in dealing with dirty data such as missing files and bad records and then we talked about how spark sequel 2.0 and beyond actually devoting massive efforts in around this project tungsten which is essentially helping us build the next generation performant execution engine and finally we kind of like did a sneak preview of many upcoming features in Spock 2.2 and 2.3 that focus on building these end-to-end ETL friendly pipelines with that I will conclude my talk and I'll be very happy to take any questions Thank You Samara for the wonderful insights we'll think about how to get rid of all the ETL exceptions do we have any questions we have two mics on either side of the row can you please proceed to the mic and ask you a question we have about a couple of minutes so let's keep the questions short great dark I would like you to elaborate more on the boosting performances on the single no machine sure so there are like a bunch of bottlenecks around like single nodes one of the things that essentially is like the bottleneck that we identified in the short term was if you're trying to do like single node shuffle so this was actually the slide that I was talking about and normally like if you're trying to do the shuffle across a single node we saw that the in-memory access between the shuffles quickly became a bottleneck so in this specific case we are trying to actually work in designing like the shuffle operator that can work well with single nodes and in order to know more about the technical details of what is being done I would actually refer to chiffon whose talk like in Spock summit Europe last year she found work with us at data base last summer in which she actually both been optimizing these like shuffle based operators so all in all I mean there is a number of efforts that are going around like the shuffle the other thing that's going around there is like we are also exploring some data structures that can actually be multi-threaded and they can actually take advantage of these multi-core processors in a much more performant way but this is still like something that is being done anybody else yeah go ahead this is a great talk so I have a quick question like in our ETL pipeline what is happening is we have lot of data sources where we pull data from rest of rest api is FTPS and other places so right now what we are doing is we pull all the data using a single threaded application and then we read it as a path text file or a data frame so is there any thing in future like where we are coming up with the operators where we can directly talk to REST API or FTP or soap up file Oh so the question is like if SPARC actually can have like support for directly talking to or pulling data from FTP I think there are like some third-party connectors and SWAT that can actually pull this data but I am NOT like completely aware of like I mean what's the state of art there right now but I mean essentially there are definitely connectors that can actually pull this data from like these third-party data sources okay Inc okay hello great dog have you ever thought of using fine-grained validation on specific attributes I know it's a bit difficult to use it on data frames but what about our disease now can you get a driver of what you mean by fine-grained validation like if it the type of attributes are for example let's say that I have a CSV we see attributes and upon the first attribute to be a string the second numeric in the third date something like that yeah so basically in many of these cases you can actually provide a schema that you would want to pass this details and in that case I mean spark would automatically validate and try to throw an error in case that schema does not match unfortunately the state of art for such errors is actually not very good so the Scala matcher that it's part about like is one such exception that spark will show if your attribute type does not actually match with the underlying data type so as part of the usability effort we are trying to improve error messages like this and in this case like Spock would be able to tell you exactly which record and which problem is actually at fault okay we're out of time so please give a big hand to samay for his wonderful talk thank you
Info
Channel: Spark Summit
Views: 7,972
Rating: 4.9039998 out of 5
Keywords: apache spark, spark summit east
Id: kweGNpPl_G0
Channel Id: undefined
Length: 31min 27sec (1887 seconds)
Published: Mon Feb 13 2017
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.