How to build and automate your Python ETL pipeline with Airflow | Data pipeline | Python

Video Statistics and Information

Video
Captions Word Cloud
Reddit Comments
Captions
[Music] hello and uh welcome one and all today we will automate our python etl pipeline we developed an etl pipeline in the previous session we will build on it today to give you a complete overview of extract transform and load link to the previous video is in the description below we will use apache airflow to automate our etl pipeline apache airflow is a widely used open source workflow management system it provides data engineer with an intuitive platform to create schedule monitor and maintain their complex data pipelines airflow enables you to manage your data pipelines by altering workflow as directed acyclic graphs also known as dags of task you manage tasks schedule as code and you can visualize your data pipeline's dependency progress logs code and status via the user interface airflow enables you to set up data pipelines over data stores and data warehouses run workloads at a schedule create and manage script data pipelines as python code in this session we will use the task flow api introduced in airflow 2.0 task flow api feature allows data sharing functionality between tasks however the data exchange is limited only to small json serializable objects such as dictionaries we can't share large data frames from one task to another without serializing it first task flow api makes it easier to author clean dags without extra boilerplates by using the task decorator airflow organizes your workflow in dags composed of tasks so what is dag a dag directed a cyclic graph is the core concept of airflow it represents a group of tasks it is organized with dependencies and relationship to say how they should run tag is defined as python script that represents the dag's structure tasks and their dependencies as code to run a dag with any service we can use airflow's ui to set up connections such as configuring our database credentials so we can connect to sql server and postgres airflows ui enables you to observe the pipeline running in your environment monitor the progress and troubleshoot issues when needed airflow's visual tags also provide data lineage which facilitates debugging of data flows and aids in auditing and data governance let's open pycharm and start with our first tag i will refactor our python etl script to make it compatible with airflow as usual we will import the required libraries at the top along with usual libraries we will import few airflow libraries especially dag task and task group then we import few hooks these will help us establish connection to our databases we define the connection in the airflow ui i have two connections defined here to our source and destination databases under admin console we can add a new connection by clicking the plus icon here we can select a database and define connection details we reference the connection in our code by the connection id okay i'll move back to the code and finally we import pandas and sql alchemy we can use the task decorator to define our tasks first we define a function to get table names from sql server just like the previous session since our connection is defined in airflow we can get a hold of it with connection id and with the help of ms sql hook then we supply it a sql script that gets the table name from the system schema in sql server the sql hook has a built-in function to get panda's data frame it takes sql query as an argument we save the data frame in a df variable and print it since we only can share serializable data between tasks i'll convert it to a dictionary and this function will return a dictionary with table names we will define our second task with a function called load src data this function takes a dictionary as an argument which is returned from the previous function we get the connection detail for postgres with base hook remember connection details are saved in the admin panel and we save it in a con variable with the help of sql alchemy we create a connection and provide the connection details via con variable let's define an empty array i'll call it all tbl name to store the table names and we will return this at the end of this function we iterate over the dictionary items to get the table names let's append the table names to our all tbl name array to display the number of rows we are importing i'll declare a variable and we will set it to the length of the data frame we create a dynamic sql query here with the table name coming from the dictionary value and with the help of f string and once again we will make use of sql hook to query sql server with our dynamic sql query and we will save the output to a data frame we print a message how many rows we are importing and from which table now is time to persist this data in call the 2sql function and prefix the table name with src to indicate that this is a source table we print a success message and return the table names array at the end we are done with data extract it's time to transform this data and once we're done with the transformation we will persist the data in staging tables in the first task we will transform dim product table let's query the source table to preview the data this will give us an idea where to apply transformation we see that there are columns in french and spanish that we can get rid of there are a lot of columns with nulls or missing values we can set defaults for these also there's a large photo column that we don't need for analysis so we can delete this as well last we'll rename few columns where the name contains english for example the description column is english description so we can drop english since we drop other language columns let's see how we carry out these transformation with pandas we will name this task transform dim product let's query the source table from postgres into a data frame we will use pandas to implement transformation if you need a refresher on pandas i have covered this in this video feel free to check it out first let's drop the unwanted column we will provide the list of columns we want to keep from the data frame let's save it into a new data frame called revised then we replace nulls with zeros for numeric columns and n a for string columns let's go ahead and rename columns where name contains english since all of our columns are in english now we are done with transformations and let's save this into a staging table with two sql function we perform similar operations in product subcategory and product category drop the unwanted column and rename few columns and save the updated data to tables with sdg prefix to build the final product model we will query all three tables there is a data type mismatch here so we'll convert the column to integer to match the data type and join or merge the three tables into a single data frame then we save this data frame into a table with prd prefix so this will be the final presentation table let's declare a dag and define some properties first it is the schedule interval you will set this to 9 am with a cron expression if you need more details on cron then i'll leave a link in the description below then we define a start date when this schedule should start and we will set the catch up to false if you set a start date in the past and catch up to true then the schedule will run n numbers of time for each day it missed till the current date it is designed to backfill the job last we'll set a tag for this tag tags help us group together multiple dags and you can use tags to filter tags from the ui under dag we define task group to call our tasks we group the extract and load task under one group we call the first task and save its return value in a variable then follows the next task and supply at the return value from the first task we set the dependency between them and define the order in which they should run we will see the visual representation of this in the ui we group the transformation tasks together let's call this group transform product we will call the three tasks here we group the task as a list and these can run in parallel in the last group we call the product model task and this will execute once transformation tasks are complete we define the order of the task group and the airflow will execute these groups in this order let's go ahead and save our work just an fyi once you save a new dag it takes a while for airflow to pick it up so it may not show up in the ui right away i will open the ui our dag is disabled by default i'll locate it and enable it with the slider we see the dependencies between our tasks we can get a better picture under the graph ui this is the directed acyclic graph this gives us the visual representation of the order in which our task will execute these are the overarching groups of tasks we can click on them to see the underlying task and we see the dependencies with the arrows on the top right we have the schedule information for this tag we can wait for it to run on schedule or we can manually trigger it with run button we can see it in action under the tree tab so we'll head there and see it execute there are different statuses here and they are color coded we click on individual tasks and see the logs for it the logs provide us with all the details and they also log errors if there are any i'll let this tag complete okay our etl load is complete i will refresh our schema in postgres to see our source staging and final model is persisted i'll query the final prd product table to see the transformation we don't see any nulls which is good and the column names are updated at the end we see category and subcategory columns so we have denormalized the product dimension our transformations are successfully executed and persisted in the final product model this is how we automate our etl pipeline with airflow i hope you enjoyed this session this is all for now share like and subscribe take care and i'll see you in the next video
Info
Channel: BI Insights Inc
Views: 78,233
Rating: undefined out of 5
Keywords: data pipelines, data pipeline, what is data pipeline, what is data pipeline architecture, data pipeline vs etl, how to build a data pipeline, how to create data pipeline, batch vs stream processing, big data, data warehouse, ETL, How to build ETL pipeline, build etl pipeline with Python, How to export data with Python, Export data from SQL Server to PostgreSQL, SQL Server, build etl pipeline with pyton, python data integration, Haq Nawaz, Python
Id: eZfD6x9FJ4E
Channel Id: undefined
Length: 11min 29sec (689 seconds)
Published: Mon Mar 07 2022
Related Videos
Note
Please note that this website is currently a work in progress! Lots of interesting data and statistics to come.