How-to Guide for using YDB Operators¶
Introduction¶
Apache Airflow has a robust trove of operators that can be used to implement the various tasks that make up your workflow. Airflow is essentially a graph (Directed Acyclic Graph) made up of tasks (nodes) and dependencies (edges).
A task defined or implemented by a operator is a unit of work in your data pipeline.
The purpose of this guide is to define tasks involving interactions with a YDB database with
the YDBExecuteQueryOperator
.
Common database operations with YDBExecuteQueryOperator¶
YDBExecuteQueryOperator executes either DML or DDL query. Parameters of the operators are:
sql
- string with query;is_ddl
- flag indicating that query is DDL. By default isfalse
;conn_id
- YDB connection id. Default value isydb_default
;params
- parameters to be injected into query if it is Jinja template, more details about params
Note
Parameter is_ddl
could be removed in future versions of operator.
Creating an YDB table¶
The code snippets below are based on Airflow-2.0
# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by
# instantiating the YDB Operator
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "ydb_operator_dag"
@task
def populate_pet_table_via_bulk_upsert():
hook = YDBHook()
column_types = (
ydb.BulkUpsertColumns()
.add_column("pet_id", ydb.OptionalType(ydb.PrimitiveType.Int32))
.add_column("name", ydb.PrimitiveType.Utf8)
.add_column("pet_type", ydb.PrimitiveType.Utf8)
.add_column("birth_date", ydb.PrimitiveType.Utf8)
.add_column("owner", ydb.PrimitiveType.Utf8)
)
rows = [
{"pet_id": 3, "name": "Lester", "pet_type": "Hamster", "birth_date": "2020-06-23", "owner": "Lily"},
{"pet_id": 4, "name": "Quincy", "pet_type": "Parrot", "birth_date": "2013-08-11", "owner": "Anne"},
]
hook.bulk_upsert("pet", rows=rows, column_types=column_types)
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
) as dag:
create_pet_table = YDBExecuteQueryOperator(
task_id="create_pet_table",
sql="""
CREATE TABLE pet (
pet_id INT,
name TEXT NOT NULL,
pet_type TEXT NOT NULL,
birth_date TEXT NOT NULL,
owner TEXT NOT NULL,
PRIMARY KEY (pet_id)
);
""",
is_ddl=True, # must be specified for DDL queries
)
Dumping SQL statements into your operator isn’t quite appealing and will create maintainability pains somewhere
down to the road. To prevent this, Airflow offers an elegant solution. This is how it works: you simply create
a directory inside the DAG folder called sql
and then put all the SQL files containing your SQL queries inside it.
Your dags/sql/pet_schema.sql
should like this:
-- create pet table
CREATE TABLE pet (
pet_id INT,
name TEXT NOT NULL,
pet_type TEXT NOT NULL,
birth_date TEXT NOT NULL,
owner TEXT NOT NULL,
PRIMARY KEY (pet_id)
);
Now let’s refactor create_pet_table
in our DAG:
create_pet_table = YDBExecuteQueryOperator(
task_id="create_pet_table",
sql="sql/pet_schema.sql",
)
Inserting data into an YDB table¶
Let’s say we already have the SQL insert statement below in our dags/sql/pet_schema.sql
file:
-- populate pet table
UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
VALUES (1, 'Max', 'Dog', '2018-07-05', 'Jane');
UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
VALUES (2, 'Susie', 'Cat', '2019-05-01', 'Phil');
UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
VALUES (3, 'Lester', 'Hamster', '2020-06-23', 'Lily');
UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
VALUES (4, 'Quincy', 'Parrot', '2013-08-11', 'Anne');
We can then create a YDBExecuteQueryOperator task that populate the pet
table.
populate_pet_table = YDBExecuteQueryOperator(
task_id="populate_pet_table",
sql="sql/pet_schema.sql",
)
Fetching records from your YDB table¶
Fetching records from your YDB table can be as simple as:
get_all_pets = YDBExecuteQueryOperator(
task_id="get_all_pets",
sql="SELECT * FROM pet;",
)
Passing parameters into YDBExecuteQueryOperator¶
The BaseOperator class has the params
attribute which is available to the YDBExecuteQueryOperator
by virtue of inheritance. params
make it possible to dynamically pass in parameters in many
interesting ways.
To find the owner of the pet called ‘Lester’:
get_birth_date = YDBExecuteQueryOperator(
task_id="get_birth_date",
sql="SELECT * FROM pet WHERE birth_date BETWEEN '{{params.begin_date}}' AND '{{params.end_date}}'",
params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
)
Now lets refactor our get_birth_date
task. Instead of dumping SQL statements directly into our code, let’s tidy things up
by creating a sql file.
-- dags/sql/birth_date.sql
SELECT * FROM pet WHERE birth_date BETWEEN '{{params.begin_date}}' AND '{{params.end_date}}';
get_birth_date = YDBExecuteQueryOperator(
task_id="get_birth_date",
sql="sql/birth_date.sql",
params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
)
The complete YDB Operator DAG¶
When we put everything together, our DAG should look like this:
# create_pet_table, populate_pet_table, get_all_pets, and get_birth_date are examples of tasks created by
# instantiating the YDB Operator
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "ydb_operator_dag"
@task
def populate_pet_table_via_bulk_upsert():
hook = YDBHook()
column_types = (
ydb.BulkUpsertColumns()
.add_column("pet_id", ydb.OptionalType(ydb.PrimitiveType.Int32))
.add_column("name", ydb.PrimitiveType.Utf8)
.add_column("pet_type", ydb.PrimitiveType.Utf8)
.add_column("birth_date", ydb.PrimitiveType.Utf8)
.add_column("owner", ydb.PrimitiveType.Utf8)
)
rows = [
{"pet_id": 3, "name": "Lester", "pet_type": "Hamster", "birth_date": "2020-06-23", "owner": "Lily"},
{"pet_id": 4, "name": "Quincy", "pet_type": "Parrot", "birth_date": "2013-08-11", "owner": "Anne"},
]
hook.bulk_upsert("pet", rows=rows, column_types=column_types)
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
) as dag:
create_pet_table = YDBExecuteQueryOperator(
task_id="create_pet_table",
sql="""
CREATE TABLE pet (
pet_id INT,
name TEXT NOT NULL,
pet_type TEXT NOT NULL,
birth_date TEXT NOT NULL,
owner TEXT NOT NULL,
PRIMARY KEY (pet_id)
);
""",
is_ddl=True, # must be specified for DDL queries
)
populate_pet_table = YDBExecuteQueryOperator(
task_id="populate_pet_table",
sql="""
UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
VALUES (1, 'Max', 'Dog', '2018-07-05', 'Jane');
UPSERT INTO pet (pet_id, name, pet_type, birth_date, owner)
VALUES (2, 'Susie', 'Cat', '2019-05-01', 'Phil');
""",
)
get_all_pets = YDBExecuteQueryOperator(task_id="get_all_pets", sql="SELECT * FROM pet;")
get_birth_date = YDBExecuteQueryOperator(
task_id="get_birth_date",
sql="SELECT * FROM pet WHERE birth_date BETWEEN '{{params.begin_date}}' AND '{{params.end_date}}'",
params={"begin_date": "2020-01-01", "end_date": "2020-12-31"},
)
(
create_pet_table
>> populate_pet_table
>> populate_pet_table_via_bulk_upsert()
>> get_all_pets
>> get_birth_date
)
Conclusion¶
In this how-to guide we explored the Apache Airflow YDBExecuteQueryOperator to connect to YDB database. Let’s quickly highlight the key takeaways.
It is best practice to create subdirectory called sql
in your dags
directory where you can store your sql files.
This will make your code more elegant and more maintainable.
And finally, we looked at the templated version of sql script and usage of params
attribute.