Analyzing Real-time Order Deliveries using CDC with Debezium and Pinot

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hey everyone uh thank you all for joining today um i'm karine wallick i am the community manager at star tree um we have a very special presentation for you guys today um the presentation is on analyzing real-time order deliveries using cdc with dabisium and apache pino and our two amazing presenters are gunner morling and kenny bastani thanks guys for joining thank you friend so i'm gonna let you guys do your own intros but just for all the attendees just a really quick lay of the land thing um if you have any questions feel free to ask them in the q a um we will be doing a q a after both presentations um like both after the full presentation we'll be doing the q a live if you are watching this on demand afterwards um or you have questions after the presentation is over and the q a has ended um feel free to either jump into the apache pinot slack uh kenny is pretty active there so you can find america just put the url in the chat um or you can also tweet at either gunner mourling or at kenny vastani their names are their twitter handles so it's pretty easy um i would suggest a tweet at them just because it would be fun and even if you like the presentation please give it a like a thumbs up on youtube or you could also just tell gunner and kenny directly on twitter how much you loved it or what you loved about it or if it sucked right you also didn't tell us you could critical feedback yes maybe in a nice way or not in a nice way whatever your best way of saying it's the internet so everything goes i like that kenny um all right so without further ado i'll let you guys take it take it over okay thanks green so welcome if you're joining along live with us today i really appreciate it i think we we put these out uh recorded afterwards so if you're joining live and spending your time with us today really appreciate you um so the name of the presentation is analyzing real-time order deliveries using cdc and uh that cdc is not we're not affiliated with the centers of disease control i know how that could get kind of confusing in these uh this day and age um but it is uh it means change data capture so uh gunner's gonna go more into that um it's very exciting to be able to bring this presentation to you today after uh several months of getting together yeah getting together a microservice an open source example that bridges oltp and olap um a little bit about myself i'm kenny bustani i'm an engineer and developer advocate at startree star tree is a startup that focuses on delivering a cloud platform for you to run piano in the best way possible um there is a lot of info on that we just came out of stealth a couple weeks ago and had a product launch event if you go to startree.ai you'll get everything that you need to know about our platform as well as we recently had a one day virtual conference with a bunch of videos of our founder kishore talking about uh the product as well as uh people from around the community um also weighing in so i highly recommend going to starttree.yeah dot ai um so i've been doing software stuff for about 20 years um i uh before coming to pino about a year and a half ago i was part of the spring team at pivotal which is now vmware tanzu i wrote a book called cloud native java which is all about building cloud-native applications with spring boot spring cloud and cloud foundry the other publication to my name is two pages and the 97 things every java programmer should know you should definitely buy that book it's a great book it has basically 97 of the i guess the biggest names or faces or experienced people in the java community weighing in on on some patterns i talk about microservices and trade-offs um and that's about it for me go ahead and pass it to gunner hey there so wow that's really tough going after you with all these accomplishments uh so hey people i'm gonna i work as a software engineer at red hat and i'm the lead of the dibysium project which is an open source platform for change data capture so by the way i have to mention that cdc another thing it does not stand from this context is the caribbean developers conference so i really like that at one point i would like to talk about cdc and cdc you get it um right so i've been doing a few things in working on division quakers which is a stack for building cloud native microservices and just a minute before we started kenny it wasn't nice to put up this little book there 97 things every data engineer should know because i just recently had the pleasure to contribute one chapter about ctc to that book um and if you would like to know what i'm up to new things i'm doing then i would suggest you follow me on figuring with that's kenny take it away cool thank you gunner all right so uh i'd like to start with an icebreaker here so the format of this presentation is i'm going to go ahead and go first um here for the first 30 minutes and then pass it to gunner for the next 30 minutes and then we'll take questions um but to start it out let's let's uh start it out with an icebreaker here about uh about uh databases some database related humor so the joke i'm going to tell without any of you being able to me being able to hear any of you laugh which on stage would be something that tells me that the the joke uh landed um but uh i'm gonna go ahead and try it anyways so why did the oltp database have trouble ingesting data anyone well is this what i'm supposed to fake laughing yes awesome it had acid indigestion and i like just in case just in case gunner didn't laugh i have my little laughing guy here and just in case um those following along at home did not uh get the the joke it's uh it stands for atomicity consistency isolation and durability which basically sets all of the trade-offs that we have for every single difference kind of database that we use today so acid indigestion is pretty funny okay so i want to start out now talking about uh open source database adoption in the last 10 years because things have changed so back in 2011 about a decade ago um it was around the time i started using a database a nosql database called neo4j and i went on to work for neo4j in 2013 as a developer advocate and so uh in the last 10 years i followed closely with open source database adoption in that space as well as being a big part of my career so in 2011 open source database adoption really kind of looked like this of course mongodb was you know the first of the nosql databases to come along and really take take people um down at different roads as far as moving them from relational databases to no sql so this is the the developer advocate on the street of hacker news with mongodb article and of course we had in 2011 our full stack developer who was like take my money please give me that mongodb and that was pretty much how developer advocacy worked or marketing databases no sql databases and so this started the revolution things have changed over 10 years and so in 2021 things kind of look more like this uh where we have uh that same developer from 2011 he's moved to antarctica and he's he's created a um a cloud platform startup for selling ice cubes on kubernetes and this is the real-time olab data store this could be any data store today there is there's as many uh open source databases today as there are front-end javascript frameworks i'd like to think and so what this really represents is that even though there are some amazing technologies that have matured open source technologies and specifically apache pinot which we're going to talk about in a little bit which was built at linkedin and is used by some of the biggest software companies in the world today including uber and this really represents this this fatigue from developers who really aren't uh aren't looking at everything today that's out there because there's so many things out there but apache pino is one of those things that is really worth looking into and so i had to come up with a different idea since i can't go on to the street of hacker news and and sell matcha piano to people um i had to do something a little bit different so i decided to build an example because if you build an open source example of course developers will come and there's more to that there's really i'm an engineer you know i'm not a marketer so i do developer advocacy but i also have spent a lot of time in code i spend time in code every single day um which is one of the privileges that i have in my role um getting to write the 600 pages of cloud native java you know it was very technical um and so i'm lucky enough to be able to build open source examples i call them highly avail was it highly available hello world applications yes um and so i decided to build one here for apache piano plus division and what kind of example of course order delivery and why is that well it is the era of the pandemic and when i started uh working with apache pino was right around the time the pandemic and lockdown started and i was ordering ubereats like every day i still order it maybe once a week i try to cook more but i was very inspired by being able to order food on ubereats which is a real-time uh delivery service for for food using the uber's platform because uber a lot of the stuff that ubereats um provides you with as a service is powered by apache pino so a lot of the analytics behind what's going on there in ubereats is is done using apache pino and now getting to learn more about why apache pino is used at companies like uber at linkedin stripe we pay and not widely used by by every other company in the world so far was uh was interesting to me and so as as the adoption for apache pino uh increases there's definitely this this signal there that tells you that it's it's really for the kind of real-time analytics that um that you need for use cases like uber so i built a real-time delivery tracking microservice architecture so this is an open source microservice example and it focuses on a lot of things so if i had maybe three hours i would like go through all of it but i only have like 20 more minutes here to talk about it so we're going to focus really on what we can but i highly recommend heading over to the open source github repository which we will link in the chat as soon as i pass it over back to gunner i will link this in the chat but you can get all the details here on github and it walks through what what we're talking about here with event sourcing event stream processing change data capture stuff like that um it also walks through the uh initial requirements which i'm going to go over now so i wanted to build a service that tracks new order deliveries i wanted to get a load simulator that realistically uh simulates a fleet of drivers delivering restaurant orders to customers i used a list of starbucks restaurants as locations so there's well over i think 10 000 worldwide but i've limited it today to san francisco because if i were to run the simulation at a worldwide scale it would surely burn down my laptop and it generates realistic delivery scenarios and simulates supply demands using appreciated variables so this is kind of like a almost like a game engine on top of a real um back end architecture that is simulating these orders so the directions here to build and run are available i suggest running it in light mode which is what we're going to show you today and so let's get back to the slides if my screen goes dark gunner just let me know um okay cool all right so change data capture is uh what gunner is going to be talking about later uh why we're talking about change data capture is there's a really interesting pattern with microservices especially event driven microservices being able to use change data capture as a log of what's really changed inside your domain so if i have a driver domain aggregate or an order domain advocate all i have to do for my microservice instead of having a kafka producer is just update the aggregate itself which generates an event in a event table so we have a event table per aggregate and those events then are picked up by debesium's connector and it is and put into a kafka topic which then i will ingest from apache pino and be able to query it from the application itself so i'm going to do a quick delivery demo from the dashboard perspective so kepler gl is a very started at uber and it's open source today and this is the demo so if you go to kepler gl they provide you a demo which allows you to load in a csv file i have one that i've already exported from pino and here you'll be able to see that data that i've exported from pino and i'm going to go ahead and animate it which shows you the drivers so these are the actual drivers and their routes over a period of something like um 30 minutes and i sped it up so that they i'd like to think that they're driving um a tesla plaid plus edition with spacex rocket boosters because it would be really boring to watch this not uh not in real time like this so here's our drivers going around from there uh going to pick up their orders um from a location and then delivering it there's 50 uh or so drivers here on something like uh 100 or something restaurant locations um i think there's 50 restaurants locations so i can go ahead and also add in the restaurants locations here oops by adding another data layer and i think that's this one and uh so those green dots are going to be the restaurant locations and if i slow this down a bit let's try 0.4 and let's go ahead and change this to be a heat map as well as this so now you can see when i zoom in here that's uh these these drivers are going to and from these different restaurant locations later on i'm going to show you from the perspective of a single driver because it's kind of tough to see but this is the basic idea of what i wanted to create and this is that kind of real-time dashboard that gives you a view into what's happening in a certain area or just an area around the world that you want to zoom into using apache pino and then being able to see the behavior now each one of these um dots that you're seeing is actually an event that was generated by dibizium exported to um to uh apache kafka and then ingested to apache pino so we're actually looking at things at the event level every this animation wasn't you know created using um anything other than the raw data that was really captured by uh change it capture so that's uh that's a very cool thing to be able to see so i'm going to jump back to slides here really cool i got to say it looks amazing yeah yeah i was impressed but when i put it in there they're just events and i didn't intend to see it like that so um what i'm going to do here now is i'm going to explore the microservice architecture to give you kind of a view into all of the containers that have application components in running and then i'm going to show you a demo of apache pino i'm going to show you how these real-time tables were created as well as the schema configuration and the table configuration and then i'm going to hand it off to gunner to do the dibyzium demo as well as he has slides as well that show you the architecture um okay so this is what the microservice architecture looks like for the most part um now there's more containers here but this is kind of a good view of what it looks like from a high level and so here we have the order delivery service so this is the rest api that has the driver api and an order api so primarily right now we have two parts of domain that's the driver and we have an order for an order uh we have each restaurant's going to have uh an order generated every 30 or 35 seconds or so and it's gonna go through a variety of state transitions up until the order being ready for pickup and when that order is ready for pickup we have drivers that are just you know driving around um the the location nearby maybe a restaurant and on their application um on their phone they might get a notification that says new order requests nearby and so i had to simulate this now to be able to do that i needed to be able to do a geospatial query that would look for nearby orders and that's not something that i could do easily with with an oltp database or a relational database like mysql and so what i needed was something like apache pino which um can go ahead and do geospatial indexing with an indexing technique technology from uber called h3 um so a lot of this stuff really came from uber and uber actually being one of the committers to apache pino was able to bring back in some of the technology that they use for their application as features for apache pino our open source project h3 geospatial indexing being one of those things as well as upserts and so for the order delivery service itself we have kind of this true cqrs pattern which is that we have a query side and we have a command slider right side so i'm separating my rights and reads i'm writing to my mysql order database here over jdbc and those events are being exported using kaffir connect the museum for my sequel which will go ahead and consume the log of these changes and push them into a two different apache kafka topics we have one for driver and we have one for order and those get consumed by apache pino which does some transformation um does some aggregation it does some indexing as they're coming in in real time making those records then those events are able to be queried from the order delivery service so here's my read side um and my right side here i do have this is a cloud native application architecture so i do have netflix eureka as my discovery service and my load simulator here um which is uh sending http requests to the order delivery service and we'll look at eureka here in a little bit uh to see more about that um i also have an api gateway slapped on top of it because this could look pretty scary from an architecture perspective for a single microservice but we have an api gateway on top of all of this which exposes the order api and the driver api from the order delivery service so everything looks quite nice now let's talk more about apache pino i'm going to show you some more stuff first before getting into patchy pinot with cloud native architecture so if i go to i think i have eureka up here somewhere so oops you still see my screen counter right yeah all good okay so here's uh spring eureka um so this is originally netflix eureka um this is the discovery service that uh netflix originally created and still uses today um and you can see that i have my order delivery service connected to it um as well as my api gateway and it connects to itself as well um for my api gateway um maybe you can make that a bit bigger oh yeah so for my api gateway we can go ahead and see the routes um so if i go to actuator gateway routes we can see all the routes through the api gateway um so true cloud native architecture i can go ahead and scale up my uh order delivery instances depending on the amount of demand demand that it's that it's getting in in terms of http requests and um so that's more of that is available in documentation on the github repository if you're interested in that so back to keynote okay so let's talk about apache pino so what is apache pino i don't have a lot of time today to do a full introduction to it the great thing is is that we have tons of intro content available um through our website and i think grim will paste in some resources there so tons of great intro content so i'm just going to quickly tell you what it is um so apache pino is a distributed real-time olap data store so it's purpose-built to provide ultra-low latency analytics even at extremely high throughput it can ingest directly from streaming data sources such as apache kafka which is what we're doing in this example as well as amazon kinesis and it makes the events available for querying instantly or near instantly it also can ingest from batch data sources um so it does bash offline data sources as well from hadoop hdfs amazon s3 azure adls and google cloud storage as well as csv files from wherever you want um so that's the that's the high level um and i won't jump too much into the architecture but it is a distributed data source so there it's distributed components here um that we're running so there's a controller there's uh there's servers as well as brokers and again this can this is all available in our documentation but i did want to oops i did want to talk about a little bit about the the architecture relies on zookeeper um we have apache kafka in there as a streaming data source as well all right so let's go ahead and look at apachepino um do a quick demo so i'm going to go ahead and pull up a new window here okay so this is the cluster manager ui so when you start apache pino up at localhost 9000 we have our cluster manager ui and this is going to give you a high level view of what's going on in your cluster now we have tenants controllers brokers and servers right so we talked about earlier distributed components our controllers our brokers which we query through and our servers which are segmenting the data that's being ingested um so our controllers are going to be our main interface to communicate with pino and then our brokerage are going to be the way that we query um and our servers are really going to be doing the heavy lifting in terms of returning back the results through query execution that's managed by the brokers now i have two tables in here i have one for drivers and one for orders and i'm just going to take a look here at the schema configuration let's go ahead and start with orders now the first thing you want to do when you create a table and apache pino is to create a schema now you all know what schemas are there's not much more in our configuration file than that but we're going to go ahead and look at the order schema so you can see the table schema columns here i basically have an account id the status that the order is in so it can transition from order created to order assigned to a restaurant all the way to order delivered now when the restaurant is ready for pickup so when the order is ready for pickup it'll go to the status will go to order ready for pickup and then the driver api takes over and the driver will then go and pick up that order and then they drive the state of that order all the way to deliver we also have an order location with our latin law launch longitude here as well as uh the restaurant uh information um on the order itself so that's just for the analytics um so this is the event that really gets exported i also have two fields here for the h3 indexing so these are used for geospatial spatial queries um and those essentially get transformed from so we have a transform function here that's going to go ahead and transform the order location as well as the restaurant location now i have a primary key column here order id since we have something called upsert enabled and so upsert's essentially going to give me the illusion that i'm updating a particular record in a table in pino but really what it does is it allows you to have two views of your individual records one is the log of all of the events that have ever been ingested and then the other one is the most current event that's been ingested so this gives you the ability to view a replicated part of your domain from mysql let's say it's the order aggregate or the order table and you can just see the current state but if you disable upsert you can see a log so i'm going to show you that in a little bit now the next thing that you need to do is to create your table configuration so in the table configuration i'm going to inherit my schema so you can see that i have my table schema assigned to it here and the schema name order and uh here's the connection that i have uh for apache kafka now this is the same apache cathode cluster that is uh being used by dibysium when it's exporting its uh it's change events to [Music] two topics so here i'm i'm specifying that i want to ingest from dabesium.org so gunner's going to look more into that in a little bit and that's pretty much it there's a lot of other configurations here this is for the h3 indexing so right we're doing geospatial indexing on all of those those coordinates those geocoordinates that are part of the uh the driver api and the order api and i specify here that i want to enable upsert as full also i have some transformation configurations here this is basically just taking a json map that is coming in through the message from the order topic and just turning them into actual columns so i can do some indexing on them and that's pretty much it um so now let's go ahead and take a look at uh some queries that we can run on this so if i go to my query console here we can see um some information get returned back from orders but i have some pre-selected queries here that i wanted to show you and we'll start out with the driver so as these events from a driver are getting exported um from uh dybysium to apache kafka and then ingested into apache pino i can go ahead and run a query here to get information related to an individual driver um so we looked at the driver events earlier in kepler um the visualization tool so here i want to go ahead and just see for an individual driver so i'm going to go ahead and get driver with id number 76 and his geospatial updates as well as as the the last modified time and uh we get back uh there's 1387 events so we have 1387 frames of information from this driver that we can go ahead and visualize and i'll go ahead and export this to csv and i can go ahead and open up a new kepler gl demo and i can load that data in here and this is going to give me all of the all of the geospatial updates for this particular driver now i can go ahead and animate this by selecting a start time here on a filter and go ahead and hit play and we can see this driver as he's going about his day delivering orders i can also add in another dimension here which will show the restaurants locations and so you can see him popping around it from restaurants so the nearby he's looking for a nearby order requests that are available so this is why he's kind of traveling um in in these uh in this vicinity and not kind of going outside the boundary sometimes he does go out there because i think all of the orders are are there's no orders in this particular location available um and so that's pretty much it for the visualization so since i'm about out of time here let's see lessons learned right so um as i built this microservice architecture i really started with those initial requirements and i started out using apache kaska as a producer from my application and as a part of that i ran into some issues with implementing upserts now upstart earlier we talked about was the ability to kind of see a log of events versus a um current state of a record so if i disable upsert here i'm only going to get back one result so you can see here the one result this is the most recent event that was ingested for this driver and if i disable it i can see a log of all of them so this is really useful to be able to see that history events especially for event sourcing so um some of the lessons learned was that after talking to gunner uh i can completely replace my kafka producer inside my application which eliminated a lot of overhead a lot of a lot of code when you have a large microservice architecture with a lot of different teams and you have a lot of messaging going back and forth between these applications which is pretty common the kafka producer can be it can be a limitation in the long term for developers having to first learn to be experts at kafka which is not a bad thing to to be but unfortunately sometimes you hire developers that don't have that expertise and them having to implement a capital producer and a microservice they might implement it a different way than other microservice developers do on other teams and so there's a lot to be worried about there um so just replacing that entirely with the bzmcdc was a wonderful thing to be able to do this allowed me to really get to see the event table outbox pattern um working for the first time which was an amazing insight thanks to gunners very quick implementation it didn't take long either it took me it took me more time to remove the kafka producer than it took to implement the cec you don't know how long it took me to build it on myself yeah yeah that's right um so this gave me some some insights as well as to the nature of how we should develop our micro-service architectures i don't think there's really a need to have a microsurface ever be a kafka producer um i could be wrong there but i thought about this quite a bit um so microservices really just become consumers right and that makes sense from the perspective of the rest api right because rest api what do we do we provide a rest api for other applications to consume from um and so we really are just a consumer on the microservice side which is really interesting insight so it doesn't have to be a catholic producer and also the ability to have this real-time oltp to olap replication and then being able to make reads back from apache pino that was uh that was very natural for me to have that ability to do these geospatial analytical queries directly on pino um and not have to worry about doing anything with my sql database itself for geospatial geospatial so that was great and so with that i'm going to hand it over to gunner um and he's already introduced himself but i put this at the end for the transition uh take it away gunner all right so people should see my stream let me go to the right one all right so let me arrange some things here just to bring my second screen in order all right kenny thank you so much this was an amazing presentation i mean it was the first time i've fully seen it right and i'm really impressed what you put together so that's really cool thank you thank you um and i mean i like this point you made right so kafka of course still is part of the overall solution right and we benefit from its scalability its reliability and so on but then the developers of the microservices of those producing microservices then you don't have to think about this uh producer complexity right so i feel that's a that's a good trade-off um and now well uh so we have seen all these amazing things we can do with pinot and kepler and of course we should uh also now answer this question how do we actually get our data from the operational database over into pinot and uh which so we can do all those nice things over there right and this is where uh division comes into the picture and i would like to explain a little bit to you what it does how it works and then i also will you know show it a little bit in the demo uh how you would uh go about implementing that all right so let's get started um well what is it in a nutshell it is a very basic principle you could say it's like the observer pattern for your database so whenever there's a change in your database something gets created something gets updated or something gets deleted dybasium will react to this change and it will capture it from the database and it will send it to consumers um now you might wonder how does it compare to other approaches maybe you have been using batch driven approaches in the past or some polling based approach in the past where essentially you have some process which goes to your database every now and then maybe every minute every hour every day gets the data from database and sends it to some downstream system like pinot but really well there you have lots of latency right and you don't you can't really increase this interval um so often you cannot pull on a database every second with uh try film debate probably crazy um and this is where this lock based change date capture approach comes into picture so the transaction log in the database is the canonical source of the changes so whenever a transaction gets executed in a database the database will append um entries to its transaction mark and the b goes to the transaction log and extracts the changes from there so this means we can get all the changes in the right order we also can get deletes for instance and we can capture them and can send them to downstream consumers you can use the bees in different ways most of the times people use it with patchy kafka if you have been using kafka there's a part of the kafka project which is called um connect which essentially is a runtime and the framework for building connectors which either take data into kafka so it would be source connectors like division or sync connectors which take data out of kafka and send them to some external system but then there's other ways you can use division there's a standalone mode which is called the design server you can use it as library in your java application and and things like that to tell a little bit more about it so there's this log based uh change they capture implementation of course so that's the main thing why people are using the business but there's more to mention about it so for instance if you don't have the transaction box let's say from one year ago and you set up division now you would like to start with what's called a snapshot so we take essentially the current data we scan the tables you're interested in and we send uh essentially insert events for all the existing data and then it would go over to the log reading mode so you have the snapshotting option you have things like transaction markers so people often are interested in transaction boundaries so you would like to know what changes in my database are originating from one particular transaction and they're supporting the museum which allows you to essentially aggregate those events originating from one transaction and for instance um emit them as a single event if you wanted to it's all open source it's all actually licensed um so you can use it any way you want and there's a large community of users around it so i believe by now it's more than 250 people who have contributed but also there's a large community of users so we are aware of or i'm aware of large production environments where people use the visa to capture changes out of thousands of databases with you know tens of thousands of tables it's used companies like shopify repay vimeo and lots of others so really um probably um you are using dbz without even knowing it because where lots of big companies are using it behind the scenes in terms of the connectors so that's a little bit the challenge of this log based approach so there isn't one generic single api which we could implement and then have log based change the capture for all databases instead we need to have a bespoke a dedicated connector for each particular database we want to support so we have currently support for mysql postgres all the ones you can see here under this table there's the connect for db for instance oracle and we've also got a few ones which are currently in incubating state which are for free tests and cassandra has two nosql databases and very exciting for me those two connectors are led by members from the community so it's not even reddit engineers working on those connectors mainly but it's driven by uh people from bolt in this case or um we pay who are working on those connectors so it's really it's a very diverse community which has formed around it and now why i find this exciting is well it enables lots of interesting use cases right so a while ago i did this tweet um i hope you don't find it too cheesy if i put my own tweet here um but really i think it's uh and it's a it's a it's a large enabler right so it enables use cases like the ones we've uh seen today or with the ones we're talking about today and like replication in the widest sensor taking data from a relational database or to pino you could consider it it's a use case for replication but also it's a use case where we actually enable this sort of cqs architecture or where we enable main to keep de-normalized views of your data so you can access your data in a very efficient way but then there's more you can use it to stream changes to your search index or to update your cache you might for instance think about having caches close to a user with de-normalized use of your data you can use it for syncing data between microservices and there's tons of use cases more and really i would say it's a large enabler so it's liberation for your data right so you don't you have it there in your database you don't have to go there and call for it but instead your data comes to you if it changes isn't that nice i really like it now i would like to talk about one particular pattern uh which we often see being used together with and that's called the outbox pattern and well why are people using this the question there is well people have different microservices or in this case maybe they would like to take to take data from one microservice as our auditors over to pino and now this data exchange um this should of course happen in a safe and consistent way right and typically what it means is we need to do two things so let's say this all the servers it receives a new purchase order well it needs to update its own database but then it also would like to notify this other microservices would like to notify pino about this new purchase order and those two things updating the database and sending out this message this needs to happen in a consistent way so if either both things should happen or none of them should happen right what we don't want to have is uh updating our database but then forgetting or failing to send this message to other consumers so that's what we want to resolve and this is where this outbox pattern comes into the picture now of course if you think about change the capture well you could use cdc for implementing that right so you could just go to your database and capture the changes from the table let's say with the purchase orders but now if you think about it well you would kind of expose your internal model so if you do a change the schema of this auras table for instance this would impact your downstream consumers and now sometimes people find this concerning other people are fine with that but it's something to think about and the outbox pattern helps us to resolve it so how does it work the idea there is well you have your transactions so your database um your service updates database and the idea is well if you can't do two things at once updating a database and sending a message via kafka for instance to a consumer you always can do a thing a single thing consistently right and this is updating our database so we can update our tables let's say our orders table our drivers table and then within the same transaction so this happens consistently and atomically coming back to kenny's asset job uh we also do an insert into another table which is called the outbox table in the outbox table well it's already in the name really it's an outbox from messages which we would like to send to other consumers and now what we do is um we use division to capture the changes and really it's only ever inserts so you would never modify events so we captured the inserts from this outbox table and we sent those events over to our external consumers speed other micro services be the search index pino or whatever now this outbox table how would it be structured the idea there is to base it a little bit on the ideas of domain driven design and this is how this table schema looks like here so there you have things like an aggregate id so the in case of our orders well the aggregate id that would be the id of our purchase orders and the idea is to use this aggregate id as the message key in kafka if you have been using kafka before you will know that the ordering of messages per is based on the message key now what we would like to have is all the messages which pertain to our to wind the same purchase order purchase order one two three or which pertain to the same driver instance they should be ordered correctly and this is why we use this aggregate id so it would be the order id would be the driver id and this is what we use as the message id in kafka then we have things like the aggregate type so in our outbox table we would like to distinguish between events of different types um which you for instance can use to send those events out to different topics and then consumers can just subscribe to the topics they're interested in and lastly there's this payload column and this one is anything in this case it's a json structure which is the message that you would like to send out to our consumers and this json message or this json structure this is the contract or it has the contract which we want to you know obey if we talk to our consumers so here we would be very careful about evolving this contract you wouldn't simply you know change the types of fields and so on so next step then would be to use consumers like pinot to subscribe to those topics and they would take the data and just ingest them into their own local data store and now in our particular use case here well we would have this loop back to the other servers and this is where this notion of cqrs comes into the picture so the auto service would do read requests to pino and it would use the occurring capabilities which we have in pino and which we don't have in our relational database to set us or to implement those advanced use cases which we have seen so this is how we bridge the otp system in this case micro uh mysql with the olep system pinot and kind of blur the boundaries and really when i was working with kenya on that i found this idea that you bridge those boundaries between otp and olap i find this very very fascinating so that's the outbox pattern and let me show you some of those things a little bit more in the actual code and for that i need to do so i have i got the same demo running which kenny had uh has running before and the thing is this takes lots of cpu really so there's this load simulator which produces all the orders and i didn't want to keep this running all the time because it just you know it would uh blow up my laptop so that's why i'm just starting this now and in the meantime i can talk a little bit about some other things while this is starting up so what i want to show you is the uh configuration um of the division connectors and for that let me take a look there um this connector here the using my sql connector json and if you have been using path to connect before this will look familiar with you to you so you specify your connectors in this uh json format so you say what connector in this case i would like to deploy an instance of the mysql connector what is the database um you can specify some filters like you would like just to capture a subset of tables and columns for instance and you have things like converter which tells okay the message format for this particular message or topic this should be json in this case and now usually i would use kafka connect and the rest api to deploy this but what i'm doing instead is i'm using a new tool which i've been working on just only last week and you're the first people to ever see this in live demo so i'm really excited it's called kc cuddle and crazy covers command line client for interacting with kafka connect which makes it just a bit easier for working with kafka connect deploying those things and so on and if you have been using cube cuddle before then this will be very familiar with um to use so i'm doing casey cuddle apply and why would i thought completion um oh i do so let me see and i need to specify the file i see i i got my my screen okay file i need to specify this name now which i'm of the file i would like to deploy and i also specify a name for this connector and in this case i just in my sql cc and now this registers the connector with uh kafka connect and i also can do okay uh casey cuddle get a neck curse and then i see okay this is running and i can also describe it and i will see some more information i always get this nice completion here so if you have been using the rest api you will realize this or i i suppose you would see it's very easy to use now i see okay my connector is running this is its configuration uh it's tasks are running and so on okay so now this connector is uh right and i should uh the next thing i would like to do is i would like to take a look into this database so let me use this tool my and here i would like to select the data from this others table and i hope if i execute this again and again yes the ids change so you see new orders are coming in so this load simulator is working and you see it has the same columns which you just have seen on the slides like event id aggregate id aggregate type payload and so on by the way the event id that's a unique message id which a consumer could use for de-duplicating messages uh in case it would see it duplicate so everything here in dibysium and kafka has at least once semantics so this means in case of a failure let's say the beast or kafka connect crashes it doesn't uh properly shut down it could happen it goes back a little bit further in time to read the transaction from an earlier offset and you would see some duplicates and uh a consumer could use this unique message id for deduplicating messages provided it keeps track of the messages which it has received so now let's take a look into the kafka topic so um this is the data in my sql it's changing i'm i'm getting new purchase orders and i take a look at this kafka topic there and for this i'm using another nice tool called kafka cat and i would like to take a look into this topic here so let's see what we got there and i'm piping the output to jq so it's nicely formatted and now let's talk just briefly about the decent change format so i have those uh blocks here before and after they describe the structure of my data in this case it's always insert events that's why this before part is now where if this would be an update we'll see both before and after now in this case i just have the after part and i see the um columns of my table so there are the fields here right event id and so on or the payload and i got this source structure here so this is some metadata which tells me what database is this versus coming from which table some timestamps position in the log file things like that i see this is a create event um and i would i could get some transaction metadata here so this is how it reason change when it typically looks like and now in case of pinot well we are only interested really in this particular payload here so if you remember we have this outbox tables we don't want this before state we just and we don't even want all the things from this after state in our pineal system you just would like to have the uh information from the payload here so uh and now i guess i could implement this somehow in pino to extract this but we also can do this in division and kafka connect and this is where the notion of message transformations comes in so i can modify messages as i send them into kafka or as i take them out of coffee and for that i'm registering another connector using my secret connector order outbox so let's take a look at that one and you see it's uh again it's an instance of the mysql connector it's going to the same database but then um it captures the same table it's almost even order event table but then i do some transformations here and essentially this is a sequence of transformations which get executed and let's just focus on this outbox one this is a message transformation in smt a single message transformation which comes comes with division and it essentially takes care of implementing those outbox semantics classes so it picks essentially this payload um of the message so i can configure what column is my payload column or the payload in this case it just will propagate that part and it will do things like making sure we use the right message key so i use the event sorry the aggregate id i use this as the event key so i have to proper kafka message key and then for instance it does propagate this event id propagates it to a header property in the kafka message so that's the outbox event router it comes with the museum and now i just have those other transformations here essentially to work around some shortcoming which i'm uh which is the following so this outbox um this payload right now it would be a stringified json structure so a single string field and uh which encodes adjacent structures so for instance the colons or the quotes i should say they would be escaped and this is not what you want instead you would like to have a json structure as the entire payload of the message and just in a nutshell those three smt's they make sure that i have this expansion of the single string into a proper json structure and i don't go into details it's not too exciting um but really this is what this is about so let's take a look at uh that i should deploy this connector now so use casey uh cutter again i say apply and now in this case i used this to be my secret order outbox connector and i should name it um or out box and i'm going to draw another one which is for the drivers uh so as i mentioned i mean usually i could have a single outbox table and it would have different messages of different kinds different aggregate types and they would be routed to different topics based on the value in this aggregate type column in this case the demo is implemented in the way that we have multiple outbox tables which works equally well so i just deploy another connector actually i could also have used the same connector or maybe not because the filter configuration was definitely good so i'm going to deploy this a separate one they call this driver outbox and now again i can take a look at my connectors it should all be running and indeed so that's that's looking good so now i can take again a look into kafka kit or into the kafka topic i should say i'm going to copy this here so here i'm taking a look into my other topic and now i just should see this um payload from my other message and started at the beginning let me from the end otherwise we just have to re-read the entire topic um let's take a look there and now as this load simulator produces more orders we see those events come in and now you see the value of the messages it's just this payload structure right and uh it would also be interesting to take a look at the key so let me do this as the last thing really so i'm just executing kafka cat again and now i just format the output of kafka cat a little bit different i also show the contents of the key with the message key and i also showed you message headers so you can see that i cannot pipe it now to jq any longer unfortunately so it won't be colored but you see the full messages let me stop it again and what we see now here is i see this key so that's the aggregate id like the order id i and i have those headers here or the single header in this case and this is the event id and now a consumer could use this to de-duplicate or to recognize events and the kafka message value well that's again this json structure as we've seen it before and this is what we can use in in pino and i could show you the same thing for the driver topic but well it's the same things it's not not too exciting so let me just show you in the data and pinot so you can believe me it actually works i need to find the right window there you go and i have this query which kenny gave me so i'm just going to execute it i hope it does the right thing let's see it runs and well it fetches the orders here right with the pickup point and the status and so on and now if you take a look at the number of stocks here this should increase as i execute this again again if you can see that so now it's running a few times see more orders are coming in and this is the nice thing now right so our data in the production database changes in mysql changes changes division picks it up uh with a very low latency we have the data here in pino and then again our auto service can come and can query pino and do queries which you couldn't do in my sequel and this is what i tried to describe as tearing down the walls tear down the swamps to present or who wasn't i don't know tearing down the ball between um all to pee and all that right i i think that's a great sign mr gorbachev tear down that oil oh yeah [Laughter] all right um and actually i believe that we've got some more time let's i want to show one last thing and this is how we can actually go about producing the events from our application this is uh kenny made that point before right this outbox pattern it really allows us to what frees us from having to think about producing kafka messages because in the end of the day it's just inserts into this outbox table in our database um and this makes it a bit easier for us and now i could just implement this and i could you know just use uh plain jdbc or something like juke or hibernate or whatever i like to use and persist those events um what i'm showing here is uh from the quakers project so that's a stack for implementing cloud native microsoft is super interesting and what it does oh one of the things it has is it has an extension for implementing the outputs pattern with uh dibysium and now the way this works is um so that's from a different example i should say i'm just showing it here to tell to make one particular point so here i use jpa for you know doing some change in my business model so now this is named your event but it you know think about like order purchase order or driver or whatever and so i just do some change to my database i persist this and at the same time i use here the cdi event api to fire such an event for instance in spring it could be spring application events um for emitting those events and now there's an implementation backing this which takes those events and which persists them in the outbox table now you might wonder why is this uh better than just persisting an event my uh myself and our application code i would say there's some reasons for doing that one of them is uh housekeeping of this outbox table so well if you only ever do inserts um at some point we would like to remove those events and actually what this focus extension does is it inserts the outbox events but then at the same time at the same transaction even it also removes those events so what me this means when you query this outbox table uh in your database you would always see it as being empty and still for the beans you may change the capture it's enough to pick up the change from the transaction box so we don't have to go to the table we will just see we will get it from transaction lock and that way this table it never grows and this is for instance one nice side effect of having this sort of abstraction here instead of doing all this by myself and i believe really this is what i wanted to show um kenny i would send it back to you uh for some wrap-up excellent thank you gunner um i i think that that's all we have for today i think we're we're right on uh 59 minutes so we have some time now to um get some questions uh if there are any questions from the audience usually corrin yeah hi dude this is amazing this is so good thank you yeah thanks it's great it was a really really good presentation like wow awesome thanks kenny i mean you know it was so many moving parts right yeah yeah it all came together in the end um well it does seem like people are grateful for you guys taking the time to put it all together and like working on building all this stuff it definitely looks like a lot of work but i have to say it was like one of the best presentations i've seen in a long time so it's it's really cool you guys are a good duo together too oh yeah thanks you should do it again for sure yeah um so uh for the people watching if anybody has any questions there's a lot of questions that were kind of floating around which we can kind of go through it but if anybody has any questions that haven't been answered please ask it again now um and just post it in the chat um there's there is like a 10 to 15 second delay sometimes so it might take us a minute to for the questions to start coming in um but yeah so i see a one from kim y maybe i can answer that so k y shane he's actually he's a member of the division community he was one of the main uh uh contributors working on the test connector so hey ky nice to have you here and he's asking can the outbox panel replace stream processing with flink k64 or stream processing joints and i would say yes it can right because uh let's say in in the relational data model we have this purchase order and typically this would be spread across multiple tables right so maybe we would have a table with the other headers and then we would have another table with the order lines or all the line items and uh the full the entire purchase order this would be like a join of those two tables right now of course you can do this join after the fact using flink using kafka streams or spark whatever but it actually gets quite tricky if you want to do this and well if you do have control over this application and you can modify it so it produces the event into this outbox table and then you don't have to do it right because you already have this pre-joined structure in the outbox table and you have for instance transactional consistency and it definitely simplifies things but then of course there's other cases where you would like to do this post uh let's say after the fact joining for instance if you cannot modify the source application maybe it's a legacy application you don't maybe you don't even have the source code i had once a war in production i didn't we didn't even have the source code any longer so maybe you have this sort of situation right and then you would use this uh you know string processing just based on the different table strings that's a good answer yeah um i guess i would add to that there are probably going to be some examples where you might want to use flink for doing doing some joints but as far as the pattern that we see here and the implementation with the example application i haven't had not found a need to use flink originally we were going to have another presenter join us for this presentation so we were going to have a third person um and it was going to it was going to show a data processing pipeline with link as well and so there are some use cases that we identified for that in terms of doing some kind of aggregate aggregation um for some analytics uh to do like sentiment analysis stuff like that or pulling data from another system right so let's say you have some sort of um master data from another system or you know iot data which you would like to feed into this then flink or kafka streams would be great right right yeah um so we do have i mean we can go through these i don't know if there's i have i don't see any new questions coming in but there was a there was a lot of activity in the chat if you want we can kind of run through it i know i know gunner was answering a bunch of questions at the beginning right yes um do you have the ability to see these chats gunner or i see it here on stream yard yeah uh not sure how it works but i do see it and i mean there's a there's a lot of it's just regular conversation it's just one big stream so kenny there's one for you can pinot ingest the schema from the everest chemo registry or does it need to be manually created it needs to be manually created yeah um and you'll want to manually create it as far as the schema itself in the schema configuration you you'll want to make some modifications to it you'll likely have uh additional generated fields that you'll want to create for your real-time table there's a lot of transformation that you can do at that level and i would take a look at the source code um or just kind of rewind this video later and look at the the schema itself and you'll see in there that uh i'm i'm loading a i'm doing a udf function essentially do indexing h3 indexing geospatial indexing on the latitude and longitude which converts it to a bytes field so there's there's a lot going on in that schema configuration itself we will we're also going to put on timestamps on the video so people will be able to like jump around if they want to like see the live demos or whatever um okay so there are some new questions that just came in um i could read about you if you guys want um yeah so uh ali says what criteria do you have um while selecting single outbox pattern versus multiple outbox tables oh that's a good question so to be honest i think i always would go for a single one really um and then you know this outbox event router it has the ability to route outbox events of different types so let's say all those drivers i don't know whatever your domain is to different topics so people consumers could subscribe to just those topics they're interested in so i think i would go for a single um table but i don't know in this case kenny went for what once it's equally fine i think they really have the choice yes so the reason why i decided to go with two different table uh outboxes was because i'm using at the rest api level i'm using hypermedia so when you go to a hypermedia api location such as order or a driver um you have a link to be able to see the log of events which are domain specific to each one of those objects so you'll have a domain event for driver and a domain event for um your your order itself and that was the reason why i kind of got locked into that and i was cheating a little bit actually in my part because i didn't show those domain specific columns from the outbox table i was skipping them um but yes they were indeed specific to the type of of the aggregate in this case so yeah that makes sense i'm just going to continue reading some of ali's messages because it still talks about the same thing so in keynote i have seen you assign multiple events into one single outbox table but in live demo we prefer creating new outbox table for each event can aggregating them into one outbox table create bottlenecks like pk locking etc i don't think you would have to be concerned for that so i you know unless you have this notion where you would like to add some domain specific columns i would recommend go for a single outbox table by default and take it from there okay um our vindan uh asked could you talk about altern alternate solutions to the bzm in cdc space and how the museum fares amongst people love these them that's my favorite question of course uh no aaron are we um so i'm familiar with the museum that's a project i work with of course there's other options you could explore um but really to make a meaningful comparison it would have been you to do this because only you know your particular requirements you know which databases you would like to use uh what other non-functional requirements you might have so really you should take a look at the different options which exist and you know you should make a decision based on your particular situation context i mean whatever i would say it would be biased right so um i would recommend check it out for yourself it's good it's a great answer um how uh saijin asked how reliable the aggregations of pinot in real time especially if we have negative values as well um it's getting tougher yeah so the questions are getting harder i'm not sure i i understand the question fully with the negative values but i can tell you that the aggregation in pino is very efficient very fast and uh there a lot of that has to do with indexing so being able to use the right indexes for your data is going to optimize pino's ability to select that from certain segments that are distributed across the different servers and being able to aggregate that more quickly so the key idea here is that if you're scanning um you're spending less time scanning for the data that you want to aggregate on the aggregations are much quicker but it's it's pretty heavy duty in terms of being able to aggregate in real time nice um i just posted for a final final call for questions um there was so much conversation it's like really tough to work to go through all this stuff let's see um there's one from norton does anyone have any experience with protocol buffers and outputs parents so i don't uh i haven't seen that but i have seen people who use avro with the outbox pattern so essentially they just put an avro blob into this payload column as i would expect you can do this equally with protobuf or other binary formats um so i think it's something you can do if you are in the product of ecosystem let's see um the jokes are really uh great by the way thank you [Laughter] um let's see so there's uh there was something else i saw in here that was like um okay all right so it looks like gunner i'm like i'm going from the top looks like gunner answered a lot of these questions um somebody was asking about java 16 kenny because i was tricking you into using java 16. that's right and people couldn't use it because they didn't interesting yet you can keep them on their toes there's a question uh do we have apache pino as a completely managed service on cloud providers i would check out startree.ai and see the managed service that we're we're providing with an early rollout to certain users that would be a good place to start this one from keyshore how often does the visual checkpoint where is the checkpoint saved so that's a very good question so in the terms of kafka connect it's uh what would call the source offset and this is uh stored in intervals um i believe you can configure this i'm not even sure what's default like every 36 seconds or something like that but you can configure it and then this is uh stored within within a particular kafka topic so you even could use the kafka consumer to take a look into this offsets topic and you would see the source offsets of the different connectors and of course now let's say it happens every 30 seconds and now after 25 seconds the visible kafka crashes well then it would continue from this offset from 25 seconds ago right and this is where this idea of at least once semantics comes into play because we don't commit offsets after each particular message so after an unclean shutdown you would have to be prepared to see some duplicated events uh josh ironberg had asked this was like earlier in the session said uh can pinot ingest the schema from avro schema registry or does it need to be manually created we are we already took that question okay just kidding thank you uh there's a lot of comments there's a lot of conversation here it's like kind of hard to go through and sometimes people answer them and like um yeah so i think that might be it unless anybody else has any other questions i think we need to go ahead and call it we're we're available on on twitter so well [Laughter] i have to create a load simulator for myself to be able to answer questions no but also uh seriously right so people definitely please feel free to reach out to us on twitter uh for division we also have a mailing list i believe for pino you have uh the slack channel right so there's different places where you can find us and we will be very happy to answer all the questions as good as we can this presentation was awesome thank you guys so so so much for taking the time thank you thanks for having us um and then for all those who are watching um we have a meetup tomorrow as well um so it's uh operating um apache pino at uber's scale um with one of the senior software engineers at uber so that should be a good one um so please join us for that and also like comment subscribe tweet at gunner and kenny give them good and bad feedback and if you have it you know whatever feedback you have i love getting tweeted at and thank you guys all for joining thank you guys very much thank you so much thank you take care bye bye
Info
Channel: StarTree
Views: 803
Rating: 5 out of 5
Keywords:
Id: yWFFOkWlLoY
Channel Id: undefined
Length: 73min 23sec (4403 seconds)
Published: Wed Jul 14 2021
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.