How-to Guide for PostgresOperator¶
Introduction¶
Apache Airflow has a robust trove of operators that can be used to implement the various tasks that make up your workflow. Airflow is essentially a graph (Directed Acyclic Graph) made up of tasks (nodes) and dependencies (edges).
A task defined or implemented by a operator is a unit of work in your data pipeline.
- The purpose of Postgres Operator is to define tasks involving interactions with the PostgreSQL database.
In
Airflow-2.0
, thePostgresOperator
class resides atairflow.providers.postgres.operator.postgres
.
Under the hood, the PostgresOperator
delegates its heavy lifting to the PostgresHook
.
Common Database Operations with PostgresOperator¶
To use the postgres operator to carry out SQL request, two parameters are required: sql
and postgres_conn_id
.
These two parameters are eventually fed to the postgres hook object that interacts directly with the postgres database.
Creating a Postgres database table¶
The code snippets below are based on Airflow-2.0
import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
default_args = {"owner": "airflow"}
# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by
# instantiating the Postgres Operator
with DAG(
dag_id="postgres_operator_dag",
start_date=datetime.datetime(2020, 2, 2),
schedule_interval="@once",
default_args=default_args,
catchup=False,
) as dag:
create_pet_table = PostgresOperator(
task_id="create_pet_table",
postgres_conn_id="postgres_default",
sql="""
CREATE TABLE IF NOT EXISTS pet (
pet_id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
pet_type VARCHAR NOT NULL,
birth_date DATE NOT NULL,
OWNER VARCHAR NOT NULL);
""",
)
Dumping SQL statements into your PostgresOperator isn't quite appealing and will create maintainability pains somewhere
down to the road. To prevent this, Airflow offers an elegant solution. This is how it works: you simply create
a directory inside the DAG folder called sql
and then put all the SQL files containing your SQL queries inside it.
Your dags/sql/pet_schema.sql
should like this:
Now let's refactor create_pet_table
in our DAG:
Inserting data into a Postgres database table¶
Let's say we already have the SQL insert statement below in our dags/sql/pet_schema.sql
file:
We can then create a PostgresOperator task that populate the pet
table.
Fetching records from your postgres database table¶
Fetching records from your postgres database table can be as simple as:
Passing Parameters into PostgresOperator¶
PostgresOperator provides parameters
attribute which makes it possible to dynamically inject values into your
SQL requests during runtime. The BaseOperator class has the params
attribute which is available to the PostgresOperator
by virtue of inheritance. Both parameters
and params
make it possible to dynamically pass in parameters in many
interesting ways.
To find the owner of the pet called 'Lester':
Now lets refactor our get_birth_date
task. Instead of dumping SQL statements directly into our code, let's tidy things up
by creating a sql file.
And this time we will use the params
attribute which we get for free from the parent BaseOperator
class.
The complete Postgres Operator DAG¶
When we put everything together, our DAG should look like this:
import datetime
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
default_args = {"owner": "airflow"}
# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by
# instantiating the Postgres Operator
with DAG(
dag_id="postgres_operator_dag",
start_date=datetime.datetime(2020, 2, 2),
schedule_interval="@once",
default_args=default_args,
catchup=False,
) as dag:
create_pet_table = PostgresOperator(
task_id="create_pet_table",
postgres_conn_id="postgres_default",
sql="""
CREATE TABLE IF NOT EXISTS pet (
pet_id SERIAL PRIMARY KEY,
name VARCHAR NOT NULL,
pet_type VARCHAR NOT NULL,
birth_date DATE NOT NULL,
OWNER VARCHAR NOT NULL);
""",
)
populate_pet_table = PostgresOperator(
task_id="populate_pet_table",
postgres_conn_id="postgres_default",
sql="""
INSERT INTO pet VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
INSERT INTO pet VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
INSERT INTO pet VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
INSERT INTO pet VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
""",
)
get_all_pets = PostgresOperator(
task_id="get_all_pets", postgres_conn_id="postgres_default", sql="SELECT * FROM pet;"
)
get_birth_date = PostgresOperator(
task_id="get_birth_date",
postgres_conn_id="postgres_default",
sql="""
SELECT * FROM pet
WHERE birth_date
BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
""",
params={'begin_date': '2020-01-01', 'end_date': '2020-12-31'},
)
create_pet_table >> populate_pet_table >> get_all_pets >> get_birth_date
Conclusion¶
In this how-to guide we explored the Apache Airflow PostgreOperator. Let's quickly highlight the key takeaways.
In Airflow-2.0, PostgresOperator class now resides in the providers
package. It is best practice to create subdirectory
called sql
in your dags
directory where you can store your sql files. This will make your code more elegant and more
maintainable. And finally, we looked at the different ways you can dynamically pass parameters into our postgres operator
tasks using parameters
or params
attribute.