Designing a Workflow Engine From First Principles | Maxim Fateev

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hello everyone my name is Maxim fetif I am presenting from Bellevue Washington I'm originally from Russia West I did physics and later I received my computer science degree from Rio De Janeiro Brazil today I'm going to talk about temporal I want to give you some history to kind of give you idea that it's not the project which we came up with overnight it took probably like at least 15 years to get where we are so I was checked lead of Amazon messaging platform when Amazon was transitioning into microservices and I I was uh later that engine used for the Amazon message platform was used by the simple queue service when AWS appeared I also was taken it for a simple workflow service which is was a disappeared a lot of ideas that we are right now development is part of Team portal worked with me on a simple workflow but later he came to Microsoft and he was technically to the service bus Azure service bus but as a side project he wanted to bring to the people of the same ideas of the simple workflow so he developed durable task framework and which later uh became so popular that Azure functions team adopted that and now you you can use it as Azure durable functions uh later we both joined Uber and there we built two open source projects one was Jeremy which was a messaging system and later Cadence which was the implementation of the simple workflow ideas using completely different software stack and as an open source project uh when Cadence became very popular not only inside of uber where hundreds over 100 use cases were running on that within three years but also outside uh we uh YouTuber and started our own company called temporal and for the candies project S10 portal as an open source project I was told that if you kind of presented some technology you have to give some social proof and here's kind of some of it so these are early adopters of our technology and there are many more companies using that in production but these are companies which went public about their usage I give presentation at a multiple presentation if you Google my name uh you'll see a lot of videos about which present uh Cadence and related temporal and their programming model but today I want to talk about the back end about the how cluster is organized and why we have all these multiple roles of the class uh in our architecture and I don't expect you to understand that picture right now but I'm pretty sure by the end of the presentation you would be able to make more sense out of it uh but let's start from the beginning what is a workflow and people have different definitions that are like even standards and so on uh my definition is that workflow is just a resilient program uh and resilient what it means that this program will continue execution uh independently of like in presence of failures and you can have new deployments you can have uh just infrastructure outages the data like availability zones going down and you want to your program continue execution usually this program is organized as uh like a sequence of tasks and then it also has to react to external events and you'll be Stein because time is important a component of every business level process and timeouts I'm not going to spend time as I said a lot on the programming model as I presented that multiple times but if you never thought about temporal locations uh here's the kind of basic idea basic idea that you just write code here's Java sample we also support go and PHP SDK and working on other sdks in the future um at least typescript and python will be probably developed this year and this looks like normal Java code they give a wizard workflow it is a workflow because uh full state of this program is fully preserved all the time it's fully durable so it means that if your program restarts or even back-end service goes down and then back your program will be in exactly the same state with all local variables threads and stack traces in exactly the same state that's why you can go and write code with just call Sleep block and sleep for 30 days and then block and do something after that and if you you don't need to care about process restarts and deal with databases and recover the state because it's all done automatically by the team portal and again as I said I'm not going to spend more time on this if you're interested to watch my past presentation or just visit temporal.io and go through our tutorials uh so on a high level any workflow engine and again it's not necessarily one which uses for voice code but also engines which use yaml gson or XML if you are like those languages uh they are kind of state machines so practically workflow is a state machine which defines in which order to run tasks and also react to external event if necessary and if you represent the state machine uh kind of what workflow definition does as a sequence diagram we can practice say to ask us to execute in certain order but if you want to implement the workflow engine we need to have this was like more logical representation but a physical presentation will be some sort of code some engine which will drive those workflows so the way it's usually done is that you will go to workflow definition uh the state machine and give it a state of the world to say what are the next uh tasks commands to run and the command can be run task one so it will go and run command task one then it will notify of the definition that Task 1 is completed it will execute the state machine logic according to its definition and say okay now you need to run task two in Practical systems you don't want to call tasks directly because these workers can be somewhere else they might be not available which run those tasks and processes they can be issues with flow control because just you don't you don't you uh and then can be unavailable for some time so you or if they can can be slow so you want to fit tasks in appropriate order so using queues to just page tasks to the implementation is a very common technique so you practically uh every almost every practical workflow engine uses queues to dispatch tasks to workers like our working processes which host those tasks and important at least we also run workflow code outside of the core engine so we also use tasks and queues to deliver uh the requests to workflow definition and then get commands back what to be executed so probably all communication to workflow definition and task implementation is done for queue as I mentioned the time is important because every task in vacation should have a timeout book Vlog itself can have a timeout and then we need to be able to execute operations like sleep 30 days so we need some external uh timer service or timer Q which will durably store those timers and then dispatch those timers uh so workflow engine needs to have not only task use but also timer queues or timer engine uh and add tasks there every time some State transition happens and then we also need to store state of the workflow itself so every time you start a workflow uh every update practically we need to at least to create state in the state database then we need to create a timer and then we need to put tasks for the task queue for the workflow definition to pick up gives a list of commands we again need to update and create tasks and update the state uh what's important to understand and I think this is like a very important part of this presentation is unless these updates across this multiple data sources are transactional uh this engine is not very practical you don't want to run on top of the engine which doesn't have transactions across all these components because you will run in all sort of race conditions it's almost impossible to get rid of race conditions because if you update State and then try to put task in a task queue if update goes through and then uh update to task you fails so you will end up in the state which thinks there is a task outstanding but task is not if you put task and it asks you first and then you update the state if updates is slow tasks can be delivered and processed by the time update went uh like is is going through so it will be inconsistent and it needs to deal with all these edge cases so if updates to all these components is a transactional then all this rates can be such disappear uh your system becomes much simpler and the application Level code doesn't see all these edge cases and it's actually simplified the programming model but it's not only applies to the statement uh to the flow engine implementations reality is that in these days majority of the engineers don't use workflow like orchestrator or like workflow engines to write their services what they do they use queues they use databases they use other data sources they create this hedgepodge architectures of like different components and there are no transactions across them usually and the point is that the like what I said about not having transactions all this race condition applies to majority of the system ad hoc system built by developers and I think this is very important to understand that this is uh if you're building system from these components you you're guaranteed to have a edge cases in race conditions um so that's why having like a robust engine like temporal and the land you architecture will simplify your life tremendously so if you want to remember one slide for the presentation remember this one that workflow engines are hard not because uh like mostly because they they need to deal with multiple things like skewing timers and state and updates across them should be transactionals so when you implement the workflow engine uh because of all these transactionality requirements majority of existing implementations just use one database and even one process and then yes if you have one database and one process uh transaction requirements are easily solvable and you can provide pretty uh robust engine obviously this engine wouldn't be scalable because it will be limited to a single process to it or a single database so if you want a scalable solution first we need to decide what are the scale scale dimensions of scalability obviously one dimension would be creating this huge workflows and have a single workflow which spawns um multiple machines for example map reduces the technology which allows to write single kind of mapreduce Pipeline and this pipeline will be executed by thousands of machines uh and you can think of this pipeline being more as a single workflow for the use cases which we are targeting we actually decided to go with approach when we're not going to scale up a single workflow instance we you cannot have single workflow which runs million tasks uh every workflow should be limited in size but then we can infinitely scale out the number of workflows so you can if you need to run mean on task you probably will have single workflow instance which will create thousand children each of the children can run thousand tasks you get your million tasks but each of the instances will be bounded so as soon as you say that uh workflow uh each workflow instance is a limit of limited size you can go and start Distributing them across multiple machines and uh so scaling out a fleet of machines becomes pretty practical because each each instance is uh guaranteed to fit in at least in a single machine obviously if you want to have very very live system you need to scale out the database as well so you have a single database instance will will be a bottleneck so if you have a partition database and you have partitioned uh hosts which kind of maintain the state and perform all the operations of the workflows you can get to very uh scalable Solutions but as I said we need to maintain transactionality and as soon as you start breaking uh your resistance into multiple databases you wouldn't be able to unless you start doing complex things like two-phase commits and so on uh across multiple databases you wouldn't be able to provide transactional guarantees so uh the kind of simple way to solve that would be to have one database per host right so if you have four hosts we can distribute these workflows across those hosts and every host will have its own database which contain the full like its own queues its own timers uh and so on and workflow State and this way uh we can guarantee transactions within each host and we within each database and that will be pretty straightforward system to implement uh that system wouldn't be very practical because it would be very hard to add and remove hosts and I'm not going to spend much time on that the standard way to solve this is just using charging so instead of physical host we use partitions within the database and and we kind of Overlake it number of partitions then we kind of move those uh then we allocate those parties to specific physical force and we can move them around if necessary and the same applies to shards within our host so you can have workflows to a specific chart ID and then use consistent Hessian to put a special to to allocate chart to a specific host uh but to implement that you need to know membership of your cluster because if you need to allocate charge to host you need to know which hosts are available in your cluster so as soon as you do that you need some Mechanics for for a membership and then you need routing layer because if request comes in and usually don't want to have fat client-side Library which understands topology of your cluster so you need to have front-ends which will know the membership of the cluster and Route request to specif for for example flow id1 overflow id2 it will know it will start it it's located and it belongs to and then where this chart runs right now uh at which host problem is that it doesn't work very well for for tasks in uh for task use for example if you have activities which listen on the task queue named Foo how do you get uh activity tasks waiting to be executed if the uh we store those activity tasks in every chart you probably don't need to go to all shards and ask them do you have anything for full on task queue task you fool as we want to allocate large number of shards because we're going to spot very large scale and we will over allocate them uh these type of queries become impractical even if you have them like you cannot even aggregate them over the cost because uh each shot practically requires a separate database quality imagine you have 10 000 charts so every time you do pull you will need to go in like a fan out for 10 000 database requests so a practical solution is to move queue into the separate component with its own uh resistance and that solves the problem of routing because you know where to which was to route and which host uh right now owns that queue and you can have its own database with its own resistance for that but uh it obviously introduces other problems as I mentioned earlier is that we lose transactionality as soon as queues live outside of core shards of workflow State uh we don't have transactions across them anymore and we will have the race conditions which I described earlier obviously one way to solve that would be to use some sort of two-phase commit uh variant paxos or raft and so on but it would be introduced pretty significant complexity to the system and also require a participate like every component of that interaction uh to implement pretty complex protocols uh that we were sold at in temporal is that we don't use uh any uh of those transactions Visual and database how do we do that we use the so-called transfer cues uh the idea is that every chart which stores workflow State also uh stores uh a queue so we will have if you have 10 000 shares we'll have 10 000 cues and every time we make update to the search we also can update that Q transaction because it lives in the same partition so if you need to start workflow we will create a state for the workflow and we need to create workflow tasks for the worker to pick up uh we will add the task to the local queue to the that chart and then this will be committed to the database atomically and then we will have a thread which will pull from that queue and then practically transfer that message to this uh queuing subsystem and this way we we have transactional Commit and then we can later transfer that to the community subsystem obviously this transfer can fail it will retry so it can create duplicates but we have separate part of the system which we'll do video in there um so so that is kind of very simple mechanism but it allow us to uh have transactionality at the same time don't rely out complex to face commit protocols other requirement that works with any reasonable system should implement this ability to list workflows right because you want to go and say give me all workflows started by this user in the last 24 hours all workflows which failed and so on uh going again to all shards and ten thousand stars and asking them for if even if they have index each of them has its index for that information would be in Practical so the way to solve that is to have separate indexing component we use elasticsearch there is no it can any indexing technology could be used instead there is nothing special about elastic that but it's something which we just decided to use because it was open source um and we use the same uh transfer queue approach to transfer uh to commit a so-called visibility records into the H chart and then use transfer queue to transfer them to the elastic uh this mechanism uh has inherent DeLay So elastic index is always a few seconds maybe like uh at least some time behind the actual update so it's not it's kind of uh converges to the correct uh this delay but it's not uh um atomically updated uh with uh when they commit message to the core core to core shark partition but there is a guarantee that uh if uh commit happened that elastic will be updated and if uh it cannot be it cannot happen that you will update the state and the last thing wouldn't be updated because of the transfer key mechanism as soon as you can list workflows you uh your users will ask you to perform batch operations for example I want to terminate all workflows which match these criteria started by this user and you can program that from outside just to make a list get the ID save them to the file and then execute some script to terminate them but to execute that your library this is actually sounds like a workflow and this is exactly what temporal is doing it implements this logic using a workflow and but this workflow is built into the core cluster so it's kind of system workflow user and supplement as normal workflow it can be implemented outside but we decided to implement inside because we want to have this kind of functionality be provided as part of the core cluster functionality so we have this worker role which performs system workflows like database scans and other like operations which can take long time reliably using standard temporal abstract workflow abstractions I showed this slide at the beginning of the presentation and I I didn't expect you to make sense out of that but now I think you can understand why we have all these roles so we have who have so-called history component which is responsible for State transitions of individual workflows has transfer queues to be able to transaction the update update create tasks and then we have a matching component which is responsible for uh delivering uh tasks to uh like for queuing and mention uh pull requests coming from uh external workers to add task requests and then we have front-end because we need routing we have elastic because we have indexing and we have worker component to implement background jobs and then we have workflows and activities which implemented by application developers using uh temporal sdks this architecture is uh pretty uh as scalable and reliable because every single host in this architecture can fail and it still will continue functioning you can uh because charts and history and service and machine will be redistributed automatically we use cluster database like Cassandra which can sustain not failures elastic is also is full tolerant but um and front ends are stateless so you can add and remove them anytime as well but this system still has a single points of failure in terms of like blast radius because for them single bed schema deployment to the database can bring it down the update of the front end with the bug and so on so if you want to provide very high availability uh we have multi-cluster deployment in this case we have a simplest application on the application Level and even the total meltdown of the cluster or even like for some Amazon region unavailability of region will not stop your workflows because you would be able to fail over your execution to another cluster and continue execution uh this is for this next talk I have it's pretty complex part of the system uh the the whole multi-cluster cluster setup I can make multiple multiple conference talk about that as well just to recap um temporal uh is a technology uh besides this awesome programming model of writing uh distributed quote like a reliable code through using the programming language it's a highly scalable and scale is a number of workflow execution and it's consistent which is very important to avoid all sorts of race conditions and it allows to run a plugable uh activities and even workflow implementations outside of the cluster which allows very very high micro flexibility right you can practically write your workflows and activities and then in the future and producing practically in your language this concludes my presentation uh I want to mention that we are actively hiring and you can reach our project on uh over team portal IO or directly on GitHub we also have cabin Forum support for support any condition me or our company on Twitter thank you
Info
Channel: @Scale
Views: 1,698
Rating: undefined out of 5
Keywords:
Id: 7XG6IRkY-Fc
Channel Id: undefined
Length: 23min 14sec (1394 seconds)
Published: Fri Oct 27 2023
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.