PostgreSQL as a distributed computing platform

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
you you I guess we can start alright hello everyone thanks for coming to this talk so by the end of this you're probably going to think I am slightly crazy so I'm going to talk about how posters is not just a great database but potentially also a great distributed computing platform and well why would that be useful well a lot of companies when they start out they start out with this being their architecture their data architecture right you put your data and Postgres and you actually have a lot of flexibility if you need to build new features new applications you can build that by maybe making some new queries new tables new use new indexes on Postgres so this gives you kind of the flexibility that you need as a young starting company but then as you know things grow bigger this tends to happen right you deploy system on system on system to kind of deal with large flows of data and process them so you might end up with HDFS and Kafka a spark and and redshift and s3 all combined into one a slightly messy architecture and and the operational cost of such architecture is quite high I mean you have to deploy and maintain all these different systems but you also have to you might not know them very intimately all of them so tuning them becomes quite difficult so you're probably gonna bet spend more and hardware then you would like to and you're gonna spend a lot on operations and you don't really have a lot of flexibility right you can every time you once up to do a new thing you might have to actually deploy a new system so what I'm kind of crazily proposing is that your data architecture could also look something like this and there's companies out there that do this where all your systems kind of are built on top of Postgres did the sound cut hello yeah okay thanks so like if you know poster is very well then you know you can make sure you tune it well and if all your systems are kind of based on the same infrastructure it becomes much cheaper to operate em moreover because it is prosperous you actually keep a lot of the flexibility that you had when you were just a small company with a single note database and I would argue that posters is actually a perfect building block for creating distributed systems whether you're like considering writing the software for a new distributed system or even just need to solve a scale a scalability problem I think Postgres is a great way to get started because posters has really wanted a lot of the important features that you need when building a distribute system which often take like 10 years to implement like a lot of systems before they have a well-defined extensible efficient protocol that supports batching and asynchronous execution it takes many years and Postgres has it you can create new types that can get serialize and deserialize crash safety one of the most important things in distributed systems and a thing that most systems get wrong the first five to ten years so if if a node crashes and comes back you don't want to suffer from your files getting truncated and Postgres this is almost Postgres as core business to solve this problem concurrent execution like another thing that even a lot of no sequel databases still don't really get right and posters is actually arguably one of the best databases for concurrent execution transactions so last year at PG confessed fee I gave a talk about PGP EXO's and I argued actually implementing pack cells on top of Postgres is relatively easy even though Parks's is known as quite difficult to implement and the reason is that on Postgres you have transactions and the vital the the difficult bit about implementing paxos is that you kind of have to build a database you kind of have to make sure that whatever promise you make in the protocol is actually kept even if you crash and you need to do with concurrency and you need to deal with other things that post curse solves for you and so post curse has a lot of features that help you kind of set up a distributed system with also to PC and replication and all some custom functions you can just put like program custom logic into in Postgres and the other thing is extensions and some of these extensions already kind of usually ship with Postgres or commas contrib modules like posters FTW lets you query tables that are on a different server DB link just but lets you send like raw commands to other servers from a sequel query and TL PG sequel and also the other procedural language is kind of let you tie everything together and then there's third party open source extensions such as PG logical for logical replication PG cron kind of a pet project I have and incite us which actually if you start combining them all you can do very interesting things so I really sorry and of course there's your extension it's actually not so hard to write a post curse extension I'd advise just take one of the existing extensions rip out the code and use that and and so you can combine that with all the existing extensions into one system so one of my favorite tools for doing this is DB link especially for prototyping like it's so useful you can just connect to another server send a query to it to get to resolve on disconnect and it can also do the sense queries asynchronously so you can send a quick open a couple of connections send a couple of queries asynchronously get the results which means you actually can do parallel execution just using a couple of sequel queries and also perhaps more importantly you can do remote procedure calls so you can create a function on one note and then create like a client side stop using DB link on another note to call that remote function and RPC is usually kind of the building block for a distributed system PPG sequel is then super useful for tying these kind of things together so if you want to get you have a lot of databases let's say you want to get the size you can write a simple function that kind of queries all these databases in parallel and sums up all the sizes so we can like these things become quite interesting when you combine them and just with these extensions we can already put together quite an interesting distributed system where maybe I have notes in which I continuously shove some data if there's a central kind of authority which keeps track of which nodes are in the system and then maybe I use post-course after W to be able to read that notes table on all the other nodes and then maybe I can have a function that transforms the data and I could use DB link to call that function in parallel so I can kind of build out a parallel batch processing system by setting up a few posters notes and using the extensions and then I can kind of also think about trade-offs so here maybe the central coordinator node like whenever I do something I might have to when I do a post Chris FTW query on the notes table it actually needs to go to the coordinator node so if my coordinator node is not available that's not so nice but perhaps I could use an under extension for that like P geological I could continuously push replicate the changes from the coordinator to all the notes and even if the coordinator is temporarily down I still have to copy on on all the notes so I don't depend on it or extension I discussed last year could be PG Paxos I could actually replicate the notes table in a consistent way among all the eldest servers also what I find useful in kind of setting up a distributed systems architecture is a way for notes to act autonomously so I wrote this extension PG cron which is literally just cron in your database so I took the source code written by Paul vixie and copy pasted it into post Chris extension and it kind of runs as a background worker alongside Postgres and it kind of internally uses Lippe q so can actually run many jobs at the same time and this is useful for let's say if you want all your nodes to do something every 10 minutes or every minute or clean up old data or apply some transformation so it's also a tool you could use and there's situs my my day job where it kind of gives you a way to already set up a distributed system out of the box so you have inside as a coordinator and then what we call worker notes and if you you can call create distribute a table on a table incite us and that creates shards on the worker notes which are just regular posters tables and enquiries are delegated to to those charts you can actually also set up use situs in this way where all the nodes have a distributed table so then you can just connect to any node and in any kind of let's say insert you do on that distributed table will get routed to the right sharp and then actually situs also starts acting as a kind of a middleware I now have a shared data structure across all my notes which I could actually even use for kind of internode communication like OneNote could insert something into the events table and another node could select from that table in a consistent way so it's another tool I can use for building out a distributed architecture so ok so we kind of know our tools or have at least gone over them briefly so could we use that to build kind of an advanced distributed system using just just Postgres posters extensions and without kind of even external code so it was kind of thinking of doing MapReduce and Kafka I realized Kafka was going to be really difficult I'm a produced a lot simpler so I decided to do Kafka so how do you build a streaming publish/subscribe system what cuf got essentially is on top of Postgres so you have producers kind of web servers maybe that generate events like someone click this button or someone looked at this page that goes into a layer a kind of scalable layer posters notes and then on the other side there's consumers that kind of want to get a stream of events maybe that you want just a clicks maybe they want all the events and they kind of want ideally exactly once delivery or or at least once delivery but ideally exactly wants delivery so yeah how do we get started so there's actually posters offers you actually many ways you could go about doing this so I'm just gonna describe one which kind of works just to kind of you know get to within one talk an entire Kafka replacement so first thing I need is a way to store the incoming events that the producers are putting into my system and I kind of want also be able to scale this out so if I have many producers generating many events I need a lot of bandwidth so I could use itis for that and I can create distribute table across all the notes such that producers can just connect to any note and then do an insert on that table and the data will get stored into shards and what what's nice about using situs here if I want to later do something with this events table I can add columns and it can also manage rebalance operations for me so that's a good start but I mean it's not Kafka yet but so to set this up I would create an events table and I'm gonna have a serial which is to generate the event ID I'll add a timestamp it's often useful and a topic name and a just JSON generic JSON be payload and that's going to be where I put all my all my events should be in that form then I create the distributed table on the coordinator and then I can piece equal into any note and kind of insert into events table or copy into the events table if I'm a producer so I kind of do need to think a bit about some trade-offs here so if I use the event ID which is a big serial as the partition column then every event like I said I do a copy command every event might need to go to a different shard and this is very beneficial for optimizing data distribution because my data will get evenly distributed across all the shards but maybe alternatively I could decide maybe applications should decide which shard the data goes into and there's some extra token like a stream ID and that determines okay these events should go up to this particular char or I could do something a bit hack here but also could be beneficial where I make cytus pick a value for the partition column itself and it should be a value that's on the localhost and the advantage of that is that if I'm a producer and I connect to a host I know I'm gonna be able to copy all my vet events into it because I don't need to connect you on the other node to be able to write them so for the rain for the remainder I'll go with event ID for the kind of even data distribution but it's like you know there's there's some trade-offs there so now on the producer side okay I connect to a random node I can probably put a load balancer in front of my posters nodes to make make it easier and I do maybe a copy or an insert and that copy will get fanned out to the shards which are posters tables okay so that's that's the producer side I have a way for events to come into my system I can scale this out now how do I actually go about consuming those events like reading a stream of events so in Kafka you have this notion of consumer groups so and a consumer group is a group of nodes that together want to get destroyed the stream of events exactly once so you can have multiple consumer groups which all independently are consuming a stream of events but we did a consumer group I can have multiple nodes and I want to load balanced the streams of events across the nodes so and actually I'm just taking heavy inspiration from Kafka so the way it works in cough Caillou the consumer notes kind of Lee's a partition or in this case we call it a shart but actually pro uses the same model it uses multiple shards per node and so when a consumer kind of starts reading from Kafka it kind of tries to get Lisa's on a shard and that makes it's the only note who can at that time consume events from the shark so I will need to have a bit of infrastructure for for managing that in my PO square system so in Kafka the way this is done is by a zookeeper so it's kind of a highly available consensus store now as I could have set up maybe PG Paxos for this as a highly available can replicate a table but I decided it was simpler to do something kind of a bit more probabilistic so I'll I created a separate table on every note so every posters note independently keeps track of the leases of the shards it contains and this note keeps track of okay this is the consumer group this is the shard this is the current owner and it also has a new owner column and we'll use that for kind of handoffs because if we're gonna switch a stream like one one could notice consuming a stream and now a new node joins the system and it wants to consume the stream we don't really want to you know consume the events twice we certainly don't want to consume them less than once so a way to organize that is to say okay the current owner can finish its current batch but once it gets back we switch to the new owner and so I have to keep track of both of these pieces of information and I keep track of like when was the last time the kind of consumer reported in case the consumer goes down at some point I need to kind of make sure another consumer picks it up so I need a way to get a lease if I'm a consumer note I connect to the Postgres cluster I need to be able to say I want to like get alleys on a shard so for that I can define you know utilize P opt sequel create a function that claims a lease which essentially just boils down to doing an absurd into the leases table if there's no lease yet great I get it if there is a lease then I set myself to the new owner and here's another case where we can wear kind of posters actually solves a tricky concurrency problem for us because what if two leads to consumer notes are trying to grab the LEAs at the same time well I can just add a filter where Lisa's new owner is null so this will only do anything if there isn't already someone else who's trying to claim the lease and posters takes care of blocking that's required for this but actually it's a bit inconvenient if all the consumers kind of need to figure out themselves like how to claim which which charge to claim and so I also added the function for kind of automatically obtaining leases and this gets using DB link all available these tables all available leases and by available I mean if a note is down it just kind of ignores it and if there's any charge which don't currently have a consumer note in the consumer group then it claims those shards if then it still has less than number of shards divided by number of consumers it starts claiming or stealing random shards from other notes and so this is a very kind of probabilistic distributed algorithm so the ideas eventually this will converge to all discharge having or all the consumer nodes having either a number of shards divided by number of consumer leases or number of showers divided by consumers plus one because once you go over it you stop claiming new leases so one idea is you you keep calling this function and that's that's how it converges it's a little tricky but it kind of tends to work yeah the only edge case that I didn't cover is like if there's more consumers than charge this this breaks a bit but so if you're a consumer note so this could be any kind of application that wants to do something with the data could be just consumed data put it in some other data store what does it what should it do well first it needs to obtain leases so this function is on one of the posters nodes so it connects to a random posters nodes calls obtain leases and then that will use DB link to get the lease tables from all the other nodes and then it will call the claim leases function so after that operation basically this one consumer note is now has the permission to get the events from all the shards but we might actually one consumer note might not be enough if we have a large stream of events I mean one of the most important functions that Kafka really fulfills is helping you scale out other parts of your system not only does it scale out itself it also helps you scale out like your consumer layer and your producer layer so we also want to make sure if we have multiple consumer notes that the other consumer node also gets some to read from some shards so it does the same thing it calls up team leases it sees while all shards already have an owner but I'm gonna steal some random leases until I have number of shards divided by number of consumers which in this case is tree so I'm gonna steal tree leases from the other node and so then okay we've divided the leases among the consumer notes this way and actually by keeping because they keep calling this should keep calling this function like periodically if let's say one of the node goes down at some point the remaining node will actually claim all the all the leases or if we add a new note it will start stealing leases from the other house but at this point leases is still a bit of an abstract concept okay so we a shard has an owner what does that actually mean what is it what does it get to do so we have to decide on a way for like how do sure how do the consumer knows what is just some random client how do they actually get the information from from the shards like we kind of want to see all events that the producers are putting into the system and ideally we want to see them exactly once so one thing I could do is I have I've kind of serial numbers maybe I could keep track of up to which serial number I've consumed the stream of events but that has some issues that you run into some concurrency issues where you're not actually sure if you see serial number ten that there and there might not be a serial number nine and that could mean that the transaction that generated serial number nine aborted or it could mean it's still in progress and it's a bit hard to know which it is so without adding more locking so I'm doing it that the sequel layer is actually a bit tricky but again actually Postgres provides exactly the feature we need namely logical decoding with logical decoding I can just see a stream of all the incoming events and kind of gradually move forward in that stream and actually had a few different option of using it there's some utility functions sequel functions inside Postgres that kind of read from the logical decoding stream I could use a replication connection probably the best way to do it but it takes a bit more effort so I'll actually do the second one once I have post Chris 10 I can also could use logical replication limitation dairies like the the consumer camp also needs to be Postgres so if I want I don't know some random ruby applications to read for my my system I have to use one of the other ones so for for simplicity I'll go with the logical decoding utility functions namely I believe it's called P geological get changes PG or PG illogical slot get changes and P changes they're not super efficient but for prototyping they're actually great so then I can define a function and you know you can define this whole function in kill pitch is equal if you want which first basically this is the function for getting events from the streaming of events first it checks if the note still has a lease so it hasn't like its lease hasn't been stolen if it has been stolen it just you know doesn't return any events and it sets the sets the owner to complete the handoff it gets all the pending events using P geological slot peak changes and then it moves forward in the replication stream so uses replication slots for this and they help you kind of make sure that you can kind of see all the recent changes to the database and if for some reasons you weren't able to process those changes it allows you to kind of keep them until you've actually until you actually do so this ghoster two-step get the pending events move to changes that we've processed forward and then return remaining events and the way we keep track of progress is the pull events function actually as a parameter takes the event ID of the last event in the previous batch so that's my way of telling the system like hey I'm done processing this batch I've seen this event so give me the next batch don't give me the same batch again so this is something I can also implement it in sequel and the consumer can then and this is if you go to like say Kafka docks that you'll see pretty much the same description you need to always implement a loop which first calls pull pull events and actually in the client library it calls it kind of pulls all the shards so in this case if we're building a client for this system the client libraries should make connections for all the shards on which the consumer note has a lease and then called Paul events and then as it processes events it keeps updating the parameter to move forward and yeah it just keeps doing this forever so failure handling so if you fail to connect to one of the nodes if you're a producer for example you could just connect to a different note you could have a load balancer in front of it to manage all that like storage note failing is it's a bit tricky that's like one of the areas where post Chris likes a bit as a distributed system like the auto failover isn't built into into the database but I mean you can't afford to lose event so you need ultimately some sort of failover mechanism for your storage nodes but at least for purpose of right availability if you use the you know always copy into the local node trick then you can always find some note that's available for your rights at least if a consumer kind of connects to a shard or connects to a posters note tries to pull the shard but then fails that's not a big deal it can just connect again and get the same batch because the set of events that it gets will not change until it's actually confirmed the last batch my consumer fails it doesn't and does not come back all the ideas that the consumers periodically tried to keep calling this obtain Lisa's function and then ultimately and then also we need a way for expiring the old Lisa so if a consumer hasn't reported to the system for a couple of minutes we need to throw it out and that's something we can do with PG cron we can create a function that you know flushes out all the consumers that are no longer responding and you know that run just run that every minute and we could also you know do other things using digi cron for example like deleting old events after a while like after a day so ok this was a very this was a complex system right but actually implementing this on Postgres or at least prototyping it is really not very hard you know I wouldn't recommend writing it in PL PC sequel but if you do it takes maybe 300 lines of code because all the kind of functionality that you would normally have to write to have a kind of fault tolerant distributed system a lot of it is already there so I have a little demo of the system I described so I setup on ec2 here some like little clusters so there's four nodes there's just a situs coordinator and I also have to consumer nodes that are going to consume from the replicate from the event streams so my connections are still alive yes not super alive that is a pity come on Wi-Fi maybe if I said put it here it helps so I'm typing but it's not showing up so basically the nodes have a load balancer in front of them so I want to try and connect to the load balancer which slow and here you can see so I'm connected to one of the posters notes I can see the shards that side is created these are the shards that the events will be the tables that the events will be stored in I have the main kind of events table so that's the one that the producers will write to and I have a little script actually that will kind of connect to the load balancer and then just start inserting also suffering a bit from latency okay so now some events are going into my system now now I'm going to act as a consumer so I'm gonna do obtain leases I'm just gonna be the only so I'm the client now I'm gonna be the only consumer I obtained 32 leases across the whole system so this obtained leases function called all the other notes to get leases on them and now that I've done that I can start pulling for events from this node and so I need to so I can call the poll events function to get all the new events from a particular shard yes okay cool and so the way this function works is so if I'm programming a client I take the last event in the batch and then pass that as the parameter and then that's how I move forward so now I get new events after dispatch but if something fails and I kind of reuse this value I get the same events again plus any any new events and it goes up to batches of 100 grows so on the consumer nodes like so the consumer could be anything like this is kind of the logic tip that the consumer library needs to implement but I could also just implement that on Postgres so I did that here where I set up to consumer note and by the way that I kinda want to show to code but the latency isn't really letting me okay so this is a consumer node which is a post course note and so this node wants to get a stream of events process those events and then move forward in the stream to process the next batch of events and so the first thing I need to do is obtain the leases so I can do that by using a DB link stop to call the obtain Lisa's function in the system so if I call this function it tells me that I got 32 Lisa's so this consumer now it is now consuming all the events and I have a consume events function that will will actually call the pull events function on all the shard of which it is the owner so all the new events in the system will now go into this will be returned by by dysfunction slowly so okay so I had about 121 events so far in the in the system and actually this function also keeps track of how far it has progressed along in each stream in the in its own local leases table which will show up soon hopefully yeah so it keeps track of okay actually from this shard I've consumed up to this event and that's how it moves forward in the street and so I could also add another consumer that's surprising so this is no - it's now also going to obtain leases and it's actually obtained 16 leases so it's gonna pull 16 shards and so now actually the events are getting distributed across the two nodes there's no actually nice way of showing that but basically they are now getting distinct events from the 16 different charts for which they are owner so you can should be able to see different events on both sides but it's--but it's all random data so it's a bit hard to show it what's nice is I can actually in this architecture guarantee exactly ones delivery because I could even do something like begin consume notes or consume events and then well now my top event is 4 minutes past 10:00 and 28 seconds now if I abort I kind of ignore all the events and then if I consume events again I still get the same batch and if I call it again after a commit I get a different batch so I can actually guarantee exactly ones livery because my consumers are transactional so it's a bit of a crazy way of implementing Kafka but I kind of using it as an example to show how you could implement a fairly complex distributed system on top of post Chris and the code for it is well there's this function which consists of about seven sequel queries this is the obtaining Lisa's function and then there is the pull events function down here which is also about seven sequel queries and that's pretty much all the logic that I showed and then the consumer is pretty simple it mostly does a couple of DB link calls so with relatively little effort and you can find a link to these files in in the slides that I'll put online my presentation also has latency so in the slides you will find that link where you can see it what's on my main point is that you know this is a real possibility so you don't necessarily always have to deploy a new system to solve or a different system for every problem you could actually build advanced filter' and scalable systems on top of Postgres using all the tools that are available any questions I'll take the question are you crazy yep it's convenient because it doesn't require kind of creating the schemas so it's it's a bit quicker in that sense but yeah and and sometimes you can express a bit more complex things with it I'm not well I mean like I'm calling functions for example like this RPC where you're doing a select function call and then you could only do that using DB link you can use that using Postgres ftw unless you do really crazy hacks where you look use like a default column and you can insert but don't do that any other questions all right thank you very much [Applause]
Info
Channel: Postgres Open
Views: 3,553
Rating: 4.6923075 out of 5
Keywords:
Id: y9dSACedK44
Channel Id: undefined
Length: 38min 11sec (2291 seconds)
Published: Wed Sep 27 2017
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.