RedshiftSQLOperator¶
Overview¶
Use the RedshiftSQLOperator
to execute
statements against an Amazon Redshift cluster.
RedshiftSQLOperator
works together with
RedshiftSQLHook
to establish
connections with Amazon Redshift.
example_redshift.py¶
Purpose¶
This is a basic example dag for using RedshiftSQLOperator
to execute statements against an Amazon Redshift cluster.
Create a table¶
In the following code we are creating a table called "fruit".
setup__task_create_table = RedshiftSQLOperator(
task_id='setup__create_table',
sql="""
CREATE TABLE IF NOT EXISTS fruit (
fruit_id INTEGER,
name VARCHAR NOT NULL,
color VARCHAR NOT NULL
);
""",
)
Insert data into a table¶
In the following code we insert a few sample rows into the "fruit" table.
task_insert_data = RedshiftSQLOperator(
task_id='task_insert_data',
sql=[
"INSERT INTO fruit VALUES ( 1, 'Banana', 'Yellow');",
"INSERT INTO fruit VALUES ( 2, 'Apple', 'Red');",
"INSERT INTO fruit VALUES ( 3, 'Lemon', 'Yellow');",
"INSERT INTO fruit VALUES ( 4, 'Grape', 'Purple');",
"INSERT INTO fruit VALUES ( 5, 'Pear', 'Green');",
"INSERT INTO fruit VALUES ( 6, 'Strawberry', 'Red');",
],
)
Fetching records from a table¶
Creating a new table, "more_fruit" from the "fruit" table.
task_get_all_table_data = RedshiftSQLOperator(
task_id='task_get_all_table_data', sql="CREATE TABLE more_fruit AS SELECT * FROM fruit;"
)
Passing Parameters into RedshiftSQLOperator¶
RedshiftSQLOperator supports the parameters
attribute which allows us to dynamically pass
parameters into SQL statements.
task_get_with_filter = RedshiftSQLOperator(
task_id='task_get_with_filter',
sql="CREATE TABLE filtered_fruit AS SELECT * FROM fruit WHERE color = '{{ params.color }}';",
params={'color': 'Red'},
)
The complete RedshiftSQLOperator DAG¶
All together, here is our DAG:
from airflow import DAG
from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
with DAG(dag_id="redshift", start_date=datetime(2021, 1, 1), schedule_interval=None, tags=['example']) as dag:
setup__task_create_table = RedshiftSQLOperator(
task_id='setup__create_table',
sql="""
CREATE TABLE IF NOT EXISTS fruit (
fruit_id INTEGER,
name VARCHAR NOT NULL,
color VARCHAR NOT NULL
);
""",
)
task_insert_data = RedshiftSQLOperator(
task_id='task_insert_data',
sql=[
"INSERT INTO fruit VALUES ( 1, 'Banana', 'Yellow');",
"INSERT INTO fruit VALUES ( 2, 'Apple', 'Red');",
"INSERT INTO fruit VALUES ( 3, 'Lemon', 'Yellow');",
"INSERT INTO fruit VALUES ( 4, 'Grape', 'Purple');",
"INSERT INTO fruit VALUES ( 5, 'Pear', 'Green');",
"INSERT INTO fruit VALUES ( 6, 'Strawberry', 'Red');",
],
)
task_get_all_table_data = RedshiftSQLOperator(
task_id='task_get_all_table_data', sql="CREATE TABLE more_fruit AS SELECT * FROM fruit;"
)
task_get_with_filter = RedshiftSQLOperator(
task_id='task_get_with_filter',
sql="CREATE TABLE filtered_fruit AS SELECT * FROM fruit WHERE color = '{{ params.color }}';",
params={'color': 'Red'},
)
setup__task_create_table >> task_insert_data >> task_get_all_table_data >> task_get_with_filter