How to build a Distributed Task Scheduler with Go, Postgres, and gRPC

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hey everyone in this video we'll be implementing a distributed task scheduler I recently started writing a substack and in the last newsletter post I discussed how you can design a distributed task scheduler in this video we'll be going over the implementation details I highly recommend that you go through the newsletter post before you watch this video so you can better understand what we're doing here the task scheduler that we designed consisted of a few components it consisted of a ER that received task scheduling requests from the user and persisted them into a database we then had a coordinator that would constantly keep scanning the database to look for new tasks that needed to be executed at the current time if it found any tasks it would pass them off to the workers that are registered with it these workers use heartbeats to register with the coordinator and communicate the liveness in case any of the worker nodes went down the Coordinator would automatically remove them from the worker pool once the workers were done executing the tasks they communicate the status back to the coordinator it's going to be a lot of fun to implement the system so let's get on with it so here we are inside VSS code we won't be writing the code line by line from scratch instead I've already written the code and we'll try to understand what it's doing it's much more similar to how you'll see a code base in real life you'll probably have some code already written and you probably want to understand what's happening there first of all let's start from the component that probably most of our components depend upon which is the database in our example and in this implementation we are talking about postgress we'll be using the postgress docker image to set up a database we'll use a database initialization script which the postgress image will pick up and will initialize using that script so here is the postgress docker file you can see that we pulled the official postgress image from dockerhub we set up a working directory and finally we copy our initialization script to the doctor entry point folder inside the container I've added a comment here that the post image runs all the SQL SQL Dogz or the script files that are found inside the entry point folder upon initialization if the data directory is empty the data directory that we are talking about here is this directory which is being used as the docker volume so this means that across restarts of our cluster we'll still retain the data that we have so let's see what's happening here inside this folder we'll navigate into the SQL file and we'll see what we have here we see that we have two main statements here one is a create table statement and the other is a create index statement the first statement creates a tasks table for us to store all the tasks that user schedules the second statement is to create an index on this table the table that we create is similar to the one we discussed in the newsletter we have an ID column that is generated as a uu ID by the database itself automatically the second column is a command that the user wants to execute at the scheduled time third is the scheduled at time which is a time stamp and it cannot be null the rest of them are the time stamp phases which are populated as soon as each phase is completed whenever a task is picked for execution the pick that field is populated when a worker starts executing the task it populates the started at timestamp when it completes a task the completed at time stamp is populated and if the task fails the failed at timestamp is populated why we keep track of all these timestamps is explained in a lot more detail inside the newsletter finally we create an index on the schedule. column since this is the column that the cordinator will later use to pick the tasks that are scheduled to run at a current time so going back to the post Docker file we see that we pull the post image and we populate the entry point folder so that our database can be initialized and we don't need to do anything else we do need to pass in a few environment variables which are already initialized inside the docker compos file to see those we can go to the docker compos jaml file inside the post service you can see the environment variables that we are initializing we send the post DB the postest user and the postgress password here these are environment variables that are coming from our do end files theend file is available here for this we mostly using the default post user and passwords which are postgress and postgress and we name the postgress uh DB as scheduler so what we're doing in the docker compos file is we send is we're sending the context which is this entire directory as the context of the docker file Docker file being used is the postgress docker file that we just looked at we're sending these environment variables that we are picking from the end file we have a volume which is the/ dat folder which we see right here which we have also hidden from the git ignore so that we don't commit it and finally we expose exp the 5432 Port externally externally exposing this port allows our other services to access it and that's all there is to our database service which is a postest database let's move on to the scheduler which is the first part of the system which we should discuss since that's the entry point for our task and is the service that a user will interact with let's navigate to package scheduler scheduler Dogo this is the file that defines our scheduler server and some utility classes the first thing that we see is a struct called command request it has two Fields command and schedule at they represent a task request that the user sends to our HTTP server that runs inside the scheduler the command is the command that the user wants to run and the scheduled at is the time that the user wants to schedule the task to execute it the user enters the task in the iso 8601 format mainly because it's easier to read and write by hand in case you're testing your program in post Postman or some other API testing service the second struct we see is the task struct this is a struct that we use to interact with the database it closely resembles the database model for our task table as you can see it has an ID field a command field and all the time stamp fields that we already discussed then we have our scheduler server we have a port which defines on which Port our server will be running then we have a DB connection string this is the string that we use to connect to our postest database that we just talked about then we have a DB pool you can see that this comes from the PGX pool Library PGX pool is a postgress library that allows you to set up a connection pool with your postest database the reason for using a pool here is that we get a lot of benefits from shared connections we don't have to set up new connections whenever a new request comes our way we have a pool of connections and we utilize one of them the library itself dynamically manages the number of connections it's holding how it manages the load across these connection and also the resource cleanup then we have our context object we use it to manage our go routines as well as other asynchronous processes in our system such as the database connection pool then we have a cancel function which is sort of related to the context whenever you call the cancel function your context receives a done signal and whenever that happens we can capture that signal and then do whatever we want to do for example terminating a go routine or something like that finally we have the actual HTTP server object which comes from the buil-in htgp package you see that there is a function called new server which simply creates a new server instance of the scheduler server it takes in the port at which the server should be running as well as a DB connection string that it should use to connect to the database note that this just creates the scheduler server object it does not start any of those services that is the job of the start function which is a function for the scheduler server struct what we see here is the first thing that we do is connect to the database and set up our database pool we use one of the utility methods that we Define inside the common package let's see what that does what we see here is a simple implementation with the retry mechanism to connect to our database we simply try to connect to the database using the PGX pool connect method in case there is an issue we retry this for five times with a sleep interval of 5 Seconds the reason we've added this retry is that whenever we are setting up our cluster there's a chance that it takes a little bit of time for the database to come up in that case we shouldn't immediately fail in case we not able to connect to it therefore we allow five retries to connect to the database before sending out an error message you could use a more sophisticated approach here such as exponential back off but this is just a demo project therefore a simple 5sec sleep should be enough let's go back so once you've connected to the database you register handlers with your different HTTP routes which are schedule and Status routes the schedule route will be used to schedule new task requests while the status route will be used to get the status of already scheduled task we then set up the HTTP server and finally run the HTTP server in a new go routine so that it can run asynchronously while we await the server shutdown the server shutdown function is quite simple the a shutdown method waits for an interrupt or a s term signal on the signal channel that we make using the os. signal package then it blocks until it receives an interrupt or a s term signal on this Chann that we just created and once it receives one of those signals it returns going back let's discuss how we doing the handling of our schedule and Status endpoints let's jump into the handle schedule task method of our scheduler server we see that the first thing we do is we simply run a validation to make sure that the user has sent us a post request on this route then we take the Json body from this request and decode it into our Command request struct we receive the time time in the iso8601 format and therefore we parse it into the Unix timestamp since that's what we store inside the database now that we have passed all the information that the user has sent us we generate a task object and we send it to the insert task into DB method inside our scheduler server this method inserts this task into the database and returns a task ID that was generated by postgress using a uu ID now that we have the task ID we generate a response and we send this response back to the user and that's all there is to the handle scheduled task method let's get back the status endpoint Handler is also quite simple let's jump into this method and see how it works we first run a simple validation to ensure that the user is sending get request on this route then we read the task ID from the query parameter we run some simple validations to ensure that this query parameter is present and finally with this task ID in hand we query the database using a database pool to get all the information that we have about the task that the user is requesting we send back all of the information that we have in the database in the form of adjacent response we build the response with a couple of if statements to ensure that we are not sending any invalid or null values once that's done we write back the response to the user and that's mostly how the scheduler works and now we're ready to move on to the next component of our system which is the coordinator for that let's head over to package coordinator coordinator. go the coordinator and the workers make use of grpc for communication instead of HTTP there are a couple of reasons for why we want grpc instead of HTTP for such interprocess communication grpc is much more low latency and high throughput compared to your regular HTTP since it uses protocol buffers for serialization Proto Buffs are much more efficient and faster than sending Json responses moreover it uses HTTP 2 right out of the box this reduces the latency and improves the speed of communication because of smaller header sizes and multiplexing grpc also supports strongly typed contracts which means that the grpc services that we Define in our protuff file defines a strong contract between the client and the server which reduces the likelihood of miscommunication in case of contract changes over time since both the coordinator and the worker make use of grpc to communicate with each other it makes sense to understand the grpc contracts that we are going to implement in both the services before we dive into the code to check out the grpc services head over to package grpc API api. prototo this file defines a couple of things the first being the task status enum this enum defines the status of a given task this can be cued started complete or failed we then Define the worker service this service is implemented by all our worker nodes it defines only a single RPC which is the submit task RPC the coordinator uses this RPC to send tasks for execution to the workers that are registered with it we then have the message definitions for the task request as well as the task response they are quite simple the task request simply contains the ID of the task we want to execute as well as the command the command is stored as the data then we have the coordinator service service this contains one RPC called submit task we don't need to worry about this I just added this for some testing purposes while I was developing the system this is followed by the send heartbeat RPC this RPC is quite important since the workers use this RPC to register themselves with the coordinator and then the coordinator uses this RPC to subsequently keep track of all the workers that are alive in case one of the workers does not send heartbeats for a given time the coordinator removes that worker from the worker pool in case the worker recovers later it can start sending heartbeats again and the coordinator will automatically register that worker in its worker pool again this is followed by the update task status RPC the workers use this RPC to update the status of the tasks that they are executing this is followed by the request and response definitions for all these rpcs they're quite simple for the most part what we'd want to look at is the heartbeat request this includes the worker ID as well as the network address of that worker which wants to register itself now that we've looked at the proove definition we ready to go back to the coordinator code but before that you might be wondering since this is a protuff file how do we access it inside go for this we make use of the generated files from this protuff grpc provides you tooling to generate code for whatever language you want to Target in your grpc code I have the script to generate the go code for this protuff and that generates these two protuff files you don't need to edit them nor do you need to commit them to get since these will be generated every time you want to build your Docker images now we can go back to the coordinator code the first thing we see here is the coordinator server struct the first field in the struct is an unimplemented coordinator service server this comes from the generated file from our grpc service we add this here to indicate that our coordinator server implements the coordinator service interface from the grpc API if you hover over it you can see that it includes all the rpcs that were defined in the api. prototo and we'll need to implement these in our go code this is followed by the server Port this is the server port at which our grpc server will run then we have the listener for our grpc server the grpc server object itself then we have a map which is a worker pool this map is keyed by the worker ID and contains the worker info the worker info includes the number of heartbeat misses a worker has had the address of the worker the grpc connection object as well as a client to talk with this worker we maintain one separate client for each each worker that is registered with us moving on we have the worker pool mutex we use this mutex to lock the worker pool whenever we access it this is followed by a helper data structure called worker pool Keys which helps optimize our round robin algorithm by helping us choose the next worker server to send our tasks to since multiple go routines might be accessing this we have a mutex over this as well this is followed by the max heartbeat misses we want to allow for a worker before it gets deregistered we then Define the heartbeat interval this is the interval at which each worker should send its heartbeat then we have the round robin Index this is a helper member which defines which worker is next in line for the round drin in our worker pool we then have the DB connection string which we use to connect to postgress we have a DB pool to connect to post press we have a context and a cancel object which serve similar functions as they do in the scheduler and finally we have a weight group to manage our go routines moving forward we see that we have a new server function which in instantiates our coordinator server then we have a start function which starts our coordinator server the start function is a little bit complicated and we'll take some time to understand what it's doing the first thing it does is it launches the manage worker pool function in a go routine we'll see what this function does in detail in a bit but as a high level overview this go routine is responsible for keeping track of which workers are still active and which workers need to be removed from the worker pool next we start the grpc server followed by connecting to the database using our database pool then we launch another go routine and this go routine scans the database for any tasks that need to be executed in the next 30 seconds it keeps firing queries on the database to check which tasks need to be executed we make use of some clever SQL in this method and we'll go over it in a bit finally we return and await shutdown method which blocks until we shut down the server this is very similar to what was happening in the scheduler server before we try to see how the manage worker pool method is working it's better if we understand how the send hardbeat RPC works and how different workers register with the coordinator and how the coordinator uses these rpcs to keep track of active workers then it would make more sense to see how we are deregistering workers which seem to be inactive so let's head over to the send hardbeat method which we Implement from the coordinator service interface so we see that the send heartbeat method takes in a context and a heartbeat request and it returns a heartbeat response or an error as soon as we enter this we take up a lock on the worker pool mutex the reason for this is we do one of two things when we receive a heartbeat either if this is the first heartbeat that the worker is sending us we register it which requires us to make changes to our worker pool or if this worker is already registered we reset the number of heartbeat misses that it has had so once you've taken the lock on the worker pool mutex you get the worker ID from the hardbeat request after that we check if this worker already exists in our worker pool if it does it means we have already registered this worker and we reset its heartbeat misses otherwise we register this worker into our worker pool to do that we first set up a connection with this worker we use the grpc dial method for the sa once that is done we save the worker details inside our work pool using the worker info object we save the address which is the network address the grpc connection we just created and also a client to communicate with this worker once we've added the worker into our worker pool we take a lock on the worker pool keys mutex this is the mutex that guards the worker pool Keys data structure worker pool Keys is nothing but a list of integers and the integers being the worker IDs we make use of this data structure to help us select the next worker that we want to send the task asks to in our round robin scheduling we can't do that directly over the worker pool since the keys in a map do not have a definite order therefore we save those keys inside this array and as you can see we need to regenerate this array every time a new worker registers with us but that's all right since we don't expect that new workers will be registering every other second once that's done we print the registered worker message and finally return a heartbeat response now that we know how the worker registration and heartbeat works we can go go ahead and understand how the manage worker pool method operates so we first head back to the start method from here we go into the manage worker pool method which we launch into a separate go routine the first thing we do in this method is we add an entry into the weight group adding an entry into the weight group allows us to gracefully wait for this go routine to end when we shutting down the server next we set up a new ticker the ticker ticks every time we want to check for inactive workers this duration is calculated by the maximum number of heartbeat misses you want to allow multiplied by the heartbeat interval we defer the ticker to stop so that it stops once we return from this function when we returning from this go routine we use a for select statement to concurrently wait for signals on either the ticker or the context we receive a done signal on the context when we shut down the server and call the cancel function that we discussed earlier every time the ticker ticks we call the remove inactive workers method let's see how that works ins this method we again take a lock on the worker pool mutex we take a lock since it's possible that we will be removing some workers from the worker pool and hence we'll need to make some changes we iterate through the worker pool and check for the heartbeat misses for each worker that is there in case any of the heartbeat misses is greater than the max heartbeat misses we allow we close the grpc connection with that worker we delete its entry from the worker pool then we regenerate the worker pool Keys data structure and finally return otherwise if it's not yet at the max heartbeat misses that it's allowed we just increase the heartbeat Miss count and that's how this method works so let's go back to the start method we can now begin discussing the scan database method which we also launch in a separate go routine this method is probably the most interesting one on the coordinator we see that we first set up a ticker to tick at every scan interval in our example we have the scan interval set to 10 seconds we then defer the ticker to stop once we return from this go routine next we have a for select Loop which has two cases that it waits upon it waits for signal on two of our channels the first being the ticker which ticks at every scan interval and at every scan interval it runs the execute all scheduled tasks method in a separate go routine and the second one is a context done Channel which receives a signal whenever the cancel function for this context is called which happens when we are shutting down the server now let's focus on how the execute all scheduled tasks method works first we set up a context with a timeout of 30 seconds and store its cancel function and defer its execution to the end of this function next we begin a transaction on our database this transaction will contain the statements to select and update our tasks table before we get to that we set up some cleanup functions we defer a roll back in case of an error so that the transaction can be closed if we do not complete it successfully next we run a select query on a database let's see what it does select the ID and the command from the task table where the schedule at time is within the next 30 seconds or before that and we also make sure that the pick do column is null so we do not pick tasks that have already been picked by the coordinator finally we have the for update skip logged Clause the reason for this Clause is explained well in detail in the newsletter post so I again highly recommend that you go through that to understand why we doing this once we've executed the query we start reading the rows that were returned we scan the ID and the command from each of these rows and append it into the tasks array next we send each task in the tasks array to the submit task to worker method which is responsible for sending this task to one of the workers that are registered with the coordinator if we are able to successfully submit the task we update the status of the task TK by updating it's picked at column and res set that column to the current time once we've submitted or gone through all the tasks we finally commit the transaction so that the changes are persistent and are visible to the other transactions and finally let's take a look at the submit task to worker method to see what that does so let's jump into this method the first thing we see is we get a worker ID from the get next worker method what this method does is it takes a lock on the worker Pool Key keys mutex because now what we're going to do is we're going to get the next worker from the worker pool Keys we get the worker ID from the keys data structure and we use that worker ID to get the worker info from the worker pool then we increase the round robin index so that the next worker in this data structure can be selected the next time someone calls the get next worker method and finally we return this worker's info to the caller we just have a sanity check that some work was returned and finally we use this worker's worker service client to submit the task before returning any error that might have been returned and this error could be nil in case of a successful submission with that we complete all the logic inside the coordinator and we are finally ready to move on to the last component of our distributed system which is the worker so head over to package worker worker. go the first thing we see here is the worker server it's quite similar to the coordinator server in a couple of ways similar to the coordinator it implements a service interface for the service that it implements for the worker server we implement the worker service server it contains the rpcs that we need to support which in this case is the submit task RPC then we have a worker ID the port on which it runs its grpc server the address of the coordinator that it needs to register with the listener for the grpc server the grpc server itself an object to maintain the coordinator connection a client to talk to the coordinator using grpc a variable that stores the heartbeat interval a task queue to store the task request that come its way a simple map of tasks that it has received just for bookkeeping purposes a mutex for the same and finally a context cancel and a weight group the context and cancel functions Ser the same purpose they used to in the coordinator which is to manage the cleanup of go routines and the weight group is used to manage the worker threads in the worker pool that we have the these worker threads are responsible for picking up tasks from the task Q when they are free and when they are done with processing the previous tasks so let's go forward we again have a new server function which is very similar to the other new server functions we've seen so far it instantiates a worker server but it doesn't start it to do that we go on to the start method the first thing it does it starts a worker pool then it connects to the coordinator then it starts another go routine to send periodic heartbeats to the coordinator it then starts the grpc server and finally blocks on a weight shutdown the connect to coordinator function is Trivial it simply dials a grpc connection and sets up a service client what we're interested in here is the worker pool and the periodic heartbeat methods let's go to the worker pool method to see how it works it takes in an argument which is the worker pool size this defines the number of threads you want to run inside our worker pool inside this worker node so we go into this worker pool what we see here is a for Loop which runs num worker times and each time it runs it adds an entry into the weight group to mark that it has one go routine that it has launched for one worker and then it launches a new go routine with the worker method what the worker method does is quite simple it launches the for select Loop this Loop Waits on the task CU for new tasks from the coordinator and whenever it is done processing the task it updates the status for the same it also listens to the done channel on the context in case we've called the cancel function to shut down this worker the update task status method is quite simple all it does is send an RPC on the update tasks RPC method of the coordinator the process task method also does not do much since this is a demo we not really processing any tasks all we are doing is sleeping for some time to simulate as if we are processing some task and to add an artificial DeLay So going back to the start method we see the last thing that we need to discuss is the periodic heartbeat method and it's quite simple all it does is add an entry into the weight group set up a new ticker with the heartbeat interval and finally launch a for select Loop which waits for ticks on the ticker Channel and every time the ticker ticks we send a heartbeat to the coordinator the send heartbeat method is also pretty simple to understand we get the worker address we get the network address as well as the port number and then send this along with the worker ID to the coordinator and that's all there is to creating the worker for our distributed system so the last thing we need to see is how we launch all of these different services and see our distributed system in action the launch scripts for each of these services are present inside the command package so head over to command coordinator main.go to see how the coordinator service is launched it's quite a small script all it does is it reads a flag to get the coordinator port on which it should run which defaults to 880 then it gets the DB connection string using a utility function inside the common package all this function does is it reads a few environment variables and generates a postgress connection string and it returns it to the column once we have that we pass it down to the new server function in the coordinator package to get a coordinator server and then we start the server a very similar thing happens in the other packages as well for the scheduler we do a similar thing as well as for the worker so let's see how we've set this up inside Docker compost which lets us test this distributed system quite easily so closing all these files we go into the docker compose yaml we Define a couple of services here we already went through the postgress service similarly we've also defined the scheduler service the coordinator as well as the worker we Define the environment variables we want to supply these include the post credentials the reports you want to expose and the dependent services so you can see the scheduler and the coordinator both depend on the post service and the worker itself depends on the coordinator this ensures that all these services are launched in order you can go to the docker files for each of these services to see how they are set up they're quite simple most of them just install a few dependencies copy the required files and finally launch the binary that we've built from our main.go inside the command package and now it's time to launch this entire distributed system using Docker compose so we'll open up the terminal and we have this Docker compose command to launch this you can run Docker compose up with the build flag if you want to build this entire thing from Source or in case you've already built it you don't need to have this flag here along with the scale tag to define the number of worker Services you want to launch in my case I'll be launching three workers so press enter to run this distributed system in case you running this for the first time it will build all the docker images so it might take a little while in my case you can see the logs from the different services in case of the postgress service you see that the log says that there's already a database that already exists since I have this data directory here which contains my postest files after that you can see that your workers connect to the coordinator and they connect to the coordinator successfully and once they've done that and they've sent the first heartbeat the coordinator registers all of these workers the last thing we need to do is to send a testing scheduling request so let's open open Postman to do that so we're inside Postman and we can see that we have a sample request ready here we are making a post request to the scheduler on the schedule endpoint we have a Json body with a command and a scheduled at timestamp the schedule. timestamp is present in the iso8601 format which allows us to define the time on which we want to schedule this this is today's date as well as the current time or at least close to the current time so when we send this request we should be able to schedule a task on our distributed system so we send the request and we receive a task ID generated by our postgress database now we can use this task ID to query the status of this task but before we do that let's check out the logs on our distributed system so we see that we received a schedule request and this is logged by the scheduler it prints the command as well as the time that we've sent from Postman since the schedule time was close to the current time it was immediately picked up by the worker it processed the task and finally updated the task status inside the postest database to check the status we can go and check the get status endpoint here and we can copy the task ID that we received when we made the scheduling request then go to the get status tab we can paste the task ID here and we can send this request and here you see you have all of your execution phase time stems therefore we can verify that our distributed system is working as expected in case you want to dig deeper we also have some integration tests set up here and you can run through those to see how each component of the system works and how we are testing each of these different services and that's it for this video I hope you enjoyed it and learned something new so go ahead and subscribe to my substack for some more interesting content on software and computer science and I'll see you in the next video
Info
Channel: Jyotinder Singh
Views: 4,823
Rating: undefined out of 5
Keywords: leetcode, 30 day challenge, algorithms, problem solving, interview prep, interview, interview question, dynamic programming, dp, two pointer, coding, tutorial, memoization, how to, solution, c++, cpp, java, python, count unique bst, count, unique, binary search tree, binary search trees, bsts, bst, june challenge, dynamic programming explanation
Id: bBp5AOTkJcg
Channel Id: undefined
Length: 34min 19sec (2059 seconds)
Published: Sun Jan 07 2024
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.