Easy, Scalable, Fault Tolerant Stream Processing with Structured Streaming in Apache Spark

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
thank you very much miles for the flattering introduction just as a reminder this is a technical deep dive and which spans two sessions so my talk which is roughly an hour long will be spanning two sessions they'll be attend break in the middle nonetheless I will cut off at some point and continue in the next session I'm hoping that all any follow-up questions would be at the end of the second session so with a full let's start off so oh you you heard in the morning during the keynote matei and early and everyone talked about structure streaming and the how it interacts with Delta and stuff in this talk I'm going to give a kind of a overview of what structure streaming is all about and little go a little bit deep into some of the most important aspects of structure streaming and and and end with how we use structure streaming in data breaks itself especially how our customers use it a little bit overview of that as well let's start off so miles already gave an introduction of myself so I have been involved with SPARC since its beginning I was part of the ampulla where spark started Marty and I started working on spot streaming together currently I'm a software engineer at data breaks and to start with but he already talked about this that buildings improvising systems in a robust way is hard the problem to give a brief overview what Marty talked about is that the problem lies in the complexities of stuff associated around stream processing for example you have to deal with complex data you have diverse data rate for mats it can be JSON Avro bind custom binary formats your data may be coming from many different sources in in widely varying formats data can be dirty you may have corrupted data you may have laid out of order data then the workloads you want to run on the data can be pretty complex you have event time producing you want to do gaghe regressions on event time you want to do machine learning on the data you want to do interactive querying of the data that has arrived then obviously you have to interact a lot of complex different systems things like various different storage systems like file systems as three databases both sequel no sequel stream sync streaming system Lily Kafka kinases etc so a variety of different storage systems you have to interact with and reason about there's their semantics and obviously each of these systems can be distributed and therefore complex by themselves you have deal with system failures and stuff so and you still yet want to reason about what are the fall trans guarantees that you are getting despite all these complexities so the whole point of structure streaming was to make stream processing on spark the on spark sequel engine really smooth and fast so it's you don't ever think about dealing with scalability fault tolerance it's just fast out of the box you have a rich unified high level API to deal with complex data and complex workloads and you have a rich ecosystem of data sources and data sinks that are continuously growing so that you can deal with complex storage systems without and the system the engine spark itself takes care of some of the lower-level complexities and you don't have to reason about each and every different systems individually the key idea that that forced us to go back to the drawing board is after learning stuff by working with customers through using these streams for last two three years we realized that it's that you as a as a user of spark streaming should not have a reason about streaming basically spark itself should handle all the complexities and corner cases so you should be writing simple sequel like queries and spark should actually take care of handling all the complexities and just seamlessly update the answer and we always give you the correct answer despite all the complexities so to do this we build structured streaming and to give a to start with an overview let's look at what a simple structure streaming query looks like and the usual canonical example is a streaming word count so let's see how what does streaming what call quality looks like so first step let's say you are trying to read from Kafka you want to take all the records into a word count on the records which which happened to be strings so first step is you have to read from Kafka and this is the few lines I'll go into each of these details in in much more detail in rest of my talk but this is a high-level overview so first step you want to define the source of your stream you can specify one source multiple sources Union them together you can there are lot number of built in sources like files Kafka sockets and so and you can and you just specify the format and where to read from second step is actually now that you've created a data frame out of this from the source you can apply usual data frame operations like group by mapping reducing aggregations etc so in this case what we're doing is converting the whole the records into string and treating them as keys and then counting the number of values of each key and what underneath the system does is basically as you're rewriting this query the system underneath externally generates a lot internal logical plan that kind of at a high level expresses what is the computation you want to do then spark takes that logical plan and converts it to a very optimized physical plan and and here all the optimizations that we have built in to sparkle over the last three four years actually come into play and what it finally generates is a series of actual physical execution plans which keeps executing new data as it gives from Kafka so spark seek will actually takes your this very simple sequel like batch like query and converts it into a series of incremental execution plans while taking care of all the complexities of late data and stuff you don't have to worry about any of that so well the code is not completely done because you have to specify what to do with the final output word counts in this case we have to specify a sync let's say you want to or put it back to Kafka in a different topic so you specify that and then now comes the details of how you exactly wanted to be executed for example let's say I want to do it in batches of one minute so you specify that trigger interval and you want to specify that don't that output only the updated word counts every time every time there's new data only output the updated word counts and so you can specify these different output modes whether you want to update only the things that have updated or you want to write out two Kafka only the new things or all the records every time so you can specify that and then finally to for fall toriel purposes you have to specify a check point location where the sparks equal engine would write the necessary data so it can recover seamlessly from any kind of failures and so and with that again going to detail on how exactly this works you finally call start and at and what the system does underneath is that all the the series of incremental execution plans that it generates it keeps track of the exact set of cough coughs is that it each mic each batch processes and tracks it by writing them out into a right ahead log whenever there is any failure you can recover from that check pointed offsets seamlessly and it in total gives into an exactly ones guarantee both for both stateless as well as stateful stream processing and for those who have actually used the previous version of this spark streaming D streams all there are big advantages of the checkpointing that we have introduced in structure streaming because in these streams the lessons we have learned from some of the distance main point is dealing with checkpoints and recovering from checkpoints in in a seamless manner we have learned from those mistakes and we have designed checkpoints in such a way that it is always going to be forward compatible because internally we save the offsets in JSON rather than binary formats that were not compatible at in in the past and so you can always recover from previous checkpoints even if you change your query a little bit you add filters and stuff so we have designed this in a much better way than the earlier districts for those who have actually worked with these teams and under and faced those pain points and regarding performance but he already talked about this that we have shown that we because of all the sequel and tungsten engine optimizations we have worked on and continue to work on over the last three four years we get we can get more than 4x higher throughput than in the then Apache flink in a very standardized Yahoo benchmark and 4x faster performance means 4x lower cost for exits lower the chances of failures in one of nodes and spot prices failures etc so again we have published a blog post on this I highly recommend you to take a look at our blog to see how exactly we did this benchmark and you can also reproduce that benchmark yourself by running our publish notebooks on our platform so this was a high-level overview let's start digging a little bit deeper layer by layer so one of the most compelling use cases that motivated us to actually build this structure streaming design it in this way is is what is a complex streaming ETL pipeline which pretty much like 60 70 % of the streaming workloads that we have seen in our customers and in the in the wide open source community is basically doing ETL and trying to do it really in the right way and the way it has traditionally been done is that you have streams of data coming in what people do is that immediately with as soon as possible dump the raw streaming data as files as soon as possible within seconds and then do a periodic job like every six hours or every 12 hours sometimes in the best case every one hour where you take the raw file data and convert it into more structured form like tables so that it can be consumed by downstream by more structured workloads like sequel that data analytics the tools etcetera so now the problem with this approach is that for the data to be usable it takes hours for the data to be actually usable which is okay for some workloads but for a very large variety of vocalist it's not okay for example if you are doing some sort of fraud detection and stuff if it's hours before you can detect or do any analysis on the data then the the cause is already lost so it is often unacceptable or when time is really of essence to wait for hours before that you can start processing the data so one of our main motivating use case was to bypass all this two-step process and make the raw data available as structured table structure in a structured format within seconds and let's see how you can do that to is using structure stream so let's start with a more concrete example suppose you have JSON data coming in from Kafka you have what you want to do is you want to parse the JSON flatten it because it may be very nested JSON and stuff so flatten the individual fields inside the JSON and store it in a structured parking table so that you can query it much more easily and obviously you want to get into an exactly once guarantee all the code you need to write is this this is pretty much the entirety of the streaming queries that you need to write - all the imports and stuff let's walk through this code step by step you can see three parts let me walk through them first step this is this is digging deep into how you actually read from Kafka so you have to specify the Kafka format and exactly how and where to read from for example you have to specify the Kafka Brokers these are all options in the in in the in the in the code so you have to specify Kafka or bootstrap dot servers where I specified the the comma-separated list of brokers you can specify whether to subscribe to a single topic subscribe to a pattern of topics or sub score or specifically assign a particular topic and partitioned so you can see that these things very similar to the subscribe subscribe pattern and and can assign of the Kafka client API is so we kind of match that and you have to specify why do you understand reading from do you understand reading from the latest offset or the earliest available offset in Kafka or something specific etc and what you get out of this this raw day this raw data object is essentially a data frame which and for those who are not familiar with data from data frame is sparks equals basic API which essentially stands for a collection of raw objects defined by a common schema and by schema I mean essentially this looks like a table which has specific columns with well-defined column types and in this case the data frame that is that is generated through the Kafka source is essentially has the key and the value as binary columns and the topic partition that each of those key value came from as additional columns as well as a timestamp on when the Kafka provides and when Kafka received a timestamp now that you have this data frame with all these columns all this information as data from columns you can now start doing arbitrary data frame operations for example in my case I want to first of all take the binary key value data from Kafka and convert it into a string so that I can parse it as a JSON the first step is casting it as a string second step is actually parsing it as parsing it as JSON so for that we have this inbuilt function in a spark sequel called from JSON where you specify the column you want to parse as a JSON string as well and the schema that you expect and what comes out is essentially a nested column of containing all the fields inside the JSON as a nested columns so we are converting essentially a string column into a nested column using this function from JSON and then finally you want to actually flatten this out because downstream querying flatten columns might be easier both from efficiency as well as from the user readability point of view so you want to note and that's also pretty easy just said select data dot star and which flattens out the nested columns then the final step before I reach the final step I want to say that I want to highlight the fact that not just from Jason there are there are a huge number of built-in API is inside spot sequel that allow you to do really very complex transformations really out of the box in the in the most efficient manner because we have gone ahead and actually build these functions underneath and taken the pains to actually make them super super efficient you to the point of using full whole stage codes and low-level bit pattern matching all sorts of crazy stuff that is done usually have been traditionally done in true databases we have all we have done similar stuff inside our spark sequel engine our tungsten engine and so you don't you have a huge plethora of complex functions built-in that you can leverage and get the best performance go to spark sequel so the final step is obviously writing it out of park' so now in this case I know that later I want to actually query probably the last one day of data or last 12 hours of data if something goes wrong I want to start quickly quickly start querying it so I besides saving it's party I also want to partition the data by date or by by time so in this case I'll partition I specifying that partition by date and then specify the checkpoint location and then finally call start now when you actually call start that actually starts the processing well till now as as these lines were executed it was only setting up the query it was not actually doing any processing start actually forces the system to actually start processing the data in the background on the cluster so and what it does underneath is that it goes through the whole logical plan optimized plan of the physical planning through just using the spa planner and start generating the seek this continuous sequence of incremental plans that keeps pressing new data from Kafka and what it returns is this handle to that streaming crane that is being executed and you can you handle to manage that streaming qualifiers that will get statistics out of it how do you do to monitor it to stop it started restart it etc and so what this does is that it's essentially reading directly from Kafka and immediately making the data available in the park' format within seconds so you're you can start running complex ad-hoc queries on the latest ask data within seconds because it's already in the structured format and this and this is a game changer because the way we do this is and we have written a blog post about this to go into more details is that both for park' as well as and which-which as well as for data bricks Delta which determines Delta was like is a was inspired by our earlier work on parquet the way we do it ensures that either if there is any kind of failure either all the data in the in a particular batch in is is visible in the park a table or none of it you can reason about the atomicity of what is available in the park a table so both in the open source version we have the the original version of it working on parquet or any file format this actually formed the inspiration what led to data rates Delta that Michael and Holly was talking about today so we have added both Kafka support over time as well you can write out two Kafka as you saw it in my earlier example or you can interactively directly query Kafka not as a stream but actually as a interactive or a batch query so you can essentially treat Kafka like a pure file system like storage system and you can say that okay I want to query from this offset to that offset directly from Kafka without having to even write to to a storage system like file system and stuff so that and that is often very powerful because you may want to actually go back to the raw data inside Kafka to do a different kind of analyst for example maybe or some of the data was corrupted which got filtered out in of writing out to the park' files you may want to actually go back and directly get the data out of Kafka and try to understand that why was it corrupted and try to recover the data and stuff so this makes it much easier to do that and this is something that I don't I very few other systems allow you to do we also have support for Amazon kinases as part of our data breaks runtime and it's pretty similar to Kafka you specify how you want to read it or using you specify your AWS access key secret key I am roles etc or you specify where to read from latest or earliest what is the stream name to read from etc pretty much same as Kafka okay so this was a high-level overview of something that is still reasonably simple at least conceptually something like that is only map only because they're not doing any sort of aggregation you're just trying to keep take the data clean it up structure put it into a well-known structure and write it out the next step usually is working when you want to actually process the data while being aware that you want to do analytics based on the time in the data working with event time essentially and so this is one of the biggest challenges that the earlier spark streaming the the D stream API is did not address well because it the API is were not designed to expose event time data in the right way and so this is something we learnt from our past mistakes and we designed the API is instructing in the right way so that event time processing using event time becomes very natural so some of the challenges that yeah we had to consider anyone once the process using event time has to consider is data can be late out of order and and how do you reason about when to how late of data to consider when do you what happens when there is later do you update the aggregate or not how what is the right way to update it reasoning about it etcetera so let's see how we the event time aggregations instructor stream so the way we have formulated the API is is that windowing is node is basically just another type of grouping so if you think about it windowing is essentially taking every record and based on the time stamp inside the record put it in a right window bucket now your windows if they are non-overlapping windows then every record will essentially go into a single window bucket or window group if they're overlapping windows then every record will go into multiple buckets so and thinking it in this way makes it very easy to follow the API because all we are saying is now instead of grouping by a particular specific key we are grouping by instead a window on a key and so here we are specifying group by window on the timestamp using one our windows so in this case I am specific one our non overlapping windows you can also specify overlapping windows like window timestamp one hour every 30 30 minutes so all that variation is possible and then you can also combine with things like with other grouping keys for example you want to get the average signal strength of every device every IOT device then you can go all over ten minute periods and you specify both the device as well as the window as grouping keys and we have support for average right built in right out of the box so this becomes pretty easy and you can write your own custom aggregations because we have all since long time and support and user-defined aggregate functions and in batch and those just seamlessly work in streaming as well what happens underneath though is that so we have these running aggregations going on for every window and so to keep these aggregations this partial aggregates alive across these micro batches each of these increment executions we keep them around as distributed state and what in every trigger every incremental execution it reads us in the previous state updates it and writes it out a new version of the state for the next trigger to consume now this state is stored in the executors memory as in the executors memory but it is also backed to a fault or and file system like HDFS or s3 by saving all the changes to that in memory State and into a into right ahead log so this is all inside the Check Point location that you have specified and then and this happens completely seamlessly completely behind the scenes and you don't need to worry about that all you need to specify is that checkpoint location and you're done and so this ensures that even stateful processing is exactly fault-tolerant and gives exactly ones guarantee or without you having to actually reason about what happens under in the case of a failure now this also handles late data pretty automatically so now if you think of this in terms of window buckets we have we can have a bunch of buckets open at the same time all four for every hours count so for example in this case in this figure shows that there are multiple 1-hour windows open and if there is any late data you can see that if there is any later that counts of the know of older windows can get incremented because we carry around those older windows as part of the state and so this seamlessly handles that whenever there is later it does get taken care of and counted properly but now the problem is that we don't exactly know how long to keep each window open so that it can it can keep receiving updates so to limit the size of the state because if we keep all the windows around the the size of the state will keep growing indefinitely so to limit and the size of the state what we have added is something called water marking and and a water mark is essentially a moving threshold of how late data is allowed to be so the way it is calculated is that you the system takes keeps track of the max event time that the data thus the engine has seen in the and this watermark is essentially a trailing threshold that trails behind this max event time so that means that for example if the maximum time seen by the system is 12:30 p.m. then and we tell this the query that the watermark is defined as ten minutes behind the max event time then the watermark in this case at this point of time would be 12:20 and this trailing gap is what we call the watermark delay and and the semantics is that any data that is late but still not less than 10 minutes late will be within the newer than the watermark and will be allowed to aggregate we're going to keep those windows corresponding to that that that that of times newer than the watermark keep those windows open so the data will bail out to aggregate but windows that are older than the watermark will be cleared off and any too late data that derives will be ignored so in this way what you can do is that you can specify or in the query or how late data you want to consider and then specifying this watermark is pretty easy it's just this one thing that with watermark what is the column by which I can calculate the event time and what is the delay the engine is should keep track off and and so with this you can specify exactly how how much state you are willing to keep to handle how much late data if you want to handle more late data then you you will have to keep more state in the cluster therefore having maybe a larger cluster or a cluster with a more memory whereas if you if you don't care about later too much because if it's too late then it's meaningless anyways there's no point using that then you can keep the delay very short and you will be requiring much less state so now this is this watermark is actually useful only stateful operation so if you specify this and but related do not do any kind of stateful operations this watermark will just be ignored and also if you if you write this if you run this query on not a streaming data frame but a batch data frame then the world this whole watermark is also completely ignored because in batch all this doesn't matter if there is no late-late data but by definition so and so when you do this watermark the system underneath what it will do is actually if they as data comes in it will keep track of what is the max event time in the system so the y axis is even time and this is the processing time all the wall clock time as the to streaming query is running so as data comes in with varying degree of event time the system keeps track of the max even time and accordingly calculates what the watermark should be and now if there is any new any data that comes out of order late but still higher than the watermark the those events will be considered and the counts would be updated but any data that is later than the or older than the watermark will be ignored so I've so this is explained in more elaborate detail in my blog post take a look at that so we are very close to the end of this session let's actually this is a good point where we can cut the session I'll start off exactly where we left off right here in 10 minutes thank thank you very much any questions there will be time at the end of the next session for questions so please hold after all your questions and do remember them because I would love to hear your questions thank you very much see you in 10 minutes
Info
Channel: Databricks
Views: 8,045
Rating: 4.8651686 out of 5
Keywords: #EUdd1, #ApacheSpark, #DataScience
Id: _jPKqJ-gaIY
Channel Id: undefined
Length: 30min 41sec (1841 seconds)
Published: Thu Oct 26 2017
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.