- So welcome everyone, my name is Mate. And in this presentation
we're gonna talk about, how we can design and implement
a real-time data Lake, with dynamically changing schema, or when basically we
don't have information and incoming change in the schema. At a shorter, longer introduction is that I'm a Practices Lead and Principal
Instructor at Databricks, and I'm joined with Shashidhar, who we're presenting together, but I will leave his introduction to him. So basically, what we have here is this
project we worked on with SEGA. And SEGA is a worldwide leader
in interactive entertainment, and titles like Sonic, Total
War and Football Manager, actually huge is probably
an understatement. And SEGA is actually currently celebrating the 60th anniversary. And it wasn't always about messages with the changing schemas. And SEGA is pretty, has a lot of things, not just video games but arcade machines and holiday resorts and
films and merchandise. And the technical use case
that we worked on with, it was basically a real time data lake, where the different titles
from SEGA are sending messages, so think about games, and these real time messages
are crucial for business users. The SEGA's six studios are sending data to the centralized platform, and new events are frequently added and not just that new events are added, existing events can have new attributes, and these actually you changes, does not have any prime information. So when they are happened, we don't have any information about that. So basically what we have is
just receive a new message, and the new message has a new attribute. And, we have over 300 event types, from over 400 titles, but this is constantly growing. These events actually arrive
at 8000 messages per second. So actually, it's quite a lot and there's a lot of messages. So, to sum up what we have
is a real-time data lake, where we receive messages with sometimes new attributes and sometimes totally new message types that we've never seen. And we have to incorporate this and we have to integrate these changes with no downtime at all. - Thanks Mate for the great introduction, myself Shashidhar, I'm a Resident Solution
Engineer at Databricks. I'll be joining Mate in this presentation. So we'll talk through the
architecture first in this part. So, let's see what are
the key requirements to implement this particular pipeline that they have to consider because there are a lot of information
that you have to deal. (clears throat) So coming to the type of
data that we're dealing, we are dealing with JSON data, and as Mate already mentioned, we have to handle schema
evolution dynamically. And the main goal is we have
to serve this unstructured data in a structured format for business users for their consumption. Coming to the architecture, (clears throat) if we look at the data
sources that we're dealing, whichever the streaming source. If it is able to ingest JSON data, we should be able to handle it. And we follow the pattern
of data architecture, which is pretty standard way
to design curated data lakes. Here we are going through two
levels of data processing, one we call as Bronze and Silver and as we move to the next layers normally we keep on in improving the quality of the data through this hops. First part is what we call as Bronze, here we are talking
about ingestion stream. So, ingestion stream is
nothing but a simple stream which takes the incoming JSON messages and it will dump the message
as it is into the Delta table. So, we're not doing any transformations or high level processing. We take the stream and we
dump the Jason as it is in a Delta format. Along with the dumping the data, we also do tag the schema changes because this is the first
point in the pipeline where we are looking at the data, and it makes sense to
detect the schema changes as early as possible, because we don't want
to lose any messages due to schema inference delays. Then we have Silver layer, here we are actually going
to start multiple streams for every event type. We almost have more than
300 events in the system. And as you can see, we are leveraging the (clears throat) idea of stream multiplexing with Delta. Meaning we start multiple
event streams from the same Bronze Delta table that we have already populated
in the previous step. So these things are very simple, we just read the message, the JSON message. And we fetch the latest event schema from the schema registry table, and we apply it and write
it back to a Silver layer. So let's see how the data
changes in the different layers. So this is an example for Bronze stable. As you can see, we have two columns here, the payload and name. Payload is like the one
which I want to highlight, where we have the exact JSON monthly sale from the input source. And, once it goes through
Silver processing, we have two tables now because there are two different
event types in the Bronze, and each event type will end up in separate event type tables with the exact schema
that we have in for it. With this now I give it back to Mate to go through the Schema Inference in detail. - Thanks Shashi. So, basically the next question is that then how do we detect the
changes in this schema when we don't receive
any upfront information that they're going to be the change? So the example that we're
going to demo this is, a very simple one. Let's say we have event type 1.1, and this one has just
two attributes, right? Even type and the user agent, and what the change will be was that we will add a new, we will receive at some point the message, which has a new attribute
called has plugins. So, think about like a
browser, has plugins, and is true if we have and force if the browser has no plugin, this is just a made up example, but it would be perfect for us to see how we can actually be diagnosed. So basically when we receive the message, what we do is we will take
out all the attributes, and put it into a list and we will generate a
hash out of this list. So why do we do that? We create this list recursively, because JSON might have
nested structures, right? Nested data structures. And then we sort it because
we don't really care if one attribute is before
or after another, right? Is the same schema of the order
is a little bit different. And when we calculate this hash, now these hash, which we'll call the
schema of variation hash, this will basically I identify this specific set of
attributes for this event type. So basically, whenever we see
this schematisch and hash, this contains the even type and the user agent and
the order doesn't matter. So what do we do? We put this into a schema repository. The schema repository
contains a couple of things, the event type, this
schema variation hash, the schema version and a prototype for that
schema of variation hash. So for each variation of the schema, we will have one row in
the schema repository, and we will not just save the schema, but also this prototype, basically, the first message we've seen with this specific
scheme of variation hash. We also store the schema. The schema is just a spark
schema and JSON serialized. So in this case, we will have just that one
schema for event type 1.1 with the user agent. Then let's see what
happens when we receive this new changed schema. We'll do the same thing. We will recursively in
a user defined function, when we receive the before
writing it to the Bronze layer, we will take a list of attributes and we will sorted recursively. We take the attributes and
we calculate a new hash, but this time, this hash is not in the schema repository. So we know that we need to
update the schema for 1.1. So the next question, how do we update the
schema on the fly? Right? This is a streaming, structure streaming
application which runs 24/7. The key is for Foreach Batch. Foreach batch is a
structure streaming sync. Basically this enables
us to provide a function and this function receives a DataFrame, and in each trigger interval. So basically in every batch, Micro-batch, we receive this DataFrame, and this is a Static DataFrame. So basically the body of
the function acts like you would be writing an
ordinary batch job, right? So how do we use this? The way we use this is
we will take all the ski, all the prototypes. So not just the one that we received, so now this new unseen message, by the way, even this
one can be not just one, but multiple new unseen messages. Think about like we trigger an into, so we have a batch, a Micro-batch every two minutes or every minute or every 10 seconds, it doesn't matter. During that time, we might see, not just one new schema change, but multiple. So we will take all the new messages, all the new messages that has a schema variation
hash that we've never seen. And next to that, we will put all of the
old known prototypes from the schema repository. So remember in the repository, we don't just save the
schema of variation hash and the schema it self, but also we will save the prototypes. So what we have now is basically all of the old and
the new prototypes in one list. So why did we do this? The reason is because
we're going to use that to calculate a Superset schema, a schema that incorporates all the existing schema
variations and the new ones. And how do we do that? So how do we do it? Sounds magic but it's
actually pretty easy, we'll use Spark. So, the first part of the
code is actually just import, super boring, so lets go ahead. And, what we will do after, what love do after is
the create DataFrame. So we'll just create a DataFrame
out of these prototypes. That's actually a technical necessity, because here is the actual important part. Merely use Spark to read JSON, the same thing you would use to actually just read
a bunch of JSON files, but we will use inferSchema true, what it does like you would do with a bunch of JSON files on this. It will create a DataFrame
with a schema that incorporates all of the JSON messages. In this case, the JSON messages are all the prototypes from the new and the
old schema variations. And then we just return the schema. So this function basically
takes all the prototypes and returns the schema that
incorporates all of that. So we are ready to update
the schema repository. We will add the new schema variation hash with the new prototype, but also with the new schema, but here is the catch, we increment the schema version. The reason why we do that
is because from now on, because this is still in the forage batch. We still haven't written
into the Bronze layer, but because now we updated
the schema repository hat, the schema repository, the max, the biggest, the
latest schema version, from 1.1 is not the schema repository and it incorporates all
of the messages for 1.1. We can take a look at the schema. This is a JSON serialized
version of the Spark schema, which we'll be able to use
from JSON build it in function, which we'll use later
downstream in the Silver tables. And we can see here that are in inference, in inference was correct. So we have the has plugins
and we have the type Boolean. So because we don't want to
cheat by just having streams for every data type, right? That would be easier, but we want to be good
friends of the data scientists and they need the proper types and the proper flat data structure. So how do we use this downstream? Because now what where we
are is in the Bronze layer, the payloads are as strings, but now knowing the schema repository, now we have a new schema
that incorporates all the previous prototypes
from all known variation from the schema. So let's take a look at the Silver tables. We will use foreach batch again here. So what we do is that
in the foreach batch, we will well use these functions
as a simplified version, but basically the df variable
will contain the DataFrame. And now we are streaming, but not reading from a Delta table. So, when we read from that Delta table, we'll have probably new schemas, new messages with,
messages with new schema. But that's not a problem
because in every iteration, so basically in every trigger, every two minutes, every one minute, every 10 seconds depending on how often you are
triggering the Micro-batch, we will read in the schema repository, and that's fine. That's fine because actually that's a, that's a fairly small table, so we can read it every minute. But what we get is that now
we have the latest schema that incorporates all the previous ones. So we can just use that to parse the string serialized a message. So basically the message, JSON message, which we read as a string, we will use the schema to flat it out. If there is a new, a new column, we will have a new column, but Delta supports schema evolution. So with merge scheme or true, we will be able to merge
this new schema into the Silver table. So what as a business user
we would observe is that every Silver table has
the latest the schema. So if you read it, that's the latest schema
for that event type. And with that, basically the finished
the schema inference, we incorporated that
into the Silver tables. And this whole implementation brings a lot of productionizing and interesting questions
about deployment, but for that I will
pass the ball to Shashi. - [Shashidhar] Thanks Mate, for taking us through the implementation. Now we move on to the productionization, which involves both
deployment and monitoring, which are normally the two
major things for any pipeline which is running in production. So let us see how we
deploy these event streams, by now we already know we
have more than 300 plus type of events that we have to deal. And every event is actually, will be running against
individual Spark Streaming job. So for running, for
deploying this event steams, we have a notion of event groups. What we call as even
group table is exactly where we stored this information. And each event group is actually deployed on separate job clusters. Job clusters are meant to be
running production pipelines, which gives a resource isolation and other benefits of
production pipelines. So, once we deploy these event streams through event group table, and they're up and running, there are mainly two aspects that we have to be worried about from the deployment perspective. So they're like, what happens to the deployment
if the schema changes and what happens to the deployment if we detect a new schema, which is also a possibility in
the current injection layer. So for schema changes, we don't really do much, we let the Delta handle
the schema evolution because Delta supports pretty standard schema evolution feature. So we try to write with
the new schema as it is to the Silver layer, but if it is not compatible, we might have a situations where we can see the stream failures. But the good thing with
the job clusters is, whenever you have a set of streams running on single job cluster. And if any, one of the stream fails, it will actually cause
the whole cluster to fail. So this will give us the
ability to fix the schema or make it compatible, and restart this particular
set of streams again on a new job cluster. So this gives us the ability
to like flexibly handle the income incompatible schema changes without disturbing the other event streams that are running in other job clusters. The next topic is, how do we handle new schemers
that we're getting from the input sources. For that we have a notion
of management stream. So management stream is a
pretty straightforward stream, which actually tracks
schema registry table for a couple of things. (clears throat) So from the schema
registry table perspective, we have like two main data points. It can be updation of the
schema for existing event types, that can be a new role getting added into the schema registry table, when there is a new scheme
a detective at the source. So if you see the scenarios, again, we have the similar
scenarios with the deployment. As I said earlier, for the changes to the of
the existing event types, we don't do anything. We just filter out those events from the schema registry table, before writing it to
the event group table. But for the new events that we detect, we try to use some
heuristics and determine which event group this
particular event should go. And once we decide it, we write it to the event group table. As we saw earlier, we start the event streams
from the event group table, whenever there is a new
entry getting added into the event group table, automatically, we pick up the new entry and launch a new stream. So this is very key because as you see, we're not causing any
downtime in the scenarios, (clears throat) where there is a new event
getting added at the source. So this is also like one of the design approach that we took to award the downtime in the whole
implementation of the system. So this brings us to the
monitoring side of things, for monitoring we are using Structured
Streaming listener API, pretty standard API is from
the Structured Streaming, where you get lot of streaming metrics (clears throat) and we dump these metrics into a center dashboarding tool. It can be a Grafana or Datadog or whatever the tool prefer. So once we're dump, we have a notion of the whole system, how it is behaving now. So I just wanted to highlight
some of the key metrics of which are a pretty important, one is Stream Status, as you can already see we have more than 300 streams running
in the whole system, and it might be, it might be complicated
when we tried to see the health of the whole system. So stream status is something which shows the liveliness of individual stream. And we also have the Streaming latency, which denotes like how far a
stream is lagging behind from the input source. And if we are trying, if you see there is too
much latency in the stream, we can use this metric
to provision more nodes and that particular group of job cluster, to catch up the stream and
catch up with the input speed. And, finally, I also want to highlight
this particular property, which normally we try to ignore. You can set this property and also by setting this you will get all the streaming metrics into
the default Ganglia UI for all the running
clusters inside Databricks, which might be very useful
if you just want to see on a running cluster
without having to go to the other dashboards that
we have already set up. So this brings us to a final section. (clears throat) So before I conclude, I want to just highlight
a couple of things, what you can take from the whole session. If you see from the
architecture point of view, we extensively leveraged the
Schema Evolution of Delta. And also we use Delta
as a streaming source, and we create this Multiplex
of streams from the Delta and from the implementation point of view, we have this notion of
Schema Variation hash, which is very key in
detecting the schema changes and applying the schema dynamically. And from the deployment perspective, we are using job clusters to
run the stream in production with the built-in monitoring features of what job clusters bring to
the table inside Databricks. We also get the chance to fix the stream failures efficiently. I would like to read the code from Felix, who is the data head of SEGA. What it says is, this has revolutionized
the flow of analytics from our games, which is the key platform, and it has also enabled the
business users to analyze and react to data far more
quickly than we have been able to do it previously. And yeah, we'd appreciate the
feedback for the session in the summit website that it from our side. Thanks for joining us.