Real-Time Data Pipelines Made Easy with Structured Streaming in Apache Spark | Databricks

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
[Music] let me start with a brief introduction about myself first of all I'm more conveniently referred to as DD for obvious reasons so I I used to be in the UC Berkeley amp lab that Joey referred to earlier and so that and that's where I started building spark streaming with mati and it was our grads our grad school project which turned out to be something much larger than what we imagined so and right now I'm it's been almost four or five years since then and right now I'm focused on actually building the next generation beyond sparks doing which is structure streaming and so I'm an engineer at data brakes we are I'm part of the stream team and our team water is we make all your dreams come true also BMC member of a Paris part ok so here is your one slide about spark for those three folks spark if you is a unified producing engine that allows you to build all sorts of cool big data applications wheat streaming applications machine learning applications graph pressing applications in java scala python r whatever language you prefer tell us build all these all sorts of cool applications that read data from all different kinds of storage because part can talk to a lot a lot of them and allows you to run in those applications on lot of different environments B DC to guinea cloud Hadoop er and me sauce and we recently added the kubernetes in the last Apache 2.0 release so hopefully it all up to speed everyone knows exactly what about you Sparky's oh okay so I'm yet to talk about actually about data pipelines and this is something that pretty much every company dealing with data has to build you guys know about that but here is where I would like to start from our perspective on it this is data pipelines in at 10,000 feet view and many data pipelines just kind of at a high level fit into this model you have unstructured data streams of data coming in you the first step is actually dump it as fast as possible in the possibility unstructured form with maybe a little bit of cleanup and stuff but then there's usually a separate process that is going to ETL the data from your data Lake where the data is dumped into a more structured form in some sort of data warehouse and then the data is ready for for their analytics like doing what whatever sort of analytics you want to do now this her high-level looks clean understandable but the implementation of this is really clean so here is a data pipeline of a fortune 100 company that database is currently working with and this is how it looked like before we came in so they had trillions of records coming in from all their network infrastructure their server infrastructure in their own data center they have bro logs a lot of low level logs application level logs all coming in and being dumped in multiple data like spread across multiple clouds and then there's usually a separate complex ETL process to read all those out a util them and put them into multiple dedicated data warehouses and their multiple because each data warehouse is kind of designed and tuned for one particular end use case for example alerting is requires certain kind of database reporting requires different kind of database searching would require a different kind of data warehouse and so there so they had essentially this messy architecture of multiple data Lakes multiple data warehouses and things don't very well integrate across each other etcetera so besides the general messiness of the whole thing what the major problems they had which is why they came to us is that you have hours of delay in accessing the data from the data from the time when the data is generated at their servers or network equipments by the time it reaches the data where else where they can do a useful stuff with it it is ours really ours and that was not acceptable when they have what to do critical stuff like intrusion detection as fast things like that and each of these systems are very expensive to scale because data warehouses often are not built to really scale horizontally there and data is locked in very proprietary format so so pulling data out from the the structure it out from those data warehouses can be often clunky slow etc and they did not have any support for any sort of advanced analytics like machine learning and stuff because those data warehouses were not designed for specific for those purposes so when we came in we saw them we work we work with them figured out a cleaner approach and that led to something like this and that's what I'm going to talk about today so we replaced their complex ETL logic etc too with something like with spark spark structure streaming that's going to be my main focus after they stopped the in terms rather than use their storage solutions of variety of data lakes and data warehouses we went ahead and designed and built data big Delta for them I'm going to talk a little bit about that towards end and then what Delta X Delta solves their storage problems in such a nice way that now you can use spark for any kind of operations like all the downstream requirements like Galler ting monitoring etcetera all of them can be built on top of just these two systems so and that essentially allowed the data to be usable within minutes if not seconds it was easy to horizontally scale all of these all the formats are essentially go pen so you can you're not locked into a particular data warehouse solution and it of course it was built on spark so it automatically enables advanced analytics like machine learning etc you can integrate all of that in the same in the same of your pen ecosystem alright let's talk about struct assuming first the way I would like to present usually about song siming is to start with the philosophy the philosophy that we had the sort of the realization we had when we designed structure streaming is that the end user that is you should not have to reason about the complexities of streaming late data out of order or I will all of these complexities getting out what to do when to drop data etc you should not have to reason about all of these you should have to simply write simple queries as if you're pressing batch data that's usually simpler and it's Foggs job to automatically figure out how to convert that simple query into a more incremental streaming version so that that can run on streams so to do that we realize that we need us one simplifying simple unifying abstraction and we realize that tables are essentially that one unifying abstraction you can have tables you can have your streams which Reedus tables essentially every new data or new record that derives in a stream it's like a new row being appended to two tables and so we have these tables as a common abstraction that can represent both unbounded data which are streams as well as bounded data which are batch tables so let's to understand how this signifying abstraction helps let's walk through a simple example it may be simple but often building something this simple in other system becomes very clunky and complicated so let me demonstrate how easy it becomes in the system for example let's say you have a JSON data coming in through Kafka what you want to do is parse the nested JSON into more structured form and write it out as part case so that you can now do interactive queries on that part a table and obviously we want to ensure into an fault tolerance guarantee so that you don't want to drop data or have data duplicated so to start with this first you need to define what your source of the data is and that's these four lines of code where you are specifying that I want to read from data from Kafka which is the format Kafka I want where do I want to read from which is what servers Kafka is what topic I want to read and load it and so Kafka is among the few built-in support we have files and we have also cases in our data breaks offering and you can also combine data from multiple different sources using Union join etc what this returns is essentially what we call a data frame and a and data frame is programming the main programming abstraction for Apache spark so data frame do the the simplest way to think of data frames is that they are semantically equivalent to a table it's a bunch of rows which have a well defined schema and since and and going back to the unifying model the tables can be both bounded and unbounded data frames can we both represent both static data as well as streaming data so it gave this single API for representing both kinds of data and and the cool thing or data frame is that there are multiple different variations of the API that you can use to program depending on whatever you are comfortable with for example these did the spar Gator films are very similar to Python pandas or our data frames for those who are familiar with it typically data scientists are more familiar with those set of things or you can you can express your business logic in pure sequel and and get data frames out of it as well typically BN lists are more familiar with sequel and then if you want to write something a lot more complex like very custom processing in a MapReduce cell fashion you can write you have this variation of data frame called data set which are essentially statically typed data frames and so there you can write your function literals and lambdas in a fully statically typed language like Java and Scala and you get full compile time type safety for for complicate stuff and not a pigeonhole that one one data scientist should use this and data engineer use this the nice thing is that you can pick whatever hammer you want to use for whatever nail you have and you get the same semantics across all these different variations and the same performance guarantees you're not restricted to any using one or the other so going back to the query so what this returns is essentially a cough cough car data frame where and where each row is like a record inside cough but it kind of expresses each row in terms of these clear schema for these columns where we have the binary key the binary value the topic and partition where the the record came from the offset of the record and the kafka timestamp so now since it looks like a table it'd be like a table you can do all sorts of operations on it like a table for example let's do some transformations so here remember what I want to do I want to basically treat the data inside kafka as json and parse it out so that's what I do if I cast the binary value to string and then apply this function call from JSON on with the specific schema to to put two parts of the data and from jason is one of the hundreds of built-in sequel functions that automatically are super optimized based on because we have been optimizing that for over last two three years that you can use to make all your complete processing much easier and you can obviously write your own user-defined functions you can write your own lambdas you can write your own MapReduce style functions you can invoke machine learning modules through that so all of that can just fit into these this data frame data set programming interface finally we want to write the data the the parsed out data out into park' so that's what we specify as a sink or we transform and so you can specify in this case we are specifying that we won't write it out to a park if format into a particular path which is our pocket table and while we have native support for files in Kafka you there's also a method called for each where you can specify your arbitrary code to write it out to arbitrary systems and and depending on your sink implementation area for each implementation if you write it out transactionally then you get into ant fault well and exactly ones guarantees but to get that you also need to specify two more things one is checkpoint location which is where the spark will definitely save all the necessary data to figure out how much it has processed and therefore what it needs to do process for tolerance and that is the one that crucial for getting the exactly ones guarantees and also you need to specify thing more processing level details like trigger which is how do you want to actually execute it when do you want to immediate you want to emit data every one minute or every 10 seconds or continuously which is a new execution mode in spark 2.0 I'll talk about it later so you need to specify these pricing details and then call start when you call start what happens is that sparked the the course this foxy collision takes this query wait in data frames data set a sequel and converted it converts it into a standardized logical plan then it converts to an to figure saw what is the best an optimal way of executing this logical plan into a more optimized plan and from that optimized plan it essentially figures out how to run this code incrementally on it's automatically and therefore and and and generates a series of incremental execution plans which executes snoo batches of data as new data arrive on the streams so SPARC takes care of automatically streaming fiying your query from the very batch table like query that you've written and it also automatically takes care of the checkpointing based on the checkpoint location you have provided it's usually HDFS st or some sort of fault tolerant storage format where spark will save all the necessary offset information like what cough cough sets have been processed it will save it you know using JSON enough so that its future it is always forward compatible with git is always forward compatible and with that you get exactly into ends once guarantee and you also get the guarantee that even if your code changes within certain constraints you can always recover and start where the previous version of the code left off and then this is often very useful where let's say you have you receive some corrupt data and your query failed because it could handle it you can add an addition filter to filter those corrupt data out and start where it left off going back to the query so this is what we started to build and what the end result is that you have this query that can convert unstructured data from Kafka in a structured format of park' making that available for further downstream processing within seconds this is great and this is this when we did when we did this a couple of years ago this was by far very different and a leap forward than what was possible without before structure streaming and this by the way is really fast for example we built the Yahoo benchmark to run on both spark and flink and we we and to compare the raw throughput performance and in a 40 cold 10 cluster setting we were almost 3x faster than the nearest computer Apache flink and for you guys that essentially means for your requirements it will be 3x cheaper to run on apache spark than anywhere else so let me focus on another aspect of this query so note that the core business logic of how you're transforming data is in just in these couple of lines whatever is around it is essentially peripheral code that specifies whether it reader where to read and write from and how to process but in this case if you just flipped the that peripheral Court ready to use read and write instead of read stream and write stream then your query automatically becomes a batch query not a streaming query and the cool thing is that your business are equally mains completely unchanged whatever complicated map function reduced function you have applied all of that doesn't need to change at all because it's the same API same statically typed API it's exactly the same so you can build libraries of these transformations and just use it in whatever context you want so you write a business logic independent of what is your future execution mode going to be and for in terms would you dare have therefore have three choices either you want to run it as a batch as a bad job or an interactive job which if you want to do it periodically in your data pipeline you will have hours at best minutes of latency but you can execute on demand you can optimize your execution costs very well schedule it on gone spot nodes and stuff but you are going to get really really high throughput you can choose to put it on our the default streaming execution engine them the microbats engine and you get low latency of the order of seconds but you also still get the same high throughput because it's reusing the exact all the same machinery that the SPARC batch engine is using the coarse parts equal optimized engine and recently as of last month in spa 2.3 we also have this continuous streaming mode which gives you all Trello millisecond level agencies at the cost of slightly higher resource usage because unlike the microbial model here you are statically allocating resources in the cluster to continuously process data as opposed to in micro batch where you are you are gathering resources as and when data arrives so it is much more so one is more efficient the other gives you lower latency you have the you have the ability to choose what execution mode you want without your query changing so 2.3 has an experimental release which works on non aggregation just simple ETL kind of stuff I highly encourage you to try it out the next step usually that people do beyond this simple ETL kind of pipelines are event time aggregations and and even time aggregations people usually think of windowing windowing based on the event time and in in our API windowing is just another form of grouping essentially you are taking every record and putting it multiple groups of windows what whichever window time windows that that record falls in based on the event time of that record and you can specify so so to specify this when doing you have to specify what is the timestamp column and what sort of windowing do you want in terms of how the window length and this and how how the window moves and so you specific you can specify only time-based windows and you can combine that with other kind of grouping as well together this is very sequel ish and stuff and of course we support user-defined aggregate functions as well underneath under the hood what happens is that to continuously compute these gag relations your the engine needs to remember what was the partial aggregate every time a new data is processed so that it can continue where it left off and and the way it is done is basically by maintaining distributed state in the cluster so on giving us during the spark cluster on all the worker nodes this the state is maintained as in memory state although it is backed into as a writer ad log in HDFS any state changes so what happens is that in every micro batch it reads the previous state Gupta's aggregates and writes out as the new state to be and an automatically synced to the fault tolerance storage s3 HDFS whatever and this all happens seamlessly under no hood you don't need to worry about it all you need to do is specify the checkpoint location and it takes care of everything and you get the same exactly once fault tolerance guarantees that you expect out of spark this way also automatically handles late data and this is why the user doesn't really need to worry about it too much so if you happen to receive if the only happens is receive some record that is out of order and late all it will do is it will figure out that oh it needs to update an older bucket rather than the newest bucket was actually updating and since all of that information is already present instead let's just go ahead and update there are the right bucket and therefore late data gets taken care of automatically but the side effect of that is that the engine really doesn't know when to stop updating old buckets when it can kind of close the bucket and say nope I'm not this is not going to be updated so I'm just going to not throw it out of state and and because it doesn't know it can never throw out anything so the state will keep going indefinitely to avoid doing that the you don't want that to happen usually because more state means more memory more resource requirements in the cluster so to avoid doing that what we what we are sponsoring supports is watermarking and for those who are not familiar with whatever marking is it's essentially like a moving threshold that is trailing behind the maximum event time system as seen so for example if at any point of time among all the records that has will process the maximum event time is 12:30 p.m. and you have specified this trailing gap which is called the watermark delay to be ten minutes then the at that point of time your watermark is at 12:20 and what and what the engine can automatically do is that any record that is older than a watermark that is that has been delayed by more than 10 minutes it can automatically start dropping that data and so for example if you are aggregating then your data if we give it as let's say five minute delay then it is late but it will be still allowed to aggregate and update the counts and the engine you are going to figure out when to how much of the state to maintain to get the correctness guarantees within those bounds but if any record is more than 10 minutes late then it will automatically be dropped and it's corresponding buckets in the state will be automatically cleared out it's very easy to specify this 10 minute trailing what gap this watermark delay by just specifying this method with watermark what is the event timestamp column and what is the trailing delay and the system takes care of automatically figuring out at the run time and when it's running what is the current max event time so it will add vaguely track based on the as data comes in so the x axis here is the processing time which is the real wall clock time and as data is coming in the y-axis which is the even time it can move up and down in the event time and it will the system will automatically track what is the max even time and based on that it will keep updating what is the current watermark which is the red line here and based on that watermark value it will figure out if whether the record is late but still will still give of the watermark or if it's actually older than the watermark and based on that it will decide whether to consider it or not and I've died for a lot of these I have more detailed explanations in my blog post so please take a look at that so I am NOT going to be able to cover a lot of the more advanced operations in structure streaming do we have blog posts and multiple talks about that I highly encourage you to take a look at those but things like automatic streaming deduplication we recently added support for stream stream joins we have always supported stream by joins but we also have seen stream joints and we have the ability to arbitrary stateful processing where you want to do things like session ization and stuff and all in all of these cases you can use watermark to automatically clean up state or do custom state clean up yourself in the case of garbage we say to proceed anyways so going back to the original picture of how we build the new data pipeline using apache spark so the second part of this building this end-to-end pipeline was data with Delta and since I really can't go into too much detail about one of our 1400 customers on how things look like let me use our own example of how we do ETL editor breaks to kind of explain what are the problems we actually faced while using structured streaming alone and how we came up with the idea of data because Delta and how that helped immensely us and there and and therefore we made a product out of it and goddess can use it as well so we had essentially the same problem as most of you face we have dead events coming in from Kafka these are our users logs from we manage the managed service in the cloud so running on various different cloud environments AWS as your different locations we have all the events coming in through Kinnison Kafka now we want to do one essentially streaming analytics on it we want to do build long-term reports on it and so how do you build this so when we started building this we the first thing we went about was we started building seeming analytic dashboard using structure streaming where it failed is that we can't Kafka can store only maybe you seven days or maybe few weeks of data and anything more than that is probably really expensive so what do we do about reporting and conditional queries so we had to build another pipeline that takes the same data and puts it in your data Lake so we have your typical lambda architecture we have two different pipelines which automate and then you so now with the with the data Lake we we can do reporting and stuff but that but then we ran into other problems of having two different pipelines so these two by pipelines even though you use different we use the same business logic thanks to spark structure streaming and sparks equal and data frames there were still certain kinds of drifts things that we saw in the swimming pipeline that had to ignore certain amount late data but the the actual bias pipeline had all the data sometimes you need to cross validate to figure out what is the div between that and if cross and that cross validation often leads to that we need to reprocess certain parts of the data because things were wrong so that that that led to another challenge that how do you validate across to these two different kinds of pipelines how do you reprocess the data data in the data lake such that it can be kept always correct and stuff and it's the golden source of truth for any kind of monitoring the other set of problems that we ran into is query performance well when you're dumping data into a data Lake from us from a stream pipeline a common problem that you run into is that you have really tight defense because you're dumping data let's say every for one minute we you end up with very very tiny fence and that leads to a huge blow up in the query processing times because more listing more fine listing etcetera it just gets complicated so we need to compact the small files as well but that means and this along with that reprocessing we need to do both of them lead to these sort of complexities like oh I want to replace this part a file with that part a file and while in the meantime the reporting job is actually trying to read the Sparky files what you ended up is getting a lot of file not found exception z' because things are moving around its inconsistent so you need to schedule the reporting to not run at the time where the compaction is running and things got a little complicated I see some of your smiling yes a lot of our customers also had the same problem which is what led us to the drawing board that what is the right way of doing this thing and that's where we did Derek's Delta so dynamics Delta is a storage solution that allows the scale of data leaks it has a reliability in performance of data warehouses and it provides the low latency that streaming pipelines require let me so what do I mean by that and so the the greatness of data warehouses is that you have data in pristine quality in purely structured form you get transaction reliability and you obviously get fast queries because it's super optimized databases are super optimized to do that whereas there are lakes you have massive scale on the cloud storage you can just dump files onto it usually people do it in really open formats like Park and oak or C and because of its drop in nature you can do all sorts of complex analytics on top of it machine learning streaming etcetera now to combine the best of both of these worlds we we could do these were like for example data because Delta it's naturally built on the spark and decouple compute and storage architecture so it can horizontally scale out really well it provides the reliability guarantees of data warehouses because it natively supports asset transactions when updating data and data validation etcetera things that you expect out of a data warehouse data warehouse you get the performance of data lakes of data warehouses because it also supports caching and indexing that things that you find only in data warehouse is not typically in data lakes that well the output format is still open and so you can do all other kind of stuff on it so that the benefit of data lakes do you get there and all of this can be done at low latency so it integrates very well at streaming so this is how our new pipeline looks like and this is very similar to the fortune 5 the 1,400 company that was talking about we have data from Kafka structure streaming writing into a single massive data X Delta table which has all the data in it so we don't have two different pipelines there is there's only one pipeline so and from that massive database table we spawn out more structures having pipelines to create more final summary tables that are specifically designed and optimized for the specific workloads like streaming analytics our reporting and with this biplane going back to the challenges we faced we don't have the lambda architecture anymore it's one single single pipeline we don't we really don't need to do evaluation even if we do need to do valuation between old and new data it's all in the same location so it's much easier and all sorts of reprocessing and compaction related problems the file not for an exception there's goes out of the picture because all of this because data break supports this provides transaction guarantees when updating data therefore you will not ever see file not found exception and all the little nasty stuff like that so you don't need to worry about any of that you can feel free to schedule your reporting whenever and your compaction job whenever any ways to summarize Delta X Delta is part of the data breaks offering we have a unified analytics platform to solve all your data processing needs be it data engineering stuff or data science is stuff and and and and we really build this platform because we wanted to build the same kind of we wanted to help others build the same kind of data pipelines that we had to build ourselves and structure swimming is recently a fast scalable falter and suppressing engine with high-level user-friendly API is whereas Delta is the other part of the end-to-end pipeline solution where we solve all your storage problems by giving the reliability of data warehouses and the scalability of data lakes you can read more on our blogs the semi programming guide on our data breaks Delta website huh and if you want to learn more the best place to learn more about latest and greatest is the spark some it spotless a I submit now and it is in from June 4 to 6 there are still tickets available I highly encourage is to come at the spark summit and talk to us thank you very much thank you so I'm wondering whether data bricks Delta is open source and if it's not like what's the kind of architecture and what does it look like internally is it something a little bit like kudu or so dinner breaks Delta is not open source it's part of our Derek's runtime offering which is Dericks runtime is the more optic a more highly optimized version of a patches part that we offer and internally it essentially looks like a bunch of parka files similar to a parka table except the the consistency guarantees of what parka files are present etc is managed by a log that is updated transactionally so at a very high level it looks pretty much like a database there is a log which is updated transactionally and there is a bunch of files which in case of databases it used to be super optimized and coupled with the engine here we just use open formats to save those files so if anyone wants to access those files from outside there it's they are not blocked from doing that you
Info
Channel: Data Council
Views: 43,976
Rating: 4.8202996 out of 5
Keywords: apache spark, spark streaming, Structured Streaming, structured streaming spark, streaming etl, structured streaming with kafka, structured streaming in apache spark, structured streaming spark kafka example, tathagata das databricks, spark 2.0 structured streaming, structured streaming spark 2.0, spark structured streaming, spark streaming machine learning, spark streaming architecture, learning spark streaming, how does spark streaming work, how spark streaming works
Id: wQfm4P23Hew
Channel Id: undefined
Length: 35min 22sec (2122 seconds)
Published: Wed May 30 2018
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.