Designing and Implementing a Real-time Data Lake with Dynamically Changing Schema

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
- So welcome everyone, my name is Mate. And in this presentation we're gonna talk about, how we can design and implement a real-time data Lake, with dynamically changing schema, or when basically we don't have information and incoming change in the schema. At a shorter, longer introduction is that I'm a Practices Lead and Principal Instructor at Databricks, and I'm joined with Shashidhar, who we're presenting together, but I will leave his introduction to him. So basically, what we have here is this project we worked on with SEGA. And SEGA is a worldwide leader in interactive entertainment, and titles like Sonic, Total War and Football Manager, actually huge is probably an understatement. And SEGA is actually currently celebrating the 60th anniversary. And it wasn't always about messages with the changing schemas. And SEGA is pretty, has a lot of things, not just video games but arcade machines and holiday resorts and films and merchandise. And the technical use case that we worked on with, it was basically a real time data lake, where the different titles from SEGA are sending messages, so think about games, and these real time messages are crucial for business users. The SEGA's six studios are sending data to the centralized platform, and new events are frequently added and not just that new events are added, existing events can have new attributes, and these actually you changes, does not have any prime information. So when they are happened, we don't have any information about that. So basically what we have is just receive a new message, and the new message has a new attribute. And, we have over 300 event types, from over 400 titles, but this is constantly growing. These events actually arrive at 8000 messages per second. So actually, it's quite a lot and there's a lot of messages. So, to sum up what we have is a real-time data lake, where we receive messages with sometimes new attributes and sometimes totally new message types that we've never seen. And we have to incorporate this and we have to integrate these changes with no downtime at all. - Thanks Mate for the great introduction, myself Shashidhar, I'm a Resident Solution Engineer at Databricks. I'll be joining Mate in this presentation. So we'll talk through the architecture first in this part. So, let's see what are the key requirements to implement this particular pipeline that they have to consider because there are a lot of information that you have to deal. (clears throat) So coming to the type of data that we're dealing, we are dealing with JSON data, and as Mate already mentioned, we have to handle schema evolution dynamically. And the main goal is we have to serve this unstructured data in a structured format for business users for their consumption. Coming to the architecture, (clears throat) if we look at the data sources that we're dealing, whichever the streaming source. If it is able to ingest JSON data, we should be able to handle it. And we follow the pattern of data architecture, which is pretty standard way to design curated data lakes. Here we are going through two levels of data processing, one we call as Bronze and Silver and as we move to the next layers normally we keep on in improving the quality of the data through this hops. First part is what we call as Bronze, here we are talking about ingestion stream. So, ingestion stream is nothing but a simple stream which takes the incoming JSON messages and it will dump the message as it is into the Delta table. So, we're not doing any transformations or high level processing. We take the stream and we dump the Jason as it is in a Delta format. Along with the dumping the data, we also do tag the schema changes because this is the first point in the pipeline where we are looking at the data, and it makes sense to detect the schema changes as early as possible, because we don't want to lose any messages due to schema inference delays. Then we have Silver layer, here we are actually going to start multiple streams for every event type. We almost have more than 300 events in the system. And as you can see, we are leveraging the (clears throat) idea of stream multiplexing with Delta. Meaning we start multiple event streams from the same Bronze Delta table that we have already populated in the previous step. So these things are very simple, we just read the message, the JSON message. And we fetch the latest event schema from the schema registry table, and we apply it and write it back to a Silver layer. So let's see how the data changes in the different layers. So this is an example for Bronze stable. As you can see, we have two columns here, the payload and name. Payload is like the one which I want to highlight, where we have the exact JSON monthly sale from the input source. And, once it goes through Silver processing, we have two tables now because there are two different event types in the Bronze, and each event type will end up in separate event type tables with the exact schema that we have in for it. With this now I give it back to Mate to go through the Schema Inference in detail. - Thanks Shashi. So, basically the next question is that then how do we detect the changes in this schema when we don't receive any upfront information that they're going to be the change? So the example that we're going to demo this is, a very simple one. Let's say we have event type 1.1, and this one has just two attributes, right? Even type and the user agent, and what the change will be was that we will add a new, we will receive at some point the message, which has a new attribute called has plugins. So, think about like a browser, has plugins, and is true if we have and force if the browser has no plugin, this is just a made up example, but it would be perfect for us to see how we can actually be diagnosed. So basically when we receive the message, what we do is we will take out all the attributes, and put it into a list and we will generate a hash out of this list. So why do we do that? We create this list recursively, because JSON might have nested structures, right? Nested data structures. And then we sort it because we don't really care if one attribute is before or after another, right? Is the same schema of the order is a little bit different. And when we calculate this hash, now these hash, which we'll call the schema of variation hash, this will basically I identify this specific set of attributes for this event type. So basically, whenever we see this schematisch and hash, this contains the even type and the user agent and the order doesn't matter. So what do we do? We put this into a schema repository. The schema repository contains a couple of things, the event type, this schema variation hash, the schema version and a prototype for that schema of variation hash. So for each variation of the schema, we will have one row in the schema repository, and we will not just save the schema, but also this prototype, basically, the first message we've seen with this specific scheme of variation hash. We also store the schema. The schema is just a spark schema and JSON serialized. So in this case, we will have just that one schema for event type 1.1 with the user agent. Then let's see what happens when we receive this new changed schema. We'll do the same thing. We will recursively in a user defined function, when we receive the before writing it to the Bronze layer, we will take a list of attributes and we will sorted recursively. We take the attributes and we calculate a new hash, but this time, this hash is not in the schema repository. So we know that we need to update the schema for 1.1. So the next question, how do we update the schema on the fly? Right? This is a streaming, structure streaming application which runs 24/7. The key is for Foreach Batch. Foreach batch is a structure streaming sync. Basically this enables us to provide a function and this function receives a DataFrame, and in each trigger interval. So basically in every batch, Micro-batch, we receive this DataFrame, and this is a Static DataFrame. So basically the body of the function acts like you would be writing an ordinary batch job, right? So how do we use this? The way we use this is we will take all the ski, all the prototypes. So not just the one that we received, so now this new unseen message, by the way, even this one can be not just one, but multiple new unseen messages. Think about like we trigger an into, so we have a batch, a Micro-batch every two minutes or every minute or every 10 seconds, it doesn't matter. During that time, we might see, not just one new schema change, but multiple. So we will take all the new messages, all the new messages that has a schema variation hash that we've never seen. And next to that, we will put all of the old known prototypes from the schema repository. So remember in the repository, we don't just save the schema of variation hash and the schema it self, but also we will save the prototypes. So what we have now is basically all of the old and the new prototypes in one list. So why did we do this? The reason is because we're going to use that to calculate a Superset schema, a schema that incorporates all the existing schema variations and the new ones. And how do we do that? So how do we do it? Sounds magic but it's actually pretty easy, we'll use Spark. So, the first part of the code is actually just import, super boring, so lets go ahead. And, what we will do after, what love do after is the create DataFrame. So we'll just create a DataFrame out of these prototypes. That's actually a technical necessity, because here is the actual important part. Merely use Spark to read JSON, the same thing you would use to actually just read a bunch of JSON files, but we will use inferSchema true, what it does like you would do with a bunch of JSON files on this. It will create a DataFrame with a schema that incorporates all of the JSON messages. In this case, the JSON messages are all the prototypes from the new and the old schema variations. And then we just return the schema. So this function basically takes all the prototypes and returns the schema that incorporates all of that. So we are ready to update the schema repository. We will add the new schema variation hash with the new prototype, but also with the new schema, but here is the catch, we increment the schema version. The reason why we do that is because from now on, because this is still in the forage batch. We still haven't written into the Bronze layer, but because now we updated the schema repository hat, the schema repository, the max, the biggest, the latest schema version, from 1.1 is not the schema repository and it incorporates all of the messages for 1.1. We can take a look at the schema. This is a JSON serialized version of the Spark schema, which we'll be able to use from JSON build it in function, which we'll use later downstream in the Silver tables. And we can see here that are in inference, in inference was correct. So we have the has plugins and we have the type Boolean. So because we don't want to cheat by just having streams for every data type, right? That would be easier, but we want to be good friends of the data scientists and they need the proper types and the proper flat data structure. So how do we use this downstream? Because now what where we are is in the Bronze layer, the payloads are as strings, but now knowing the schema repository, now we have a new schema that incorporates all the previous prototypes from all known variation from the schema. So let's take a look at the Silver tables. We will use foreach batch again here. So what we do is that in the foreach batch, we will well use these functions as a simplified version, but basically the df variable will contain the DataFrame. And now we are streaming, but not reading from a Delta table. So, when we read from that Delta table, we'll have probably new schemas, new messages with, messages with new schema. But that's not a problem because in every iteration, so basically in every trigger, every two minutes, every one minute, every 10 seconds depending on how often you are triggering the Micro-batch, we will read in the schema repository, and that's fine. That's fine because actually that's a, that's a fairly small table, so we can read it every minute. But what we get is that now we have the latest schema that incorporates all the previous ones. So we can just use that to parse the string serialized a message. So basically the message, JSON message, which we read as a string, we will use the schema to flat it out. If there is a new, a new column, we will have a new column, but Delta supports schema evolution. So with merge scheme or true, we will be able to merge this new schema into the Silver table. So what as a business user we would observe is that every Silver table has the latest the schema. So if you read it, that's the latest schema for that event type. And with that, basically the finished the schema inference, we incorporated that into the Silver tables. And this whole implementation brings a lot of productionizing and interesting questions about deployment, but for that I will pass the ball to Shashi. - [Shashidhar] Thanks Mate, for taking us through the implementation. Now we move on to the productionization, which involves both deployment and monitoring, which are normally the two major things for any pipeline which is running in production. So let us see how we deploy these event streams, by now we already know we have more than 300 plus type of events that we have to deal. And every event is actually, will be running against individual Spark Streaming job. So for running, for deploying this event steams, we have a notion of event groups. What we call as even group table is exactly where we stored this information. And each event group is actually deployed on separate job clusters. Job clusters are meant to be running production pipelines, which gives a resource isolation and other benefits of production pipelines. So, once we deploy these event streams through event group table, and they're up and running, there are mainly two aspects that we have to be worried about from the deployment perspective. So they're like, what happens to the deployment if the schema changes and what happens to the deployment if we detect a new schema, which is also a possibility in the current injection layer. So for schema changes, we don't really do much, we let the Delta handle the schema evolution because Delta supports pretty standard schema evolution feature. So we try to write with the new schema as it is to the Silver layer, but if it is not compatible, we might have a situations where we can see the stream failures. But the good thing with the job clusters is, whenever you have a set of streams running on single job cluster. And if any, one of the stream fails, it will actually cause the whole cluster to fail. So this will give us the ability to fix the schema or make it compatible, and restart this particular set of streams again on a new job cluster. So this gives us the ability to like flexibly handle the income incompatible schema changes without disturbing the other event streams that are running in other job clusters. The next topic is, how do we handle new schemers that we're getting from the input sources. For that we have a notion of management stream. So management stream is a pretty straightforward stream, which actually tracks schema registry table for a couple of things. (clears throat) So from the schema registry table perspective, we have like two main data points. It can be updation of the schema for existing event types, that can be a new role getting added into the schema registry table, when there is a new scheme a detective at the source. So if you see the scenarios, again, we have the similar scenarios with the deployment. As I said earlier, for the changes to the of the existing event types, we don't do anything. We just filter out those events from the schema registry table, before writing it to the event group table. But for the new events that we detect, we try to use some heuristics and determine which event group this particular event should go. And once we decide it, we write it to the event group table. As we saw earlier, we start the event streams from the event group table, whenever there is a new entry getting added into the event group table, automatically, we pick up the new entry and launch a new stream. So this is very key because as you see, we're not causing any downtime in the scenarios, (clears throat) where there is a new event getting added at the source. So this is also like one of the design approach that we took to award the downtime in the whole implementation of the system. So this brings us to the monitoring side of things, for monitoring we are using Structured Streaming listener API, pretty standard API is from the Structured Streaming, where you get lot of streaming metrics (clears throat) and we dump these metrics into a center dashboarding tool. It can be a Grafana or Datadog or whatever the tool prefer. So once we're dump, we have a notion of the whole system, how it is behaving now. So I just wanted to highlight some of the key metrics of which are a pretty important, one is Stream Status, as you can already see we have more than 300 streams running in the whole system, and it might be, it might be complicated when we tried to see the health of the whole system. So stream status is something which shows the liveliness of individual stream. And we also have the Streaming latency, which denotes like how far a stream is lagging behind from the input source. And if we are trying, if you see there is too much latency in the stream, we can use this metric to provision more nodes and that particular group of job cluster, to catch up the stream and catch up with the input speed. And, finally, I also want to highlight this particular property, which normally we try to ignore. You can set this property and also by setting this you will get all the streaming metrics into the default Ganglia UI for all the running clusters inside Databricks, which might be very useful if you just want to see on a running cluster without having to go to the other dashboards that we have already set up. So this brings us to a final section. (clears throat) So before I conclude, I want to just highlight a couple of things, what you can take from the whole session. If you see from the architecture point of view, we extensively leveraged the Schema Evolution of Delta. And also we use Delta as a streaming source, and we create this Multiplex of streams from the Delta and from the implementation point of view, we have this notion of Schema Variation hash, which is very key in detecting the schema changes and applying the schema dynamically. And from the deployment perspective, we are using job clusters to run the stream in production with the built-in monitoring features of what job clusters bring to the table inside Databricks. We also get the chance to fix the stream failures efficiently. I would like to read the code from Felix, who is the data head of SEGA. What it says is, this has revolutionized the flow of analytics from our games, which is the key platform, and it has also enabled the business users to analyze and react to data far more quickly than we have been able to do it previously. And yeah, we'd appreciate the feedback for the session in the summit website that it from our side. Thanks for joining us.
Info
Channel: Databricks
Views: 2,460
Rating: 5 out of 5
Keywords: Databricks, Designing, Dynamically Changing Schema, Real-time, Implementing, Data Lake
Id: No55ImP-Jic
Channel Id: undefined
Length: 24min 54sec (1494 seconds)
Published: Mon Jan 11 2021
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.