My team had a big problem. Well, five problems, actually. Picture this. Every week on Monday morning, one of my team's very talented engineers would come into the office, sit down at her desk, and run a Python script. Every single week. And while this already sounds like a tedious waste of her time, it gets worse. The script barely worked. Sure, it downloaded the latest data and ran all of our team's models. Problem is, the script wasn't parallelized, so it was really slow. And if even just one of our models had a bug in it, it could bring the whole script crashing down. And when that happened, we would just have an incomplete progress bar and a stack trace to try to help us pick up the pieces and figure out what went wrong, starting over from scratch. In this video, I'm going to tell you the story of a very important script that overstayed its welcome. You'll hear about five problems that nearly drove one of my team's very talented engineers to the brink of insanity. And I'll show you how I use just one library, Prefect, to fix all of them. By the end of this video, you'll know how to supercharge your own outdated scripts by modernizing them into production workflow applications with Prefect. I've used Prefect since 2019, and this is just the latest time that it saved the day for me. That's why I was so excited when Prefect reached out, asking to sponsor a video. This is a story that I already planned on telling because it made such a big impact on the way that me and my team work. So thank you to Prefect for helping me bring it to you all just a little bit sooner. Let's get into the video. But first, let me ask you a question. If you've already done all of the hard work of scripting an entire workflow from scratch, like downloading the latest data, backtesting a ton of models, computing metrics so that you can choose the best model, and then using that model to make an actionable prediction that your entire team relies on, why would you stop one step short so that if you're not at your desk Monday morning to run your script, the entire world stops? My guess is is that when I put it that way, you'd rather take things across the finish line and make things run automatically. In the realm of workflow orchestration, this is called scheduling. So let's schedule it. To do that, we're going to start by turning our main function into a Prefect flow. It's really simple. Just pip install Prefect, import the flow decorator, and apply it to our function. This turns our basic Python function into a Prefect flow and unlocks a ton of powerful features. Because I'm on a mission to take next Monday off, let's schedule our flow. To do that, we're going to use our flow's serve method. It supports a bunch of different ways for specifying when our flow should run, like providing a cron string, specifying the time interval between runs, or by defining a recurrence rule string. If we run our flow.serve, give the deployed flow a name, and provide this cron string, then our flow will be scheduled to run every Monday morning at midnight UTC. And now with our flow scheduled, I mean, I guess I could just sit back, kick my feet up, and wait till Monday morning when... S**t. (Sigh...) So it's great that we scheduled our code, but that was really only our first problem. I kind of forgot that our script was basically broken to begin with. So we're going to need to make our flow more robust to failures before we could expect it to succeed. Take a look at this part of the code here. Sometimes when we try to get data, it'll just fail, and we'll need to keep retrying until it succeeds. It's outside of our team's control, so that's the best we can do. Oh, and I almost forgot. One of our models is still a bit experimental. The team working on it is doing their best, but sometimes they introduce a bug. And when that happens, the whole pipeline crashes. So what we normally do is we'll comment out that model, rerun the pipeline, and then we could uncomment it to re-add it to the pipeline when they fix the bug. Yeah. Although we can't fix the underlying crashes, we can make our flow more resilient to them. To do that, we're going to turn our functions into tasks using the task decorator. Cool, but what does that do? Like flows, tasks give us a bunch of powerful functionality, and a lot of it is specifically designed for improving scaling and error handling. Like this... For our get data task, we can add retries equal 42. Now, Prefect will keep retrying our task up to 42 times until it succeeds. It never fails that much, so this solves our first problem. The second problem is a bit trickier, so stick with me here. Our goal is to throw out any model that fails. So if we had something that vaguely looked like this... Well, then that would work, but where would we get the result from? Well, using the walrus operator, we can evaluate our backtest expression and immediately assign it to our result variable. That lets us do something like this... but this still doesn't solve our problem because our backtest function doesn't return an error when it fails, it raises it. So this would still crash. Thankfully, that's not a problem. Here's how we can fix it. Instead of calling our task directly, we can use its submit method. Here, submit immediately returns a Prefect future, which is basically an IOU. We'll need to come back later to get the finished result. But in the meantime, we can do other work in our main function. To get the result, we can call the result method on our future. This will wait for our task to finish and then return its result. Now, you're probably thinking, this is basically where we started off, except with extra steps. And yeah, I got that. But the result method actually has a really helpful argument. If we set raise_on_failure to false, then Prefect will return the error as a value rather than raising it is an exception. With all these changes in place, our flow will now always succeed, even if there are some failures that we had to handle. So if you could give me one minute here, let me just type something up quick. Dear boss, I'll be out of the office today, but you could find the results from this week's pipeline on our Prefect server here. Thanks, Doug. One of my favorite features of Prefect is its web-based UI. And the good news is, I don't need to write any more code to use it. We could spin up a local Prefect server by running prefect server start, or we can use the free tier of Prefect Cloud for even more cool features. All we need to do is create an account on Prefect's website, create a workspace, and then run prefect cloud login. Here, I use the login with browser option because it's just really easy. Let's start on the flows tab. Clicking on our flow, we see some upcoming scheduled flows, as well as all of our past flow runs from our testing. Let's click on the latest run. We can actually see a real-time view of all of our tasks, including when they start, when they end, and what their status is. And this is all within a graphical view to show us which tasks depend on each other. This is the first place that I come to when I'm trying to debug an error. It just gives you so much information at a glance. Next, if we look at the logs tab, we can actually see that Prefect automatically added logging to our application. We got this for free because we made our tasks, well, Prefect tasks. This is already super helpful, but Prefect Cloud takes it one step further. When something goes wrong, you don't necessarily want to go spelunking through log messages to figure out what went wrong. That's why Prefect uses AI summarization to automatically extract the failure and put it on your dashboard. That way, you can discover and address your failures faster. If you're managing multiple flows, this might even help you prioritize which failure you should address first. But if you do want to drill down into the details, we can click on the task run tab. There, we can get information on the status, runtime, and logs for each of our task runs. But I just noticed something. For tasks that we run multiple times, like when we're running backtests on multiple models, it can get a little confusing which backtest corresponds to which model. You probably want a bit more information than task name zero, one, and two. That's what the task run name argument is for. This allows us to give our task run a custom name based on its arguments. If we use the name backtest and then curly braces, model name, kind of like an fstring, except without the F, then the next time we run our flow, our models will be backtest_slow model, backtest_buggy model, backtest et cetera. Now it's really obvious that our slow model is slow. Ah, great. I just got myself signed up for another weekly task. Thankfully, Prefect Cloud has a really powerful event-driven automation system. We can use it to automate that email. Go to the automations tab and click add a new automation. We want our automation to trigger when our flow enters the completed state. Now we can configure the action that happens when our event is triggered. I want to send a notification. After clicking the add button, we can select the email block. Let's call this block notify-stakeholders and give it my boss's email address. We can configure the email subject and email contents. We can use Jinja templating to include Prefect specific information, but I'm just going to keep the default message for now. Then we just give our automation a name and we're done. Now when our flow succeeds, Prefect will automatically send that email. If I want to take it one step further, I could even include information like what was this week's prediction so that our stakeholders would have even more information in that email. This is all awesome. And there's even more stuff that we can do within Prefect UI. But yeah, wait a minute. Check this out. If we go back to our graphical view, we're spending so much time processing our back tests and metrics one by one. We don't need to do this. Each time we submit a task, we're immediately waiting for its result. But we don't need to do that. If there aren't any dependencies between our tasks, in principle, they could all just run at the same time. Like this, if we submit a task and then another task and then another task, well, in principle they could all process at the same time. We're actually using the same task and just passing a bunch of different inputs in it from an iterable container. This is such a common problem that Prefect actually has a special method for it, task.map. So let's use it. For back tests, we can map over models.keys to dynamically create a task run for each model. But we also need to pass in our entire dataset and we don't want to accidentally iterate over that. We can use the unmapped annotation to just pass in the entire dataset. That way we don't iterate over it. Task.map returns a list of Prefect futures and they're in the same order as the inputs that we passed in. We still need to eventually retrieve our results and filter out any failures. We could do that using a dictionary comprehension similar to how we did previously. We can do the same for our metrics. Run metrics.map, pass in our entire dataset unmapped, and then map over our predictions. After, we can create our metrics dictionary as we did before. And now, if we run our flow again, we could see that our back tests and metrics ran concurrently, which is awesome. By default, Prefect uses the concurrent task runner. It works well for our toy example, and it's really good if you have a lot of IO or network calls. But if you want to take things to the next level and achieve true CPU-level parallelism for compute-bound task, then I really recommend the PrefectDask or PrefectRay libraries. They integrate directly with Prefect and you could use the PrefectDask or PrefectRay task runners. These runners can automatically stand up a local cluster if you just need basic CPU-level parallelism, or you can connect them to existing dedicated clusters if you need more power than your local device can provide. But for now, I'm just going to stick to the concurrent task runner because I don't think a cluster can make this any faster. Our code is really shaping up, but there's still a lot of to-dos. What even are these? Oh, I mean, okay. These are just complaining that every time we run our flow, we actually just overwrite last week's results. That's not good, but there's an easy fix. We can use Prefect's artifact system to create table or markdown artifacts. Artifacts in Prefect are just stored pieces of data that are generated within flow or task runs. And best of all, they're not overwritten between runs. Here we can use the create_markdown_artifact function to replace this line where we're writing our report to file. Or here we can use the create_table_artifact to store our dictionary of metrics or our final prediction. We can find our artifacts within the flow run or task run pages within Prefect UI. Or better yet, since we named our artifacts, we can find them on the Artifacts tab. Let's click on the next week prediction artifact. We can easily look back through previous week's predictions to figure out which models we've used and what they predicted. Let's recap the five problems that Prefect helped us solve. Number one, we saved someone's Monday morning by scheduling our Python code with Prefect. Number two, we added retries and error handling to gracefully resolve and recover from unforeseen failures. Number three, we got a super useful web-based UI that gives us all the information that we could possibly need to debug and monitor our data pipelines and react to unexpected situations. Number four, we massively sped up our pipeline by running a bunch of our tasks concurrently. And if we need to scale our compute capability beyond what our local device could provide, Prefect makes it really easy to run our task on external infrastructure or on-prem compute clusters. And number five, we got an easy way to maintain a history of all the data and reports that our workflow generates through Prefect's artifact system. Prefect lets you build, observe, and react to all sorts of data pipelines. And I'm just scratching the surface of all the awesome things that Prefect can do. Prefect's cloud product levels up your workflow orchestration with true end-to-end observability. So you never have to wonder what's happening with your workflows. To get access to Prefect's powerful automation system, AI-powered log summarization, or role-based access controls, you should sign up for the totally free tier of Prefect Cloud by scanning the QR code on screen or clicking the link at the top of today's video description. Using that link really helps the channel. If you made it to the end of the video, please like the video. It helps this video reach more awesome people like you. And if you really liked the video, then you should watch this video next to learn why Compiled Python is incredibly fast and shockingly easy to use. Thanks for watching.