RedshiftSQLOperator

Overview

Use the RedshiftSQLOperator to execute statements against an Amazon Redshift cluster.

RedshiftSQLOperator works together with RedshiftSQLHook to establish connections with Amazon Redshift.

example_redshift.py

Purpose

This is a basic example dag for using RedshiftSQLOperator to execute statements against an Amazon Redshift cluster.

Create a table

In the following code we are creating a table called "fruit".

airflow/providers/amazon/aws/example_dags/example_redshift.py[source]

    setup__task_create_table = RedshiftSQLOperator(
        task_id='setup__create_table',
        sql="""
            CREATE TABLE IF NOT EXISTS fruit (
            fruit_id INTEGER,
            name VARCHAR NOT NULL,
            color VARCHAR NOT NULL
            );
        """,
    )

Insert data into a table

In the following code we insert a few sample rows into the "fruit" table.

airflow/providers/amazon/aws/example_dags/example_redshift.py[source]

    task_insert_data = RedshiftSQLOperator(
        task_id='task_insert_data',
        sql=[
            "INSERT INTO fruit VALUES ( 1, 'Banana', 'Yellow');",
            "INSERT INTO fruit VALUES ( 2, 'Apple', 'Red');",
            "INSERT INTO fruit VALUES ( 3, 'Lemon', 'Yellow');",
            "INSERT INTO fruit VALUES ( 4, 'Grape', 'Purple');",
            "INSERT INTO fruit VALUES ( 5, 'Pear', 'Green');",
            "INSERT INTO fruit VALUES ( 6, 'Strawberry', 'Red');",
        ],
    )

Fetching records from a table

Creating a new table, "more_fruit" from the "fruit" table.

airflow/providers/amazon/aws/example_dags/example_redshift.py[source]

    task_get_all_table_data = RedshiftSQLOperator(
        task_id='task_get_all_table_data', sql="CREATE TABLE more_fruit AS SELECT * FROM fruit;"
    )

Passing Parameters into RedshiftSQLOperator

RedshiftSQLOperator supports the parameters attribute which allows us to dynamically pass parameters into SQL statements.

airflow/providers/amazon/aws/example_dags/example_redshift.py[source]

    task_get_with_filter = RedshiftSQLOperator(
        task_id='task_get_with_filter',
        sql="CREATE TABLE filtered_fruit AS SELECT * FROM fruit WHERE color = '{{ params.color }}';",
        params={'color': 'Red'},
    )

The complete RedshiftSQLOperator DAG

All together, here is our DAG:

airflow/providers/amazon/aws/example_dags/example_redshift.py[source]

from airflow import DAG
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator

with DAG(
    dag_id="redshift",
    start_date=datetime(2021, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=['example'],
) as dag:
    setup__task_create_table = RedshiftSQLOperator(
        task_id='setup__create_table',
        sql="""
            CREATE TABLE IF NOT EXISTS fruit (
            fruit_id INTEGER,
            name VARCHAR NOT NULL,
            color VARCHAR NOT NULL
            );
        """,
    )
    task_insert_data = RedshiftSQLOperator(
        task_id='task_insert_data',
        sql=[
            "INSERT INTO fruit VALUES ( 1, 'Banana', 'Yellow');",
            "INSERT INTO fruit VALUES ( 2, 'Apple', 'Red');",
            "INSERT INTO fruit VALUES ( 3, 'Lemon', 'Yellow');",
            "INSERT INTO fruit VALUES ( 4, 'Grape', 'Purple');",
            "INSERT INTO fruit VALUES ( 5, 'Pear', 'Green');",
            "INSERT INTO fruit VALUES ( 6, 'Strawberry', 'Red');",
        ],
    )
    task_get_all_table_data = RedshiftSQLOperator(
        task_id='task_get_all_table_data', sql="CREATE TABLE more_fruit AS SELECT * FROM fruit;"
    )
    task_get_with_filter = RedshiftSQLOperator(
        task_id='task_get_with_filter',
        sql="CREATE TABLE filtered_fruit AS SELECT * FROM fruit WHERE color = '{{ params.color }}';",
        params={'color': 'Red'},
    )

    setup__task_create_table >> task_insert_data >> task_get_all_table_data >> task_get_with_filter

Was this entry helpful?