.NET System.Threading.Channels Primer / Walkthrough

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments

Awesome voice. Awesome explanation.

TL;DR:

OP explains with the smoothest voice in tech to come along in ages about the System.Threading.Channels.Channel<T> class. This is an asynchronous Collection that allows asynchronous production and consumption (pub/sub like), part of .net since 1.1. This is the simplest copy paste example in the video (screenshot). The second part is dedicated to using the awesome Open.ChannelExtensions that allow using a fluent/LINQ like interface to asynchronous collections.

👍︎︎ 5 👤︎︎ u/keesbeemsterkaas 📅︎︎ Dec 25 2019 🗫︎ replies

I thought I understood async / await fully but I'm missing something at 16:40

The code has

cs await writing; await reading;

I thought writing had to finish before it would go on to the next line, so how is it that both reading and writing are running at the same time?

👍︎︎ 5 👤︎︎ u/[deleted] 📅︎︎ Dec 25 2019 🗫︎ replies

This is a great video. I didn't know about this. I'll need to upgrade my stuff to .NET 3 soon.

👍︎︎ 2 👤︎︎ u/paul_mendoza 📅︎︎ Dec 25 2019 🗫︎ replies

A previous video I did related to Channels: https://youtu.be/_P-ikfXiIsU

👍︎︎ 1 👤︎︎ u/electricessence 📅︎︎ Dec 25 2019 🗫︎ replies

A great combo: channel.Reader.ReadAllAsync() (.NET Core 3.x only) and https://www.nuget.org/packages/System.Linq.Async/

If you're using .NET Standard 2.1, Open.ChannelExtensions exposes: channel.Reader.AsAsyncEnumerable()

