Hey guys, this is Sig from Eldr.io, and I'm back
again with more Golang tutorials for you. I had a request to do a video about concurrency in Golang,
specifically things like goroutines, channels, mutexes, general multi-threaded programming in
Go, and so I figured that would be a fun little video to do. So what I want to do is give you a
practical example of how you can write concurrent programs in Go, and tell you a little bit
about some of the pitfalls and some of the things you might run into. So for this example,
I prepared a little API that we're going to hit, and I found this Exchange API on GitHub. I'll
put a link to it in the description below. It's a fun little project. It's basically, I think,
a daily kind of API endpoint upload. And what it does is it allows you to fetch currencies,
so the world's currencies, including sort of real money and cryptocurrencies, if that's your
thing. And it just gives you a list of them, and then for each currency you can use this unique
code to go in and fetch the rate that the currency has in relation to all the other currencies.
So you could write a little currency exchange using this API. So that's what we're going to do
today, and then we'll show you some things of how we can make it concurrent. So if we go into
our editor here, I'll just walk you through. So I've implemented some base stuff for us to get
started, because we want to focus on concurrency, not on the business logic here. So what I've done
is I've written this little currency exchange. So my currency exchange is a struct, and it has
a map of string to currency. So it has a bunch of currencies, kind of like we just saw. And then
for each currency, the currency has a unique code, like we just saw in the API as well. It has
a name, and it has some rates. And the rates are basically this currency's exchange rates
relative to the code of the other currencies that the exchange rate pertains to. So basically
what we just saw in the API. Then I have two functions in here, fetchAllCurrencies, which just
hits that currency's endpoint and populates the currency's map. And then you can fetch a currency
rates. You basically give it a currency code, and then it goes out and fetches the currency
rates. So hitting that other API endpoint that we just saw to fetch a bunch of rates for each
currency. And so our goal is to populate the currency exchange to have all the currencies,
and for each currency, all the rates to the other currencies in our currency exchange. So
let's have a look at how we can do that. So I've actually written a little bit of boilerplate
code to get us started. So I've got here a pretty naive implementation that just instantiates a new
currency exchange, creates the currency's map, and then it calls fetchForAllCurrencies that we
just saw. Now I've limited this to just fetch 10, because I don't want to fetch all of them for
this demo. We don't want to overload the API. I'm just fetching 10 at a time. And then it actually
goes through. So once it's fetched those, it goes through and loops through all the currencies. And
for each currency, it calls fetchCurrencyRates, right? And then it goes in and it stores the rates
for each currency. And then at the bottom here, it takes a benchmark of how long that took. And at
the bottom, it prints out the results. So we can actually run this. Let's have a look and see what
it looks like. So if we run this, there we go. So we got 10 currencies, and for each currency, we've
got 755 rates. And if we run it again, let's see what happens. It took a little bit longer there.
We've got some different currencies. You can see not all currencies have a name, but it works,
right? It kind of does what it needs to do. It takes roughly around a second and it fetches all
these currencies. So now the question becomes, what if we wanted to speed this up a bit, right?
How could we speed this up? Well, one thing you might say is, sure, we fetch all the currencies,
but here we go through and we one by one fetch the rates for each currency. What if we were able
to do that concurrently? What if we're able to do that in parallel, so to speak? Now, Go has several
ways of implementing concurrency, but by far the most simple implementation is using goroutines.
So goroutines, if you look at the, here we go, goroutine anonymous function, it's basically
you create an anonymous function or a lambda function and you say, run this as a goroutine.
And you literally use the Go keyword to say, go func and then do something. In this case, let's
say we wanted to fetch the currency exchange rates concurrently. So we can create this concurrent
function and we'll pass in the code and we can just stick all of this inside a concurrent
function like this, right? So now what we're actually telling the Go compiler is, when you
reach this for loop, go for all the currencies and then spin up a goroutine, which is a
separate thread, a little concurrent thread, and do this work that's inside this function
inside the goroutine. So in here we'll go and we'll fetch, we'll make the API call and we'll
fetch the currency. And then just like before, we should be able to do to print out the things
at the end. So let's see if that works. I have a feeling we might run into some trouble. So this
is kind of nonsense. We didn't get any rates. You see how quickly it finished. You know, this is way
too fast. And the problem is of course that now we are dealing in it with a concurrent program. So
when we go out and we iterate through all these currencies and we say go func, so basically go
do this in a separate thread in a go routine, the main thread, the one that's calling our
application, that our application is running in, doesn't wait for this to finish. The main
thread just carries on. It takes the end time, it prints things out, and then it exits down
here. It doesn't care what's going on inside these go func go routines. It just carries on and
does its thing, right? And so this is the problem we're running into. See, we were executing
really quickly. So our go routines haven't finished doing their work yet. Okay, so this is
a common problem. So how can we solve this? Well, go actually has a nice way of solving this, which
is a concept called weight groups. So weight group are in the sync package. And we can go in and
we can actually create a new weight group like so. And a weight group, kind of like the the
name entails, is a variable that you can use to increase things to wait for and then decrease
as things complete. And then you can basically assert that the weight group has fully completed.
So if you think of it a little bit like a shared pointer or a reference pointer, it's called in
some languages, it's basically just something that counts references. So you can increment it by
x amount and you can decrement it whenever things complete. And then you can wait for it to reach
zero again. So let's try doing that. So what we're waiting for to complete here is all our different
go routines, right? They're going out one by one. So what we need to do is we can go in here and we
can say for each time we iterate, let's add one to the weight group. So we're saying here's some work
that we want to wait for. This is one entity of work going out and fetching the currency rates for
one currency. So we can go ahead and we can add one and we can wait for this. So then what do we
do at the end? We need to do a couple of things. First of all, inside our go routine, we can now go
in and say I've done an entity of work, right? So our go routine out here, this is outside of our go
routine, we say we add a bit of work to be done. And then inside the go routine, we do the work and
then we say, great, the work's done. So the add keyword or function adds x amount of entities to
do and the done basically decrements one entity. So this is a common pattern that we use. And
then in our main thread, what we can do is we can simply call weight group dot wait. So what this
will do is it will block the thread that's calling weight group dot wait until the weight group
reaches zero again. So let's try this out now. So I'll clear this and run it again. Boom, and it
works, right? So now we're getting full currency rate exchange again. And in fact, if we run this,
you'll also notice that it's a lot faster than it was before. Before we were hitting around 800, 900
milliseconds. Now we're doing all these API calls concurrently. So in parallel, although concurrent
and parallel are a little bit different, but we're doing them concurrently. And so we've sped up our
program by a lot, right? It's a lot faster now. However, there are also some problems with this.
One of the problems that we have right now is that we have all these go routines. They're going
out and they're fetching stuff. That's fine. Then they go in here and they change, they mutate the
map, which is our currency exchange. If we look at currency again, right? We have these two maps.
We have a currencies map and we have a rates map. So our go routines are changing the data inside
these maps from different threads, potentially at the same time, depending on how long this takes.
You can imagine that multiple threads might start trying to write to this map at the same time. This
is a real problem because go maps by default are actually not read and write concurrent safe.
This is a terminology, right? Concurrent write safe or concurrent read safe. What that means is
that a structure like a map in Golang will throw an exception if you try to read or write to it
at the exact same time from different threads. We can actually try this out. Let me just grab
some code here just to give you an example. What we want to do now is we have this code. Let's go
in and add a little bit of extra code. Here we go. Get rid of that. What we're doing now is we're
spinning up another go routine. Again, you can spin these up as many as you want, as many as your
hardware can handle. This go routine, what it will do is it will go in and it will just keep hitting
the currencies map and look if we've got any value for US dollar or for USD. So this is like a little
extra loop that goes in and it actually just keeps on, you know, it does this forever, it just keeps
on trying to read from our map and say do you have anything for USD? Do you have anything for USD?
And if it does, great, then it prints out USD. So let's try running this now. Click clear and we'll
run and now we get a whole bunch of exceptions. The whole thing is blown up basically. Each of our
goroutines is potentially firing an exception here and this is because, again, we are accessing a
shared resource from multiple threads at the same time. We've got a thread here in our goroutine
trying to read from it, we've got multiple threads down here, one for each currency and we've got 10
currencies being fetched. So up to 10 threads at any given time trying to write to it at the same
time. This is a really bad situation that we don't ever want to be in. So how can we resolve this?
Well, the easiest way to resolve this is using a so-called mutex. So what is a mutex? A mutex is
essentially a data structure, but it acts as a sort of a pointer to a something that can only
ever be acquired by one entity. So basically, you can lock the mutex and if you lock the mutex,
then you are the only one who has ownership of it and if you unlock the mutex, you leave it up
for other people to grab as well. So think of it as a resource that can only be grabbed by one
thread at a time. That's kind of where it comes in here and Go has a great way of implementing this
using, again, the sync package that we saw weight group from and you can add to your struct simple
something called sync.mutex. So this just adds a mutex to the MyCurrencyExchange struct. We could
have named this mutex if we had multiple, but this just adds a base generic one to the struct. And
now what we can do is since CurrencyExchange now has a sync.mutex field, whenever we access
the shared resource from multiple threads, for example, down here, we can call lock
and when we're done accessing the resource, we can call unlock. So now when we're writing,
we're saying when this goroutine comes in, it says I'm about to change some shared state
between multiple goroutines. That's a big no-no. I can't do that. I'm not allowed to do that unless
I hold the mutex. So it goes in and calls ce.lock. When it calls ce.lock, the operating system will
basically figure out who has or who's in charge, who's next in line to pick up the mutex. So it
will block, our goroutine will block and wait for the mutex to become available. So if another
thread currently has the mutex and is changing the state, it can't, we can't, or this thread that
we're coming from here cannot obtain the lock, cannot obtain the mutex lock. And so it sits
there blocked, waiting for it to become its turn. And then eventually the other thing
will unlock the mutex. We can then pick it up and do our work and so on so forth. And this
protects the shared resource from only ever being accessed by one thing at a time. So likewise,
we can do the same thing up here because this is another thing that we are accessing, a shared
resource that we're accessing in a multi-threaded scenario. So now we've got mutex lock around both
of our resource accessing, and now if we run it, we're back to things working just fine. So
now we've got multiple threads mutating the state of a shared resource and doing so in a
concurrent safe way, or in a thread safe way, it's also called. But there's one more thing I
want to show you, because this is all good and well, this is a very well established pattern.
You can definitely use this, using mutexes like this to protect resources is a great way of
doing things. The thing is though, you can imagine you kind of need one for each, unless
you group things together, you kind of need one for each resource that you're trying to do. And so
sometimes it's actually easier to have a dedicated goroutine or dedicated thread, and it can be the
main thread, that is sort of the thread in charge of gathering work that's coming in from a bunch
of other threads, and actually persisting those results into a shared resource. So that sounds
complicated, but let's look at how we can do that with channels. Because if you think about
it, there's another problem with this example. So right now, we're going through all of our
currencies, we're spinning up a goroutine, and we're fetching currency rates. Great. This
is kind of fine, because I limited it to be 10 currencies that we're pulling in. But what if we
remove that limit, and suddenly we're pulling in hundreds or thousands of currencies? With this
code, we'd go through all of them, and we'd spin up a goroutine. to fire off API requests for each
currency, what if there's like 100,000, you know, with all the amount of Bitcoin or whatever crypto
coming in these days, there might be loads and loads, right? We don't really want to spin up a
thousand threads just to do API calls. What we want instead is we want a worker pool of threads
that can basically, whenever they're available, pick up work, and then they can do the work and
then return a result, and then they can pick up the next work until there's no more work to be
done. And this is a great pattern called a worker pattern or worker pool pattern. It really depends
how you look at it. But Go has a great way of doing this by using so-called channels. So let's
have a look at what that might look like. So let's implement a function called runCurrencyWorker. So
this will be a worker that runs and the copilot's already having some great ideas, but let's
just leave it a little bit. Let's just do it ourselves. So what the runCurrencyWorker will
do is it will go in and whenever we run it, it will create a threaded worker that will sit
there and pick up work to be done, do the work, and return a result. And let's show you how
we can do that with channels. So I'll talk you through this channel stuff in just a second.
So what am I doing here? So Golang has this concept called channels. Channels are essentially
another data structure, but it has some really, really interesting characteristics. So channels
is a way of either sending or receiving data into a buffer essentially, but that buffer
has some implementations that make it thread safe in really interesting ways. For example, a
thread can write to a channel, which will be a blocking operation. So a thread will sit there
and say, I'm trying to write to this channel. Is anyone on the other side ready to read from the
channel? And that's what these arrows actually indicate. So you can see we've got the channel
type. So we're saying channel. So this one, the currency channel, the arrow on this side indicates
that we can read from the channel and the arrow on the other side means that we can write from the
channel. So when we're passing it into a function like this, we're saying here's a channel, which is
a buffer implementation. And the currency channel for this currency worker is one that the currency
worker can read from. And the result channel is one that the currency worker can write to. And
our channels are also typed. So this channel has a type of currency.currency, right? So currency is,
as we see here, our custom struct representing a currency in our currency exchange. So what we're
saying here is this worker is gonna take an ID so we can identify it, but also two pointers
to channels, with each channel being a channel that can send or receive currencies. And one of
them being for reading and one of them being for writing. So this is a little bit strange maybe,
but bear with me and I'll show you, it'll make sense when we actually implement it. So let's go
ahead. Yeah, we'll take, that's fine, Copilot. That one looks good. So we'll say, okay, we've
started a worker. So how does this work? Well, channels are great because they're implemented
in a way that allows you to treat them kind of like other data structures that you're used to.
So in this case, we can actually iterate through a channel using the for and range keywords
that we know from things like lists, slices, maps. And we can actually go in and we can,
well, Copilot's kind of done it for us already, but we can walk through it. So we can go in and we
can say, this worker will sit here and it will try to read from the currency channel. This is the
channel that the worker is trying to read from, looking at the arrow here. It will go in and
it'll try to read from the currency channel. Whenever it reads from it, it will print out
processing currency. I actually don't want to print that out. We don't need that. Then
it will go in and do the work, right? So it will fetch currency rates using the code from the
currency. It can panic, sure, if it fails. Then it will update the rates and then it will write the
resulting currency back to the result channel. So the arrow here is saying right into the result
channel, the currency that you just populated, right? And then another thing we can do is at some
point, we probably want to break out of this for loop. And the way channels implement in a Go means
that they will break out when this channel closes. So you can call close on a channel and that will
cause all these things that are listening, trying to loop through it to actually break out of their
for loop. So in this case, we can say the worker has stopped. So now we've implemented our worker.
So let's go ahead and create some. So imagine, or remember what we're trying to do. What we're
trying to do is we're trying to avoid having to spin up all these routines all the time for all of
our currencies. We just want to have a set pool of workers. So let's create a set pool of workers,
right? Let's say, yeah, five is fine. Let's do this. So first, before we can do that, we need
to actually create the channels. So we can say, just like we create maps, this is exactly how
we create channels too. So we can use the make function and make a channel of the desired type.
And bear in mind, this is just the type. Channels can be anything. They can be channel string, they
can be channel pool, whatever you need, right? But also channel of your own type. So in this case,
our currency struct. And another thing you can do is make and actually take a parameter because
channels can either be buffered or unbuffered. So unbuffered means it will just try to use as much
memory to send as much as you can on the channel. But if you know how much you want to send at
any given time on the channel, or how much the maximum is you're ever going to send, and
we kind of do, right? Because we know there's only so many currencies in our currency exchange.
So we can actually just say, the channels need to be no longer than the length of currencies that
we have, right? We're never ever going to send more than the total count of currencies that
we have. So now we've created the channels, and here we are firing off concurrency
workers. So we're creating five workers, and they're all going to fire up and listen on
these channels. Let's get rid of this, right? We're going to get rid of all this stuff we don't
need anymore. We're going to re-implement our for loop as well. So now we fired off goroutines for
these workers. So how do we actually interact with what's coming back from these workers? Well, the
way you can do that in Go, or there's actually multiple ways, but one way I like to do it is
we can say, we'll just go inside a for loop, and then we'll actually keep track of how many
results we've processed. So we've got result count, right? We'll set it to zero first. At this
stage, we haven't gotten any results back from any of our workers. And then we have a desired finish
state. So desired finish state is exactly this. We want to keep going until we've got results for all
of our currencies, right? So then we can say, if we've got all the results, then we can say, we've
got them all, so let's close the channel, right? So we'll say closing, and then we can actually, as
I said earlier, we can close a channel, and then we can break out this for loop. So this is kind of
our termination sequence. So if this isn't true, we want to do one of two things. We can
actually read from the result channel, like so. And this is a kind of special syntax that Go has
for this. And then we can store the results like this. So what this means is, let's just walk
through this. What this means is select, and we say the case where we're reading from the result
channel, again, look at the arrow, right? So this is us saying, we're gonna read a value from the
result channel. The value is gonna be a currency, right? So we'll set that, we'll bind this currency
to what's coming out of the result channel, and then we'll store that currency in our
currencies. That's what we want to do, right? The other thing we'll do is, since we've
just processed the result, we'll increment result count, like so. And this is actually the most
interesting case, but you might imagine also, we've got multiple threads here that could be
blocking on multiple things. So what if we want some kind of timeout as well, right? So another
thing we can do is we can actually use this shorthand to say, let's have a timeout on here and
say, if it's taking too long for our workers to do stuff, then we're gonna get rid of them, right?
We're gonna timeout and we're gonna break and say, tough luck, you took way too long. I was getting
impatient. And so I've called timeout on you. Let's just go ahead and format this. So again,
let's walk through this one more time to make it clear. So we've got a result count that we set to
zero. Then we start iterating indefinitely. We go and we say, if the result count is equal to the
length of currencies, then we've processed all our results and it's happy days, right? We close
the channel, we break. Closing the channel will cause our workers to break out of their full loop
up here. So they should print out worker stopped. And if that's not the case, we go into the select
statement and we try to read a value from the result channel. So this will block as well. but
the select statement is special because this will block but this one will still execute in the sense
that this one will block but if this one finishes first this one will trigger. This is why this
timeout thing works so let's set the timeout a bit higher but we'll say if it's taking more than
three seconds then tough luck we timeout and we break out and we didn't succeed in our operation.
But if this one comes in first we read a currency from the result channel we store that inside our
currency exchange map and we increment the result count and then we loop right and we go again and
we go again and again until we are the timeout or the result count is equal to all the currencies.
Now if we run this you might notice that it's probably not going to work because we're missing a
fundamental thing but let's just run and see what happens. First of all it fails because I have an
import that I didn't use. Let's run it. Okay so the worker started but you can see we timeout
there's no values coming in right so timeout again timeout it's not happening right nothing's
happening here it's it's not a good time. So what's going wrong here? Well there's a couple
of things going wrong here and well actually you know we can actually make it more dramatic like
this and say if we return then we'll exit out the whole program. But the crucial thing that we're
missing here is we're not sending any work on the currency channel for the workers to pick up. If
you remember the workers sit here trying to read a value from the currency channel which is their
sort of give me some work to do channel and so we actually need to send all the work to do on
that channel. So let's go ahead and do that so let's iterate through our currencies like so and
what do we want to do with each currency? All we want to do is we want to send it on the currency
channel. So again look at the arrow right so we're going we're just iterating through all of this and
we're sending we're sending all these currencies to be processed on the currency channel. This will
block but our workers will start picking it up and so our main thread will exit out of this it'll go
down here and it will start sort of our channel result gathering loop so and meanwhile all the
workers should start picking up this work doing the work fetching the result putting it back on
the result channel and our main thread will pick that up our main thread will then bring it all
together and hopefully print out the results. So here goes nothing let's try it out and there we go
we've now got results coming in again right so now things seem to be working you can see though that
since we are working concurrently the printing order is kind of weird right so even though
standard out which is what's printing this is buffered so at least we don't get characters all
mixed up we do get lines mixed up so our workers are kind of saying you know they're actually
stopping obviously before we print the results because here we're closing the result channel but
the crucial thing is we are getting our values in and this worker pool actually doesn't grow even if
our currencies grow so as a final thing let's try that right I've got this thing in here let's just
say let's do 100 currencies instead of 10 right so our workers we just still only have five workers
so it'll probably take longer right but let's see this work so here the workers are starting boom so
it took one and a half seconds but we now have a lot more things and we only we still only use
the same amount of threads right we only used five extra threads with five different workers
but they are doing all this extra work without spinning up all these extra threads. So I
hope that was an interesting introduction to concurrent programming in Go using channels,
mutixes and some of the things you can use and I hope this is a practical example. As always I'll
leave a link to the GitHub repository for this code in the pinned comment below and thank you so
much for watching. I'll see you in the next one.