Practical Concurrency in Go - GoRoutines, Mutexes, Channels and more

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
Hey guys, this is Sig from Eldr.io, and I'm back  again with more Golang tutorials for you. I had a   request to do a video about concurrency in Golang,  specifically things like goroutines, channels,   mutexes, general multi-threaded programming in  Go, and so I figured that would be a fun little   video to do. So what I want to do is give you a  practical example of how you can write concurrent   programs in Go, and tell you a little bit  about some of the pitfalls and some of the   things you might run into. So for this example,  I prepared a little API that we're going to hit,   and I found this Exchange API on GitHub. I'll  put a link to it in the description below. It's   a fun little project. It's basically, I think,  a daily kind of API endpoint upload. And what   it does is it allows you to fetch currencies,  so the world's currencies, including sort of   real money and cryptocurrencies, if that's your  thing. And it just gives you a list of them,   and then for each currency you can use this unique  code to go in and fetch the rate that the currency   has in relation to all the other currencies.  So you could write a little currency exchange   using this API. So that's what we're going to do  today, and then we'll show you some things of how   we can make it concurrent. So if we go into  our editor here, I'll just walk you through.   So I've implemented some base stuff for us to get  started, because we want to focus on concurrency,   not on the business logic here. So what I've done  is I've written this little currency exchange. So   my currency exchange is a struct, and it has  a map of string to currency. So it has a bunch   of currencies, kind of like we just saw. And then  for each currency, the currency has a unique code,   like we just saw in the API as well. It has  a name, and it has some rates. And the rates   are basically this currency's exchange rates  relative to the code of the other currencies   that the exchange rate pertains to. So basically  what we just saw in the API. Then I have two   functions in here, fetchAllCurrencies, which just  hits that currency's endpoint and populates the   currency's map. And then you can fetch a currency  rates. You basically give it a currency code,   and then it goes out and fetches the currency  rates. So hitting that other API endpoint that   we just saw to fetch a bunch of rates for each  currency. And so our goal is to populate the   currency exchange to have all the currencies,  and for each currency, all the rates to the   other currencies in our currency exchange. So  let's have a look at how we can do that. So   I've actually written a little bit of boilerplate  code to get us started. So I've got here a pretty   naive implementation that just instantiates a new  currency exchange, creates the currency's map,   and then it calls fetchForAllCurrencies that we  just saw. Now I've limited this to just fetch 10,   because I don't want to fetch all of them for  this demo. We don't want to overload the API. I'm   just fetching 10 at a time. And then it actually  goes through. So once it's fetched those, it goes   through and loops through all the currencies. And  for each currency, it calls fetchCurrencyRates,   right? And then it goes in and it stores the rates  for each currency. And then at the bottom here,   it takes a benchmark of how long that took. And at  the bottom, it prints out the results. So we can   actually run this. Let's have a look and see what  it looks like. So if we run this, there we go. So   we got 10 currencies, and for each currency, we've  got 755 rates. And if we run it again, let's see   what happens. It took a little bit longer there.  We've got some different currencies. You can see   not all currencies have a name, but it works,  right? It kind of does what it needs to do. It   takes roughly around a second and it fetches all  these currencies. So now the question becomes,   what if we wanted to speed this up a bit, right?  How could we speed this up? Well, one thing you   might say is, sure, we fetch all the currencies,  but here we go through and we one by one fetch   the rates for each currency. What if we were able  to do that concurrently? What if we're able to do   that in parallel, so to speak? Now, Go has several  ways of implementing concurrency, but by far the   most simple implementation is using goroutines.  So goroutines, if you look at the, here we go,   goroutine anonymous function, it's basically  you create an anonymous function or a lambda   function and you say, run this as a goroutine.  And you literally use the Go keyword to say,   go func and then do something. In this case, let's  say we wanted to fetch the currency exchange rates   concurrently. So we can create this concurrent  function and we'll pass in the code and we can   just stick all of this inside a concurrent  function like this, right? So now what we're   actually telling the Go compiler is, when you  reach this for loop, go for all the currencies   and then spin up a goroutine, which is a  separate thread, a little concurrent thread,   and do this work that's inside this function  inside the goroutine. So in here we'll go and   we'll fetch, we'll make the API call and we'll  fetch the currency. And then just like before,   we should be able to do to print out the things  at the end. So let's see if that works. I have   a feeling we might run into some trouble. So this  is kind of nonsense. We didn't get any rates. You   see how quickly it finished. You know, this is way  too fast. And the problem is of course that now we   are dealing in it with a concurrent program. So  when we go out and we iterate through all these   currencies and we say go func, so basically go  do this in a separate thread in a go routine,   the main thread, the one that's calling our  application, that our application is running   in, doesn't wait for this to finish. The main  thread just carries on. It takes the end time,   it prints things out, and then it exits down  here. It doesn't care what's going on inside   these go func go routines. It just carries on and  does its thing, right? And so this is the problem   we're running into. See, we were executing  really quickly. So our go routines haven't   finished doing their work yet. Okay, so this is  a common problem. So how can we solve this? Well,   go actually has a nice way of solving this, which  is a concept called weight groups. So weight group   are in the sync package. And we can go in and  we can actually create a new weight group like   so. And a weight group, kind of like the the  name entails, is a variable that you can use   to increase things to wait for and then decrease  as things complete. And then you can basically   assert that the weight group has fully completed.  So if you think of it a little bit like a shared   pointer or a reference pointer, it's called in  some languages, it's basically just something   that counts references. So you can increment it by  x amount and you can decrement it whenever things   complete. And then you can wait for it to reach  zero again. So let's try doing that. So what we're   waiting for to complete here is all our different  go routines, right? They're going out one by one.   So what we need to do is we can go in here and we  can say for each time we iterate, let's add one to   the weight group. So we're saying here's some work  that we want to wait for. This is one entity of   work going out and fetching the currency rates for  one currency. So we can go ahead and we can add   one and we can wait for this. So then what do we  do at the end? We need to do a couple of things.   First of all, inside our go routine, we can now go  in and say I've done an entity of work, right? So   our go routine out here, this is outside of our go  routine, we say we add a bit of work to be done.   And then inside the go routine, we do the work and  then we say, great, the work's done. So the add   keyword or function adds x amount of entities to  do and the done basically decrements one entity.   So this is a common pattern that we use. And  then in our main thread, what we can do is we can   simply call weight group dot wait. So what this  will do is it will block the thread that's calling   weight group dot wait until the weight group  reaches zero again. So let's try this out now.   So I'll clear this and run it again. Boom, and it  works, right? So now we're getting full currency   rate exchange again. And in fact, if we run this,  you'll also notice that it's a lot faster than it   was before. Before we were hitting around 800, 900  milliseconds. Now we're doing all these API calls   concurrently. So in parallel, although concurrent  and parallel are a little bit different, but we're   doing them concurrently. And so we've sped up our  program by a lot, right? It's a lot faster now.   However, there are also some problems with this.  One of the problems that we have right now is   that we have all these go routines. They're going  out and they're fetching stuff. That's fine. Then   they go in here and they change, they mutate the  map, which is our currency exchange. If we look   at currency again, right? We have these two maps.  We have a currencies map and we have a rates map.   So our go routines are changing the data inside  these maps from different threads, potentially at   the same time, depending on how long this takes.  You can imagine that multiple threads might start   trying to write to this map at the same time. This  is a real problem because go maps by default are   actually not read and write concurrent safe.  This is a terminology, right? Concurrent write   safe or concurrent read safe. What that means is  that a structure like a map in Golang will throw   an exception if you try to read or write to it  at the exact same time from different threads.   We can actually try this out. Let me just grab  some code here just to give you an example. What   we want to do now is we have this code. Let's go  in and add a little bit of extra code. Here we go.   Get rid of that. What we're doing now is we're  spinning up another go routine. Again, you can   spin these up as many as you want, as many as your  hardware can handle. This go routine, what it will   do is it will go in and it will just keep hitting  the currencies map and look if we've got any value   for US dollar or for USD. So this is like a little  extra loop that goes in and it actually just keeps   on, you know, it does this forever, it just keeps  on trying to read from our map and say do you have   anything for USD? Do you have anything for USD?  And if it does, great, then it prints out USD. So   let's try running this now. Click clear and we'll  run and now we get a whole bunch of exceptions.   The whole thing is blown up basically. Each of our  goroutines is potentially firing an exception here   and this is because, again, we are accessing a  shared resource from multiple threads at the same   time. We've got a thread here in our goroutine  trying to read from it, we've got multiple threads   down here, one for each currency and we've got 10  currencies being fetched. So up to 10 threads at   any given time trying to write to it at the same  time. This is a really bad situation that we don't   ever want to be in. So how can we resolve this?  Well, the easiest way to resolve this is using a   so-called mutex. So what is a mutex? A mutex is  essentially a data structure, but it acts as a   sort of a pointer to a something that can only  ever be acquired by one entity. So basically,   you can lock the mutex and if you lock the mutex,  then you are the only one who has ownership of   it and if you unlock the mutex, you leave it up  for other people to grab as well. So think of   it as a resource that can only be grabbed by one  thread at a time. That's kind of where it comes in   here and Go has a great way of implementing this  using, again, the sync package that we saw weight   group from and you can add to your struct simple  something called sync.mutex. So this just adds a   mutex to the MyCurrencyExchange struct. We could  have named this mutex if we had multiple, but this   just adds a base generic one to the struct. And  now what we can do is since CurrencyExchange now   has a sync.mutex field, whenever we access  the shared resource from multiple threads,   for example, down here, we can call lock  and when we're done accessing the resource,   we can call unlock. So now when we're writing,  we're saying when this goroutine comes in,   it says I'm about to change some shared state  between multiple goroutines. That's a big no-no.   I can't do that. I'm not allowed to do that unless  I hold the mutex. So it goes in and calls ce.lock.   When it calls ce.lock, the operating system will  basically figure out who has or who's in charge,   who's next in line to pick up the mutex. So it  will block, our goroutine will block and wait   for the mutex to become available. So if another  thread currently has the mutex and is changing the   state, it can't, we can't, or this thread that  we're coming from here cannot obtain the lock,   cannot obtain the mutex lock. And so it sits  there blocked, waiting for it to become its   turn. And then eventually the other thing  will unlock the mutex. We can then pick it   up and do our work and so on so forth. And this  protects the shared resource from only ever being   accessed by one thing at a time. So likewise,  we can do the same thing up here because this   is another thing that we are accessing, a shared  resource that we're accessing in a multi-threaded   scenario. So now we've got mutex lock around both  of our resource accessing, and now if we run it,   we're back to things working just fine. So  now we've got multiple threads mutating the   state of a shared resource and doing so in a  concurrent safe way, or in a thread safe way,   it's also called. But there's one more thing I  want to show you, because this is all good and   well, this is a very well established pattern.  You can definitely use this, using mutexes like   this to protect resources is a great way of  doing things. The thing is though, you can   imagine you kind of need one for each, unless  you group things together, you kind of need one   for each resource that you're trying to do. And so  sometimes it's actually easier to have a dedicated   goroutine or dedicated thread, and it can be the  main thread, that is sort of the thread in charge   of gathering work that's coming in from a bunch  of other threads, and actually persisting those   results into a shared resource. So that sounds  complicated, but let's look at how we can do   that with channels. Because if you think about  it, there's another problem with this example.   So right now, we're going through all of our  currencies, we're spinning up a goroutine,   and we're fetching currency rates. Great. This  is kind of fine, because I limited it to be 10   currencies that we're pulling in. But what if we  remove that limit, and suddenly we're pulling in   hundreds or thousands of currencies? With this  code, we'd go through all of them, and we'd spin   up a goroutine. to fire off API requests for each  currency, what if there's like 100,000, you know,   with all the amount of Bitcoin or whatever crypto  coming in these days, there might be loads and   loads, right? We don't really want to spin up a  thousand threads just to do API calls. What we   want instead is we want a worker pool of threads  that can basically, whenever they're available,   pick up work, and then they can do the work and  then return a result, and then they can pick up   the next work until there's no more work to be  done. And this is a great pattern called a worker   pattern or worker pool pattern. It really depends  how you look at it. But Go has a great way of   doing this by using so-called channels. So let's  have a look at what that might look like. So let's   implement a function called runCurrencyWorker. So  this will be a worker that runs and the copilot's   already having some great ideas, but let's  just leave it a little bit. Let's just do it   ourselves. So what the runCurrencyWorker will  do is it will go in and whenever we run it,   it will create a threaded worker that will sit  there and pick up work to be done, do the work,   and return a result. And let's show you how  we can do that with channels. So I'll talk you   through this channel stuff in just a second.  So what am I doing here? So Golang has this   concept called channels. Channels are essentially  another data structure, but it has some really,   really interesting characteristics. So channels  is a way of either sending or receiving data   into a buffer essentially, but that buffer  has some implementations that make it thread   safe in really interesting ways. For example, a  thread can write to a channel, which will be a   blocking operation. So a thread will sit there  and say, I'm trying to write to this channel. Is   anyone on the other side ready to read from the  channel? And that's what these arrows actually   indicate. So you can see we've got the channel  type. So we're saying channel. So this one, the   currency channel, the arrow on this side indicates  that we can read from the channel and the arrow on   the other side means that we can write from the  channel. So when we're passing it into a function   like this, we're saying here's a channel, which is  a buffer implementation. And the currency channel   for this currency worker is one that the currency  worker can read from. And the result channel is   one that the currency worker can write to. And  our channels are also typed. So this channel has a   type of currency.currency, right? So currency is,  as we see here, our custom struct representing a   currency in our currency exchange. So what we're  saying here is this worker is gonna take an ID   so we can identify it, but also two pointers  to channels, with each channel being a channel   that can send or receive currencies. And one of  them being for reading and one of them being for   writing. So this is a little bit strange maybe,  but bear with me and I'll show you, it'll make   sense when we actually implement it. So let's go  ahead. Yeah, we'll take, that's fine, Copilot.   That one looks good. So we'll say, okay, we've  started a worker. So how does this work? Well,   channels are great because they're implemented  in a way that allows you to treat them kind of   like other data structures that you're used to.  So in this case, we can actually iterate through   a channel using the for and range keywords  that we know from things like lists, slices,   maps. And we can actually go in and we can,  well, Copilot's kind of done it for us already,   but we can walk through it. So we can go in and we  can say, this worker will sit here and it will try   to read from the currency channel. This is the  channel that the worker is trying to read from,   looking at the arrow here. It will go in and  it'll try to read from the currency channel.   Whenever it reads from it, it will print out  processing currency. I actually don't want   to print that out. We don't need that. Then  it will go in and do the work, right? So it   will fetch currency rates using the code from the  currency. It can panic, sure, if it fails. Then it   will update the rates and then it will write the  resulting currency back to the result channel. So   the arrow here is saying right into the result  channel, the currency that you just populated,   right? And then another thing we can do is at some  point, we probably want to break out of this for   loop. And the way channels implement in a Go means  that they will break out when this channel closes.   So you can call close on a channel and that will  cause all these things that are listening, trying   to loop through it to actually break out of their  for loop. So in this case, we can say the worker   has stopped. So now we've implemented our worker.  So let's go ahead and create some. So imagine,   or remember what we're trying to do. What we're  trying to do is we're trying to avoid having to   spin up all these routines all the time for all of  our currencies. We just want to have a set pool of   workers. So let's create a set pool of workers,  right? Let's say, yeah, five is fine. Let's do   this. So first, before we can do that, we need  to actually create the channels. So we can say,   just like we create maps, this is exactly how  we create channels too. So we can use the make   function and make a channel of the desired type.  And bear in mind, this is just the type. Channels   can be anything. They can be channel string, they  can be channel pool, whatever you need, right? But   also channel of your own type. So in this case,  our currency struct. And another thing you can   do is make and actually take a parameter because  channels can either be buffered or unbuffered. So   unbuffered means it will just try to use as much  memory to send as much as you can on the channel.   But if you know how much you want to send at  any given time on the channel, or how much   the maximum is you're ever going to send, and  we kind of do, right? Because we know there's   only so many currencies in our currency exchange.  So we can actually just say, the channels need to   be no longer than the length of currencies that  we have, right? We're never ever going to send   more than the total count of currencies that  we have. So now we've created the channels,   and here we are firing off concurrency  workers. So we're creating five workers,   and they're all going to fire up and listen on  these channels. Let's get rid of this, right?   We're going to get rid of all this stuff we don't  need anymore. We're going to re-implement our for   loop as well. So now we fired off goroutines for  these workers. So how do we actually interact with   what's coming back from these workers? Well, the  way you can do that in Go, or there's actually   multiple ways, but one way I like to do it is  we can say, we'll just go inside a for loop,   and then we'll actually keep track of how many  results we've processed. So we've got result   count, right? We'll set it to zero first. At this  stage, we haven't gotten any results back from any   of our workers. And then we have a desired finish  state. So desired finish state is exactly this. We   want to keep going until we've got results for all  of our currencies, right? So then we can say, if   we've got all the results, then we can say, we've  got them all, so let's close the channel, right?   So we'll say closing, and then we can actually, as  I said earlier, we can close a channel, and then   we can break out this for loop. So this is kind of  our termination sequence. So if this isn't true,   we want to do one of two things. We can  actually read from the result channel, like so.   And this is a kind of special syntax that Go has  for this. And then we can store the results like   this. So what this means is, let's just walk  through this. What this means is select, and we   say the case where we're reading from the result  channel, again, look at the arrow, right? So this   is us saying, we're gonna read a value from the  result channel. The value is gonna be a currency,   right? So we'll set that, we'll bind this currency  to what's coming out of the result channel,   and then we'll store that currency in our  currencies. That's what we want to do,   right? The other thing we'll do is, since we've  just processed the result, we'll increment result   count, like so. And this is actually the most  interesting case, but you might imagine also,   we've got multiple threads here that could be  blocking on multiple things. So what if we want   some kind of timeout as well, right? So another  thing we can do is we can actually use this   shorthand to say, let's have a timeout on here and  say, if it's taking too long for our workers to   do stuff, then we're gonna get rid of them, right?  We're gonna timeout and we're gonna break and say,   tough luck, you took way too long. I was getting  impatient. And so I've called timeout on you.   Let's just go ahead and format this. So again,  let's walk through this one more time to make it   clear. So we've got a result count that we set to  zero. Then we start iterating indefinitely. We go   and we say, if the result count is equal to the  length of currencies, then we've processed all   our results and it's happy days, right? We close  the channel, we break. Closing the channel will   cause our workers to break out of their full loop  up here. So they should print out worker stopped.   And if that's not the case, we go into the select  statement and we try to read a value from the   result channel. So this will block as well. but  the select statement is special because this will   block but this one will still execute in the sense  that this one will block but if this one finishes   first this one will trigger. This is why this  timeout thing works so let's set the timeout a   bit higher but we'll say if it's taking more than  three seconds then tough luck we timeout and we   break out and we didn't succeed in our operation.  But if this one comes in first we read a currency   from the result channel we store that inside our  currency exchange map and we increment the result   count and then we loop right and we go again and  we go again and again until we are the timeout or   the result count is equal to all the currencies.  Now if we run this you might notice that it's   probably not going to work because we're missing a  fundamental thing but let's just run and see what   happens. First of all it fails because I have an  import that I didn't use. Let's run it. Okay so   the worker started but you can see we timeout  there's no values coming in right so timeout   again timeout it's not happening right nothing's  happening here it's it's not a good time. So   what's going wrong here? Well there's a couple  of things going wrong here and well actually you   know we can actually make it more dramatic like  this and say if we return then we'll exit out the   whole program. But the crucial thing that we're  missing here is we're not sending any work on the   currency channel for the workers to pick up. If  you remember the workers sit here trying to read   a value from the currency channel which is their  sort of give me some work to do channel and so   we actually need to send all the work to do on  that channel. So let's go ahead and do that so   let's iterate through our currencies like so and  what do we want to do with each currency? All we   want to do is we want to send it on the currency  channel. So again look at the arrow right so we're   going we're just iterating through all of this and  we're sending we're sending all these currencies   to be processed on the currency channel. This will  block but our workers will start picking it up and   so our main thread will exit out of this it'll go  down here and it will start sort of our channel   result gathering loop so and meanwhile all the  workers should start picking up this work doing   the work fetching the result putting it back on  the result channel and our main thread will pick   that up our main thread will then bring it all  together and hopefully print out the results. So   here goes nothing let's try it out and there we go  we've now got results coming in again right so now   things seem to be working you can see though that  since we are working concurrently the printing   order is kind of weird right so even though  standard out which is what's printing this is   buffered so at least we don't get characters all  mixed up we do get lines mixed up so our workers   are kind of saying you know they're actually  stopping obviously before we print the results   because here we're closing the result channel but  the crucial thing is we are getting our values in   and this worker pool actually doesn't grow even if  our currencies grow so as a final thing let's try   that right I've got this thing in here let's just  say let's do 100 currencies instead of 10 right so   our workers we just still only have five workers  so it'll probably take longer right but let's see   this work so here the workers are starting boom so  it took one and a half seconds but we now have a   lot more things and we only we still only use  the same amount of threads right we only used   five extra threads with five different workers  but they are doing all this extra work without   spinning up all these extra threads. So I  hope that was an interesting introduction   to concurrent programming in Go using channels,  mutixes and some of the things you can use and I   hope this is a practical example. As always I'll  leave a link to the GitHub repository for this   code in the pinned comment below and thank you so  much for watching. I'll see you in the next one.
Info
Channel: eldr-io
Views: 4,130
Rating: undefined out of 5
Keywords: golang, coding, programming, concurrency., concurrency, go, neovim, teaching, computer science
Id: X24qXb4uWms
Channel Id: undefined
Length: 28min 53sec (1733 seconds)
Published: Fri May 03 2024
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.