Rust Linz, October 2021 - Tokio, Warp, and Websockets by Stefan Baumgartner

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
we are talking today about tokyo warp and websockets so hey um hey yeah that's me hi i'm stefan i have the worst twitter handle uh on the bladder planet it's that parrot so if you are into monty python you maybe know what it means if you aren't don't worry it's 60 years old or something um and today i want to talk to you about tokyo warp and web sockets this is um so i've spent a lot of time trying to learn async trying to teach async in rust um it's interesting it's it's um um very uh different to what i'm used to from other programming languages but also once you get it um it's really one of those mind opening things and i want to talk to that about it some other point now i want to talk about an actual application that i wrote with um with async in rust um and um i use the tokyo framework for that so tokyo tokyo is fun because if you've done anything async or anything um web related in rust chances are that you have stubborn bomb tokyo in one way or the other either by seeing some hash tokyo colon colon main thing uh maybe if you if you deploy the lambda on the aws lambda or something in rust then you use that because the aws lambda runtime uses it or if you've written a web server be it with rocket if you use rocket which was one of those talks that you hear had that we had here in rustling very early on then you've used tokyo uh even if you didn't know it i think also hectic so basically any web framework out there bait is based upon tokyo it's a multi-threaded run-time for executing asynchronous code it has an asynchronous version of the standard library and a large ecosystem of libraries there are some alternatives a glomio is one that i that i just found out about recently async standard is i guess that the other big alternative um to having an asynchronous runtime for rust which is honestly one of the better ones to learn async because the surface is a lot lot easier to understand because tokyo can get very big very early on but if you reach one point where you are working on the the application layers um your stuff is mostly done in tokyo why do we need a multi-threaded runtime for executing asynchronous code in rust um so rust gives you all the primitives for for writing asynchronous code it gives you futures to write something that is executed at some point in time and returns a value it gives you async and the weight to have syntax sugar to work conveniently with asynchronous code but it doesn't give you a runtime to execute it the reason is rust wants you to choose the one that's best for your system you could write asynchronous code for an embedded system and you wouldn't might want not to use tokyo for that because tokyo might not be the right choice for that maybe it's async standard um or maybe you have different concerns uh um that that you need to take into account and for that you have to choose your random and the runtime is the one thing to take that takes all your futures all the asynchronous bits and pieces and executes it on the thread or multiple thread depending um on your flavor so tokyo is not only a runtime but tokyo has also an entire set of tools it includes everything from low level i o up to to type system based service modeling and anything in between so we have mile that's that's short for metal i o and it's some sort of abstraction layer on top of operating system events like oh wow there's a socket done please do something dear the application that's coming from my it normalizes the i o across all operating systems for apps that you build on top of it then we have to tokyo runtime which is the heart of tokyo it's the synchronous runtime it implements work stealing if it's multi-threaded so you have a queue of of tasks task is a unit of process in tokyo land and you have a couple of threads that spawn workers which can work on those tasks and they just pick a task from a queue execute it if there's something to do or put it back on the queue um and produce results so that's that's what um tokyo does it also comes with the rich standard library they have async file as inc networker timer schedules all the chairs that's all there in the tokyo around them and and this is this is already great and um if you're at that level you can you can pick and choose whichever runtime that you like there plenty out there i took it might be one of the most popular but there's maybe others that are better suited for your needs um and then um there's even stuff on top of that and that's a couple of layers first is hyper which is an hdb implementation a correct http implementation for rust which implements http one and two for both client and severance which is basically that the framework basis for anything x and rocket warp actives you name it that's all based on this package we have the same thing for uh for trpc it's called tonic um it's also a layer on top of tokyo which implements the grpc protocol and you can work with that and all those um protocol implementations get around the idea of a service which is done with power so think of it as a middleware layer where you have different middlewares different layers on top of each other you can build a tower and then you run a request through this target down to the bottom you get a response you run it up again and then you can modify requests and responses like oh there's there's an http response which i want to cheat it then you run through the cheesy player maybe you want to have a shot token validation then you're onto the top layer or maybe you don't want to take this to take really long let's have a timer player so you can kill some of those requests if it's not worth working with the nmr so this is this is tower and all those things build up uh all the things that we see then are building up upon tower um we also have a couple of um packages that help us with the whole thing bytes is basically um a synchronous reference count the mutex on u8 that's cool if you want to want to manipulate byte arrays across threads and tracing if you want to monitor if you want to send your data into data talk or dynatrix or whatever so that's that's this part um well and on top of that we also have frameworks and frameworks like exxon which is pretty new which is from the tokyo team themselves it's also in the tokyo organization on github um and warp which is one of the crew one of the tokyo maintainers one of the hype maintainers from sean mcarthur um which i guess that's a pretty wild guess so sean if you if you can see this and if i'm totally wrong i'm sorry um a warp is i guess um inspired from a haskell framework that's that same similarity and you can see that if we if you look into an example um it's very functional and very um compositional uh and filter-based and uh i think it's beautiful so i liked working with that with that a lot um but yeah so those are a couple of frameworks but there are others and and you can see tokyo basically basically anywhere so one thing where i had to work a lot with tokyo is deno um um i'm using denoin in a couple of projects not just as a runtime that i can install as a binary but i use the runtime itself to work on a couple of javascript projects uh more than 5 500 crates depend on tokyo and that's great it's not application so it's really really popular all right but what we're going to see warp just in a bit um what tokyo also gives us um and that's that's very um um inspired i guess by by go and how gold works in terms of communicating with go routines which is their version of executing a synchronous code is channels channels to share state and i want to quote rob pike who said if you run concurrent code if you run things concurrently don't communicate by sharing memory like a mutex like an rw lock or something share memory by communicating so you get the couple of channels where you can send messages through and you can pick it up from your other tasks you have those independent units of executing things and you send data in between them and geography there are a couple of things a couple of implementations integrate there's npsc which stands for multi-producers single consumer so you have a lot of producers many taskers can create value and have one produce one consumer that gets them one shot which is just sending one value once and then it's done which is great for communicating back so you can for example use um this sent the sender pattern where you send um something to your consumer and you get the sender where you can send back to the producer so you can do this back and forth which is great if you have some architecture where you have the web player completely separated from everything else this is where i use that a lot then you can communicate between two parts of your architecture without them even knowing from each other it's just message part watching brokers is multiple reducer multiple consumer with with a bit of different flavors so um there's a lot of stuff stuff in there so the tasks operate independently and you can send messages in between to synchronize um and yeah channel support sending a message from one producer task to one or more consumer tasks one that we are going to see right now in an implementation this is also available in the standard library in standard sync and psc is multi-producer multi-consumer send work to a task and receive the results of many computations multi-producers single consumer channels have a capacity but they're also unbounded variants so this is this is the idea of many producers you send it to one one consumer already and you know those are many producers but this also could be just one producer so we have also one to one but with multiple computations um here's a little example so i have i hope you can see my cursor because i don't have those fancy errors to draw to draw on the screen but here i have an async function main um i create a new tcp stream i connect to some some server and then i oh forget this line this um this is not so important this is more important i create the multi-produce a single consumer channel with a capacity of 100 okay and then i create 10 tokyo tasks with tokyo.spawn and i do that from from 0 to 10 so 10 tasks in total i clone the producer and a clone from a producer just creates another producer it doesn't create another channel it just creates another producer which means i have many producers that talk to one receiver but i need to clone that so i can move that into this async move here so this uh this async block which is a future so if i write async move here it creates a future that's being executed in the tokyo task that took his wonders and once this essence once the future is executed i can send some data to this channel and down there i'm going to receive it so this is also pretty nice this is a tokyo stream so this is some sort of asynchronous iterator and even if you look at the trade definition it's also very very similar to an iterator which is that you have an option that returns an item but it's also something that's packed in the future so you need to wait for it uh so you can use it like an iterator but it's asynchronous so it runs somewhere in this whole task execution um on tokyo um and yes if i get some some data here i write everything out and one thing that is important um this receiver returns none so it's an option either i have a value some value or have no value at all and if i have no value it returns num but it only returns num if all the producers have been dropped and this happens you know if this block here ends from the async move block if this move block ends tx gets dropped because hey freeing memory that's the ownership model of rust but the original producer is still available that's why i need to drop it manually in this case so i need to drop that so the rx.receive returns none at some point and i can stop the execution of my program so this is one of those channel types that are available in tokyo and this is also the one that i want to focus on today so there are a couple of more and there's great documentation on all of them um but but this is one that's interesting for uh what for that what we are doing just in a second all righty and what i want to do is having a simple web social chat and this is a very very stupid architecture diagram i have a client that's a browser up here and i create a web socket connection to some server so this is this is the world simple for server which is this cylinder i i don't know why but that's that's the the world superfood server and why is it for a database i'm sorry i i guess i totally forgot but i'm creating um i'm creating a websocket connection which is i have something to receive that's um rx is for um receiver tx or tx is for transmitter these are terms that come from um from broadcasting actually so from radio broadcasting you have a transmitter you have a receiver and and the the abbreviations for that were tx and rx and we carry uh them until today even if we do either sockets or just channels or whatnot so i have a user rx which is a receiver so i get messages in from the browser this is the one socket connection and i have tx where i can send back to the web server and i do this for every for for every client connection for every browser connection that establishes a websocket to to the server once i get a message what i do is i send this message to um every user and i create an mpsc channel for every user so um every user has the possibility to to send messages two to two one or more receivers to one receiver sorry to one with zero so a broadcast this to every tx that i have for every user that is connected this message goes out through the channel and it comes in on whatever receiver that is and it goes back to to the user tx to the user transmitter so this is this is the sort of round tip that i do this is everything that i do with the websocket so i establish a connection and then using channels so i can communicate between all those connections that i have by keeping track of all the centers and sending out stuff alrighty and this is this is what we're going to see right now let's remove my presentation and give me a second let's go to visual studio code and that's the websocket administration itself that's that's warp um and as you can see it's very busy it's very very busy because the moment you want to abstract a lot of stuff with um for web servers or whatnot you end up with a lot of types and if you end up with a lot of types you get type annotations the size of war and peace or something so so really really huge um nonetheless um for for for right now so that you can scan this and see what's happening there uh let's remove the inlay hints and if you learned one thing today that it's that you can remove the inlay hints it's a little bit less busy less busy to look at okay okay so what i'm doing here i create i create a new work server and what this is funny because you are connecting bits and pieces um to to create routes and to create callbacks for that or stuff that needs to be executed and it's then uh and or or so it's just connecting little filters together and to have a big filter tree and when a request comes in it gets filtered through so it finds the right and the last stuff to execute and this is very functional this is a very heschlish if you will um but it's also it's also a lot of fun fun to use so especially if you if you have have a little love for for functional programming all righty so let's start with something easy um what one did i do which is great for starting a web server is having having a serving a static directory a directory with static files in it and i create a new binding where i say okay this is warped colon colon effects current colon directory server everything from this folder if it's an html if you have an html if it's a css file serving css file um and that's about it so that's that's the easiest that you can do here i have a route that does some some some more interesting stuff so i create a path that's called hello which means i can do slash get get hello so this is a route that i'm creating uh for that and you say okay you have an optional argument an optional parameter and i create this parameter up there and say okay um hey i want to have a string parameter and um if it is some um return it if it isn't some return at least nancy i get an option an option back um an option which i don't have a value i have no video and this is an optional parameter how i declare it in warp and so i can say okay for this hello path have this optional argument have an optional parameter there where you can send you know something like that or or stefan or whatnot and i also say okay after this let the path end so so it's not possible to write slash hello slash verb slash something something but this is not a valid route i let the path end after an optional argument and this is this is one filter that i create for this route and then i say okay everything that comes into that filter it's mapped to something that i want to want to execute and what do i get i write the closure here um we heard a lot about closures today i guess so so you know what what this does i get an optional string in there and let's say okay um say hello to the name um unwrapped or else several or world that should work too um i don't know why i did it that way but but this is how i create a lever out with an optional argument which is either hello world or hello name depending on what you do and this is how you how you plug everything together um here that's also interesting i have a 404 route which means that for any else so this is every other request that probably so this is a filter that accepts it create the new response where okay my status code is not found 404 and the body is just this four or four html file and this is also a string that a function fs read to string which is also something that could error so i expect it to work and if this file isn't here i get a 4 for my 404 and this is a very very very bad error message but i was lazy i guess so result of 404 i have some files i have the hello route and now i combine them so i say okay um wait for the chat i'm going to talk to that in a little bit so i say hello or files or four or four so those are this is that the order um that of priorities i say okay hey the hello route goes first if i have anything there that's my dynamic api execute this if this gets filtered out so it's not part of this select the file if this file isn't available go to the 404 and that's how i concatenate all those to speak to each other but i also have this chat here and this chat is funny um this is the chat route so i say um for the web socket um for the web soccer dot for the path at ws try to create the websocket outfit try to do a web socket upgrade um i don't know if you know how web circuits work but web sockets is basically um talking over http if they speak http and then sending this this upgrade header and say hey do you speak web circuit and then you get either okay back or or another okay back and say yeah i speak websocket then say okay let's upgrade our existing http connection to a web socket connection and now we can send back and forth packages that's what it is basically it is a little protocol on top of tcp then uh the tcp connection stands open you can send packages back and forth and the browser knows how to integrate them to see everyone knows how to interpret them that's what it does what i also doing that's also funny i add some users there and users um is this here is this little data structure that i have here um uses default that's a very simple data structure it's basically an atomic reference count on a read w log on the hashmap where i store multiproducer single consumer senders that's that's a huge type basically it's a hatch map of of channel senders that the store with a user id and they just created the little type so it's easier to manage for me and what i also do through some chatting i say okay if if users um i want to create the filter out of it a filter that i can use with what which means i say hey for anything that bubbles through here mapbit2 um uses.clone since it's an atomic reference counter i get another atomic reference that i can work with and suddenly i have here in my chat users available as an object so i can this is how i map data structures into this filter chain of workbench just bubbles through and then i can use the data in there and this is what i do so what i get is i get first i get a websocket a plugin from warp i get the users to say if this web socket upgrade is successful so on upgrade i have a i have an actual websocket and then i want to call this connect function with the socket and with my users so this is what's going to happen here that's that's a mouthful that's a lot um and this connect function this is what i'm going to implement you with you right now so here again combining everything then i'm serving the routes i bind it on the socket address and i wait for that and here oh there's this to do function so this is now something that i have to do with you um already so let's toggle the inlay hints again because i need them in developing the first thing that i do i create the new user id um that mighty i have one thing up there that's the next user id which is an atomic use size which means it's a thread safe use size that starts at one and i can increment it across threads and across tokyo tasks of course so i say my id is next user id um and then i have something like fetch ad that say okay um um basically um take this one and add one twit and how do you add one to it i don't care about race conditions or whatnot sha um do it in a relaxed way so increment whenever you can because hey everything that i want to get out is the next user id i don't care if somebody is there in front of me absolutely not um something that i i need to do so let's print a print out that i have um welcome user that i have a new user all righty what i do then so so i have an i have a websocket connection established that's great sorry it's my id my id and uh here i say it's bordering sorry better okay so i have a websocket established now i want to get the transmitter and the receiver so i want to get something where i can read from i get onto something that i can write to um and i call it like in my architecture diagram use a tx and i need a mutable user rx which i do from websocket.split and this is splitting is always always pretty funny because um if i go to the definition down there it's split and then i have here split sorry split again split split and again so basically long story short it's still one connection but i get a reading reference and a writing reference to the same socket that i can use differently and i need to use them differently so i can move those parts safely across threads across tasks etc so this is the idea why i want to call split there but i don't want to use the same socket for reading a writing because i want to do different tasks with so that's the connection something to read from something to write from between browser and server so and what i want to store is i want to then use channels so i can communicate between all the existing users that are there so i create something for the users to communicate so that's px and rx and here i create would you produce a single consumer um let's create a number on the channel i don't care about bounce in in that case so if you get um too many too many requests it errors that's that's bad but usually for that what i'm doing here that's okay but you also see here the types are unknown um typescript sorry typescript rust that's a thrill rust can't infer the type yet but it will when we move forward so bear with me and what i also create so so this is a sender and a receiver for the channel so i can sense the file tickets gets received to the other side and this is the way how i want to communicate between between channels um what i do with rx because um i get a couple of convenience functions with it i say um let's create an unbounded receiver stream um out of that receiver stream and now i have an unbounded receiver stream where i can simply um use it to to forward everything that comes in to something else and this is what i'm doing um right right now so i create a new task um i spawn a new task so this is the one thing that needs to be as synchronous this is every every time something happens i want um i want to forward everything that comes in front so these are the messages that come from others and i want to send it out to the websocket to the browser so this is what i'm doing here say forward everything um through user tx so everything that every message that comes in from my internal users um send it out to the web socket to the browser this is what i'm doing here and suddenly i also get type annotations right so now it's a message etc so this is this is what we get here one thing that i need to do as well i want to add the new user so i have created the user and this is why i want to store them and what i'm storing here is say users.right i get right access weight it's in a synchronous operation because i'm using um why is it in a synchronous operation because i'm using the tokusync rw log and not the one from um from the standard library this is because those are w log right guards are not thread safe they can be threaded if you know how tokyo shuffles or async shuffles those tasks in between across the weight blocks but that's a lot of stuff for you to do on your own that's why tokyo provides you with an abstraction layer on top of it so you can use it easily within your tokyo applications and you can have everything like um mutex as well or tcp listeners or file listeners there's always a variant of that that works really well with the runtime also with the same api which is which is not the only thing that you need to do is a wait for it and once you get it let's insert uh based on the idea that you have this one sender so what i'm doing here i store all the senders all the transmitters for all my users in this bookkeeping data structure that's what i do here i'm not using a mutex because mutex means it's it's locked for reading and writing and um rw log means once i i lock it um others can still read which is which is interesting for broadcasting later on so it's a little bit faster in certain scenarios okay let's do that now i have this here as well okey-dokey so now i created that connection and i said okay um everything that um if i have a message internally to broadcast send it out to to the websocket uh store the sender and now i need to use this and then i need to broadcast stuff and this is done by listening to messages from the websocket incoming let's say i have some result um which is user rx dot next so it's an is it stream it's an executive stream i get um little messages in i wait them i wait for them and want to do that um let's broadcast this method so i create a little function broadcast it's an async function broadcast message um the message is of type message that's what comes in from the websocket so you see warp filters wsws for apps okay that's a website message that's great and i want to have a reference on my users and then i say okay for if uh let sorry if let okay um string message or sdr message i don't i don't need that i guess do i need that i don't know why i need that um i guess this is a this is operation that could fail but i want to have it because i want to send strings i want to check if this is a string message i guess this is what i want to do it's been a while so if this is a string message let's get all my users from um the user data structure and it's again it's a alternating stream so i have some sort of iterator no no it's an actual iterator because i'm now accessing accessing the hashnet so i'm reading the rw log i'm accessing the hashtag now i have an actual iterator and it's okay for every sender that i have here send send a clone of the message and failed to send message so this should be it that's the broadcast message message stuff that's great um and here i say okay broadcast message um and the message is result and unwrap it so expect um fail to fetch message that's also a result type already okay okay that looks good i'm already broadcasting oh i have some troubles here i guess you need to have a semicolon in there now we are good um this one okay that's an asynchronous function so i definitely need to wait thanks to robert analyzer for letting me know okay um now for the last part um this is something that i don't need um i want to also write the disconnect message so if something if one of the users is run away i want to disconnect them disconnect the user id from users you can say okay first of all print line by user id there you go and save from users users of blank so you can have it centered in your screen for users write a weight boof id because it's a hashtag i can just call remove there oh that's a that has been something uh where they could be disconnect now so how do i say that that disconnect news and this is this is great this is great because we have here an asynchronous stream and this one returns either result or it returns none and when does it return none it returns none the moment the connection drops the moment the connection drops this next returns a num which means this whole while block um ends i don't get any messages anymore i can safely disconnect that's it i can safely disconnect my id from the users and i of course need to await that too because it is an asynchronous function and what i like especially about this is that first um keeping um bets establishing a connection reading and broadcasting broadcasting messages disconnected that's sequential that's that's a four step solution to having um a web sub gate application and the only thing that they need to do as strengtheners is this particular and let's say okay everything that comes in from rx forwarded to usage i don't want to to have that in my synchronous code right uh particulars in my sequential code i want to have that somewhere else this is a standard task work on that just forward that i don't care the rest is bookkeeping establishing a connection reading and broadcasting messages and disconnect in the end and this is the only thing that i need to put aside this only little um token spawn of course i have lots of awaits in there which means there's lots of little little tasks because basically every await this is some sort of checkpoint for for us to say okay this is one package one part in this future state machine that i can execute independently this is with every await block that there is but everything that i need to shop aside which i need to to not have in my sequential flow of code is this line that's everything this couple of live coding let's see if it works i don't know um the old one worked look at that um there you go let's see cargo run there's the trees okay i have around here building building building in the meantime i'm opening up another connection and what i'm also opening up is for you to look at um that's that's an index html file it does nothing but loading a main charge file the main.js file is is very very simple it creates a new web socket connection that's that um it opens it that's great it's connected on message it says okay um receive the data and put it in some some text content and on send it send some some data that's that's order is it's it's nothing nothing fancy at all just getting messages sending messages um not even jquery in there because i yeah clogged it together really really quick okay back to the terminal okay i have a running server at this world so if i say um girl localhost 8080 hello then i get hello world if i do it with hello rust lintz i get hello wrestling so that's great that already works now let's see what happens if i want to curl localhost aesthetic file that's great you can see that i tried it with hectics first now i switched to a warp that looked a little better so let's do uh even more stuff let's open up a browser let's go to localhost 8080. say hello oh i didn't implement return sits and hello cool oh that's great i just got everything that came up from here down there that's super great um why a server round trip that's easier but it's even better if i create another browser window down there and say how are you if i click here let's see hello it's here as well um uh i'm great i just put it a web server a web socket server live on stage at roslands go off alrighty and that's about it that's the demo so um in the end one one advice that they can give you if you want to develop a websocket server on your own that's a very basic demo of establishing that um tokyo gives you a lot of stuff to wire everything together to plug something in and and send messages via channels and opening connections so this is all part of tokyo everything that happened after ws after i had this websocket connection established is pure tokyo it's not part of the framework and this is also the reason why i was able to do that because those frameworks like like warp and exim they build on top of those tokyo primitives which means everything that they can't provide on their own you can do yourself with those tokyo libraries if you have a web server like acting so if you have a web server like like rocket um a framework like that stuff like that becomes in my experience increasingly harder because they are opinionated they are opinionated which also means that web sockets need to work according to their opinion opinion which is great you know you want to have an opinionated server because it moves you it moves you um fast it it brings you up to speed but also um you need to wait until they decided on their opinion on those uh topics um i guess dev sockets is on the roadmap for rocket which is great i'm looking forward to that i'm really looking forward to that how they how they solve it because it's um i think this is a challenge um i tried to implement it with hectics and needed to work with the actor model which was way too tough for me to understand um but i think it's doable there as well there are some tutorials out there i found this to be the easiest version and i think if you use axon which is um a declarative web framework from the tokyo folks um you end up at the same point where you suddenly get all those primitives in your hand where you can create channels where you can create sockets and you can split them and you can write them and you can read them and then you can do all the message passing around it so that was my talk you
Info
Channel: Rust
Views: 8,517
Rating: undefined out of 5
Keywords: rust-lang, rust, rustlang
Id: fuiFycJpCBw
Channel Id: undefined
Length: 40min 39sec (2439 seconds)
Published: Fri Oct 29 2021
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.