DjangoCon US 2018 - Building Workflows With Celery by Josue Balandrano Coronel

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
(whimsical ukulele music) - [Man] Buenas tardes. I promise you this talk will be in English. (laughter) - Introduciendo el señor Josue Coronel. El titulo de su presentacion, Building Workflows with Celery. (laughter) (applause) - [Woman] Sî, I will welcome Josue Coronel for his talk in English. (laughter) - How's it going everybody? So let's start well, first, yeah my name is Josue Balandrano Coronel. I work for the Texas Advanced Computing Center at Austin. And I'm also a small part of the core team for Celery. So let's start with this, hopefully, very interesting presentation. And first let's start off with a very quick overview of what Celery is, for anybody who doesn't know out there. Celery is basically a task queue, which, what it allows us to do, is to run things asynchronously. Now the only thing is that you might be thinking, well nowadays we have all these different projects like AsyncIO and all the different asynchronic world out there. But the very interesting thing about Celery is that it allows us to distribute other things. So, run things remotely in other computers. Also, it gives us a lot of different things that, in a way, that we can for instance apply a retry policy on different tasks that doesn't run, or they error out, or something like that. And we can also store results in different ways. This is just like a very quick thing. A very quick, also, way to view how the Celery architecture actually works. And everything starts with our main application which is usually what we call our producer. And we call it like that because this is actually what creates, or produces, all the different messages that are going to be sent to the workers. And the workers are the ones who are actually going to run these different tasks that we're queueing, right? The cool thing about this is that, if you think about it, you can run different tasks in different machines that have different resources or have access to other different resources. Inside each one of these workers what we have is usually a main process, and then a number of other worker processes. And this is how we can run things also concurrently. And all of these, actually, the way that they communicate between themselves is using something that's called a transporter which is usually something like RabbitMQ or we can also use Redis. And there is other supports for other types of transports, even a simple file transport. And we can also use a backend, which is usually what we name what we use to save the different task results. So, how can we start building these different workflows with Celery? Because we have talked on how Celery basically just runs different code in remote machines, right? And distributing machines, if you can call it that. So everything starts, actually, in Celery with what we call signatures. And signatures are basically very related to what python partials are. They're implemented in a different way, but the idea of it is basically the same. So, the way that they work is that, if we have a task, for instance, here the task that we're going to use is project.tasks.my_task. We can create a signature from this task. And this signature is basically going to be a representation of what we have here at the end of the slide, which just a way to actually execute that function itself. Which is create the task off my task with some specific arguments. It could be just, like, normal parameters or also keyword parameters. The other cool thing about these type of signatures is that they are easily serializable. Sorry that's really, really hard to pronounce. (laughs) - Serializable, yeah, is that okay? Okay, let's just continue with that, yeah. Okay so, for instance, the serialization of this task, or of this signature, as we can take a look at it at here. It's actually just a very, very simple dictionary. And this is what allows us to actually send this message in to our workers, through something like RabbitMQ or Redis or any other type of broker, and have that worker know what it needs to execute. So we can see signatures actually starts creating this message that we're gonna send. So the message itself doesn't contain any code. And that's one of the things that we have to kinda get a grasp on, because we have to have the code that we need to run in each one of the workers. That way the producer only tells the worker what it needs to run, with which parameters, and then the worker does it's job. The other cool thing about signatures, is that we're able to define different options that Celery actually uses. So, for instance, we can specify different queues on where these tasks are going to run. And we have this example here, where we're actually setting a custom queue whenever we're creating our signature. Then the other good thing is that we can create a signature and then, after creating that signature, we can set different arguments for it or we can merge different options. So, for instance, here on the first line we're creating a signature. Or, well, we're using a signature that we have already created here. And we are actually using another parameter, which is going to be the string called, well, with the value new model id. Or we can also set another queue that that task is gonna go to. Now, this is the very beginning of how to start creating workflows. The other idea that we need to realize here, is that we can create signatures in different ways, right? And one of the other ways is to create it directly from the task object. So we can see here there are some shortcuts, which is .s and .si. So .si, well .s first creates a regular signature as we just saw. And .si, what it does is that it creates an immutable signature. And that signature will not allow to get any new parameters, or anything like that. The other thing to see here is callbacks, because now we know how we can send these tasks to different workers. Now how can we start creating these workflows? So the very first step of it is to create a callback. After one task gets run, or one function runs, and it's successful, then we can run something else. And this is the way that we can actually start setting different callbacks, it's very simple. One of the main ways to do it is whenever we queue one task, we can tell Celery to actually run something after it has been successful using the link parameter. The link_error, that's going to be the error callback whenever that task fails. Or, if we're creating a signature, then we can actually set this different links or link error to callbacks. Now, after this, this the very basic ideas on how everything that has to do with workflows works in Celery. And then, it's a little bit hard to just kinda start building workflows, which we just did these two different objects. So Celery allows us or, well, Celery gives us these different primitives which we can use to start creating better, and just a little bit more complicated, workflows. The first one is a chain, which is basically what we just saw. It's running first one function and then, once that function executes, we can run something else. So first this one, and then we send the result to whatever the callback is going to be. And this is the way that we can actually define a chain. As we can see here, it might be a little bit difficult to see first, but the way that, what we're using here is a pipe operator, operant. Or, we can also just use the actual chain function that we can import from Celery. The other thing that we can use is a group. Basically, what we do with a group is that we queue multiple tasks with the same, well, with different arguments. But they are going to run in parallel, as we can see here in the image. So for instance, here, what we're going to do is that we're going to run an array of tasks and they're going to run each one with different parameters and they're going to run in parallel here. And we're creating this array of tasks here in these four statements. The other primitives that we can use are chords. And these, as we can see from the image, actually it's just calling one or more tasks and running those in parallel, and then running a callback after each one of those has been run successfully. And, this is basically what we can also call a group with a callback. So there's two ways to create a chord. One is to create a header, which is an array of one or more tasks and then a callback, and execute that. But also we can use chain, a group with a callback and that's basically the same thing. Celery will know that this is going to be a chord. The other primitive is map, which is something that we can use whenever we have an array of parameters and we need to run this array on the same task and it's, the only difference is that it's not going to be in parallel as a group, it's going to be sequential. And we also have starmap, which is basically the same thing as map, but it's whenever the array has each one of the elements, it's another interval. So let's see which kind of workflows can we create from this. Let's think about whenever somebody logs into an account. And you wanna check if that user is logging in from any other device. So there's different things that we can do in the background. Let's go through, kind of, just the requirements that we need for it. First we need to check if the device has been used before. Then we need to, if the device has been used before, then we need to notify the user that this is a new device and we need to save that event into a database. These two can probably run in parallel, so we're already kinda building this workflow in our own mind. Then we can send whatever we get from all these different events, we can send it to an external API. Let's say that's not really external to the company, but it's just an API that another group is developing. So, for us, it's going to be external. And let's say that that API, I don't know, does some machine learning stuff like Outlier and RSE or something like that. And then, finally, we get whatever response we get from this API, and we post-process it in different ways. We could probably, again, save it into a database or maybe send another notification, or maybe just flip a switch somewhere. So let's start. How can we start drawing our workflow? First, the first step is going to be just to check if the device is a new device, right? And then we have these two tasks which we already said that we can run in parallel. So we start kinda drawing them here, so we have an idea of the different tools that we can use. Then we're gonna do a call to a third-party API, which is over here this load. And then we're finally going to post-process whatever response we're gonna get. So, the first one is going to be a task as we were talking about these ones. And then after this, we can use a group in order run these two tasks in parallel. Then, the result of this group is going to be an array because the group is going to wait until each one of these tasks are going to finish to go to the next step. Then this array of results are going to be sent to these other tasks, which is going to be our step three. And this one, which is basically a callback, is going to post all of this data into this third-party API. And then this response is going to be fed into this last step, which is our last callback. And all of these, we have to put into a chain. That way, we can go from step to step. So let's take a look at it in code. This is basically how it looks. And, as we can see, Celery gives us different tools. All the different tools that we're talking about in order to do this, and it's very easy to read and it's very easy to write, too. So first, well, for instance, we can see here, we have our step one. Then we have the step two, which is a group that we're running a parallel. Then we have our step three, which is where we're posting to our third party API. And then we're post-processing a result from this external API. Now this is only a very kind of simple example with no errors, and we're assuming that everything is perfect in the world, which it's not. So let's try and figure out, how can we handle errors. There are different ways, and we can handle it in different levels from within the workflow. One thing that we can do is that, for every error, we can just retry one task. Or the other thing is that we can set different callback errors and, instead of retrying that task, we probably flip another switch or maybe do something else, right? It depends on what kind of an array it is. So, if we wanna retry the one task, then the only thing is that we can only do it in the task level. We cannot do it in the entire group level, we will have to do it in one task. But within one group, if we start retrying one of the tasks that are running parallel, the group, well, the entire chain is not gonna move to the next step until every single one of the tasks in a group finishes. That means even if one of the tasks is retrying and retrying. This is one of the ways that we can retry within a task. It is a little bit manual. There is another way that we can do it which is setting an auto-retry, and then specify the different exceptions that we can catch. But historically, actually, in the different things that we have implemented, we've seen that it's easier to put it this way because it's more explicit, what we're actually trying to do. The other thing that we can do is that we can set different callbacks. We can set our callback on the task level so, whenever this step is going to fail, then we're going to fire off this callback. Which is, this is one of the ways that we can do it. As we can see, it's the same code that we had before, but we're adding an .on_error, and then the handler. Then, we can also do a callback just on one of the tasks on a group. Or even, we can do it the entire group itself. So, for instance, if we wanna do it on an entire group we can also use this link_error, which is what we were looking at before. Or, the other thing is that we can also add a callback error for the entire chain itself. It depends, it really depends, on a lot of different things and it's a decision on whatever it is that each team is implementing. This is a way to add an error callback after, or if anything goes wrong in a chain. The other thing to take a look at here is how can we handle different results? So we have our entire workflow, but we wanna check, we wanna go back, we wanna check what were the results of each one of these tasks. So the good thing is that Celery gives a specific task ID, which is unique, to each one of the tasks. And we can grab that task, and use this class, which is called AsyncResult in order to grab the result of that specific task. And there's even ways to do it. One way to do it is to use the AsyncResult class and we have to give it a Task ID, then we have to specify which backend are we using. Usually it's going to be Redis, or it could also be any other database, or it could be, it could also be RabbitMQ. But that depends also on your needs. And you also have to specify which Celery app are you using, because of the different configurations that you can set on each one of the apps. The other way to do it is to use the AsyncResult class that comes from the task object. And that way we only have to give it a task ID, and we don't have to specify the backend or the app because the task object is already bound to a specific app which has configured the backend correctly. So one way to do it is to grab that and then to use the specific task ID in order to get the results. Now, these are different things that we can use from the result object, different attributes, in order to analyze it. So the first one is how we can get the value of the actual result. Then if we're within a workflow, like this one, or within a chain, we can go up and down in the chain or in the workflow by using .children and .parent. Now, the only thing is that this workflow is very interesting because the second step is actually a group result, it's not a single task. So the class that we're gonna have to use here is called also GroupResult, which is basically an array of AsyncResults. And here it works a little bit different, but most of it is basically the same thing. So for both classes we're gonna have these different methods. The first one is to check if the result is ready, because all of this is running asynchronously and we don't know when is it gonna end. So we can continuously check if a result is gonna be ready. We can also check if it has been successful. It's either going to be one task or, if it's a group result, then it's gonna check if every one of the results within that group were successful. Then, whenever we have an AsyncResult, we can get the result which is a method that we can use to get the same thing as were doing when we were accessing result.result. Then, whenever we have a GroupResult, we can use .join which, what it does is that it loops through each one of the AsyncResults that is inside one group, and it's going to wait for each one of those AsyncResults to be ready, and then it's going to return an array of each one of those results. And, that was pretty much all the different ways that we can create and manage different workflows in Celery. This is where you can contact me if you have any questions or outside in the halls. Also, Señor Pared is going to be on the Sprints so drop by to pick up some stickers and/or pins and we also have some stickers and pins right here in the front if you guys wanna pick up some. - [Woman] Thank you, Josue. (applause) - [Audience Member 1] Hey, in your experience, which processing backend to you prefer? Like RabbitMQ versus Redis, or whatever? - Well, in my experience it's usually better to use RabbitMQ because it supports more things and is more robust. And the only thing is that whenever we're doing, like for instance, this type of workflows Celery has to send different messages. That is, you're not gonna see those, right? But your backend is gonna see it. So if it's an actual implementation of AMQP, then it's going to be more robust and it's going to handle these multiple messages better. [Audience Member 2] Thanks for the talk. We use some workflows in our work, and we had some bugs. Like things weren't being called because, even though we are calling retry or, like, messages being lost even though the configuration kind of handled that. So, do you know if this is well-tested inside of Celery, or did you have any problems like that? Because we had many bugs on workflows, and kind of right now we are trying not to build so complex workflows for better debugging at production. Do you have any problems like that? - Well, yeah, it's always better to keep your workflows as simple as possible. And we do test a lot of these different things within the Celery project. But there are some bugs around it. I mean, these are very complicated things to implement. So, yeah, if you have some bugs I do recommend you to submit some issues. We're always taking a look at those, even though it may take time. Also, if you wanna drop by the Sprints and help out, that would be awesome. - [Audience Member 3] Can I invoke coroutines from within Celery, is that possible? - Can you what, sorry? - [Audience Member 3] Coroutines like AsyncIO things. - Oh, well yes, there is a way to do it but it is not fully supported by Celery right now because it doesn't support all the way to 3.7. We're working on that and, hopefully, well soon we're gonna do that release that's going to fully support 3.7. Okay, just one more thing. If you guys are gonna drop by the Sprints we have a lot of different beginner tasks and also advanced tasks, so feel free to drop by, and to grab some swag. (applause) (whooshing tones)
Info
Channel: DjangoCon US
Views: 7,111
Rating: undefined out of 5
Keywords: django, djangocon, djangocon us, python, Celery, building, workflows
Id: 8YLeWxLtVgo
Channel Id: undefined
Length: 23min 41sec (1421 seconds)
Published: Thu Nov 08 2018
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.