Airflow Tutorial for Beginners - Full Course in 2 Hours 2022

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
hello everybody this is code2j welcome to the airflow tutorial for beginner full course in this two hour course we combine theory explanation and practical demos you don't need any prerequisite to start this course but basic python knowledge is recommended to help you get started quickly as an absolute beginner to make the most out of it it is highly encouraged to follow and try out the hands-on examples in the description you will find the link to the course github repository which contains the source code of or examples all the hands-on demos are based on the latest airflow version 2.0 throughout the course you will be given an introduction to apache airflow and how to run it locally in the python environment and stalk a container by practical examples then you will also learn about the airflow core concepts task lifecycle and basic architecture after that we will show you how to build airflow dac with different operators like bash and python operators and how to share data between tasks using xcoms via demos also the new feature introduced since airflow 2.0 task flow api will be covered with an example then you will learn airflow dac catch up back view and how to schedule dac with the chrome expression after that we will teach you how to connect to external services like postgres database and aws a3 via airflow connection using demos then we share the tips on how to install python packages in the airflow docker container in the end we will cover how to use airflow hooks with examples of postgres and s3 make sure you subscribe and smash the like button if we hit 1000 likes i will do a video about how to debug airflow dag if we hit 5000 likes i would do a video about airflow docker operator sounds exciting let's get started so what is apache airflow it is an open sourced tool which we can create schedule and monitor many kinds of workflow it is the perfect tool to use when we have a workflow with taskers 123 which should be run periodically in specific order airflow is based on python if you already have some coding experience with python you are good to go ok let's get started first of all let's create a python project and in-store airflow i'm going to open visual studio code to create a project folder and open it and i will name it airflow tutorial on the desktop directory let's open a terminal and check whether i am in the right directory or not by command pwd since airflow 2.1 requires a python version above 3.6 let's check my python 3 version by command python 3 minus minus version as we can see my local python 3 version is 3.6.7 it meets the requirements that's great you can also use version 3.7 or 3.8 let's create a python environment with command python 3 minus m vm with environment folder named py underscore if once created successfully we can see the py underscore in folder in the explore section now let's active the python environment use command source py underscore if bin active when activated we can see the py underscore if on the very left of the terminal prompt next step i'm going to install airflow locally let's open browser and search apache airflow official github repository and navigate to the ink store section i will copy the ink store command and paste in the terminal before executing i have to change the dependency to match my local python version instead of 3.7 we change to 3.6 oops there's an error of missing gcc to solve this problem i have to run command xcode minus select in-store to install mac os command line tools the installation takes couple of minutes after that we execute the pips in store command again this time there's no error therefore airflow is installed successfully okay next step i'm going to initialize the database for airflow before that we have to indicate the airflow home directory by default it will create a folder airflow in home directory however i would like to have everything in my project directory so i export the airflow home environment variable to my current directory then i will initialize the database with command airflow db init this command will create a sqlite database a log folder and some configuration files next i will start airflow web server by command airflow web server minus p 8080 by default 8080 is the port to be used if you want you can also other use other port for example 8088 8082 as you wish then let's open the link in the browser it requires me username and password to log in okay let's go back into the terminal stop the airflow web server and create one user by command airflow users create and give value to parameters as shown in help output by setting the password we start the airflow web server again and we are ready to send sign in it looks great i can see all the example decks but over that it says there's no schedule running okay in order to execute the dex we have to start the airflow scheduler let's go back to vs code open another terminal first we have to make sure we export the airflow home environment variable as we did in the first terminal then we execute the command airflow scheduler now let's go back to browser and refresh the page we can see boom the message is gone it means the scheduler is running properly let's turn on one example dag by clicking the button before the deck from the tree view we can see this deck has multiple tasks when we click the refresh button on the right hand side we can see the tasks have been scheduled and executed and it has been marked as dark green once it has been run successfully let's open vs code create a project name on my desktop directory and open it i'm going to name it airflow underscore darker let's open the terminal and check whether we are in the right directory or not by command pwd as we can see we are inside of our project directory that's great then i'm going to open apache air flow's official website and look for the documentation we click the quick start section instead of running airflow locally we are going to install it with docker so we click running airflow in docker before we actually do the installation we have to install docker and dock compose in our laptop no worries it's just as simple as you in-store any other software if you are running with a mac or a windows laptop what you need to in-store is just the docker desktop application you can find the download link in the description for both windows and mac os once you've download the file just double click and follow the installation steps okay as i already have it installed i'm going to launch the docker desktop application it might take less than a minute or more depending on how powerful your laptop is once started we can see the docker icon on the menu bar when clicked it indicates the status of darker when you see the green text which says darker desktop is running we can go back to vs code to check darker and darker composed versions by command darker minus minus version and docker compose minus minus version if you see the version output it means you have a running darker and darker composed now we should have all the preparation worked down let's go back to the airflow documentation and download the official docker compose yama file by copying the cool command and paste in the terminal and then enter to execute it once successfully we can see the doc compose file has been downloaded in our project directory let's open the yama file we can see it defines many surfaces and composes them in a proper way by default the yama file defines airflow with salary executor to make it simple we are going to use local executor therefore i'm going to change the core executor from celery executor to local executor and we don't need salary result backend salary broke url for local executor so we delete them redis is necessary for salary we don't need it either so delete its dependency and definition and we also don't need celery worker and flour we are going to remove them that's it we saved the yama file and are ready to go i suggest you pay attention to these steps and watch back and forth to avoid missing any of them you can also find a github repository link in the description below by which you can get the finer version of the yama file okay next step we need to create folders for dacs logs and plugins which are quite self-explanatory saving airflow decks logs and customized plugins just copy the command paste in the terminal and execute it we can see all the folders have been created successfully under our project directory echo the airflow user id and group id to inf file it's only necessary when you are using linux os i'm using mac os so i just skip this step next we are going to initialize the database by the commands docker compose up lflow init we can see that it is going to download all the necessary docker images and set up an admin user with airflow as the username and password once you can see the airflow init is exited with code 0 it means the database initialization is complete next which is the most exciting step we are going to run the air flow with command docker compose up minus d which means in detached mode running containers in background let's check what containers are running by command docker ps we can see from the output that there's an airflow web server airflow scheduler and the postgres database let's open the browser and input 0.0.0.0 8080 port to check our airflow web server in the meantime we can also open docker dashboard here we can also see all the running containers in our airflow project we type username and password airflow to logging boom we can see that airflow is running properly in darker we can see all the example dags let's pick the first one and start it when we click the refresh button we can see the tasks have been scheduled and executed in the end the successful dac and task run have been marked in dark ring that's awesome congratulations you have got airflow running successfully in docker to better understand the core concepts let's recap what airflow is where and how does it come from airflow starts as lbmb's internal tool to manage increasingly complex workflows in 2014 from the beginning the project was made open source becoming an apache incubator project in march 2016 and a top-level apache software foundation project in january 2019 airflow is one of the most popular workflow management platforms and it is written in python since its goal is to manage workflow what is workflow workflow is a sequence of tasks in airflow workflow is defined as dac namely directed acyclic graph for instance we have a workflow which starts with task a when a finishes followed by task b and c when b and c finish executing the final task d and e however when task d finishes it is not allowed to run task a again since it creates a cycle likewise task c cannot be followed back by task a therefore dag is a collection of all the tasks you want to run organized in a way that reflects their relationships and dependencies what is task an operator a task defined as a unit of work within a dag as shown in an example dac it is represented as a node in the dac graph and it is written in python and there is a dependency between tasks for example task c is the downstream of task a task c is also the upstream of task e the goal of the task is to achieve a specific thing the method it uses is called operator while decks describe how to run a workflow operators determine what actually gets done by a task in airflow there are many kinds of operators like a bash operator python operator and you can also write your own customized operator each task is an implementation of an operator for example a python operator to execute some python code or a bash operator to run a bash command to sum up the operator determines what is going to be done the task implements an operator by defining specific values for that operator and dag is a collection of all the tasks you want to run organized in a way that reflects their relationships and dependencies but what is execution date deck run and task instance the execution date is the logic date and time which the deck run and its task instances are running for for example we might currently have three dag runs that are in progress for 2021 january 1st to 2021 january 3rd a task instance is a run of a task at a specific point of time namely execution date a deck run is an instantiation of a deck containing task instances that run for specific execution date that's it these are the most essential concepts that you need to know about airflow when a deck run is triggered its tasks are going to be executed one after another according to their dependencies each task will go through different stages from start to completion every stage indicates a specific status of the task instance for example when the task is in progress it status it's running when the task has been finished flawlessly it has a success status and so on there are in total 11 different kinds of stages in airflow ui graph and tree views these stages are displayed by a color represents each stage in the early phase a task might go through from node status to queued after that a task will be at the execution phase from running to success if the task fails it will be directed into up for retrial upstream failed or up for reschedule and during the task lifecycle if we manually abort or skip it the task will be in the status of shut down or skipped accordingly let's have a closer look and explain each of them in the task lifecycle diagram as i just said a task is usually starting with no status which means the scheduler create an empty task instance after that there are four different stages that the task can be moved on they are scheduled removed upstream failed or skipped scheduled means that the scheduler determined task instance needs to be wrong upstream failed once the task's upstream task failed skipped if the task is skipped or removed when the task have has been removed if the task is lucky enough to be scheduled then the executor kicks in which puts the task into the task agree and the tasker's status changed to creed after that the worker will execute the task once it is free free means the workers computation resources are not fully occupied at this stage the task's status is running based on the execution results there will be three possible stages success failed or shut down which is very straightforward success means the tasks complete flawlessly failed means the task fails and shutdown if the task run has been aborted in the case of the tasking stages failed or shut down if the maximum retry is not exceeded the task will be forward to up for retrial stage which means the task will be scheduled and rerun after a certain waiting time in some specific cases a task in the running stage can also be directed in the up for rescheduled stage which means the task will be rescheduled every certain time interval a simple example for this use case would be that you will never move forward the task unless a file has been saved in the s3 bucket so you want to check the file's existence every 10 seconds for instance using a sensor task to sum up a happy workflow execution process will be starting with no status and the scheduler scheduled the task after which the executor puts the task into the task queen once the worker picks up the task and executed it flawlessly we will receive a happy task process okay we now understand the complete life cycle of the task of airflow let's dive into the basic architecture of the airflow we have mentioned lots of components in the previous videos when we talk about airflow and you can also find such a diagram on the airflow official documentation site on the diagram you can see things like the data engineer web server scheduler worker dax and etc what is the responsibility of each one and how do they actually work together first of all we need someone called data engineer like you to start building and monitoring all the etl process data engineers have to configure the airflow setup like the type of executor which database to use and etc data engineers create and manage the dacs they authored through the airflow user interface which is supported by the web server in addition to that the dacs are also visible to the scheduler and workers which change the tasks status during the whole task lifecycle apart from that there's a component called executor in order to persistent the update and retrieve the info of the dax those four components are connected to the database closely there are a variety of database engines you can choose like mysql postgres that's it this is the basic architecture of the apache airflow and how the most essential components work together first let's open our project folder in vs code and then we launch the airflow by command docker compose up minus d after that we open the link localhost 8080 port in the browser it might take some seconds but in the end we can see the airflow admin login page after we put the username and password airflow that we created we can see all the example decks since we are going to create our own dac let's remove all the example decks we go back to the vs code terminal type docker compose done minus v minus v means that we are not only shutting down the airflow containers but also removing the volumes we defined in our document compose yama file then we change the value of airflow core load examples in a yama file from true to false then we emit the airflow by command docker compose up airflow minus init after that we launch airflow again using docker compose up minus d after we refresh the page and logging we now see no example dag it's loaded that's great next we are going to create our first dag now let's go back to vs code in lflow airflow dag is defined as a python file under the dax folder therefore let's create one called our first deck wpy and open it a dac implementation it's an installation of the class dac therefore we have to firstly import the dac from airflow then we create an instance of dag using the with statement in this case all the following code will be under the scope of our deck in this instance we have to give values to a couple of parameters let's type our first deck as the unique deck id and describe our dac as this is our first airflow deck that we write after that we have to decide when we want to start our dac and how often we want to execute it let's first import data time from datatime package let's say we want to start our dac from the 30s of july and run it every day at 2 am we basically set the parameter start date equal data time 2021 july 30 and 2 and scheduler interval equal at daily apart from that we would also like to define the common parameters which will be used to initialize the operator in default odds for example we want to set the owner of the dac as code to j the maximum time of retrials is 5 and re-tri delay will be 5 minutes for retry delay we need to import the time data from the data time package and set the two minutes as the wait time for every retrial then we set the dax default aux equals to our defined default aux dictionary variable next let's create a simple task for our dag we will use bash operator to execute some bash commands therefore we have to import bash operator first a task it's an implementation of an operator so let's create a very simple task print out a message hello world this is the first task let's type task 1 equals bash operator we first have to set the task id equals first underscore task then we set the bash command equals echo hello word this is the first task let's go back to our browser and refresh oops we found errors in our code let's click it and see the details it says that there is no module named airflow.operator let's go back to vs code and change operator to operators refresh again the error message is not the same which means we fixed the bash operator import error the new message says that minute is the invalid argument for time data i think it should be minutes not minute let's try and refresh now the time data error is fixed but we got another error with an unexpected keyword argument scheduler underscore interval let's fix this by change the argument to schedule underscore interval and refresh boon we have no errors and finally see our first dac shows up we can see there's only one task named first underscore task in the tree view let's turn on our first dag we can see that our first deck will be executed from the start date july 30 midnight all the way up to yesterday midnight if we select one of the successful task runs and open the execution log we can see the hello world this is the first task message has been printed out in general an etl process consists of multiple tasks let's create our second task which will be executed after the success of task 1. after the definition code of task 1 we type task 2 equals to bash operator we put second underscore task as the task id and the simple bash command says i'm the second task and i will be running after the task 1. now that we have defined task 2 let's build the task dependency to achieve this we can set task 2 as downstream of task 1 or task 1 as the upstream as task 2. in order to not mess up the version of dac let's change the dac id to our first dac version 2. let's go back to the browser and refresh you can see there are two decks now our first dac and our first dac version 2. from the graph view we can see task 2 is after task 1 which is exactly what we wanted let's start the dac once it is complete we can see task 2 will be run only after the success of task 1. let's check the execution log the log shows that the execution date time of task 2 it's later than of task 1. what if we want to have 3 tasks and task 2 and 3 will be run once task 1 finishes let's go back to vs code and add task 3 and build the dependencies we will create our third task with bash command echo hey i'm the task 3 and i will be running after task 1 at the same time as task 2. there are many ways we can build the task dependency let's try our first method basically add the task 3 as the downstream of task 1. we change the tag version refresh the page and see that task 1 is followed by tags 2 and 3 in the graph view let's start the dac and all the tasks will be executed follows the dependencies let's check the execution log of task 3. the message has been successfully print out and the execution date time of task 3 and 2 is later than that of task 1. let's go back vs code and try the second method with the bit shift operator task 1 right shift task 2 and task 1 write shift task 3 which means task 2 and 3 are the downstream tasks of task 1. let's change the dac id version and refresh the page again we can confirm the correct task dependencies in the graph view let's go back one more time and introduce our third method which is the adjusted version of the second method we can convert the two line bit shift operator into one with task one right shift square bracket with task 2 comma task 3 inside of it go back to the browser and refresh we can confirm the task dependencies are correctly configured first let's open our project folder in vs code and check if airflow is running by docker ps we can see the airflow components are already up if not you can launch airflow by command dock minus compose up minus d and log in okay we have running airflow now let's go back to vs code and create a python file under the dac folder called create underscore dac underscore with underscore python underscore operator.py and open it we have to first import the dac package and create an instance of dac using the with statement of course let's create a default dictionary variable set code to j as the owner retries equals to 5 and 5 minutes for each retrial of course we have to import data time and time dieter packages we then set the dax d4 act equals to our defined d4 oct let's give our dac with python operator version 01 as the dac id and describe it as our first deck using python operator then we set the start date of the dac to yesterday 2021 october 6 and schedule it daily let's define a simple python function that we want to run as a task let's name it as greet which will bring out a hello world string when it is executed okay what's next let's create a task using python operator to run our grid function we have to import the python operator module in the first place then we set the task id to greet and basically pass our greet function name to the python callable parameter we are good to go let's go back to the browser and refresh our airflow web server we can see there is no error message and our deck with python operator version 1 dag shows up in the graph view we can of course see that it has only one task named greet let's trigger the dac and check the log of the task run from the log we can see that hello world has been printed out which means our python function indeed has been executed in practice we will use more complex python functions with some parameters for example how do we pass the python function's parameters using the python operator let's go back to vs code and update our greet function to take some parameters we add name and h as the parameters and print out hello world my name is name and i am eight years old in python operator there's a parameter called op keyworks which is a dictionary of keyword arguments that will get unpacked in the python function let's set the name to tom and set the age equals to 20. update the version of our dac and go back to the browser to refresh the airflow web server let's trigger it and check its log we found hello world my name is tom and i am 20 years old has been printed out that's great our parameter values are passed successfully to our greet function using up keyworks you can basically pass all kinds of parameters that you defined in your python function we now know how to use python operator to run our python function and how to pass parameters to it but can we share information between different tasks yes we can achieve this using airflow x comms basically we can push information to xcoms in one task and poor information in other tasks by default every function's return value will be automatically pushed into xcoms let's go back to vs code and create a new python function called get underscore name and we simply return jerry as the name then we build a new task using python operator to run it let's set its task id equals to get underscore name python callable is get underscore name we comment out task 1 and update the version of our dac of course let's refresh the airflow web server select the newest dac and trigger it in the log we can see a line called done returned value was jerry then we go to admins and xcoms the return value of our function get underscore name has been pushed to xcoms let's go back to vs code to pull the name jerry in our greet function using poor let's delete the name parameter and add ti which is the task instance since export can only be called by ti let's pull the name by typing name equals ti dot x columns underscore pool and we have to set the task ids equals to get underscore name basically to pour the return value of task with id get name we have to also uncomment the task one remove the name parameter in the up keywords and build the dependency namely task 1 is the task 2's downstream since we should first push and then pull the name update the dac version and refresh the page trigger the dac and check our log oops we faced some issues let's see exactly what error is the log says task instance has no attribute xcoms underscore pool i think the poor function should be xcom underscore pool let's go back to vs code to change it okay refresh again and clear the first try of the greet task wait a second since it will run the grid task again this time it is executed successfully let's check the log again we can see hello world my name is jerry i'm 20 years old named jerry has been pulled successfully let's also check the x columns we can also see jerry as the value and return value as the key what if we want to push multiple values into x comms in one function can we distinguish them of course yes let's go back to vs code and this time we need to push first name and last name into x columns we have to first add task instance in our get name function and remove the return statement then we call t i dot xcom push function by giving a key and value with first name and jerry then we push our last name by calling it again with last name as the key and fleetman as the value now let's pour those values in our greet function we have to call the xcom tool function of course apart from giving get name to the task ids we have to tell it which key of the values to pull obviously we put the first name from the key first name and last name from last name let's modify our print message to include first name and last name update the version of our tag and save it refresh the browser and trigger it let's check the log boom our pushed name jerry friedman was included in the print out message let's double check the x comms we can see that the pushed first name and last name have been distinguished by their unique key value let's also quickly update our code to get age via xcoms instead of up keyworks we create another function called get age with task instance parameter in the function body we just push an h value 19 with age as the key we then create a task for our getage function with python operator with task id of get age then we remove the up keyworks of task 1 since we want to pull the age let's update the grid function by removing the h parameter and pull the h value via x compoor with the key of h and the task ids equals to get age update the task dependencies by adding the task 3 to the upstream of task 1. then we update the version of our dac and refresh the browser triggering the dac from the log and x comps we can see all three values have been pushed and pulled successfully although it is very handy to use x-coms to share value between tasks you should be aware that the maximum size of x-coms is only 48 kilobyte yes you heard me right it's 48 kilobyte not megabyte or gigabyte we can confirm this from the source code of airflow xcom so never use xcoms to share large data for example pandas data frame otherwise it will crash let's go back to vs code we can see that the create dac was python operator dag has around 60 lines of code let's see how much code we can reduce by rewriting it using the task flow api let's create a python file named dac with task flow api dot py under the dac folder and open it then we have to import the dac and task from airflow decorators we also need to create the default aux variable to define the recharge and retry delay next we have to define our dac let's create a python function called hello world etl with the deck decorator above in the decorator we will sign values to the dac id default start date and schedule interval inside this dag function we will create three tasks get name get age and greet each task is represented by a python function with the task decorator above let's create our first function get name which returns jerry then creating the function get age returns 19 as the age and of course don't forget to put the task decorator above and lastly our greet function with parameters name and age in the function body we print hello word my name is name and i am age years old that's it we have all the tasks defined but how can we build their dependencies since the task flow api will automatically calculate the dependency what we need to do is to call the get name and get age function to get the returned name and age variables then we pass them into the greed function as parameters and don't forget the final step to create an instance of our dag save it and let's go to the browser and refresh the page select the dac we created and from the graph view we can see the dependency is correctly defined let's turn on the dag and wait for its execution once it finishes we can check the log of the greet task which has print out the message with the desired name jerry and age 19. let's also check the x columns we can see the returned name and age has been pushed into x columns and the task flow api takes care of the xcom's values push and pull operations but if we want to return first name a last name instead of name in our get name task let's first to change the greet function to take the first name last name and age parameters and update the grid message then we have to first put the multiple output parameter equals to true in the get name task decorator then we return a dictionary which includes a first name jerry and last name friedman then we can get the name dict from the get name function unpack it and pass them into the greet function parameters first name and last name let's update the dag version and refresh the page in the browser turn on the newest dag and wait for it to finish once it's done open the lock boom we can see that there are first name jerry and last name friedman in the message let's also check the xcoms not only the name dict has been pushed to xcoms but also the first name and last name with its key and value exist in the xcoms now we have a rewrote version of the python operator dac with the task flow api let's go back to vs code to compare them we can see that the dac version with task flow api only needs around 40 lines of code which has reduced nearly a third of the code implementations of the previous version let's go back to vs code to create a new deck called deck with catch up and backview.py and open it we firstly import every package we need then we create a very simple dac with a simple task using bash operator the current time is 2021 november 10th we set the start day of the dag to the past which is november 1st and schedule it daily in airflow deck there's a parameter called catch up by default it is set to true let's set it manually anyway save the file and then go to the browser to refresh the page let's pick our dag and show it in a tree view then we start the dag click the refresh button and we can see that many deck runs have been executed once it finishes we hold the mouse over the background from the left to the right we can see the deck's schedule and execution date are november 1st november 2nd all the way up to november 9th since november 10th is not fully passed the latest deck run is november 9th when the time is after november 11 12 am the november 10 deck run will be scheduled and executed great the up feature helped us run the deck since the start date november 1st but how can we achieve this using the back view let's go back to the vs code to disable the catch up basically to change the catch up parameter to false then we update the dac version we are good to go to the browser to refresh the page and select our newest dag let's check the code and verify that we have turned off the ketchup then we start the dag once it finishes we can see that it only executed the background on november 9th that's correct since we set false to the catch-up parameter but we can still run the deck in the past using the back field command let's go back to the vs code terminal to find the airflow scheduler container by docker ps then we open its bash interactively in the container by command docker exec miner i t bash from the prompt we can see that we logged in as the user airflow and the current directory is opt airflow to back view the background we need to execute the command lflowdx back view with start day end date and the deck id let's back view from 2021 november 1st to 2021 november 8th with our deck id we click enter to execute the command once it finishes we can see the log back view it's done we exit the container by the exit command then go back to the browser and refresh the page we can see dag runs from 2021 november 1st to 2021 november 9th which backfield wrong from november 1st to november 8th and scheduled run november 9th in airflow creating a dac needs the schedule interval parameter which receives a current expression as a string or a data time time data object so what is cron expression a cron expression is a string comprising five fields separated by white space that represents a set of time normally as a schedule to execute some routine airflow already provides some presets for the schedule interval like at daily at hourly which is linked to its chrome expression stream so if you know how to use chrome expressions you can schedule your dag in any way you want let's go back to vs code and create a new dag file called deck with chrome expression.py and open it we firstly import any packages needed then we define the dac with a simple task using the bash operator we want to start our dag at the november 1st and then we schedule it daily with the airflow chrome preset at daily save the file then we go to the browser and refresh the page let's select the dag and start it wait until it finishes we can see that the dac has been executed from november 1st to yesterday november 16th let's go back to vs code to schedule our dag daily using the chrome expression stream we change the schedule interval parameter from the add daily to the chrome string 0 0 and 3 starts separate by space update the dac version and save the file let's refresh the browser and select the newest dag then we start it once it finishes we can see that the dac execution history is exactly the same as the add daily preset but how to generate customized schedule interval using the chrome expression luckily there's a website called crontab.guru which gives us a visual way to generate and verify the chrome expression let's check it in the browser in the text field we can define our current expression every time we give our input it will automatically verify whether the input is valid or not if we input the wrong syntax the text field will turn red otherwise it will interpret our inputs chrome expression and show its meaning in human language above the text field now let's try to generate a chrome expression which runs tasks weekly on tuesday 3 am in the morning and see the at 3 am on tuesday above let's go back to vs code to change the schedule interval parameter to our customized chrome expression update the dac version and save it let's refresh the airflow web server ui we can pick the newest dac then start it once it finishes we can see the deck has been executed at 3 am from the first tuesday november 2nd to latest tuesday november 9th [Music] what if we want to run the deck weekly but on multiple weekdays for example to schedule our deck weekly on tuesday and friday at 3 am we just need to add comma and friday to our previous chrome expression let's use the chrome tab.guru to verify it yes we can see the interpretation above says at 3 am on tuesday and friday what if we want to run our deck weekly from tuesday to friday we can also just simply add wednesday and thursday to it with comma in between or we can type tuesday dash friday both ways will work let's copy it and go back to vs code to change our dac's schedule interval then we update the dac version go back to the browser to refresh the airflow web server pick the newest dag start it and wait for its execution to be finished once it is done we can hold our mouse over the dag runs and we can see the dag has been executed every tuesday up to friday from november 2nd normally when we build an etl dag we need to connect to some external services for example database servers like mysql postgres cloud servers like azu aws etc and many other types of servers to connect those we need the credentials like host username password port and etc you can create and manage them easily by airflow connection which can be used by corresponding operators in airflow web server ui if we mouse over the admin we can see the connections menu let's click it here you can see all the connections that have been created let's have a look at the add connection page by clicking the plus button here you can define the name of the connection and create whatever type of connection you want if the connection type is missing you can just install the corresponding provider packages or create your own customized connection just view all the necessary fields click save we can create a connection which is ready to be used ok now you know what a connection is let's learn how to use it with postgres operator to demonstrate the postgres operator we need a postgres database let's expose the postgres database we used in the airflow project by adding ports 54325432 to the postgres services in the docker compose yama file then we can recreate the postgres container let's connect it use the dbeaver which is an open source cross-platform database management tool if you don't have it just go to browser search dbeaver click download and pick the one that matches with your operating system i already have it let's launch it and create a new connection we select postgresql then we input localhost as host username and password is airflow then we click the test connection to verify it it may ask you to install postgresql jdbc driver if you don't have that just install it and try the test connection again once it says connected we click finish then we can right click on the databases click create new database we name it test if we expand it we can see it is empty great we have a brand new postgresql database let's use an outflow dac to create a table and insert some values let's go back to vs code and create a dag file called deck with postgres operator.py and open it first we are going to import any packages we need then we define the default aux after that we initialize a dag by setting up its deck id start day and schedule interval let's create our first task using the postgres operator it requires mainly three parameters task id of course a connection id to tell the operator which postgres database to connect to and a sql query statement which will be executed let's go to the airflow web server ui to create one we click the plus button and give postgres localhost as the connection id we have to select the connection type to postgres schema is the database name in our case it's test username and password are airflow port is 5432 as we did in the docker compose gamma file for the host we can either give the postgres service name defined in the docker compose yama file which is postgres in our case or localhost if we are using docker desktop application or mac os or windows to connect localhost from a container using localhost will not work instead we need to use host.docker.internal now click save we can see postgres connection in the overview let's copy the connection id and go back to vs code set it to the postgres connection id parameter then in the sql statement we just create a table if not exist named dag runs with column data time as a date and deck id data type character varying and the primary key is the combination of data time and deck id columns save the file and go back to the browser to refresh the page select the dag and start it ops the task failed let's check the log and see what's wrong it says cannot translate the host name host.darker.local we had a typo it should be host.darker.internal let's fix it clear the task run and let it run again oops it fails again let's check the log it says we had a syntax error in our sql query statement we missed a t in the character variant data type let's go to vs code to change it let's clear the tasker runs and wait for its execution now it succeeds finally we can see from the log that our create table sqr query statement has been executed let's go to dviver to verify it great when we refresh the table we can see it created the table backgrounds exactly as we defined let's create another task which inserts the deck id and execution date into the background tables using the postgres operator we set the task id postgres connection id which will be the same and in sql we type insert into deck runs parenthesis dt and comma deck id values parentheses with ds in the double query brackets and dac.deck id also in two curly brackets dt is the dag runs execution date and deck id is the deck id which are set by default by airflow engine and can be accessed by putting the variable name into two curly brackets let's build the task dependencies update the dac version then we go back to the browser to refresh the page select the newest dac we then start it oops it fails let's check the log to see what the issue is it says template dt is undefined let's search the airflow macros documentation the execution date template variable is ds not dt let's go back to vs code to update it then we go back to the browser to clear the current task runs and let it run again now it works from the log we can see there's one row inserted into the database table backgrounds with the execution date 2021 december 19 and the deck id deck with postgres operator version 2. let's also clear and rerun the latest deck once it finishes we can see it also succeeds inserting the correct execution day and deck id as shown in the log let's verify it using the debeaver we select all the rows in the table backgrounds we can see the number of rows is the same as the number of backgrounds in airflow it is recommended to delete data before insert to avoid data duplication or primary key violation for example if we cleared one of the successful insert tasks it tries to insert data which already exists in a table in the end it will fail since it violates the primary key constraint let's fix this by adding a delete task before insert we can copy the insert task and rename its task id and change the sql from insert and to delete from backgrounds where dt equals ds and dac id equals dag.id then we rebuild the task dependencies with task 3 as the upstream of task 2 and downstream of task 1. save the file update the dac version and then refresh the page oops we have an error it says task 3 is not defined let's go back to vs code to change our delete operator task variable name to task 3. save the file then refresh the page and select the newer stack to start it once it finishes successfully we can check in the debeaver there are two rows for the newer state with date december 19 and 20s let's clear the december 19th deck rom wait for its execution boom we can see it succeed without violating the primary key constraint since we delete before inserting data let's double check in the debeaver we can see that we have exactly two records for the newest dac id hello everybody this is code2j as we all know the beauty of python is its tons of excellent third-party packages in our airflow project we definitely need those but how can we install them in our airflow docker container today i'm going to show you two ways to do that sounds exciting let's get started in general you can either extend or customize the airflow darker image to install your python dependencies both of them have pros and cons for example extending the image only requires basic knowledge of darker images it doesn't need the airflow source code and builds really fast however if you need to build from the airflow sources or want to heavily optimize the image size then you definitely need to customize let's first try to extend the airflow darker image let's go back to vs code open our airflow project folder and create the requirements text file in which we can define the python dependencies let's assume we need the scikit-learn package to do some machine learning model training so we add a line in the requirements file with scikit-learn double equal 0.24.2 next we are going to install all the dependencies defined in the requirement file by extending the airflow image to do that we have to create a docker file in our project root folder and open it then we write from apache airflow 2.0.1 which tells docker that we want to extend the official airflow darker image with version 2.0.1 next we need to copy the requirements text file in our project root folder to the docker image using the copy command then we run the pip upgrade command to have the latest picked version and run the pip in store to get all the python dependencies extorted save it and we have a perfectly defined docker file which is ready to be built let's build the extended image by command docker butte punct minus minus tag extending l flow latest basically we tell the docker to build an image using the darker file existing in the current directory and name the image as extending airflow and version it as latest it may take some minutes to finish in the meanwhile we can see from the log that it builds exactly the same steps as we defined in the dockerfile once it finishes we need to open our docker compose yama file to change the image name from apache airflow to extending airflow latest version let's create a dac file to verify whether the scikit-learn package has been installed let's initialize the deck instance and then create a function called get scikit-learn in which we print out the scikit-learn package version then we build a task using the python operator to run this function save it and we need to rebuild the airflow web server and scheduler services since we modified the airflow image name in the docker compose yama file let's do it by command docker compose up minus d minus minus node depends minus minus build airflow web server airflow scheduler then we go back to the browser logging and refresh the page trigger our newly created dac and check the log we can see the sidekick learned version has been printed out what if we want to add more python dependencies let's say we need matplotlib let's go back to vs code add matplotlib with version to the requirements text file in the dac we create another python function called getmatplotlib to bring out the matplotlib version update the dag version go to the browser and refresh and trigger the newer stack oops our get matplotlib task failed let's open the log we can see an error no matplotlib package why does this happen because we changed our requirements text file locally but not in our dark image and containers so whenever we change our python dependencies file we have to rebuild the image let's do it by running the docker build command again then we have to rebuild the airflow web server and scheduler containers by command docker compose up minus d minus minus node depends minus minus build airflow web server airflow scheduler clear the previous task runs and wait for the retry to finish from the log we can see the matplotlib version has been printed out successfully this time that's it we managed to install python dependencies via extending airflow image how can we achieve this by customizing the airflow image basically we have to build it from airflow source code let's open the second terminal in vs code cd to the directory desktop then we need to clone the airflow source code let's google the official airflow github repository and clone it make sure you have set up the ssh key properly it may take some minutes but we finally have it let's open it in a new vs code window first we have to find the folder docker contacts files since every python dependency defined here will be installed automatically when building the image let's create a requirements text file in it and put scikiler and matte plot lip packages with version info then we need to build the darker image by command docker butte comma minus minus build arc airflow version equals 2.0.1 and tag it as customizing airflow latest version basically tell docker to build the image with version airflow 2.0.1 using the dockerfile in the root directory and name the image as customizing airflow version latest it might take five minutes or more since it views the image from the source if you pay attention to the step trough of airflow build image you can see it in stores all the python dependencies defined in the requirements text file in the darker context file folder next we need to replace the airflow image name in the doc compose yama file let's go back to our l flow project vs code window and update that save the yama file and we recreate the docker containers for airflow web server and scheduler by the command docker compose up butte airflow web server airflow scheduler let's pick the same dac and clear the previous deck run wait for it to be rescheduled and executed when we check the log we can see both python packages and their version have been printed out which means they have been installed successfully ok now you might wonder which way you should choose well in my opinion you can go with the first method in 99 percentage of the cases namely extending the image because it is super fast and easy however if you want to have more things to be customized and really care about optimizing the image size you need to customize the image sensor is the special type of operator which waits for something to occur it is a perfect tool for use cases in which you don't know exactly when the data will be available for example your client will upload a csv file to the aws s3 bucket daily but can be at any time in the day you basically need to check whether it is uploaded at a certain time interval before starting other tasks like downloading or cleaning for simplicity in this tutorial we will use a s3 compatible open source solution called minio instead of aws f3 minio is api compatible with aws s3 cloud storage service if you know how to build etl with menu you can easily apply it into aws f3 moreover we can easily set up a menial service in a darker container let's go to the browser search for its official website then we go to docs click legacy documentation here we can find the menu docker quick start guide let's click it and we can see the example darker command to launch a menial darker container copy it we then go back to our vs code open a new terminal paste the command the docker command basically runs a menu darker image exposing two ports nine thousand and nine thousands won and set up the root username and password let's hit enter to execute it once it is done we can see the 9000 port is for the api 9001 is for the console let's copy the console localhost url and paste it in a browser then we can see a login page which requires us to input the root username and password let's copy them from the terminal and login once logged in we can see we have no s3 bucket let's create one by clicking the create bucket button let's name our bucket name airflow then click create a bucket and we can see the bucket airflow we created make sure we have the read and write permission then we click the browser button here we can either create a path or upload a file let's go back to vs code to generate a csv file which will be uploaded later to our airflow bucket let's first create a folder called data in our root project directory then we create a csv file called data.csv with two columns product id delivery dt with some rows just for demonstration okay save it and go back to the browser to drag the file from the folder to our airflow bucket once uploaded successfully we see the data.csv file exists in our airflow bucket now we have a proper s3 bucket setup let's start building an airflow dac to connect to our s3 bucket and sense the file's existence we first create a python file called dac with menu s3.py and open it we need to import the necessary python packages then we create the default arcs variable after that we initialize the dag with a proper deck id start date and at daily schedule interval next we need to create a task using the s3 sensor operator since menu is s3 api compatible to connect to our menu bucket we can use the aws s3 api which is included in the airflow and amazon providers packages let's first make sure which version of the amazon airflow provider package we installed it let's open our docker compose yama file to change the airflow image back to the latest extending airflow or official apache airflow version 2.0.1 if you don't know how to extend the airflow darker images yet let's check out my last video about that don't forget to recreate the airflow web server and schedule by the command docker compose up minus d node depends butte airflow web server and airflow scheduler then we find and copy the container id of the airflow scheduler by command docker ps after that we enter into the airflow scheduler container by command darker exec minus id with the copied container id bash on the left we can see airflow at container id which means we are inside of the container let's run command pip list pipe grip amazon the output shows we have an airflow amazon provider package with version 1.1.0 installed it that's great let's go to the browser and search for the apache airflow official documentation site here below the providers section click the amazon on the top left we change version to 1.1.0 to match our local installation then we check the python api look for sensor f3 we found airflow.providers.app the sensors.s3 key and click it here we found the f3 key sensor which says wait for a key a file file-like instance on s3 to be present in a s3 bucket that's what we need exactly to use it we need a couple of parameters like bucket nine bucket key aws connection id etc let's copy the package directory and go back to vs code to paste it we basically need to import the s3 key sensor operator from the package directory let's build a task using the s3 key sensor we set the task id to sensor menu s3 bucket name is airflow bucket key is the file we want to censor which is data.csv then we need to set up an aws f3 connection id let's go to the airflow ui from the admin connections click the plus button to add one we set the menu connection as the connection id name for the connection type we need to select f3 then we only need to write a dictionary in the extra field which consists of the aws access key id aws secret access key and host xs key is the menu root username and the secret key is the menu root password host is http host.darker.internal with port 9000 host.docker.internal is the local host since we are using mac docker desktop 9000 is the port menu exports for the api connection save it we then go back to vs code to set our newly created s3 connection id name to the aws connection id parameter save the dac file then we go to the airflow web server pick our newly created dac and start it oops the task fails let's open the log it says expecting property name enclosed in double quotes ok let's update our connection extra field from single quotes into double quotes [Music] clear the task and refresh we can see the background succeeded let's check the log from the log we can see it was checking the data.csv file existence in our airflow s3 bucket using mode parking poc is the default mode for sensor operators basically it checks the file's existence at every poc interval as long as it is within the timeout limit since the data.csv already exists in the airflow bucket it finishes immediately after first the pocking and it's marked as success let's go back to vs code change the pog interval to 5 seconds and time out to 30 seconds update the dac version and save it then we delete the data.csv from our airflow bucket let's pick the newest dac and start the sensor task again open the log and refresh a couple of times roughly every 5 seconds we can see from the log every 5 seconds it will pock the file until time out and fails because it didn't find the data.csv file within the 30 seconds time limit in the airflow bucket what if the data.csv is uploaded during the parking let's clear and rerun the sensor task this time we will wait for some seconds and upload the file to our airflow bucket before time out we open the log and see it is already parking the file let's open the menu console to upload the csv file ok it is uploaded let's check the sensor task rom boom it pocked the five's existence right after we upload it then its task run was marked successful i have a csv file called orders which contains artificial sales orders from march to june 2022 you can download it in the description below it has four columns order id date product name and quantity we need to first import it into our postgres database then write a dac to query data from it and upload it to our s3 bucket let's open the beaver connect to our postgres database if you don't know how to connect check out this video we are going to create a table called orders in a database test right click test and set as the default database check the toolbar to make sure the database test is selected then create a table called orders using the statement create a table if not exist public.orders and we define the four columns type correctly in the end we set order id as the primary key excute without error refresh the tested database and we can see the table orders have been created then we right click table orders select import data choose import from csv and select our orders csv file and match the column names in the final step we click proceed we can see the import successful message let's query the first 100 rows for double check great we have set up our orders table properly next let's create a dac to query data from it let's go back to vs code create a python file called dac with postgreshook.py and open it let's first import the required packages then we create the default aux after that we need to initialize the dac with proper start day schedule interval and etc let's create a python function called postgres 2s3 in which we will do two steps step one is to query the order data from the postgres database save it as a text file step two is uploading the text file into the s3 bucket we need python operator to run our python function so let's import the python operator packages to query data from postgres database we need the postgres hook let's check the version of the extorted postgres package type command docker ps to find the container id of the airflow scheduler then use command darker exec minus it container id bash to enter the airflow docker container then run the command pip list pipe grab postgres we can see we have installed postgres package version 1.0.1 let's go to the airflow documentation site search and click the postgresql in the providers packages section select the version to match our local installation which is 1.0.1 then click the python api here we see two sub-packages hooks and operators let's click hooks the documentation shows the hook it's used to interact with postgres database and we can establish a connection to a postgres database via the get connection function to initialize the postgres hook we need to provide the postgres connection id let's copy the whole class name paste in our vs code and update it to from airflow.providers the postgres the hooks the postgres import postgres hook next let's go to our python function to initialize the postgres hook with the postgres connection id which is postgres localhost we can get it from the admin connection page then we get the connection via the get connection function from the hook after that we get the cursor from the connection what we need to do is to ask the cursor to execute our sql query statement let's say we want to get all the order data before may 1st 2022 we just need to put the query select from orders where date is smaller and equals to 2022 may first into the cursor execution function then we write all the returned data into a text file using the csv module let's first import csv then we open a text file in write mode with name getorders.txt in the folder dex we get the csv right first then write the column name as the first row after that we write all the rest data in rows in the end we need to close the cursor and connection then we import the logging module and then write a log of save orders in text file getorders.txt we have the step 1 implemented let's create a task using the postgres operator to run our python function we call it tax one task id is the postgres to a3 python callable is our function postgres to f3 let's go to the browser and refresh our airflow web server oops the error says we need to set the task id let's check it in the vs code ah we mistakenly put stack id instead of task id let's fix it and go to the browser to refresh again pick our newest dac start it and wait for its execution results oops it fails let's check the log it says sql syntax near order let's go back to the vs code we need to correct the table name from order to orders and put the date inside a pair of single quote let's rerun the deck again great we saw it is successful now let's open the log we see it says the order file has been saved in getorders.txt file let's also rerun the latest dac rom and check its log great let's check our dag folders in vs code we found only one getorders.txt file which contains orders up to may 1st 2022. the reason is that the second dac run overrides the text file which was generated in the first one let's update our query to make sure we only get orders data during a specific dag runs execution interval and save them with a dynamic text file name so that they will not be overwritten to achieve this we have to give the current and next execution date to our python function let's open the airflow documentation select the installed airflow version which is 2.0.1 search for the macro then click it here we found all the predefined macros let's find the node dash current and next execution date copy them and paste directly as the parameters of our python function since lflow will render those macros values during execution next we need to update the query statement to have date conditions large and equal to the current execution date and smaller than the next execution date then we give the two macros parameter as the query params after that we need to make our text file name dynamic with execution date as the surface and change the file name in the log let's save it and update our dag version then go back to the browser to refresh the page select our newest stack start it and wait for its execution we saw two successful dag runs on april 30s and may 1st let's check the latest deck runs log we saw the orders have been saved in text file with 20501 as suffix going back to vs code we saw two text files in our dax folder each with the execution date as the surface opening the text file we saw it only contains the order data which is between current and next execution date that's exactly what we want now let's tackle step two which is uploading the ordered text file into the s3 bucket obviously we need the s3 hook library to achieve this let's first check which version of the amazon providers package is installed first we make sure that we entered into the airflow scheduler docker container then we run command pip list pipe grip amazon we can see the version is 1.1.0 let's go to the airflow documentation site select amazon and change the version to 1.1.0 on the top left then we click the python api here we look for hooks for s3 and click it the dock says it is to interact with aws s3 to initialize the f3 hook we need to set the aws connection id let's copy the class name and go back to vs code to import the s3 hook module then we create a s3 hook instance with the s3 connection id we created which can be found easily in the airflow web server connection page then we need a function which can upload our local text file into the sri bucket let's go back to the s3 hook documentation and search for the keyword load then we find the function load file which does exactly what we need loads a local file to s3 what we need is to indicate which file which bucket and key to upload and whether we replace the file once it already exists let's go back to vs code to implement it we need to call the load file function from the s3 hook instance we set the file name as the file we named which is dag get orders with execution date as the suffix text file bucket name is airflow key as the orders execution day dot text basically we upload all the orders text files in past orders and named by its execution date and we set replace to true to replace the file if it already exists update the dac version save it and go back to the browser to refresh pick the newer stack and start it once it is executed we see there are three successful dac runs since the date is already may 4th let's check the latest rounds log here we can see the order data has been saved as text file let's go back to vs code we can see there are four orders text files let's go to our airflow bucket we can see there are also four text files inside the pass orders we can download one of them and open it great the file is exactly the same as the one in the vs code project folder it means we have achieved the second step successfully this solution works perfect except that i don't like to have a lot of text files saved in my dex folder is there any way to keep my workspace clean yes python provides the tempofi package which enables us to create files in a system temporary directory let's first import the named temporary file module from the temple file package then instead of creating the file in our dax folder we use the named temporary file to create the file object in write mode and we can give the execution date as the file suffix before we uploading the file to s3 we need to flush the file object so that the text file is saved in disk our s3 load file function should be inside of the named temporary file with statement because the temporal file will be deleted once exiting the context manager we can output the temple file name by calling the name attribute of the file object in the s3 load function we just need to replace the file name to the temporary file name which is f dot name update the dac version save it we go back to the browser let's refresh the page pick the newest stack before we start it let's delete all the text files in our local dac's folder and s3 bucket first then we start the deck run and wait for its executions we have seen the successful dac run let's check one of the log from the log we can see the order's data was queried from the database and saved as a temporary file with the execution date as suffix and then upload it into our s3 let's go back to vs code we can see our project folder is clean there is no text file saved in our dax folder let's check our f3 bucket download one of them open in a text editor here we can see each text file only contains all the orders records happens in the execution date great congratulations you finished the course i hope you enjoy the videos if so don't forget to subscribe and turn on the notification bell smash the like button i will make new airflow tutorial video if we achieve 1 000 or 5 000 likes thanks for watching i will talk to you in the next one bye bye
Info
Channel: coder2j
Views: 413,601
Rating: undefined out of 5
Keywords: python, etl, datapipeline, airflow tutorial, airflow tutorial for beginners, apache airflow, airflow tutorial python, airflow for beginners, airflow 101, apache airflow use cases, airflow docker, airflow explained, airflow example, apache airflow tutorial, apache airflow tutorial for beginners, airflow introduction, apache airflow intro, data science, data engineer, data engineer tutorials, data engineer projects, airflow
Id: K9AnJ9_ZAXE
Channel Id: undefined
Length: 121min 12sec (7272 seconds)
Published: Sun Jun 05 2022
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.