Building Data Product Based on Apache Spark at Airbnb - Liyin Tang & Jingwei Lu

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
so okay let's get started on my name is Li Jing and this is Jing Lai and both of us work and data you for tea at the Airbnb so today we're going to talk about how we're building the data products on spark at the Airbnb so first before we dive deep into the data products first let's take one step back to see what's the overview of the data infra at Airbnb as you can see here there's two major incoming data source the first is the analytics event sent from the web servers all kinds of service these events will be published into Kafka and we will rent a badge ingesting job to read from Kafka data and land into the warehouse the second major source of data is coming from my sequel so lbm be using my sequel as the primary online storage so we run a batch job to ingest the mexico daily snapshot into the warehouse based on this tuna sauce we can derive model set in the warehouse and we have two major data warehouse we call the gold and the silver so this to deduct these two data warehouse serves to two different purpose first they can give us some isolation so critical pipelines running on the gold cluster and ad hoc pipelines running on the server cluster also will in the rear to replicate the data from gold caster to silver so the critical data set will be on both gold and silver cluster in addition to that we're using elf row to schedule the batch processing jobs so users can define their data processing pipeline in air flow using different core engines like a spark hive or presto we also have the data visualization tool like superset provides people a user-friendly interface to slice slice the data to run interactive query to get more insights of data so both rear soupiset and l4 are open sourced and LFO right now actually is a patch project so last year we actually start new initiatives to unboard the streaming processing at BB&T as well we take advantage of the spark streaming job to consume events from Kafka apply different computation logic and write the data to different things we write your data dog for dashboard and alerting right to dynamodb for online serving right to elect search for like online indexing so we often say building data products requires to implement the land architecture so what is the lambda architecture so lambda architecture is a data processing architecture to handle large-scale data by taking advantage of both streaming and batch processing it attempts to balance the latency throughput trick throughput and the fault tolerant by leverage batch processing to present a comprehensive view of the batch offline data in the meanwhile using streaming processing to get the view of the online data it's also need to merge the offline data with the online data in your country in your comprehensive view before presenting to the end-user the rise of the land architecture is correlated to the growth of the big data the streaming processing and the efforts try to mitigate the traditional MapReduce kind of batch process so since we have both the batch processing infra and the streaming in front so the next step is very naturally try to unify these streaming processing and batch processing together so we build a new framework called L stream the goal of L stream is try to unify both streaming and a batch and provide a single computation interface so user can apply the same computation logic on both streaming pipeline and a batch pipeline the addition to that the l stream also provides a shared estate stone the share that states dog is trying to share the state across multiple streaming and the bad jobs also play also play are also served as a final storage to merge the streaming result with the badge badge result and we will get more detail into that so first let's maybe take one example to illustrate how a stream can combine the streaming processing with the batch processing so in yellow stream user induced by five three different parts the first is the sauce which defines where the data coming from they could be streaming sauce or it could be static like a batch sauce the streaming sauce could be a classical topics like multiple graphical topics and a badge sauce could be a hype table a high partition or even a result of an hive query the second part is to define the computation how we're going to process this incoming data source user can using spark Seigle are using different UDF's to compute these data frames for both streaming and a batch the next stage user can't you find the things that's where other data finally while retaining true these things can also be shared in both streaming case and badge case in this particular example the data can write you into HBase is a distributed key-value storage and in the streaming case the streaming Java right through HBase directly using some HBase native API and a batch case they were generates each base edge file and the bulk upload that file into edge base so both sinks and the computation logic can be shared between both streaming pipelines and badge pipeline in addition to that like a user can specify multiple steps or multiple stages of process anything's like each result of this process can write you different things so as you can see that it's a dag like a dependency it's a computation flow so it's not only the process logic the process operate itself will be unified for both streaming and a batch actually the Intel pipeline the Intel computation flow can be shared an impost streaming job and vege job to recap users only need to write a job configuration the job recovery team divides the sauce either the static sauce for batch job or the streaming sauce for the streaming job the computation logic the operator the sinks and the Intel computation dependency flow can be shared for both streaming and a batch addition else stream can provide like a single driver interface like a single arrow string driver can run both streaming job and a batch job that covers how we unify the streaming processing with the batch process the next we're going to talk about how we'd like to take advantage of the shared state storage so when there's multiple reasons we need a shared storage we're going to first our word essence catalyst and the state folds like a spark streaming processing and for the state for processing often times we need to share the state across multiple jobs and so this state is not just unique or local to one particular job it need to be global access for by multiple jobs another reason is we need a sheriff datastore to merge the results of this batch job and a streaming job together before it can present to the end users and the email screen we choose using edge base as a shared as a store so why HBase so HBase is that distributor multi-column family motive versioning key value storage it on top of the HDFS and it's modeled after Google's big table which is edge base because it is well integrated with the Hadoop ecosystem so user can use it a native HBase API or write a MapReduce job using the hive edge based connector SPARC HBase connector or presto connector to access edge waste directly it can also provide an efficient way or efficient API to handle both streaming updates and the batch update that's actually a major differentiate compared to other key value storage also educates can provide a rich API to support sequential reads and the point of lookups like the motif get API in HBase each row key can have multiple versions and we leverage this multi version feature to merge a result for both streaming and a batch so let's dig deep into how we actually unify the write API and how to take advantage of this streaming rights and batch right so when we're using edge basically a downstream case we always preach a to the table into a fixed number of regions so when we want to load that data frame into HBase weari partition the data frame based on the region keys so each partition will be corresponding to one region in edgeways during the streaming job they will write through each base using the motor API directly it's avoid to causing too many connections to across different regions or because as simple a one-to-one mapping during the batch case for each partition they can generate its own edge file and upload that particular file to edge base that take advantage of the ASM the load log structure and merge tree of HBase so user can upload the database file directly into the database from the end user perspective it can provide a unified API for for this write for both streaming case and the batch case and internally is choose the efficient way to handle the backup load that's the major difference compared to other key value storage for read the API there are a lot of like a rich for API we can choose the multi get for point lookups the previous scan the long-range scan the time range scan user can choose this read API based on the application logic at the table schema and we can we actually have a few others like later to share the use case how will use these api's to compute a long window like moving average or long window like unique account so in each place like each row key can have multiple versions it can be used to merge the results for both streaming and a batch here's one particular example as the streaming Java runs the streaming job can keep can keep updated a particular role key with different value and different versions as you can see here the r1 actually have three version with three different values so when the batch job actually available the batch processing will also update our with a different value as a different version so now when user query r1 actually the user can get the merged of view of both from both screaming and the batch the action merge logic can be shared at the action much larger can be customized for each application for instance if user want to query r1 at the particular time stamp like a timestamp 101 it can get all to merge the value before and add that particular value so to recap we believe our stream provides two major foundations first it's try to unify the streaming process with the batch process so user can use a single computation interface as in boxing the the same computation logic for both streaming and a badge it also provides a shared space talk to share the state across multiple streaming jobs and bad jobs and emerge the final state before present to end-users so with that team we're going to talk about more use case how we leverage our stream to boot the land architecture for data projects okay the first thing sleeve forgive us introduction of Airbnb data infrastructure and the two foundations we used when we apply the lambda architecture into our work next I'm going to drill down into a couple user cases and share some of lessons we learned and the tips as well so the first a user case I'm going to talk about the database snapshot sometimes we also call them database export the task is it's very simple basically we want to get the data out of production by sequel database into hive data warehouse there was a four major challenge for us when the data size is a small database is small it's relatively easy for us all the Airbnb data stores in a couple large my sequel database getting the data out of them becomes quite challenging the second challenge is the real-time news most of our analytics pipeline can torrent a day of delay but some cases for example for all the detection a anomaly detection these kind of pipeline will require hours hourly delay maybe at most even minutes the third challenge is how to guarantee the transaction boundary when we export the data given that the original data modification is touched it bounded by our transactions in many application people modify multiple tables in one transaction try to guarantee the correctness but when we export the data these data the table once they reach to the hive med store high data warehouse they have to guarantee the transaction semantics otherwise analytics will be wrong the last challenge we have with that my sequel database people often change their schemas how we make sure that the schema change is propagated to data warehouse automatically and correctly this diagram illustrates some different versions of our solution over the times so first our first the solution is just take a database so first our my sequel database is AWS IDs instance so we take a snapshot and restore to a replica and then use a scooper to down to the data into hive into HDFS because the database is really large it takes over 20 hours to just create the back to restore the backup and sometimes we will have hit incompatible restore failure and we don't even get a restored and so the whole process takes over 24 hours which become a problem for us the improvement we did the first improvement we did that instead of restore the whole database copy the data out we develop a tool called a spinal cable which is a big log listener listens on on the my sequel database changes so we send us a either the changes send it to Kafka and the users box streaming job to process those changes all the changes will storing the HBase and the merged with a previous seeded snapshot and then the final result will be delivered to hive data warehouse this solution works pretty good for for some times but we realized there are certain issues for example it's not a lambda architecture we have different code for the Streamy processing the batch seeding so maintenance become a little bit problem and it doesn't scale that well it doesn't provide us a good disaster recovery so the latest solution we have is actually based on again also based on bing lock but instead of listen under the being lark directly we we just copied a big naka to s3 buckets so what that gives us that we not only we are not only be able to get the real-time changes from the database we can also keep a long history of our databases changes so we can keep years of transaction log in s3 bucket we have a unified job it can either run in streaming mode or in batch mode they are exactly saying they will process those being log apply the changes to the HBase and we can down to the data from HBase to hive data warehouse in the same times we will the same job we can share some of the computation component and do the seeding from the my sequel database to bootstrap our process so let me show you more detail into this job and so that from this diagram you can see that if the input is a big log whether it's a real-time or history it actually goes through a log of our servers and the log step will be separated into three streams one is a DDL which we passed through the schema process that will actually generate a database to the schema at any given point time and storage base the DMA are the changes to the data and that the change process will push them based on the schema into the HBase we also keep track of the transactions which is important by keeping track out of transit transactions we can as a transaction boundary so that when we down to the data out of the HBase we can guarantee all the table they are they meet the transaction semantics at the same time the schema process and the change process are shared they can use to directly generate the seed received from the bicycle instance so one important thing that we use my my sequel be market position which is a file number plus the offset as our timestamp for merge the second the important part is that all our write operations are idempotent which means that we can replay log at any given time window to do disaster recovery or reprocessing for that purpose the second case i'm going to talk about is related to the real-time indexing so in in Airbnb we have many products they require the provider certain like for example full-text indexing and the search ability but we really cannot build this on top of our production my sequel database it basically is due to that it's the database is pretty much overloaded so if you're adding all these additional features on top of that it it will just melt down right so the solution we provide actually we leverage this lambda architecture and do a separator service to do this kind of indexing into elastic search and while in near-real-time fashion and at the same time my the main production my sequel database don't have much overhead so again so the two parts the first part is a for the streaming part events or the database in the rotation goes from Kafka goes through the spark streaming job to all kinds of processing and maybe gather more data and send it to the elastic research batch case we get the data from the hive data warehouse to the same computation we can round the batch job either daily or hourly or in some case if we think as a streaming job is good enough we can just run once to pre-populate to the index so this pattern actually appears a lot in our productions not only provide the search capability for example in some mobile growth scenario people actually leverage this pipeline and the push the enter data to a external HTTP and the point to do different things then the sort of major user case is how we do the real-time OLAP analysis using leverage druid and a spark streaming so again this is also same same as a previous example this is the lambda architecture what I show here is simple streaming version of the pipeline data goes through Kafka and the streaming Java will separate the data into dimension and metrics and the user through it beam sender two to it to do the real-time indexing there there is a separate batch pipeline to do the to load of the data to do it in batch fashion right now it is doing a using MapReduce job but we are planning to move to spark as well so that post can be unified when after the data gets loaded into Jewess we have a soup where our tools called a supercell array and super set can actually connect to the jury and the wrong slice and dice on different data set and the near real-time analysis which is really powerful if you do some experiment you can visualize it in near-real-time so these are three main user cases I want to talk about today we have a lot more user cases which people it grows a lot including the last a year or so when when this architecture being used by different teams and they are grow organically that in the companies I company the next I'm going to share some of the tips and lessons we learn when we apply this architecture so the first the first thing I'm going to talk about is the moving window computation so normally when we do the streaming computation you want to do some operations based on a certain window if the window is small it's not a problem you can catch the data in in memory what if you want to press your window is like a year long then there are lots of data you need to be cached and sometimes if your process crash if we start those history is gone so for us we luckily we because we use HBase as a shared status store storage we can't do this kind of long window computation based on HBase the first example I want to give is how to do a distinct count in a larger window suppose we have suppose we want to figure out how many unique users have visited a particular Airbnb listing over the last thirty days to answer their question you can do you can build a 30-day window and just do a group by to a distinct count but that normally won't work if in the English for the reason that I gave before what we do that we store the data in each base using listing ID and the visitor ID F key by using those two composite key we can do the basic you can do the distinct use account for for each listing when we use when we do the query we just do a prefix scan with the given time range the time range will be 30 days so the next thing is count a lot counter if within a very large window so what we do that we store the listing and the counting using the HBase count API and we do two queries query the count at the beginning of the window and the count at the end of the window by subtracting those two numbers we get the count of waiting it's a sliding window another thing that because we have assured the state store it's nice that we can provide interactive query on the real-time data by providing the Presto connector and the hive connector and sequel connector so the query can be the query we can use the seeker lab to do the inter inter ocular query on top of our real-time data so another thing that we have a schema thrift schema object so that we can chance transform our data into a data frame so I'm going to skip this so at the end I want to emphasize two things the first lesson we learned that I think it's important to have a unified batch and streaming computation ideally we want to have just one job the only difference is the source definition the reason that sources normally will have different format and they need a different different kind of preparations so it's okay to have a different source definition but the computation and the sink should be saying the second lesson we learned that is a global status store give us a unique advantage many of our user case is enabled by because we have a shared a global status store so that draw the conclusion of my talk I think we have we have been doing this work for a while and I think it's important to share this the lesson we learned with you guys and by the way we have a happy hour going on today at 6 p.m. in the bee restaurant bar if you are interested and want to learn more detail you are welcome to join us thank you [Applause]
Info
Channel: Databricks
Views: 2,214
Rating: 4.8666668 out of 5
Keywords: apache spark, spark summit
Id: 14Db01lrrnA
Channel Id: undefined
Length: 29min 57sec (1797 seconds)
Published: Mon Jun 12 2017
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.