Big Data Meetup @ LinkedIn--April 26, 2017

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
[Music] check all right let's get things started if folks can slowly settle into their seats you guys hear me all right looks like I need to be much louder so if folks can slowly settle into their seats we'll get things started I know there was peace out there it's now gone more is coming so don't worry you won't go hungry in the meantime feel free to grab some beer wine water okay thanks for coming welcome to Lincoln's Big Data Meetup we are kick starting with this one for this year and we would we would be continuing to do this at a regular cadence moving forward today we have three speakers Shaun from uber sashanka from LinkedIn and Mars from Lincoln talking about Pinot Goblin and warehouse effectively in the future meetups if you guys if any of you interested to come and talk what you guys are doing please send me or teacher or any one of us email and you'll be happy happy to host you here same time if you guys are hosting some meetups on the big data space and if you'd like to hear more about what LinkedIn is doing let us know we'll be happy to come in have a chat with you guys as well so we thought it'd you I'll hand it off to Sean from uber to talk about you know years real-time analytics are uber thanks everyone yeah so this is the second time I'm talking about you know Ashley being the family is like Syria so go out the pack talk about people so it's a really great opportunity here to present our work of doing new real-time energy at over by using phenol so click introduction of myself so I'm suffering seven software engineer humor streaming and on each team so our team is from many focusing on the screen for setting framework using stem cells link and of course we are also working on the OLAP engine which is you know so before I'm going to the main topic I want to first share some numbers also like how were screaming team is doing today so now we're messaging substructure is build around the pocke Casca and we are like one of the biggest users purring the world and in any given day we are receiving like trillions of messages which corresponding to a couple of a petabytes data and also this data are like the Rhoda are not about like back top or aggregation the aggregated data so now our new roots are manually analytics infrastructure is not that far behind so we are processing about like a billion message of day so it's corresponding to like hundreds of terabytes our data processed every day Pino is also used here we are using to know as a generous serving layer for more than 10 years cases oh it's about hungry of the billion records received which corresponding to like her more than 10 terabytes data tier so here's our jingle today so I'm first I'll talk about some use cases of people mover and I'm going to talk about the architecture how we set up a Lupino and Link our workflow at uber and then I will give a some simple case tarde over how to use to use people and then I will show some peanut performance number the tech mark we are data tuber so in a over so in this part I will walk you guys through how nervous needs for real-time and analytics so this is the first use cases so we've taken over so this is for over our experimentation pipeline so this pattern is now powered hundreds of new experiments every week so now our team provides the regular data feedback loop for for this class so Pino is mainly powering for the internal dashboard to give the dashboard in subsetting level and also to to show like how those experiments are going for no engineer than a scientist so where is the restaurant energy this is our first user facing analytics product so by collecting all the bridge order data we could give the restaurant owners the written feedback of how their business is going so for example we can show them like how many others served every day and how much money they earned so far and the owners can also slice and dicing their menu to see like what are the most popular food is the dishes and how people satisfy our satisfaction about those foods so to support this use cases in Dino where you need to currently extremely low quarry latency so in our p9i our requirement is less than 100 milliseconds so we also have some many more use cases so for example opinion is also power over to analytics to provide the insights of how our pickups recommendation performance for example when you to burp rule you usually over will recommend you to move your location to some other places say this is the better click application and we also we will measure like how long from your request places to the pickup allocate location and the like could what's the estimation of our system and was the real detective work and we will show all those metrics has Hoover for performance and those are mobile analytics team they are using Pino to monitor the application logs debtor's so people could help the engineer to understand how many unique users sessions and the devices each of their different application and the different application version so that along with like of the many inventions they can't give type to understand how their peers announced in different and how it's going so architecture okay in this part I will briefly introduce the how keno is set up in uber and the entire our workflow so things we are doing the real-time data analytics so we have a cop car already and we have Sophie know like on the right side so how can we fill the gap basically moving the data from cockatoo you know so usually the data in Kafka is raw and not like decorated events we need to have a way to manipulate or decorate those data streams typically we need to support operations like a projection filtering a window aggregation of the joint so in order to achieve that we build on our own self-serving screaming for setting framework Athena acts based on sensor and the sink so this is an our effort to helping our users to do the stream transformation before dumping data before something that are in subpoena and apart from that we also have the offline pep line to back field data from HDFS to Pinot so we build of the pipeline to run structure up to dump data from type to you know so this is the our route and self-serving pipeline so for self-serving streaming for setting framework basically abstract the transformation layer to all see cubist using apache calcite so this is extremely convenient for our users to build the pipeline by themself and then become broader data in pino directly so this kind of voids alike analysts like perforce to say okay please go to actually still my team to say okay I am going to write our sensor cufflink job and how can i deploy low things and now results is framework will handle all the deployment all the monitoring alerting and what they need to do is just write to the seeking the transformation block and the job the streaming transformation job will just uh be deployed and help the data transformation work and this reduces the debt work to important you'll be notables from like days tune hours for now users so here is a case thirty part so I'll give a like away example of how I create a pivot table for our real-time analytics so first of all we need to like get all the requirements of how people will use those data and converted into the corresponding Tino data model the first thing here is we need to list all the columns back so that we will know this column is a single value or multi value column it's either a dimension or matrix or 10cm column then we can analyze now query pattern to see how people will query the data so that we can decide thus if we should use the special compression logic and like different indexing strategy to boost the query performance so in this case I will use our truth table as an example so assume we plan to analyze how many trips that like writer or driver will take every day so the first thing is so we have like a concolor writer ID and of course it's a single value it's a dimension column so things we want to and things we want to understand how many trips our rider is taken so that will for sure to like a filtering by the right IT and based on that one we should use like the dictionary to compress this column and if we sort of the this column uses a sorted index discount averse Oh our space and the meanwhile I'll come provide faster filtering when you are occurring and the next thing is over ID so this is also a single value at the measuring column and now we also need to do the filtering let's just compress them with dictionary and we compute invert index first to the FASTA filtering so here comes the ID this is also single value dimension column but things we don't do per creep level and analytics so we don't build any dictionary or like any index around it and now it's like to pick up points this is like a multi value because this will be a thicker a series of like coordinates here and we also talked to any filtering so we have no compression and no indexing ID then comes to like the tricks there so this is the married column so again we don't do filtering so we completed this table with all the hottest data model so this is so how you can compare your data schema and the peano data model so after you have the penal general model you'll need to ingest your data into Pinot's so you know kindly provide the two types of consumer support for the real-time industry so I was talking I thought about them for the scalability and consistency prescribe perspective so the first one is on a high level consumer so this one is simple to use and easy to understand but it's hard to scale beyond 100 50 if you have like a two nodes with the same group ID for I noticed I'd have a consumer will do the auto balancing and that's the only life window that we will receive double traffic and you lose the consistency based on that and the meanwhile that's a replication is actually cannot be chords because you have only partial data there so for the low-level consumer is managers Calcott offset by itself and the guarantee is a strong consistency if I know they've died the Arsenal that can still serve chorus so this is kind of a no-brainer for us to just pick up the low-level consumer for an hour we would have in gesture and then the second the persistency so Pino provides a science-based and time-based level developer six as a condition so we just start to say okay we practice the you know segment with persistence pivot element either indexing five hundred thousand messages or every six hours and for the flying pipeline we are using busy to schedule daily incremental data back field from hive to be no discount help us to fix any data issue from the real time side and also like to ending like identity tube or like more decorations so in this part I will share some peanut Raman spectrum numbers so we do some performance tests on the real-time ingestion size with 450 boxes so the configuration is 24 cores are 128 addicted ran with high level consumer criminal context data as far as 20,000 the messenger second is about 4 megabytes per second per box for the low level consumer you can get a a 10x better performance so you can get to the data the pig Travis can go to 40 megabytes per second per box and the meanwhile I kindly just come to the data volume from the kafka is the JSON format data it's about 500 gigabytes there and one I think that was going to pee no it's about one is of the size C around 60 gigabytes and now the G sound performance test of opinion and the treat so because I won now also like at the beginning to choose which one to use and this is the kind of something I always need to do so my sample that is about a 500 million trips with 30 more columns here and this all raw JSON data well it's about full funny-looking advisor or JSON data we index it out with both diesel and create the for P no we only select a few columns for inverting jacks for the chorus but greater will I basically building working decks for all the dimension columns so the data in the deep storage is the highly compressed so it's only about 10% of the original JSON data volume for the Penal particles inverting the exit sound amount so the real serving data volume is not smaller comparing to the grid so so which is plus plus so note that is a post system on memory mapped data on the disk so we run the food table scan to get to the in-memory data consumption but photogra and then we also did a benchmark for the queries the core pattern is simply as aggregation aggregation group i with some filtering and we also did the Union County using a blog vlog or with and without group I so for the pew aggregation to say this graph the system both systems almost a century so is so like more filtering and group IP no it's actually faster because I think it's optimized are more on the indexing technicals and also the data volume means more so that's the correlation C is better so we also tested the quarry with different concurrency level so the curry is are we going to buy and the both those sums so the CPU is bounded by 60% and the correlation see it grows linearly when you are adding more concurrency so this is the my last slide and this is one bank mark I did but also accounting how many trips so like our writer is taken with in past seven days it's used to form our a real time for the features and by putting the right ID into the sorting that one single no the counselor like 800 QPS with p99 at 28 minutes 7 so this list actually design is actually provides us a good guarantee for the SI to those kind of the side facing applications okay thank you and in capturing creative questions if you know I just think my color oh you know why with query times between treating female or different okay so one thing here is so for this curry a performance comparison if you go back to the previous slides so because to comparing this Cory's so I filled in working next on the amount of opinion so one thing here is the the data volume is smaller for you know comparing to treat and also I think other inverting decks and the sort heading next does perform better I have constraint like a when you general hurry you pair the inverted index to based on the query whatever there's some new query than to carry some column then there is not inverted interested we are the performance will be slow yeah right right right so basically if you don't superior invert index then this courier will become a full table scan which means your quarry performance won't be done good but I think we are using a Pinot mostly for like a user facing exporting all the user fishing user facing applications which means the core cotton is mostly fixed so that you can always know that after Newkirk autonomy yeah like a venue meanwhile your computer's in working ducks on demand argument how long they are eidetic to pair the antimatter index and what happened a fool you inverted all the column just like reality can you like a Santa we are better you compare the performance a papaya boy yeah yeah again a point so for the pinion size of the cause of the layout of the data so even if you kill the other inverting that for all the columns the performance won't be changed so the car performance won't be changed uh so that's the first and the second I think you asked the time so if you want aunt amount building working that basically you ads are going to the configuration and you can for example reboot the server and when the server is coming is loading all the segment or you can just online reload a segment it will build the inverted though inverting tags on the flight so usually the time is probably let's see pretend seven before like 1 million records okay so how does user interact with the pinyin in your environment like that daylight that's equal which you mention the Apache calcite and then you generate a query in spino okay so the perfect contact parser is different so that one is so this one is the code s in acts so we used the practical side to convert to the sicko into the streaming application so the thing about you are writing a sense of link streaming application so you need to write a travel coda to for this kind of like a screaming going for example and we use the tactical fighter to convert the mystical into the streaming drawing application and the deployed in young here so for the penal side the peano will index the data produced by this training application so basically you are you have a good model the table and piece under the trying to were just so Cory Pino for like the think likely I will give you quarries or the like construction of the field fraction okay and so for calcite do convert convert it into swing or Sansa for the education correct correct okay so basically you can think of we write some template a center job of link drop and you can use this calcite we just like useless equals input and we use calcite to convert it so what you convert it to send the sequel or some soon those links occur so for depends are we actually using the calcite wrong this sequel on the fly so basically we have like buffer to we know the buffers buffer some message and they're wrong stick on top of it so how is the experience period cancer is it generally really good stable and it's good hi you mentioned that you have a separate usable flow which goes and gets all the offline data and puts it into Pino right so I wanted to know like the gap how would you figure out what all gaps are or does it like the entire segment is being replaced okay so for the pinocytosis the it's a lumped architecture so we're mainly use the group in Overton Park so for the offline part is mainly for like the deli data the daily data back to you and the Pino itself or will handle the lambda so basically it will query the offline offline data for like all the data from like a 90 days ago into yesterday and from the written part you will get the data from your engine now and apena will do the merge of the core results so that we will have the daily back field job as kind of the normal flow I wonder if you support user-defined functions in the self-service tool and same again do you support user-defined functions UDF in these sequel queries yes yes so basically for here if you think about this is the sense I probably so the UDF we need to write it in Java and ingest it and for likers a more common UDF are basically you can registered in in calcite and just use them graphically it's counsel you use counselor to convert the units to blink in say observe you know you know i think that one is actually some Java code executed on the data to go so can you elaborate what kind of features are empowered by peanuts for real tonics plantation in your first ride or second side at least give me here yes I mean what kind of so what are the features okay so basically for the IB testing when you have for like well when you finish the I be testing you have like all the metrics of how this experiment is doing right so you want to have the comparison between your like experimental group and the basically those metrics will show the dashboard for different experiments checking your chain using real-time metrics oh yeah okay okay take one more and then we go to the next one I'll quick question joy I think a peanut butter do not support a giant different tables yeah they are how can you handle kinds like I do you needed to handle all the doors from half car to keynote on a derby to handle joy at the promoter said okay that's a good question so if you see this graph so I explicitly in mentioned going here so if you want to a quarry so basically if your quarry has drawing usually real models at the table basically you have to model that table first basically you need to do the streaming drawing before you are interested in Kapena so that's why we have the scene acts to support like window function or trying a new drawing here and for the Penal part it's not pouring right now we have way at uber basically a goober we have presto so we are working on the project of making Tino as one data source for presto so that you can actually to going on presto take shown picked up we'll have a Shaka to talk about gobbling sorry this is not going to be as exciting because there is no sequel and there is no joints so but we're going to talk about bridging batch and streaming data with goblin quick show of hands how many of you actually know hablen cool so hopefully by the end of it everyone will have their hands up so we started working on goblin a long time ago and no system data integration is something that evolved over time the earlier part that we were just solving data ingestion but then realize over time that it's a it's a much harder problem than just copying data and so we ended up with saying you know there are the three big requirements that we have for data integration which is you know you've got data set in system a and you want to make it show up in system B in a certain way and there are different kinds of sources and sinks that you need to connect so obviously you need that kind of diversity that you have to handle we have a ton of batch and streaming data sources and sinks and we needed to handle that and last but not the least data quality was a big deal we have been running like dozens of different pipelines over the years and we realized that data quality is the one thing that everyone forgets about until the very end and so we wanted to make sure that the other quality was something that was a first-class citizen in the pipeline so that's it these were kind of the three big pillars that we based the government design on and the term itself comes from the you know gobbling in a ton of data so that's how we named it and we started with Kafka and Salesforce moving it to Hadoop because that's where kind of our team's focus was there was a ton of data that we were ingesting from Salesforce and our analysts wanted it in Hadoop so we built that and then of course we are big Kafka shop and so a lot of our data has to come to Hadoop from Kafka but then over the years we started adding more and more sources and sinks so we then realized hey we could use the same thing to copy Hadoop data to other Hadoop clusters we have ten padieu clusters or more at this point and we are constantly moving data between these different clusters so we have Hadoop as a source as well as the sink and then of course if you have Kafka the source you can also have it as a sink and then over the years every time I do this slide it gets harder and harder to put every single logo up on the screen and I think this is not even accurate anymore but we're open source at LinkedIn github still and they're a bunch of companies that have publicly acknowledged that they're using us a lot more that have secretly acknowledged that they're using us but they're too big and have other reasons for not acknowledging it publicly and we are also incubating in Apache right now at LinkedIn Goblin is doing about 80 percent of our data ingest or more into and out of group and we're running at a pretty large scale I think the thing that always amazes me is how many different source systems we integrate with more than 30 so these are not different pipelines you're actually different systems that we are integrating with and that I think is a testament to how general the system really is cool so obviously this is not a new problem so there are a bunch of open-source systems that are already in this space and you might have heard of one or more of them a few of them are Apache scooped flume Falcon knife I concur Connect and then there's lower level of you know general data processing systems like Flinx Park Sam's of apex what we've seen over the years is you know there are certain design principles in each of these that are similar to how we have decided to do things in goblin but if you combine everything together none of these is exactly the same as what we ended up with and it's not just organic evolution we actually made a lot of these choices based on what we were seeing we decided not to tie ourselves to a specific execution model whether batch or stream we also decided not to tie ourselves to a specific implementation so you can run Gaul then on a single note or you can run it on a loop or you can run it in AWS a lot of different modes we don't need Kafka to run or we don't need Hadoop to run so there are a lot of these different design points that we chose and that have ended up making it easier for us to run in different situations even in fact even at LinkedIn we have deployments where we actually run production pipelines that are in different ecosystems or different alignment scenarios so let's dive in a little bit and talk about how goblin is architected under the hood and the goal is to give you guys a quick sense of the Goblin concepts before we talk about hey how did we integrate kind of the batch world and the streaming world with goblin so this is what the logical pipeline looks like at a high level and don't worry these are all new concepts and I'll dive in to them one by one the first thing that we need to talk about is a work unit so goblin has this concept of work unit people who are familiar with partitioning we realize that this is essentially a way of saying what are the splits what are the partitions of work a work unit for goblin is a logical unit of work which involves moving some data typically from one system to another and these units of work are typically bounded for for batch ecosystems right but for streaming you can think of them as potentially unbounded and some examples of there right kafka topic login event partition 10 offsets 10 to 200 so that's a like a bounded work unit it represents a piece of work that goblin has to do similarly it could be an HDFS folder with a specific file in that folder or a hive data set with a partition and what's generating these work units a source so a source is really a provider of work units think of it as a system like Kafka or HDFS and once we generate these work units they are processed by a task and a task in goblin is a unit of execution that operates on this work unit and so the job of the task is to essentially extract records from the source and write it to the destination and a task ends when the work unit is done and working it in our batch world will be done when you know all the records for that particular work unit have been consumed now we will talk about different modes of aggregating goblin but in a you know kind of developers way of thinking about it a task is typically mapped to a thread so that thread could be running in a thread pool on on a local process or it could be running in a pull in a mapper or it could be running in a streaming context so let's look a little bit inside one of these tasks so a task consists of first an extractor and the extractors responsibility is to connect to the data source so a kafka extractor will connect to Kafka and start pulling records from it and it typically also includes you know the deserialize er so if it's an Avro data set then it will actually deserialize the record into an Avro record and then pass it down the pipeline the task is actually a single process unit and so all of these next steps that I'll talk about are all in memory so the next step is a converter and you can have many of them it is essentially a way for us to do lightweight transforms it is a 1 to n mapper of input records two output records you can think of doing things like format convergence going from Avro to JSON or JSON to Avro or doing schema projections or encrypting the entire payload or encrypting seals and chaining all of these converters together so you can apply kind of transformation that you want to apply from from a source to a destination right here and then I talked about data quality so we added a specific concept called the quality checker quality checkers run at different levels they actually run either at row level we use it for doing things like checking the value of the time field because that's like a very important field often times we have clients filling in incorrect time fields and we want to make sure that if the time field is off by a lot then we actually drop those records or quarantine them and so quality checkers that operated the row level can do things like that and then their task level quality checkers which essentially run after the task is done and they can do things like audit checks hey did we get all the records that we were expecting for this partition is the schema compatible degree find schemas that are actually not compatible with the previous version of this data set and then finally you have a writer obviously because you want to write to the destination and these writers are basically holding these connections to the destination they're serializing records on the wire typically they could be synchronous or asynchronous we have a bunch of the the FS writer is probably the most commonly used we have couch based writers Kafka writers and a bunch more in the catalog and after you've got all these writers running in the task and once the tasks are all done there's a final stage called a publish stage and this stage is typically invoked only for destinations that support kind of transactions or atomicity so think like if I'm ingesting a bunch of Kafka data into HDFS I might actually run the whole slow move the data into a staging directory first or my writers are all writing to a staging directory and once I'm all happy with everything that I got then I move this entire data set into a final output directory so this is often used for kind of getting commit semantics essentially with the destination system for JDBC for example we write to a staging table and then finally do a insert into the final table so this is what the logical pipeline looks like now you kind of get the picture right you've got a source on the left side partition partitioning up into work units those work units are really records flowing through individual tasks and then finally a publish step that writes everything but realize that this pipeline is not something that just runs once it runs again and again because the overall purpose is to synchronize a source with a destination which means you need to remember what you did so obviously you need state and the way you add state is by you know remembering what happens between runs and the way you do it is by adding a state store and we have a bunch of implementations HDFS obviously is how we started could be NFS could be s3 could be my sequel could even be zookeeper and the way a pipeline runs is in the beginning it loads up config as well as previous watermarks from previous runs runs the entire pipeline and once it's done it saves out all their state for this run and so the next time it remembers what it did and can proceed so you can get you know semantics like if a particular task failed or a particular work unit failed sure next time we'll try it again we can still move forward and we've kind of had various kinds of a distribute like commit semantics implemented using the state store alright so I thought very quickly about the logical pipeline and then the state and how it's all maintained but how does one go about writing one of these pipelines right how do you specify a pipeline and I'll just walk you through one of them and hopefully that's easy to understand a pipeline specification is a long long file that starts out looking something like this the the initial stage is typically like you know hey what's the name of the pipeline what's the description and then there's like a source class so in this case it's a it's a hello world example where we pull some data from Wikipedia so the source class is specified there it's a wikipedia source and there's some configuration attached to it like what titles is it going to go look for the source actually pulls in data like JSON I think and then so we need to convert it to something that we can then process and so the next step is a converter so we have a wikipedia converter which takes this JSON and then convert it into an Avro record and then finally we have a writer which rise to HDFS and you can see that there is some configuration there on the output format it is a partitioner to organize data on HDFS appropriately and then finally there's a publisher just as kind of the default publisher that we use for a lot of our file system based writes so this is kind of a easy way of looking at hey what does the Goblin pipeline look like it's not the most concise definition but this is kind of the physical spec if you will the thing I do really like about this pipeline specification and how goblin has gone about kind of addressing this problem is that a single specification can actually run in multiple environments so I can take that same pipeline specification and I can run it on a single box on a single instance and that instance could come from AWS or it could be my local laptop or a desktop or a server or I could take that same pipeline specification and run it in kind of a medium scale mode where I've got a bunch of machines and I run goblin in standalone cluster mode is what we call it and that itself could again be on top of either bare metal or AWS doesn't matter where you get those machines from and then finally you have kind of a large mode where you're doing like a large-scale ingest and you really want that kind of dynamic dynamism in how you scale up or scale down your ingest and for that you use something like Hadoop yarn MapReduce we support all of that as well as AWS so this is pretty cool and I don't think many systems actually have this capability all right so let's get back to what we started with which is hey how do we take these two words batch and streaming and make them actually work with a single integration environment so first let's talk a little bit about batch batch gets a lot of bad rap these days everyone's all about streaming but I think it's useful to understand what a batch execution model even looks like it's basically you know you wake up you determine what work you need to do then you acquire some containers or some slots to run then you run your processing you checkpoint your state and then you repeat right and this repeat might happen daily might happen early might happen weekly and what's good about this well it's generally cost efficient because before you start running you know what you're processing in the case of a Kafka work unit I know exactly which offsets I'm going to consume before I start in case of an HDFS data set I know exactly the amount of work I'm going to be doing which means I can do some amount of pre-planning and then I can use my computer fishing key it also means it's deterministic it means it's repeatable there are a lot of good things about the fact that this work units are well understood obviously this means there is also higher latency and sometimes if you if you get too crazy and you're doing it too often so if you're running batch workloads every minute right and some people go even lower than that then sometimes you might find that your setup you know the acquiring slots and then the checkpointing cost that you're incurring every time you shut down might actually dominate the cost of actually the overall pipeline and so if you're getting to kind of this micro batching world where and your system is not really optimized for fast writes then you might end up seeing too much cause of just starting up and then shutting down let's look at streaming so streaming is you know similar you determine your work streams it's no longer unit anymore it's more like a work stream and then you run continuously and then your check pointing periodically as you make progress what's good about it obviously low latency right you're you're processing data as it comes it could be higher cost because you don't know ahead of time how much work you have so it's harder to guess correctly so when I'm trying to spawn up a few containers do I need 100 or do I need 10 or do I need a thousand I don't know because I don't know if this topic kafka topic let's say that I'm consuming is it a high volume topic is it a low volume topic these are things that is hard to do right the other thing is these jobs are running for a long time which means I have to handle change the caca topic might add a few partitions while I was running now I have to detect that and you know start ingesting those and so more sophistication is needed which generally means you have to write more code with batch systems what ends up happening is you know you can get away with being a little bit more relaxed so which one wins uh I did my own little magic quadrant of the attribution model and different sources and sinks so take a batch to batch use case like JDBC to HDFS back and forth or a stream to batch use case where the source is Kafka and the destination is Hadoop or vice versa where the sources are dupe and the destination is Kafka or a stream to stream where your moving data between Kafka and key necessay right and this is kind of my scorecard the important ones are obviously when you come when you're doing batch to batch then a batch execution model is generally better right if someone is giving you a weekly dump or a nightly dump and you're running in a streaming mode you know looking for data it's probably not a good use of resources on the other hand if someone is streaming data to you all the time when your destination is also streaming then if you're bad then you're probably not doing very well right so those two are easy but the other two are tricky so if I'm ingesting data from Kafka to HDFS should I run every ten minutes and in batch mode is that okay or should I run continuously and keep writing to HDFS well it depends if this Kafka topic is really important and your analyst and data scientists and your production workflows need it right away then maybe it's good to run streaming but maybe there's like 90% of your data that you actually don't need all the time or in real-time and for those batch is actually more efficient so maybe you should be saving your company some money and running in batch similarly on the other side right if you're producing data in Hadoop and then you want to ship it to Kafka you have a similar challenge if your data itself is being produced kind of every 15 minutes or every hour are you going to go the extra mile to stream it out within a millisecond probably not so you have these two places where you're moving data from stream to batch where either one could actually be the right choice and that's why we said hey can we actually use goblin and run in both models because we might have cases where we need both batch ingestion as well as streaming to do that you have to go back to the blueprint right so we go back to the logical pipeline and say okay let's look at this and see how are we going to make this work for streaming because well everything I described to you was kind of more batch like right so let's start from the beginning the initial stage what does the batch pipeline do it determines work and what does the streaming pipeline do it determines work except that that work might be an unbounded work unit so not much difference here so we can probably make it work even with the existing set up let's look at the next phase in the run phase the batch pipeline acquires slots and then runs and then it gets done at some point right but as the streaming pipeline is a little bit different it's going to run continuously so that's new and the second thing is you want to actually mark progress as you're making progress so you want to check point periodically and then finally when you want to get done you don't want to just you know do a kill switch you want to shut down gracefully so these were additional requirements that we had and the way we addressed it was by adding a few additional capabilities in the task so the task now has the ability to know what data is passing through it including kind of the watermarks and there's a new concept called a watermark manager which actually is observing the task as it is flowing tuples through and so it's able to keep track of the tupple as it moves past the extract convert quality and right phase and the writer is actually responsible for acknowledging that a that at Apple has actually made it to the destination system so this works well with async writers as well as with synchronous writers and the watermark manager is periodically check pointing these watermarks to the Stape store and the state store could be again zookeeper or anything else we ended up using zookeeper because it's it's a common pattern at LinkedIn to write these kind of check points to zookeeper so this was the modification we needed and we added the ability for tasks to shut down gracefully and commit their state now let's look at the next stage publish so for batch you want to checkpoint and commit and for streaming the stage basically never exists like it never shows up and so we have a no ah publisher which we had for other reasons anyways so that just worked so it's pretty cool I mean with very minimal changes we were able to take an architecture that we had initially designed just for batch and added the ability to actually handle through streaming this is not micro batching right it's true streaming we're actually taking tuples and moving them through and checkpoints are being persisted without kind of a barrier and so to enable the streaming mode all you need to do is in your pipeline spec just set this configuration property right asked our aggregation mode equals streaming the default is batch and the same change works for all of the modes it works for standalone works for standalone cluster it works for AWS Hadoop all of the above right now our Hadoop clusters we actually kill jobs if they run for too long so we don't really run it in that mode but it would work it would actually work so let's look at our streaming pipeline spec now this one is a particular favorite of mine because it takes CAF career and writes it back to Kafka and while we do it we also sample 10% of the records so it looks pretty much the same you put a pipeline name and a description at the top then you have a source class which is the source the Kafka source you have a convertor which is basically a sampling converter it down samples 10% of the data you have a writer which is the Kafka writer and a publisher which is the no op publisher so far it looks like a standard pipeline spec is nothing special here and this is the only section that changed you added execution mode streaming and if you want to really configure exactly where the watermarks get stored you could basically choose a specific storage type a specific endpoint etc etc so if you run goblin streaming in a cluster mode this is kind of what it looks like at a high level you've got a cluster of processes that are running you've got a stream source you're pulling data from helix is being used to manage fault tolerance and failover so if a worker dies then helix will automatically assign these work units to other workers and because checkpoints are being committed periodically the new worker will pick up the new work unit and start from where the work unit left off so this is actually committed to trunk right now but it's not part of an official release yet while we commit coming out pretty soon but this is not all there's a lot of other interesting and active work happening in goblin the big one is definitely goblin as a service the current mode of goblin that we generally run in in this kind of these pipeline specs are going to check in somewhere and then our ops team deploys them but we want to move to as a service model where it is a global Orchestrator which actually has a view of every single system including multiple clusters and we can submit these high-level SLOS to a REST API and these flow specifications and compiled down to physical pipeline specs I think it's pretty cool the second thing that you're working on is global throttling this comes up a lot when you're ingesting a ton of data and making API calls and things like that people are generally nervous about a large hadoop application starting up and hitting an endpoint or an api endpoint we also have to respect API limits so let's say we have a Salesforce account they give us you know a certain number of credits and we might have five or six different pipelines that are running continuously we want to make sure that we respect kind of a global quotas the nice thing about the global throttling piece is the exactly generic and it can be used outside of gobblin as well and last but not the least we're moving to being much more metadata driven so you want to have integration with the method of service and you know you hear about warehouse coming up soon which can allow us to do policy driven replication permissions as well as encryption without having to specify all of this in the pipeline spec having it driven more by you know global policies like hey if I'm replicating data from Calcutta to make sure that all PII data gets scrubbed things like that so a road map coming up is you know we're going to do a final linked in goblin 0.10 release this is before we kind of do the code donation to Apache we are incubating in Apache and pretty much all of the paperwork has gone through severe ready to donate it and then we will do a release once you're in Apache again and there many more streaming runtime that we want to integrate with and we've got a pachi Samsa LinkedIn has its own like Brooklyn project for doing streaming ingest and then there's compliance work I don't know how many of you are familiar with gdpr it's a big regulation that we are kind of trying to stay ahead of before the deadline and it implies doing fine-grained data purge for Hadoop and a bunch of other systems and then there's also a bunch of security work we want to get better at storing credentials and respecting echoes and also having security for our nine specs that's pretty much it this is the goblin team at LinkedIn there's a much bigger community out there of contributors I think we have 53 or more at this point and yeah verson got a special slot just because you the manager doesn't mean that he is super important but and that's it questions hi in previous version Cogan used to spawn MapReduce job for streaming is it doing the same or it's not how are you handling it so the question was whether goblin is still launching MapReduce jobs for streaming so grausen actually has MapReduce was just one of the methods in which you could run goblin jobs we recently added a standalone cluster mode where you can just start up goblin workers and they will automatically discover work and start processing it right away so in the streaming mode you would generally just use that model or you would use the AWS model if you're an AWS so it's it's basically just goblin worker processes that you start up on containers or machines that you get and they automatically discover each other and form a cluster so streaming the ingestion none of these systems support exactly ones processing so does complain providing kind of these current ease a great question so right now what we've implemented is at least once because most of the source and destination system that we are working with are already kind of giving at least once semantics so we essentially ensure that there is no data loss but obviously since the checkpoints are happening asynchronously with the right you basically have the ability to like you could have duplicates the way generally we've seen people handle this is by preserving so watermark start flowing all the way to the right and so if you when you write to the destination if you preserve the watermark along with the right then let's say you restart and you replay those rights you can essentially dedupe on the writer side so that generally ends up being a practical solution to this all right I mean I'm going to be around so we can have more chats I want to make sure we get going on where else cool thank you I say setup people feel free to grab some pizza and you could set it on back that'll be great I will start the final talk in the next couple of minutes or so so if you could just sitting down again they'll be great sorry about the delay in the pizza alright let's get started the final talk of the evening the last task is about warehouse timing metadata for 150k data sets or nine data platforms by Mars lon everyone so today I'm here with the white house him to talk about warehouse and you notice how many times I talk to word warehouse and you might think which way house I'm talking about right this way house not the data warehouse we have so so this is what I'm going to go through with the talk today so I started off by talking about the linking data ecosystem some of you obviously is very familiar with it but this basically sets the tone of why we need meta data warehouse warehouse and then I'll talk about the metadata problem where we have a LinkedIn and the architecture and some details about warehouse and then finally what we're heading towards in the future so hopefully everybody know what LinkedIn this but essentially to a lot of end-users it's really the desktop in the mobile and under the under the hood if you look at the ecosystem we have basically LinkedIn desktop mobile APIs generating tons of data and are coming into our system we also have the internal side of the story internal after generating data analyzing data as well and all of these data basically go to a couple of places so it goes to your typical online storages you know things like Oracle believer now we still use that at LinkedIn is personal which is kind of our own version of no sequel datastore and my sequel is also another place that way a lot of data goes to and then we have the streaming story obviously like Shankar mentioned before you know Kafka is a huge thing and in terms of processing the Calcott event and as a lot of that data actually end up in the offline world and how to you know through Goblin and we also do your snapshot of the online data directly into a dupe as well so that's where a lot of offline analysis happens we also use Tara data for data warehouse and so that's one of the things that the data analyst use excessively in-house and eventually we also have things like you know P node for analysis and really read only key values to what I wrote more and that's of the graph database and whatnot they all kind of flow back to that is from the offline world so in a here's a list of all the data platform that we use at LinkedIn on the left and you can see it's a whole bunch of them I measure some little here already and on the right we have almost equal if not more way that you can transform these data and so this kind of lead to the problem in diversity so across platforms you know we have of enerdata if you will there are siloed in the data platform and then they are not really interoperability front format it's not something that you can just take what from my system and then accept to work on the other there is a lot of missing linkages between these platform as well these information get lost you know you just don't know how they got flow from one to another and even within the single platform itself we have problem as well because you know a lot of these things kind of encourage sprawl so you know how do you people keep producing new data and then lose track of where they are and you know so no sequel data you know schema free systems you know trying to infer the structure is also hard and you know having the list of you saw the long list of way we can process these data two different frameworks trying to computing is also very difficult so it essentially you know down to these questions you know for someone to start a LinkedIn a lot of question that they are because okay so now I'm new data scientist where do I find the profile data okay so now okay someone told me that the profile data is at XYZ so how did I get created I like to call someone again and then how does that get inferred to the skill data and who owns that data and so on so forth and this is where the genesis will weigh house came from so a lot of times the question of where is the specific data and then how did it get created hence the name warehouse and we open source this projects things late 2015 and so this is wind house from 10,000 feeds you look at it from a very high-level point of view what is warehouse the two type of data that we care specifically about one is technical data so you can imagine these are you know your schema your data catalogues you know where things that defines and so on so forth and it's also the operational metadata these data come from you know running certain processes and pipeline and whatnot and how to data a link to get a job and so on and all of these information sort of flow into this logical metadata box if you will and then you eventually flow into the centralized repository which is essentially warehouse and then on top of that we also have the community aspect of it where you know tribal knowledge can finally get a place that we can capture them in the structural form so we can share knowledge across different teams in different data and this is the current status of warehouse a LinkedIn we start running in production about two 2014 and currently we have coverage for about 150 different days hundred fifty thousand different data sets across nine different data platforms and the main user for warehouse insider LinkedIn's our engineers analysts you know we have you know weekly active is in the hundreds or many hundreds and these user they just keep coming back right so if you look at the daily active users it's kind of similar patterns as well and what do they use warehouse for most of them use it for you know search and mining exploratory needs discovering datasets and because we have lineage information as well a lot of people use way Houston technologies as well so that like engineers trying to migrate data deprecated data and so sofas they want to know who are the users of these data and we also have a bit of script searching functionality so you can find you know specific script that people use to generate certain workflow and certain data as well so here are some of the screenshot for warehouse and this is actually already outdated because we're currently working on a UI refresh and an entire UX experience revamped if you well but if you download the open source version of warehouse this is what you see today so you know on the Left you have kind of a tree structures where you can go down to different data platform and you can drill down to each data set and then look at different properties you know you can look at the schemas you can get sample data and lineage and so on and so forth and here's kind of a graph of how lineage will look like and you can probably already see that it this is another another great example because this specific data set actually has a huge number of care now so you can see you can go to multiple level and then trying to find things in this sort of very highly connected graph is actually quite difficult for even and this is the sort of new UI we are working on right now so as you can see people was kind of a to like sort of fashion so you know where you are looking for roughly before get in here wheres in the new UI we focus largely on the search aspect of it so you can imagine this as a search engine for your data set you come in here you don't know much about your data but you know certain aspect of it you type in the search engine you get gurus are coming out of it and then you drill down to each data set and find out more so here are some of the contact pull concept for warehouse 2 data set you can imagine this means a lot of different things across different data platforms you know sometimes very obvious you know if you have a relational database normally that means a table in hive it's obviously a view HDFS it's a little bit trickier because you know a logical unit of data set to means directories or convene multiple directories and so on and Kafka in Kafka world that have been correspond to a topic and process in salalah say these are leased map quite nicely was kind of the typical processing framework so these are the processes in for would actually take data and in January more data and so on lineages is something that has a broader meaning it could be generalized to kind of any relationship between datasets but in the current way house you know world if you will it's mostly to deduce it from operational data so it's mostly about how there is we entity as AI generator from what's the OP string and in downstream of a data set metrics these are you know typical with your business metrics with a lot of Athenry information on you know how the metrics are generated you know the dimension that the metrics contentions are also force and finally ownership is a big part of warehouse what we found a LinkedIn is a lot of time this is not as simple as who owns the files and HDFS or who grounded access to a specific table in URM DBS a lot of time it's a it's a much broader concept right you're talking about give up owner you're talking about producers consumers you're talking about delegates which are kind of the your DBAs and so on in a stakeholder right PMS was interested in the data and so on so you cannot just you know have a single definition of ona say hey you know this is it you kind of have to have a generic concept for illness so here's the architecture of a warehouse so on the top right hand corner that's the app which is supposed what people see is the actual warehouse app but what powers that is on the bottom right corner in that kind of deep blue box you know these are kind of the persistent layer for warehouse you know there is a warehouse might seek ODB that stores you know analytical information about entities you know data sets metrics and so on is also a metadata store which is kind of a metadata service if you will that contains all the core metadata for the data sets and and we also have index field that they are basically build off searchable information and put in graphics elasticsearch make the surface that up to the app itself and on the left hand side you have a bunch of components essentially processes that are generating information and feed them into these persistent layer and they can kind of be broken down into kind of three categories one is the cat walking aspect of it schemas and you know basic information about data set second part is lineage obviously that's a little bit more involved an ownership like I mentioned before ownership is not always a straightforward thing so you have to kind of crawl these ownership from different multiple sources in order to find sign up with who the various kind of owners the data set should be assigned to so in the catalog aspect there's a couple challenges that we have here you know we have to first of all come up come up with a standardized model for these data metadata that will work across different platform you know a lot of time different platform of different coconuts so do you go with kind of the least common denominator or do you kind of kind of come up with the model that you know capture even though it's an abstraction but it has a lot of specific specialized ation and you know even the question of what is the data set right I mean that's something that needs to be defined extraction that generally boil down to differences across different platform you need to have ways to be able to extract these data differently from different platform and in freshness and it's also a very important issue I mean we want to make sure that the data is as fresh as they could get because you know a lot of time we used to come to a metadata store and they don't see the data picking up today you know the trust in the service well we roll very quickly so to tackle these challenges we come up with the UI and based naming for data sets across all platforms and the metadata itself we cannot categorize them into the generalized metadata which we know that works across all platform and specialized metadata that hopefully address each specific need of or specific metadata that generate from each platform and to enable quick authoring or platform specific ETL jobs of called crawlers if you will we use a kind of a choice file system where we have Java and Python run within Java so the scripting part can be done in Python and done quickly and finally in order to keep our data fresh we have both the pool model which is your standard crawler model you go and extract data in batches or incremental or the push model where we where the source can push metadata changes to us directly through Kafka events or rest interface so lineage self is also a challenging issue mostly then from because we have a lot of huge diversity and the sort of processing framework that's available a LinkedIn and a lot of time ideally you want to infer these lineages directly from code or scripts unfortunately that's not always trivial you know things like UDF and external parameters can easily make these analysis difficult and a lot of time these lineages span across the front platform like what's your shankar mentioned before we copy data from platform to platform a lot of time so it's not always a sequel script that does that I mean cobbler and obviously they play a huge part of it we need to be able to have those lineages that span across different platform as well and finally Lena show you with the screen shot it's a lot of time people when they talk about lineage the first thing that come up with in line is all pretty graphs and all that but once they look at the graphic well maybe not so good I want to have an API instead forget about the visualization so there is an example right it means it looks very pretty but can anyone make any sense out of that probably not and so full lineages we break our approach into several aspects I mean as cabañas subscriber that we use quite extensively at LinkedIn and we were able to pass the execution log and reverse-engineer the lineage you know through a bunch of Hoops and eventually we will get finished from data source to data source to destination we also use our Forks a little bit inside lengthen and once again we can pass the execution law for that to generate lineages and Goblin which basically is responsible for a lot of into Hadoop and intra across Hadoop classily copying we can use the event generators the café's in January by coupling in order to completely images and finally there are just certain things that you can not PS from anywhere these are kind of tribal knowledge things that people don't man lien or that so that has to be kind of captured through these heuristics and we provide lineage api's and you know tableau representation of downstream impacts and so on and eventually yeah we also provide a pretty but unreadable image graph on we have so this is kind of the anatomy of your typical metadata ETL jobs you know you start from some sort of data platform we extract the metadata information you need and these are very platform specific unfortunately and hopefully you feel good JSON representation of that dump that to file you your transformer come in and then compare these things into something that's more data based loading friendly you know for example with my sequel CSV and then finally load that into your metadata database so I mentioned before we have currently this metadata Kafka event in works and the idea is we want to push the responsibility of publishing metadata change to the data source so we can get near real-time updates of data instead of having the pool model where you go and guest up on a regular basis and try to maintain your database up-to-date and so the idea is data platform data processor they can publish the event and then warehouse will just go off and read them put into the database so here are the list of things that we are currently actually working at LinkedIn for warehouse as I mentioned you know we are doing a UX revamp and then we want to have search being a huge part of it and search relevance obviously is very very important and that kind of Intel's you know what how do you rel how do you wait different I ranked different search results and that's not always trivial because depending on your use case that could mean different things so research is obviously something that we're looking into compliance is a big part where we're in inside LinkedIn where warehouse plays a big part of so we want to be able to gather these fine-grained metadata in order to support GDP are various GDP our requirements and and hopefully we have other processes that rely on warehouse database to perform the actual protein and the better metadata side we want to be able to have : level lineages and this is a very hard problem still but are we making progress this is there's a separate team at LinkedIn that's doing making progress of converting all these common scripting language that we use into an intermediate representation actually calcite funny eleven and in being able to analyze lineages that way but it's still kind of a work in progress sorry and also we are trying to get rid of our tight coupling with my sequel and instead we want to have an abstraction layer where we can basically write metadata so into various storage backends you know ground neo4j graph DBS and whatnot and that means you know from the open source per sector you can take this and deploy at Marshall are just skill then you know one box single box my sequel server and then finally we are actively working with Goblin on trying to modernize some of our ETL jobs and hopefully migrated over to goblin base system so you can imagine the ETL actually gets translated into goblins and your extract convert and a writer clearly well so so that will make offering of the ETL job much easier going forward and then in the list of lightly longer-term we are trying to come up with better lineage visualization we want to emphasize more on the the social aspect of it where you know we can provide richer experience where people can collaborate on the data set and for the open source community we also looking into various things that will hopefully make the open source community to be able to pick this up and then build things and then deploy easier and finally this is the team that we have a LinkedIn currently works on warehouse right so any question since I have questions about can go back to the previous slide and you mentioned that with warehouse the developer can next one yeah the developer can replace VM with stocker image can you elaborate more how buy it can do that in how does it how do you do that so this is mostly for kind of testing or you know your auto box experience with warehouse so right now we on the open suit on the github repository we have a VM image that allows people to test it out but the VM image is it hasn't been maintained it up to date and it's a lot harder to maintain a VM image then maintaining docker images so ideally we want to be able to allow developers to download warehouse and then as a darker animation and just try it out on day one along with other docker images for example you know you can try with I don't know you can try along without a how do system write docker images that you could get and then be able to link them together so that's kind of what we're working on right now and that hopefully will make it optional ah easier what kind of lead uses leading from it uses information values tool currently and where else and what is the way in which people try to consume that for example does it provide any kind of online hems when somebody is writing a query about the tables that they're using where is it coming from how reliable it is of course the users level of that table things like those sir can you please a question again what kind of uses information that you are also capturing in warehouse also user-generated content ok so right now it's kind of rudimentary I mean it's more like a commenting on the data set level on the field level schema level sort of thing yes so it's kind of freeform right now and and that's actually one of the aspect that we want to improve on so we can have say for example you know query example queries that you can even click and run something like that that will be yeah where we're heading towards does it also capture uses of tables across different queries that people might be running so that's not a high priority right now but it's certainly a good aspect that we can put into the social components if you will and other things system like relations kind of focused heavily on that but I think warehouse we still were you more focusing on the pure metadata side on the story but the social side we recognize this importance they as well yeah I think you go back to a slide you mentioned you're working on quick authoring using China ok yeah I was wondering if you could explain what that authoring experience is for what are you interesting via that like one yeah this one so so like I mentioned before warehouse is kind of a general system that's supposed to support any kind of data platform that you have out there and a LinkedIn we obviously have our internal needs a certain platform that we new support but the idea is to generalize this so that anyone who take our warehouse should be able to easily say ok I want to add informatica to warehouse or I want to add another you know kanawa system in the warehouse I don't need to know everything about warehouse I should be able to just write a simple script that scrape off the information and put it in a format that warehouse understand and then put it into the database so that that ETL job essentially is what talking about here so we want to make it easy for the user so and so that's why we chose JSON initially and but I think if we in the long term if we move to a goblin based system that would probably be even easier because that means the user can leverage a lot of available source and sink and transformation at a goblin and they don't have to even write a lot of code in a lot of cases currently something it usually does in the warehouse UI no no so this is still know so now it's not for any user per se it's for you know developer who taking our warehouse and then try to deploy in house yep so I say as I understand it for the coal model you have a bunch of query logs or something and then you're analyzing that and pushes when you have some metadata change in some system this is a regenerate event and click on the Kafka and that would be analyzed by our system so yes and ah yes so for the for the pool model yes you go on to the data so it's not necessary query logs any being this lineage then probably squared a lot that you interested in but if it's just you know a general metadata for example your schema is also hosted you go to the data source and prove those information catalog all the available data set from system and put it in a warehouse and for the push models the data platform will generate that information directly through concur messages or called the corresponding risk api's and then put that information into warehouse but the honors of computing stuff basically fall onto the data platform now right me the FF one would now basically have to tell warehouse how what the lineage is rather than saying hey go figure out yourself yeah you mentioned that the lien is for Hadoop intra job see generally from Azkaban execution blocks does it also support others workflow schedulers like kuzey or a flow yeah we get asked with your loss we have some preliminary support for Busey by stepping by Python not that perfect because we don't we don't have a we system in healthy test and new sources but but one of I think it's uh that's how we were money yes now I think know which company was this open X they actually looking into adding woody support to warehouse so yeah I was just wondering how risky is the metadata or the data that you produce is of the same order as the sources that you are consuming like the various metadata stores or is it much smaller than the amount of metadata and other source of information that you're consuming right now it's actually pretty small we host everything on the single box so to speak so but going forward if you start looking at fine grain lineages color level images if you well and then if you try to document basically every versions of a file for example every incarnation of a file then that data grow very very quickly and that is one of the driving force for us to revamp the back end so we can store it in a more scalable solution you know like no sequel database or kind of a graph database if you will for the images stuff so so yes the anticipation is that will grow a lot much much bigger and probably in the same order at the source at some point in the future but for now because we don't have that yet so it is still manageable in a single box okay thank you so much that brings us to the end of this feet up as I said before we do have this line at regular cadence moving forward so if you do have some ideas thoughts on what you want to share with us please let us know and we'll be happy to host you over here there's still some beer wine left not much yeah but there is still some so completely grab some drinks I don't think there is any more tea generation creator of that but he will answer your questions on goblin so feel free to catch up with the speakers socialize and have a good time thank you so much
Info
Channel: LinkedIn Engineering
Views: 2,409
Rating: 5 out of 5
Keywords: LinkedIn Engineering, Uber Engineering, Pinot, Big Data, Gobblin, Wherehows
Id: evmXXZ2N1bY
Channel Id: undefined
Length: 98min 18sec (5898 seconds)
Published: Thu Jun 08 2017
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.