Working with Channels in .NET

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hey today on the on.net show we're gonna be talking about system threading channels with Steve and Taub come check it out hey welcome to another episode of the on.net show today we're going to be talking about system dot threading channels with Steve and Taub hey Steven rich yeah it's been a while since you've been on the show it's been a long time so we're gonna be talking with channels is a feature that you know you built probably in concert with some other folks and what first another product did this for ship with it was it's available as a new get packaged it shipped around neck core to one ish okay and then it's also in the Box in dead core yeah cool yeah and so out to the you know you know 10,000 foot level or one kilometer level what would what would you say this is for sure so a channel is it's actually a very simple mechanism and the whole purpose of it is to hand off data from one party to another if you think about a collection like list of tea or you know queue or stack or whatever you put data in you take data out and the only thing effectively the channel layers on top of that is the synchronization and the coordination necessary to hand that data from one party to another it is a producer-consumer collection essentially in fact you know if you're open to it the easiest way to demonstrate it might be actually just implement like a ten line one quickly yeah all right more than open to it that can sort of set the the understanding for what this is and then go from there yeah gram so I'm not yet you know I brought in the system turning channels namespace but I'm not yet gonna use it yeah commitment and that's why it's great aspects great apps we're gonna implement our own so let's say I just define a type called my channel we need basically two things we need the storage and the coordination right and then right now you're you're not channelized this is just a poco it's just yep so I'm gonna add a type I'm gonna add a a member here this is a concurrent cube so this is a queue that I can access for multiple threats concurrently right so this is basically your backing store exactly good and current queue and then I'm gonna add the synchronization to coordinate the interaction between the two players concurrent queue was thread safe but you have no idea when I add something to the queue you don't know when it's there so you need feel to be notified so I'm gonna add a semaphore slim mm-hm and now we just need the we're gonna add two methods to operate on this we're gonna add a method to write and a method to read so I'm gonna add a a write method that takes an item and because I've sort of implemented this in an unbounded fashion there's no limit to how much I want to be able to put in here my write method is just a synchronous method a space synchronous method because it's always going to succeed subject to memory limitations yeah we're not worried about that we're not worried about that so so then my but my read method is going to return a task of T and it's going to be ASIC because we might come and it might not be might not be anything in this implement in the collection so we have to wait and we want to wait without blocking the current thread I see so that's a policy that you don't return a bool for example you you basically say I will always return something in an absence of something I will wait till it's a valence I and that's a choice that I'm making right now right so it doesn't have to be that way that doesn't have to be that way and in fact we'll see what the actual channels do you have the option of your consumption model right and and you didn't I think explicitly say this but I think what you're doing right now is you're doing this is the homegrown version yep that you're doing so this isn't people at home that want to adopt channels shouldn't actually be right not in write this code using this to sort of exemplify the core concept that is a channel the implementation that we have in net effectively does the same thing but in a more efficient more robust more functionality matter and way more lines of code and way more lines of code yeah yeah so now our our implementation is going to just require basically four lines of code so two right we need to put the data into the queue so we add it and then we need to notify anyone who's waiting that there's something here and to do that we'll release the sum on the semaphore which basically says if anyone's waiting on this there's something there's something that you can go check out and now help me understand I'm actually so I definitely have heard about some afore Celyn many times I actually haven't used it I've used things like monitor so what is before you hit release what was the state that it was in before so light is that the busy that's what the zero is here so the semaphore has an initial count and that basically says if you think of it as a box of keys if you want to be able to use the bathroom that requires the key right you need to grab a key out of the box if there are no keys in the box you have to wait for the key before you get it to go and use the bathroom by creating the semaphore with zero he's in it if anyone walks up to that box they're gonna have to sit there and twiddle their thumbs waiting for a key and that's kind of the relay sync thing and that's the relays thing with the and the release is putting a key into the box oh ok on the read side to wait for a key so I'm gonna say a wait oh and then wait a sync at the moment I get past here I now have a key in my hand at which point I know that because the only reason the key was put into the box is because there was something put into the queue I can do Q dot tried EQ out t item and return right so is it fair to say this is kind of almost the opposite of like the you know add ref release kind of like in that type of release it's like I'm basically saying I'm like giving up this memory and this in this wording it makes it sound like I am giving something I'm releasing this to the semaphore slim for it to represent it it's exactly the same same concept yeah yeah so and that's it we've got our we've got our channel so now we can use this thing so up here I'm going to create a producer of data so I'm going to spawn a task that's just gonna sit in a infinite loop it's going to wait for say a second and then it's going to put some data in to my channel so here I'll say new my channel int and I will do C dot right of mine I and so I'm just sitting here an infinite loop predated once a second it's going to a denied and then down here I'm gonna sit in the loop and I'm going to say console dot write line a wait C dot read async to read out wait for data to be available and when it is read it up right make sounds and and should I think it was test out run as running on another thread you can think of it that way sonic is more pretty different way have you introduced ed that this mechanism that you've just created is thread safe should we think of this particular code as requiring that thread safety or this does it does require that thread safety yeah so if you forget the async for a moment we could do this without async if we didn't have all this async stuff in here we spawn a task the whole body of this task start run is gonna be sitting on some thread pool thread mm-hmm either blocking waiting for a period of time or accessing the channel to put data into it and concurrently we'd be sitting here in our loop pulling data out of the channel so they be concurrently out yeah exactly the the only hesitancy with your comment about the threads is because we are using async the only time we're actually executing on a thread is the this period of time will recall right loop around and then hit the task dot delight once we hit the test dot delay we then get off that thread and we're in a call back model that the compiler is sort of you know the Nira so we can run this and we will see that once a second it starts yeah the producer wakes up spits out a number the consumer wakes up reads it and and off we go right and because there actually I don't remember what the the console.writeline was representing oh it's just it's just taking the data account yes okay so it's this guy is just putting data in and he's writing out whatever he I see now I understand yep yeah you were writing in I exactly that's why it's going up that's why it's going up yes yeah and so you know fundamentally that's what a channel is now system to writing channels means we don't actually have to write this my channel class so I'm just gonna go down here and delete this I've got my namespace here and instead of saying new my channel I'm going to say channel dot create unbounded we created an unbounded channel before meaning there's no limit on how many things I could put in there's basically policy free exactly and then there isn't actually a right method but there is a writer dot try right and there isn't actually or read async but there is a reader dot Reed async and what's what's the try guarding for in this particular case so in this particular case I don't need to do anything with the try because the write will always complete it'll always return it true I see it's because it's the same API for both bounded and unbounded exams there are many different possible channels in concrete implementations of a channel one might have and so to be accommodating there is a try right method we could have also had said oh wait C writer dot write async and now just like the reader waiting until something is available the writer will wait till there space available in the case of the unbounded collection this will complete synchronously in the case of a bounded collection this will wait until there space available for the data to be written I see is that where we get into tasks some value tasks it is and well one more question which is in the try pattern we often have this like you know it returns a bool and that kind of thing that exists on the reader side so we are in the writer side there's nothing to return but on the reader side you can say try read and it's exactly what you just said it returns a bool and it Out's the it I see so why am i confused so I wasn't there a try there before oh yeah I changed it just as part of the examples sorry it's try right okay that's why I got confused yeah so does that that returns void this returns bool okay but there's no out but it's it's sort of the corollary with the try read which has now totally go yeah and so this is this is fundamentally the channels we can run you know it's do the exact same thing we did earlier and we're getting the exact same output we saw earlier because it's doing fundamentally the same thing now this might not be a useful question but is there any value so you showed us the code before is super simple yes straightforward is there any value for this super simple scenario that you've got by moving to channels other than that you didn't have to write and maintain that code there is so first if the right to maintain the code yeah which is a big one second the implementation here is much more efficient than the one that I showed earlier okay and in particular the right side the the writing that I showed earlier was was fine but the reading that I showed earlier it was going to be allocating a new task for every piece of data that handed back even if the data was already available mmm because we don't store four billion possible tasks for every possible integer in the system so even if the data is available synchronously for the most part you end up creating a new task to hand back that data this implementation if you can read the fine print doesn't return a task it returns a value task and this not only allows for avoiding allocations when completing synchronously but the a lot of the code that you mentioned going at channels is this will avoid allocations even when completing asynchronously so you can do this producer/consumer model and get a very efficient very low allocation handoff by using this library this library then also then allows you to use the same kind of consumption code or you know using the API while changing the semantics of the employed channel so here instead I could say I want to create a bounded one and I want to only allow one item at a time and now when this I'm going to put this back to write async when this tries to write if the consumer hasn't yet picked up the data it will wait for space to be avail so now I if I wanted to I could put a task dot to lie here an even longer one and previously without this being bounded this cut crew so we just kept filling up the collection there we know back pressure now there will be back pressure so until the reader picks off the next item this will end up waiting for space to be available and we can run that as well and we'll see now that you know every two seconds we'll see the next number printed out and we're not growing our memory significantly right so I definitely have heard people talking about back pressure before especially when you guys that pressure is just the idea that if you have a producer that's running full speed ahead they're generating a lot of stuff yeah and you're not consuming it as very quickly it's gonna build up a whole lot of stuff and you want to slow me down yeah exactly so do you think or is it fair to say that this system has some useful back pressure back pressure kind of strategies with it such that I guess you don't necessarily have to implement those other than use these and is there a point where there are some really sophisticated back pressure strategies that you're aware of that then people could build on top of this sure so this has some pretty simplistic bounding strategies employed you know right out of the box with this create bounded method you can specify the the number of items that you want to allow to be stored and you can specify with some options various things including what do you want to have happy happen when it's full there are various policies you can imagine being employed one is if it's fully I just want you to wait another is if it's full I want you to drop one of the items that's already in there and if you drop one of the items in there which one do you want to drop the oldest you want to drop your newest do you want to pretend that I took what you gave me and just drop it on the floor immediately so those sorts of policies are built in but there are certainly more complicated ones one might like for example let's say we just did the simple I want to only allow 10 items into this collection we fill it up now we're waiting now if a space becomes available we could wake up immediately and that's what we will do or we could say you know what that's kind of an efficient because we're gonna be ping-pong and back and forth between awake and sleep and a wake and sleep so maybe what we want to do is actually wait for sort of a low-water mark wait if we hit the bounds all right so then wait for five items to be taken out and only then wake up so that we can refill all five and then go back to sleep yeah that policy isn't currently built-in but we could add it in the future or because this is just returning channel of tea which is an abstract class so we can build their own yeah it's funny this reminds me a fair bit of there was a developer who built these tasks Combinator's that offer different policies you might know this developer maybe yeah it's it's reminiscent of that because it's basically this I guess low-level architecture for lack of a better term but then it's like there's a way that you want to interact with the things in the system and there's like affordance for both policies that come with the system that you can use and also once you can build on top exactly it reminds me of test when all or wait'll when all but always yeah reminds me of that the you know the the various modes of consumption that you talked about it so we tried to design the channel API to be very simple but accommodate the various schemes that someone might want so if you look at what ap eyes are actually available from the writer there are basically loops there are basically three primary ways of getting data into the writer you can try right which we saw earlier that would say that's a synchronous that is synchronous it will either complete immediately and return true or complete immediately and return false and not have added the data you can write async which means we're gonna wait until the data is accepted by the channel or you can actually tell the channel to notify you when space is available so we can say oh wait wait to write async and we will then be able to proceed when space is available now if you had multiple writers writing to the channel you might have a race condition that you might both wake up they might beat you to it and then you have to wait again but you end up with typical loops like you can say while oh wait seedot writer dot wait to write async if C dot you know writer dot try right do whatever otherwise loop back around and you know try and write again make sense similarly on the consumer side on the consumer side there are various modes of interacting with the reader we can you know and there's there parallels here we can try read we can wait to read async and we can read async also with the new I a sink innumerable support that we added in dotnet or three oh you can actually just use language syntax and the await for each construct to loop over all the data items that are coming into the channel so I can say rather than this while true loop here I can say a weight for each int item in ctrader dot read all async and now I am just sitting in a weight loop here pulling the date next item out of the collection I don't have to write my while loop I don't have to say we don't read you know read async it's just giving me the next item each time I say and that make sense um you know you know enumerables sort of naturally have an end right they've you get all the data and then there's nothing else and so we want to enable this loop to wake up when there's no more data when the writer says you know what I'm done so the writer side let's say we had I is less than 10 the writer a weight seed writer write async I and just yeah we just can make this little bit faster and then when he's done the writer is going to say writer dot complete which tells the channel no more data is going to be added to you if anyone's waiting for data let them know that it's never going to arrive right so then to move next pool false is what eggs at that point and now here we'll say console.writeline done I got it and when we run this we quickly up with the ten items and we're done sweet and you can do the same thing without the we'd all async you can say that's a nice system yeah you can say you know while a weight C dot reader dot move next that wait for next rather and then console uh I'll just do ya console dot write line oh wait C dot reader read async we could do that too and because there's only one consumer right now I don't have to worry about you know someone else taking my my data there are various modes of interacting this but your original point they all stand back to you like what are the interaction models you expect with the channel what kinds of channels do expect to be using and we try to provide api's that allow you to interact with it in a way that meets your needs in the example we did one one producer one consumer it sounds like you can have an unbounded number of votes you can you can have any number of each and the built in channels are designed by default to accommodate any number of each however if you know that you only have a certain mode of operating in in order to support any number of each we have to incur some additional expense I see so if you know that you don't need to deal with that like you maybe all are you gonna have one reader or one writer you can set properties on this and then we'll change the implementation under the covers to provide a more efficient one for that scenario yes so there's a little bit like creating an array and setting the capacity upfront or sorry because not an array a collection kind of you're basically not analog but you're you're making promises about how you're gonna interact with it and I can take those promises and then do better things as a result yeah okay so next question is there's two kind of features that kind of got shipped from our team at approximately the same time which were channels and pipelines abstractly they seem to describe something very similar you know channels and pipelines are kind of both like cylinders to different games yes what is kind of like the the merits of one and the merits of the other yep pipelines is all about IO and if that is its bread and butter and it's focused specifically on streams of bytes it's much more aligned with system iostream and it's very much focused on scenarios where you say read a buffer of data off the network process that buffer in some way potentially just may be parsing off some frame header data and then handing that buffer along to the next the next stage in the pipeline essentially channels is not focused on bytes it's not necessarily limited to i/o in fact some of the initial uses of it aren't at all in the area of i/o it's really about arbitrary handoff of data between two parties in the same process it could be in rosslyn for example they effectively use a channel between in their compilation server a request arrives and they want to hand that off to the thing to actually do the compilation we can use it in our HTTP 2 implementation when we want to kind of a request comes in and we want to hand that off to the main central processing unit to say hey a request has arrived like add this to your information it's it's focused on any arbitrary handoff of data between two parties makes sense now it everything you just you showed us it has been was in process yes is there any place where you can use channels for cross across process coordination you absolutely could the core abstraction of channel I mean if we just look at the the definition here it is just the class that exposes these you know channel of T types this is an abstract class that exposes a reader and a writer the reader and the writer are themselves abstract classes there's nothing in the abstraction itself that ties it to being in process so anyone could implement these abstractions to go across any kind of inner process cross machine cross Network boundary in the Box these create you know on our static factory methods they're only in process it's like saying you know new list of te or such a collection okay so one of the models for programming a little bit more in the past was pub/sub you were talking about publisher consumer which is it's somewhat similar but Pub something I think is slightly different could you use channels instead of pub/sub or would you see this as being something underneath pub/sub you could use it as an abstraction on top of another pub/sub implementation and you could use it inside of a pub sub the implementation but because there isn't there's a lot of other machinery that goes into pub sub typically you know storing the information for later in some persistent database or whatever it may be you would you it wouldn't just be like Papa channel in there and you know be done there's be a lot more implementation around it but you could absolutely use it as part of it and you could use it on top of it okay I guess maybe two more questions one is is there anything more coming in this domain or where we're pretty happy with how channels is and also pipelines at this point I mean I think at the moment we're pretty happy with the core abstraction we welcome however a feedback on new AP is that would be useful to people there are a couple suggestions currently pending in the dotnet runtime repo we've added one or two additional methods in this release one of the top pieces of feedback we got was people really wanted the ability to just inspect how many items were in there generally for telemetry and so we added account property um we're happy to do things like that there's a pending request for should we add a try peek and we can certainly look at adding all kinds of functionality but we want to be thoughtful about it in particular because it is an abstraction we want to make sure it plays nicely with all potential uses make sense where should I look if I actually want to use this so I'm convinced okay you know you said this is in the product now so it's definitely in 3-1 yep so you can go to if you want to look at the source you know go right to the source it's in the dotnet runtime repo and there is a new get package if you want to use it without a framework applications or wherever if you say file new project and dotnet core you'll have it available to you immediately it'll just be one of the things that's automatically there because it's in the core framework the shared framework and I actually wrote a blog post about it in november/december so if you want to go up to the dotnet blog there's a lot of this stuff that we've just discussed shows up there as well if you want to recap awesome well this has been a pleasure as always yeah it's fun doing this when you're in town thank you okay well folks that was a pretty deep discussion of system threading channels from the channels expert Steve and taupe thank you so much [Music] you [Music]
Info
Channel: dotNET
Views: 16,224
Rating: 4.97193 out of 5
Keywords: .NET, channels, threading, .net core, dotnet, dotnetcore, concurrency, backpressure, back pressure, programming, runtime, Microsoft, distributed systems, azure, cloud, queue
Id: gT06qvQLtJ0
Channel Id: undefined
Length: 27min 41sec (1661 seconds)
Published: Tue Apr 21 2020
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.