👍︎︎ 1 👤︎︎ u/electricessence 📅︎︎ Dec 25 2019 🗫︎ replies
Captions
it is Tuesday December 24th 2019 otherwise known as Christmas Eve and welcome to this episode of code evolution so I wanted to do a walkthrough and a demonstration for people who aren't familiar with channels to kind of get a lot better exposure to something that is extremely useful when it comes to pipelining and doing asynchronous programming and that's a lot of what happens these days so in your career as you progress you're probably going to get to a point where you're starting to do more and more async type of operations and you might actually hit a point where you're doing some pipelining of data so to do this we're going to just do a full walkthrough from scratch to kind of demonstrate the pattern that's required for channels now we could also kind of talk a little bit about dataflow which is kind of something very similar in a lot of ways where it's a very easy way to set up by a pipeline but what we're gonna do instead is I'm going to actually show you after kind of reviewing the pattern how easy it is to actually do basic you know data flow style operations as long as you're not trying to anything fancy like too fancy with that only say data flow can do which is pretty powerful but we're going to do it with channels I'm going to use channel extensions to demonstrate that so first off we're just going to start off by creating a new console project right so we're going to dotnet and that new sorry console okay and get it all set up and here we go right just real basic stuff alright and then what we're gonna do here is using system dot threading channels okay to get started yes I would like to add them yes please and you know obviously right out of the gate we'll see here you're gonna do the open the console and just say dotnet ropes run alright yeah everything's good okay all right so channels all right a channel is something it's basically an asynchronous producer-consumer queue that was developed in originally in the.net core FX lab and actually even credit free viously to that I believe it was actually pioneered by Google so a channel is a great way to manage this because it's API is very straightforward so if you want to create a channel let's say if you want to have something that essentially is an asynchronous buffer right we're gonna save our channel equals channel dot and we'll create for now we'll start by creating an unbounded Channel and then you need to set the type so in this case we're gonna say well let's use numbers okay introduced so now we have this channel that is of an type int right now a channel has a reader and a writer so in this case if we want to write to the channel which we're going to do so let's say we're going to go let's say we want to write a million numbers to the channel you know up to a point so let's say we'll start by saying a constant max our err sorry here constant into max equals 1 million okay just kind of predefined that then we're going to create a for loop okay and we're gonna say channel dot and the writer okay and in this case the options that we're provided okay are try write and write async all right well first of all we're going to want to make this an async start okay and then the thing you have to consider is that when you're writing you like the thing why do you have a try white let's talk about that so if you could easily do try right sorry and we'll say I and that is asynchronous method that returns a boolean okay and for the most part if we run this it's going to work just fine and there isn't going to be you know any issues with it but let's continue the pattern for a second and let's talk about reading right so what we would expect to have happen after this is that we should be able to count the number of entries and they add up to a million if this was to work correctly right so after it's written in doing this in a very synchronous fashion we're actually going to while channel dot Reader to try read out VAR I okay and and then what we're gonna do here is we're gonna create a vara count equals zero alright real simple stuff count all right so it's format everything and then we should be able to say with some level of confidence that after everything is said and done we could say that console oops sorry [Music] console dot write line and the line is going to be we're gonna say total count okay and so we should actually get this should work per se okay and we're gonna head and try it out here so as is we would expect this producer-consumer cue to show us the number 1 million okay what happened here well we need to import tasks into things so we're gonna whoops I'm going to close the terminal here and they see what we got happening it wants to yep there we go and it should compile just fine what is it given us any grief here okay well we're not awaiting anything yet but that's fine let's go ahead and see if we get this thing to run okay what let's see here task type name space task is maybe I needed to save it or something here what's going on there we go okay so yeah the total came out to 1 million which is exactly what we'd expect from this queue right it basically this pcq we added a million items to it and we took a million items out and we got what we wanted right for the most part all good right it seems fine it seems kind of like just basically we could exchange this whole thing with say like a queue or any other collection and that would appear to work just fine but here's where it starts to get a little bit interesting now again we've done this in such a synchronous fashion that the you know we stopped after adding stuff in and then we you know also then continued only after there was you know a full number and then it exhausted it by taking the try read notice that the try read bailed out when there were nothing left in the reader when there was empty and that's kind of the whole point behind these these try operations now this try right because it's a try right again it's a synchronous method that will essentially bail out of what's going on as soon as it can no longer write to the channel but what we have here is an unbounded channel so it basically is never going to try write is never going to return false or at least it shouldn't so let's do a little bit of an experiment so let's say what if I do create bounded okay and I pass it a number and let's say 1000 what's gonna happen now well to spoil you know the operation what's going to happen is it's going to write to a thousand and hit the maximum which is 1000 and it's gonna then continue to try to write to a thousand and keep failing so what we're going to do is to demonstrate a little bit better we're gonna say if you know try if we say if it doesn't it's not able to write we're under break so we don't waste any time because we're assuming that if we can no longer write to the channel that mean the channel is full all right and again we're gonna go ahead and run and look what we got we got a thousand we got a thousand because the maximum number that the channel could hold is a thousand but okay so well now what do we do right how do we make this work that it's going to continue to read everything from the channel and that you know that it's going to be a pipeline that has a bounded amount cuz that's what you want right you want to hit a maximum so you're not holding a lot of things in memory and then you want to continue after the fact so to simplify and again this is getting more and more complicated because I actually want you to see how complex this pattern can get and you then I'll show you how to make it simple so let's go with all right so let's say to make this more effective we're going to take our channel and we're going to set up the read operation first so we're going to say just for example tasks run and we're gonna do a set up here that it is going to be reading okay and then again what its gonna do is and then we'll say var reading okay and then we can go down here and we can await reading fair enough right so when reading is complete then we can check the count and so then what we're going to do here is we're going to come in here and we're going to leave the count as a value okay and we're gonna put this loop in here alright and so now we're setting up the read flow early on so it's already ready to read we don't know when the channel is going to receive data that's the point is that it could happen at some you know other time so with the same that same thing in mind we're gonna save our writing equals tasks run so I set up a whole nother situation where we're doing this completely asynchronously outside of what we would expect okay and now we have this these two things happening so now what's gonna happen here what's gonna happen is well first of all it has to complete this before before it actually is going to complete but what its gonna potentially happen is this is going to fire off and this is gonna fire off after this but this is already that when this starts running there's not going to be anything in the channel and it's going to return false and immediately exit this await will complete and our total could literally be zero in fact you know just for fun let's go ahead and see what happens it's very possible it'll just be zero and there we go so what happened here well in reality it did try to write to the channel up to a thousand at some point and in fact if we wanted to be a little bit more you know appropriate we could say a weight riding a weight reading that doesn't mean that it's not going to be total of zero because what happens is the the order of events essentially a race condition is going to end up being that it totals zero because this one's gonna run first so how do we fix this problem well with the channel okay we can now say well okay I need to wait for new results I'm not really done yet right this hasn't been finished this isn't finished writing right so the first step you want to do is after the writing is complete we want to call complete all right so complete is gonna signal to the channel that hey I got nothing more coming I'm done I'm not gonna add any more to the list so if you get to the end of your rope you there's no more rope all right and so then how do we process through that how do you know again what's going to happen here if we run it you're gonna see again it's gonna probably still be zero okay what we need to do now is we need to tell this while loop to smartly try again when results are available so the trick behind that and I think generally the correct pattern is to use a do around the while okay so we'll take this here and I'll explain in just a second and then you do while and then again we're gonna need to do an async of week here so we're gonna go async to make the lambda an async method alright and then we're going to put in this while loop we're gonna say a channel dot reader wait to read async alright okay and then we're gonna say oh wait so what happens here is while this is reading so the first thing that's gonna happen is it's gonna come in and say hey I'm started hey do you got anything for me and it says no I don't have anything okay I'm out and then it gets hits the while loop and says hey you know do you have anything for me and it says no were actually just waits until there is something that provides it if the channel at some point here was closed or completed then this is going to return false so it's setting up the read loop and the writing loop separately but this is the general pattern that works and as you see here we're gonna see probably a million we'll go see a thousand okay why did we do that alright let's figure that out so again the intention though is that the channel will continue to write until it's done but here's where the this is why we're leaving a thousand because we reach our limit and then you know we have to try again so what we're gonna do is is when this reaches a max limit we're actually gonna tell it to we're gonna say while mmhmm okay that's false then we're gonna say a wait channel dot writer dot wait to write async okay that's one option for us to do here let's see okay we have to say sorry it has a sink but and there's an even easier way to do this as well you could say we could do the channel writer dot write a sinks write a think is always gonna wait there until it can and then we're gonna go back to where we said hey oh where does a no wait and then we'll just say break so it's at the end of the rope now keep in mind that this should never happen because we're in control of the writer what's going on here sorry let's do this to properly oh excuse me this is going to write this is actually the whole point about this is we should expect this to write async is not a boolean it's going to throw if the channel gets closed but since we're in control of the channel we should never be concerned about this so we're gonna simplify this down to oh so I'm just gonna keep that way to this style and so now if we run it alright we got a million now right so y-you know what what just happened here well the first thing that's going on remember these could fire in any order typically speaking this one is going to fire before this one okay and you know we're gonna it's going to wait to read but there's not going to be anything in the channel and then it's gonna yield to the next task which is now gonna say cool I want to fill up the channel and then the channel is gonna get because this is near synchronous operation you know these value tasks actually can continue synchronously and typically they will so you could expect the default behavior of this channel is to fill up to 1000 and then it'll pause because it's still waiting for room in this bounded channel right and then while it's yeah well this yields back because there's nothin to wait this is gonna start to read off the channel taking you know maybe up to a thousand off and then allowing it to kind of flow fought back and forth that doesn't mean than in these situations that it's going to behave in this kind of pulsing style of operation but it also doesn't mean that that's bad either because the way that the channel is built is to be very synchronously performant and friendly to basically take advantage of the calling thread as much as possible and there are some configuration settings that we can get into we're not going to do that for this for this purpose but here's the general pattern that you should be comfortable seeing and that's that a channel first of all you you have to wait for the results oh and actually to be correct see I said I'm saying waiting to read and this is okay too because this is basically saying it's signaling outward that hey once I'm done you know go ahead and complete but maybe a better thing to just be a hundred percent certain about I mean we know for a fact that this is true but the important thing to make a note of is we also in most cases should be awaiting the channel reader completion task and this is so that when you expose a channel reader you don't have to think about the inner workings and something else can kind of wait around for this task to complete and and so you could do that here you know you could do that here it doesn't really matter and in fact you know another way to consider this is we could say let's let's do this and we're gonna get rid of our variable because we don't need it it's just we're gonna run this let's see here we're gonna say let's get rid of this it's warning here all right cool and run it again and we should get the same results we're at a million right because that completion task is the same thing as saying hey I got no more left - in the channel the channel is empty the channel is enclosed and there's nothing left in it right so okay now that's cool but that is no kind of that's somewhat a lot of that's not a trivial amount of code that we just wrote but that is the general pattern that you should be aware of and how channels work or to be correct in essentially producer consumer cues and more specifically asynchronous producer consumer cues are going to have this completion task that you're waiting on if you are in the case of if you're using dotnet core 3 and which is this is a 3.1 app here there is another way to do this as well and so I'll review that with you so today for we're gonna say a weight for each VAR i in and here we go channel reader read all async is the method and that is a and I async a normal this is just so far is and it's not part of net dot net standard 2.1 yet as far as i know but it is part of the.net core 3 plus standard so we could even go as far as yeah that's it right we reduced the code down a lot it's just loop oh sorry we needed to add the count it still needs to stay in here so what we'll do is we'll take this and replace this and we'll take this out and do this so we want to keep the count in there right and then we'll say do the format all right there we go now if you run it cool same thing right but a acing streams is available in 3 in.net core 3 with channels and it definitely think so for a lot simpler code right like you get rid of all that so that's a big deal so you kind of make it definitely makes it much easier to to progress and in fact you know for example we could go as far as I mean simplifying this down I know that we're we're writing to the channel but here's something to consider because it's a bounded channel and we were to go in here we can say I mean in reality we can actually even take all this out this is how much more simpler things can get with you know I with async streams and that is we're setting up this writing code but the remember that this for each is actually designed not to exit until there's nothing left to have that completion event has to be triggered before it's going to complete so this has to breach this point before this actually completes so again we'll run this and you'll see right so again much cleaner we know that this task is set up to run initially it is a bounded channel so it will stop at 1000 this is going to hit this for each and there's nothing going to be in the channel reader so it's going to wait here allowing the task to yield to this code and then this code is going to run and keep adding stuff to the channel until it's completed so there's nothing left you know there's no there's no more than a million up to a million in it and then this will eventually complete it'll get the complete signal and it will consider that this is done and exit out so again here's an even simpler more straightforward pattern that works really good for using channels at the stage in its development so now let's get into even the kind of a further kind of simplification at least depending on how you code there's you know you might have something even more straightforward that you want to do so this has occurs if we installed let's see here if I think I get this right we're gonna go dot net add package I think it is and we're to open dot channel extensions great and then we'll close up and then we'll say using opened channel extensions so the first thing I want to demonstrate is just a different way of doing things so channels can be essentially it's- we're talking about pipelines right so if we want to pipeline something through a process you know we kind of don't want to have lengthy like code like this we want really simple code and we want it to be you know easy to read so we're gonna start off by let's say we have this channel we also are gonna have a source which is gonna say enumerable oops and I think we need to import the link namespace yep top range and I'm gonna say zero to max okay so now we have this enumerable that's our source and then what we're gonna do we're gonna kale all this off okay and we're gonna say that let's see we're gonna go ahead and we don't need the count because we're gonna explain that in a second here we're gonna take our channel and we're gonna say that the source this is using the extensions is the source okay and now we have a starting point for our pipeline alright and now some simply put we can say read all async or just actually you can we can read all and let's just use a blank method for now because we kind of don't get to anything I would see I and then we're gonna save our count equals a wait so the good news is that read all actually does the account for you it takes however many values that have gone through and it counts them up so whatever we do in here it doesn't matter so we're gonna say here okay we go p--'s that was the wrong command run okay same thing right now so what happened here well that pattern which you had just saw is just done under the hood here for you there are a number of different methods that are available and I'll post a link to the documentation in the video but open channel extensions is basically a way of reducing the overhead and the complexity of doing all the stuff so let's do something else - right let's say we want to actually comment we want to output some fancy number I'll call it result okay it's gonna say you know VAR results equals zero and you know we're gonna say result equals results all right and then what's we kept on here okay just cuz I'm consumed and then within our we're start by saying in here we're gonna say that the result is and we're gonna change the maximum now to just be say like 1000 and then we're going to say that result equals result I'm gonna do some math right equals results plus I okay reformat come on trying to do a nice little reformat there cool and then what do we get yeah okay so that's kinda what we expected it's doing basically a sum of all numbers up to a thousand and we result in this guy here okay now okay so that's cool what if we wanted to continue doing math in stages and what if we wanted to do something asynchronously let's say that we had a an API that we needed to call for each entry in the thing let's say these were a let's say that the source was actually a set of ID's that we needed to call them and you need to call some sort of API well that's pretty straightforward we just don't read all async and this is an extension that although is the same name as the read all async eyes a sakra numeral it is not the same signature so we're gonna go async and it takes a lambda and in this case we're gonna say okay we want to read all a sink and then what we're gonna do - basically how do we say what we're gonna do - to promote the the stimuli simulate the the calling of an API we're gonna say a weight task that yield yeah that's all we're just hey give it a second right and again we're going to get this to return same results okay and so this is another option as well you can do a lot of really cool asynchronous stuff and again it's working as fast as possible through to the scheduler and very little code were writing here right like this is pretty straightforward stuff within this within this class now what if we want to do more okay so what I'm gonna do is let's say what if I wanted to do something a little bit more magical and let's say we're gonna say transform I lose and we're gonna say is that I is gonna be result because I okay and so that's what we're doing so it's Mary getting it like even more simpler code and you know what we're gonna say is at the end of this as the numbers come through or we're gonna say that let's say the last really what we're gonna say the last result so of our final equals zero and then say final equals results so whatever the last one was we want to know what that final result is okay and [Music] No okay well that was not what I expected it to do it's so I because yet we need to go I they were going a little bit different because again we started with zero and we're just adding the results up to the point it's not the same math but what I wanted to just show you is that you know we're taking and looping through this source for taking the eye and we're result you know we're doing something to it you could I even do something like you know let's say let's do something a little more interesting like let's say every time we get I we're doubling it you know it's like a thousand or let's say we'll do 128 or let's do a 32-bit for example right starting with the result is gonna start with one and then the final is gonna be whatever that is right and so let's see here yeah we don't even need this now right okay and then the eye is gonna be sorry is gonna be a funny number because it's gonna go through for every number in the range it's gonna you know give you a phone number but let's see what we get it's not gonna be what I think it is so cool we got to read some more math but again it there's a lot of usefulness in here very much like in a sense of a pipelining or something like link you could very well you know come up with some sort of you know process where you say hey I want to do join so let's say like batch as an option so if you want to batch in 10 if you want to join then you have this option as well there's some parameters for single reader which is you know you kind of expect so single reader minore actually just say true there's a lot of simple options on how you want to set up your pipeline and one last thing I want to share here is well okay you're going maybe that's that's cool I mean sort of right I mean you could use async enumerable and streams to do the majority of this stuff and for the most part that's a good idea and I do believe there's already a an async link library so that's a big advantage right if you actually were able to go and say let's say like channel that reader dot read all async and then we should get I don't know where it's gonna be let's say like let's select async maybe it'll give us the hint on that let's see is it coming in well actually what do you know it looks like it does not contain it's like async sort of a friend it could be let's just see if we have like e e mmm okay probably have to import it and that's kind of what I've been really interested in is can we import a the I ate a sync and neural link namespace which I think it already exists so you could do a lot of this kind of fun stuff with that as well but this gives you a lot more flexibility again having a nice expressive syntax and so let's say for example one of the things that you could do here that as little is a little bit more aggressive and say okay I actually want to read all concurrently is another option so you know you can do this instead concurrently expects you to give it a Mexican currency so let's say we'll say three and that's going to partition out all the tasks for you and it works very well it's basically you know as it's what you'd expect to say it essentially like the equivalent of parallel dot for or for each and you basically get that same kind of result but again this is a pipeline so you want to put all your stuff in here and the amount the total you're gonna get it back account and that's a total but whatever you need to do in order to like sum up these numbers now the other thing to keep in mind though okay is that by default as you saw I wasn't doing any interlocking or any type of synchronization of stuff you know when doing all these numbers and that's because I didn't need to when I was using something that was not concurrent and that's because the channels are really just within the the calling thread are looping one through one at a time and you're never going to have a concurrent um any kind of concurrent collisions but as soon as you switch to a concurrent method it actually is running concurrently and you have to be careful about those kinds of things so but if you don't need to worry about concurrency let's say that each one of these IDs is some sort of process that you have to to go through and you know again we can even kind of just demonstrate this here that we're going to come up with a little exception test was cancelled after looking at what happened there I must have something set up right but yeah you know basically uh why would it be canceled um well here hold on let's do something real quick here oh I I actually because I can't do this here that's why let's see if this'll uh yeah the whole point behind that joint true by the way is that it's a single reader but as soon as you put with concurrent that's the whole point why you have this as a parameter is you don't wanna you don't want to have this be a single reader when you're calling concurrently so that was actually a good reminder in discovery about why that exists but this did run concurrently in multiple threads and the in three different you know three different threads to kind of wrap up you know again that's not that many so if we were to do like a minute let's say like million I might be able to hear my computer fan kick off for a second but oh what happened here save there we go yeah so I mean it definitely fired off and and that's what you got so that's the advantage of using this kind of thing you've got a really clean syntax you know again you don't even if you just are trying to do some sort of processing and you don't care you know you just want to like run something through a pipe there it is you know it's the whole the whole nine and again using this as a source as just as a demo but you can do whatever you want again it doesn't need you know like at you say channel dot sorry reader same thing the source is just kind of setting things up and pipelining it in for you we could go back here and say channel writer and get the same results off of this as well so probably a little bit long see here at what are we at how many minutes looks like we're at over half an hour but the point of this was really to be an instructional video for those who want it you know really wanted to get exposure to channels and kind of demonstrate the patterns that you'd be used to seeing so yeah I hope this really helps you know if you're doing any kind of asynchronous producer-consumer Q's channels are awesome like they're super fast I mean I mean about as fast you're gonna get in a sync world you know data flow has more options much more complicated things that you can do with data flow and a producer/consumer queue scenario but it's very uncommon to need to use data flow in it to its full potential it's much more common to have some sort of pipeline that you're setting up that you're directing results through the pipeline and doing some sort of transformation or even further you might even have something that you say where you do like a pipe operation async I and that pipe operation is actually an entirely separate Channel and you could do that with again multiple you know threads see where we get here soon lexingto wait and again will await and task yield just to kind of what's going on here what am I missing let's see banks concurrency says the arguments try specifying oh right and then you know in this case I guess it wants to be let's say like I need to reach well in pipe I need to return eyes that's why sorry so that's another example so piping is another thing that you can do that basically sets up an entirely separate channel within the pipeline to process through these results as well and we would get the same you know thing here if we ran it not gonna get any more totals anymore but it'll run and it'll complete just fine so a lot of options here channels are great for this kind of thing it's it's very kind of much for linear pipe you're typically not you know you you you're typically not gonna fork the results in in multiple places but you can there's nothing saying you can't I've even done things where I've had channels within channels running so you know I'd have like some other channel inside this pipe operation doing something that has to complete per results that's some you know stuff that you can do but again very easy to do when you have a nice clean syntax and and a you know structure an API to support it so I'll go ahead and finish this up now if you have any questions don't hesitate to ask you know I'd love to help on this one I think this is really a big part of you know our future and when it comes to things running and and and you know data being processed and it can be a little bit hard to understand but hopefully having these extensions in place will help you to make it feel like it's not that wheel you know not that crazy and that everything just works as expected and you're only awaiting in this case one operation instead of like two three or four all right well that's it for Christmas Eve you guys have a great set of holidays and happy coding
Info
Channel: Code Evolution
Views: 3,581
Rating: undefined out of 5
Keywords: dotnet, dotnetcore, channels, async, IAsyncEnumerable, producer-consumer, producer consumer
Id: ZPZTa3iLXNY
Channel Id: undefined
Length: 39min 38sec (2378 seconds)
Published: Tue Dec 24 2019
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.