(whimsical ukulele music) - [Man] Buenas tardes. I promise you this talk will be in English. (laughter) - Introduciendo el señor Josue Coronel. El titulo de su presentacion, Building Workflows with Celery. (laughter) (applause) - [Woman] Sî, I will welcome Josue Coronel for his talk in English. (laughter) - How's it going everybody? So let's start well, first, yeah my name is Josue Balandrano Coronel. I work for the Texas Advanced
Computing Center at Austin. And I'm also a small part
of the core team for Celery. So let's start with this, hopefully, very interesting presentation. And first let's start off
with a very quick overview of what Celery is, for anybody
who doesn't know out there. Celery is basically a task queue, which, what it allows us to do, is to run things asynchronously. Now the only thing is that
you might be thinking, well nowadays we have all
these different projects like AsyncIO and all the different asynchronic world out there. But the very interesting
thing about Celery is that it allows us to
distribute other things. So, run things remotely
in other computers. Also, it gives us a
lot of different things that, in a way, that we can for instance apply a retry policy on different tasks that doesn't run, or they error out, or something like that. And we can also store
results in different ways. This is just like a very quick thing. A very quick, also, way to view how the Celery architecture
actually works. And everything starts
with our main application which is usually what
we call our producer. And we call it like that because this is actually what creates, or produces, all the different messages that are going to be sent to the workers. And the workers are the ones
who are actually going to run these different tasks
that we're queueing, right? The cool thing about this is that, if you think about it, you
can run different tasks in different machines that
have different resources or have access to other
different resources. Inside each one of these workers what we have is usually a main process, and then a number of
other worker processes. And this is how we can run
things also concurrently. And all of these, actually, the way that they communicate between themselves is using something that's
called a transporter which is usually something like RabbitMQ or we can also use Redis. And there is other supports for other types of transports,
even a simple file transport. And we can also use a backend, which is usually what we name what we use to save the
different task results. So, how can we start building these different workflows with Celery? Because we have talked on how Celery basically just runs different code in remote machines, right? And distributing machines,
if you can call it that. So everything starts, actually, in Celery with what we call signatures. And signatures are basically very related to what python partials are. They're implemented in a different way, but the idea of it is basically the same. So, the way that they work
is that, if we have a task, for instance, here the task
that we're going to use is project.tasks.my_task. We can create a signature from this task. And this signature is
basically going to be a representation of what we have here at the end of the slide,
which just a way to actually execute that function itself. Which is create the task off my task with some specific arguments. It could be just, like, normal parameters or also keyword parameters. The other cool thing about these type of signatures is that they
are easily serializable. Sorry that's really,
really hard to pronounce. (laughs) - Serializable, yeah, is that okay? Okay, let's just continue with that, yeah. Okay so, for instance, the serialization of this task, or of this signature, as we can take a look at it at here. It's actually just a very,
very simple dictionary. And this is what allows us
to actually send this message in to our workers, through something like RabbitMQ or Redis or any
other type of broker, and have that worker know
what it needs to execute. So we can see signatures actually starts creating this message
that we're gonna send. So the message itself
doesn't contain any code. And that's one of the things that we have to kinda get a grasp on, because we have to have the code that we need to run in
each one of the workers. That way the producer
only tells the worker what it needs to run,
with which parameters, and then the worker does it's job. The other cool thing about signatures, is that we're able to
define different options that Celery actually uses. So, for instance, we can
specify different queues on where these tasks are going to run. And we have this example here, where we're actually
setting a custom queue whenever we're creating our signature. Then the other good thing is
that we can create a signature and then, after creating that signature, we can set different arguments for it or we can merge different options. So, for instance, here on the first line we're creating a signature. Or, well, we're using a signature that we have already created here. And we are actually
using another parameter, which is going to be the string called, well, with the value new model id. Or we can also set another queue that that task is gonna go to. Now, this is the very beginning of how to start creating workflows. The other idea that we
need to realize here, is that we can create signatures
in different ways, right? And one of the other ways is to create it directly from the task object. So we can see here there are some shortcuts, which is .s and .si. So .si, well .s first
creates a regular signature as we just saw. And .si, what it does is that it creates an immutable signature. And that signature will not allow to get any new parameters, or anything like that. The other thing to see here is callbacks, because now we know how we can send these tasks to different workers. Now how can we start
creating these workflows? So the very first step of
it is to create a callback. After one task gets run,
or one function runs, and it's successful, then
we can run something else. And this is the way that we can actually start setting different
callbacks, it's very simple. One of the main ways to do it is whenever we queue one
task, we can tell Celery to actually run something
after it has been successful using the link parameter. The link_error, that's going to be the error callback
whenever that task fails. Or, if we're creating a signature, then we can actually set this different links or link error to callbacks. Now, after this, this the very basic ideas on how everything that has to do with workflows works in Celery. And then, it's a little
bit hard to just kinda start building workflows,
which we just did these two different objects. So Celery allows us or, well, Celery gives us these different primitives which we can use to start creating better, and just a little bit more
complicated, workflows. The first one is a chain, which is basically what we just saw. It's running first one function and then, once that function executes,
we can run something else. So first this one, and
then we send the result to whatever the callback is going to be. And this is the way that we
can actually define a chain. As we can see here, it might be a little bit difficult to
see first, but the way that, what we're using here is
a pipe operator, operant. Or, we can also just use
the actual chain function that we can import from Celery. The other thing that
we can use is a group. Basically, what we do with
a group is that we queue multiple tasks with the same, well, with different arguments. But they are going to run in parallel, as we can see here in the image. So for instance, here,
what we're going to do is that we're going to
run an array of tasks and they're going to run each
one with different parameters and they're going to run in parallel here. And we're creating this array of tasks here in these four statements. The other primitives that
we can use are chords. And these, as we can see from the image, actually it's just calling one or more tasks and running those in parallel, and then running a callback after each one of those has
been run successfully. And, this is basically
what we can also call a group with a callback. So there's two ways to create a chord. One is to create a
header, which is an array of one or more tasks and then
a callback, and execute that. But also we can use chain,
a group with a callback and that's basically the same thing. Celery will know that this
is going to be a chord. The other primitive is
map, which is something that we can use whenever we
have an array of parameters and we need to run this array
on the same task and it's, the only difference is that
it's not going to be in parallel as a group, it's going to be sequential. And we also have starmap,
which is basically the same thing as map, but it's whenever the array has each one of the elements,
it's another interval. So let's see which kind of workflows can we create from this. Let's think about whenever
somebody logs into an account. And you wanna check if that user is logging in from any other device. So there's different things that we can do in the background. Let's go through, kind of, just the requirements that we need for it. First we need to check if the
device has been used before. Then we need to, if the
device has been used before, then we need to notify the user that this is a new device and we need to save that event into a database. These two can probably run in parallel, so we're already kinda building this workflow in our own mind. Then we can send whatever we get from all these different events, we can send it to an external API. Let's say that's not really
external to the company, but it's just an API that
another group is developing. So, for us, it's going to be external. And let's say that that API, I don't know, does some machine learning
stuff like Outlier and RSE or something like that. And then, finally, we
get whatever response we get from this API,
and we post-process it in different ways. We could probably, again,
save it into a database or maybe send another notification, or maybe just flip a switch somewhere. So let's start. How can we start drawing our workflow? First, the first step is going to be just to check if the device
is a new device, right? And then we have these two tasks which we already said that
we can run in parallel. So we start kinda drawing
them here, so we have an idea of the different tools that we can use. Then we're gonna do a
call to a third-party API, which is over here this load. And then we're finally
going to post-process whatever response we're gonna get. So, the first one is going to be a task as we were talking about these ones. And then after this, we can use a group in order run these two tasks in parallel. Then, the result of this
group is going to be an array because the group is going to wait until each one of these
tasks are going to finish to go to the next step. Then this array of results
are going to be sent to these other tasks, which
is going to be our step three. And this one, which is
basically a callback, is going to post all of this data
into this third-party API. And then this response is going to be fed into this last step, which
is our last callback. And all of these, we
have to put into a chain. That way, we can go from step to step. So let's take a look at it in code. This is basically how it looks. And, as we can see, Celery
gives us different tools. All the different tools
that we're talking about in order to do this, and
it's very easy to read and it's very easy to write, too. So first, well, for
instance, we can see here, we have our step one. Then we have the step
two, which is a group that we're running a parallel. Then we have our step
three, which is where we're posting to our third party API. And then we're post-processing a result from this external API. Now this is only a very
kind of simple example with no errors, and we're assuming that everything is perfect
in the world, which it's not. So let's try and figure out,
how can we handle errors. There are different ways, and we can handle it in different levels
from within the workflow. One thing that we can do is that, for every error, we can
just retry one task. Or the other thing is that we can set different callback errors and, instead of retrying that task, we probably flip another switch or maybe do something else, right? It depends on what kind of an array it is. So, if we wanna retry the one task, then the only thing is that we can only do it in the task level. We cannot do it in the entire group level, we will have to do it in one task. But within one group, if we start retrying one of the tasks
that are running parallel, the group, well, the entire chain is not gonna move to the next step until every single one of the
tasks in a group finishes. That means even if one of the tasks is retrying and retrying. This is one of the ways that
we can retry within a task. It is a little bit manual. There is another way that we can do it which is setting an auto-retry, and then specify the different exceptions that we can catch. But historically, actually,
in the different things that we have implemented, we've seen that it's easier to put it this way because it's more explicit, what we're actually trying to do. The other thing that we can do is that we can set different callbacks. We can set our callback
on the task level so, whenever this step is going to fail, then we're going to
fire off this callback. Which is, this is one of
the ways that we can do it. As we can see, it's the same
code that we had before, but we're adding an .on_error,
and then the handler. Then, we can also do a callback just on one of the tasks on a group. Or even, we can do it
the entire group itself. So, for instance, if we wanna
do it on an entire group we can also use this link_error, which is what we were looking at before. Or, the other thing is
that we can also add a callback error for
the entire chain itself. It depends, it really depends,
on a lot of different things and it's a decision on whatever it is that each team is implementing. This is a way to add an
error callback after, or if anything goes wrong in a chain. The other thing to take a look at here is how can we handle different results? So we have our entire workflow, but we wanna check, we wanna go back, we wanna check what were the results of each one of these tasks. So the good thing is that Celery gives a specific task ID, which is unique, to each one of the tasks. And we can grab that
task, and use this class, which is called AsyncResult
in order to grab the result of that specific task. And there's even ways to do it. One way to do it is to
use the AsyncResult class and we have to give it a
Task ID, then we have to specify which backend are we using. Usually it's going to be Redis, or it could also be any other database, or it could be, it could also be RabbitMQ. But that depends also on your needs. And you also have to specify
which Celery app are you using, because of the different configurations that you can set on each one of the apps. The other way to do it is
to use the AsyncResult class that comes from the task object. And that way we only have
to give it a task ID, and we don't have to specify
the backend or the app because the task object is
already bound to a specific app which has configured
the backend correctly. So one way to do it is to grab that and then to use the specific task ID in order to get the results. Now, these are different
things that we can use from the result object,
different attributes, in order to analyze it. So the first one is how
we can get the value of the actual result. Then if we're within a
workflow, like this one, or within a chain, we can go up and down in the chain or in the workflow by using .children and .parent. Now, the only thing is that this workflow is very interesting
because the second step is actually a group result,
it's not a single task. So the class that we're gonna have to use here is called also GroupResult, which is basically an
array of AsyncResults. And here it works a little bit different, but most of it is
basically the same thing. So for both classes we're gonna have these different methods. The first one is to check
if the result is ready, because all of this is
running asynchronously and we don't know when is it gonna end. So we can continuously check
if a result is gonna be ready. We can also check if
it has been successful. It's either going to be one task or, if it's a group result,
then it's gonna check if every one of the results within
that group were successful. Then, whenever we have an AsyncResult, we can get the result which is a method that we can use to get the same thing as were doing when we were
accessing result.result. Then, whenever we have a GroupResult, we can use .join which, what it does is that it loops through each one of the AsyncResults that
is inside one group, and it's going to wait for each one of those AsyncResults to be ready, and then it's going to return an array of each one of those results. And, that was pretty much
all the different ways that we can create and manage different workflows in Celery. This is where you can contact
me if you have any questions or outside in the halls. Also, Señor Pared is
going to be on the Sprints so drop by to pick up some stickers and/or pins and we also have some stickers and pins right here in the front if you guys wanna pick up some. - [Woman] Thank you, Josue. (applause) - [Audience Member 1]
Hey, in your experience, which processing backend to you prefer? Like RabbitMQ versus Redis, or whatever? - Well, in my experience it's
usually better to use RabbitMQ because it supports more
things and is more robust. And the only thing is
that whenever we're doing, like for instance, this type of workflows Celery has to send different messages. That is, you're not
gonna see those, right? But your backend is gonna see it. So if it's an actual
implementation of AMQP, then it's going to be more robust and it's going to handle these
multiple messages better. [Audience Member 2] Thanks for the talk. We use some workflows in our
work, and we had some bugs. Like things weren't being called because, even though we are calling retry or, like, messages being lost even
though the configuration kind of handled that. So, do you know if this is
well-tested inside of Celery, or did you have any problems like that? Because we had many bugs on workflows, and kind of right now we are trying not to build so complex workflows for better debugging at production. Do you have any problems like that? - Well, yeah, it's always better to keep your workflows
as simple as possible. And we do test a lot of
these different things within the Celery project. But there are some bugs around it. I mean, these are very
complicated things to implement. So, yeah, if you have
some bugs I do recommend you to submit some issues. We're always taking a look at those, even though it may take time. Also, if you wanna drop by the Sprints and help out, that would be awesome. - [Audience Member 3]
Can I invoke coroutines from within Celery, is that possible? - Can you what, sorry? - [Audience Member 3]
Coroutines like AsyncIO things. - Oh, well yes, there is a way to do it but it is not fully
supported by Celery right now because it doesn't support
all the way to 3.7. We're working on that and, hopefully, well soon we're gonna do that release that's going to fully support 3.7. Okay, just one more thing. If you guys are gonna drop by the Sprints we have a lot of different beginner tasks and also advanced tasks,
so feel free to drop by, and to grab some swag. (applause) (whooshing tones)