TeradataToTeradataOperator

The purpose of TeradataToTeradataOperator is to define tasks involving data transfer between two Teradata instances. Use the TeradataToTeradataOperator to transfer data between two Teradata instances.

Transfer data between two Teradata instances

To transfer data between two Teradata instances, use the TeradataToTeradataOperator.

An example usage of the TeradataToTeradataOperator is as follows:

tests/system/teradata/example_teradata_to_teradata_transfer.py[source]

    transfer_data = TeradataToTeradataOperator(
        task_id="transfer_data",
        dest_teradata_conn_id="teradata_default",
        destination_table="my_users_dest",
        source_teradata_conn_id="teradata_default",
        sql="select * from my_users_src",
        sql_params={},
        rows_chunk=2,
    )

The complete TeradataToTeradata Transfer Operator DAG

When we put everything together, our DAG should look like this:

tests/system/teradata/example_teradata.py[source]



ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_teradata"
CONN_ID = "teradata_default"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    create_table = TeradataOperator(
        task_id="create_table",
        sql=r"""
        CREATE TABLE Country (
            country_id INTEGER,
            name CHAR(25),
            continent CHAR(25)
        );
        """,
    )
    create_table_from_external_file = TeradataOperator(
        task_id="create_table_from_external_file",
        sql="create_table.sql",
        dag=dag,
    )
    populate_table = TeradataOperator(
        task_id="populate_table",
        sql=r"""
        INSERT INTO Users (username, description)
            VALUES ( 'Danny', 'Musician');
        INSERT INTO Users (username, description)
            VALUES ( 'Simone', 'Chef');
        INSERT INTO Users (username, description)
            VALUES ( 'Lily', 'Florist');
        INSERT INTO Users (username, description)
            VALUES ( 'Tim', 'Pet shop owner');
        """,
    )
    get_all_countries = TeradataOperator(
        task_id="get_all_countries",
        sql=r"SELECT * FROM Country;",
    )
    get_countries_from_continent = TeradataOperator(
        task_id="get_countries_from_continent",
        sql=r"SELECT * FROM Country WHERE {{ params.column }}='{{ params.value }}';",
        params={"column": "continent", "value": "Asia"},
    )
    drop_country_table = TeradataOperator(
        task_id="drop_country_table",
        sql=r"DROP TABLE Country;",
        dag=dag,
    )
    drop_users_table = TeradataOperator(
        task_id="drop_users_table",
        sql=r"DROP TABLE Users;",
        dag=dag,
    )
    create_schema = TeradataOperator(
        task_id="create_schema",
        sql=r"CREATE DATABASE airflow_temp AS PERM=10e6;",
    )
    create_table_with_schema = TeradataOperator(
        task_id="create_table_with_schema",
        sql=r"""
        CREATE TABLE schema_table (
           country_id INTEGER,
           name CHAR(25),
           continent CHAR(25)
        );
        """,
        schema="airflow_temp",
    )
    drop_schema_table = TeradataOperator(
        task_id="drop_schema_table",
        sql=r"DROP TABLE schema_table;",
        dag=dag,
        schema="airflow_temp",
    )
    drop_schema = TeradataOperator(
        task_id="drop_schema",
        sql=r"DROP DATABASE airflow_temp;",
        dag=dag,
    )
    (
        create_table
        >> create_table_from_external_file
        >> populate_table
        >> get_all_countries
        >> get_countries_from_continent
        >> drop_country_table
        >> drop_users_table
        >> create_schema
        >> create_table_with_schema
        >> drop_schema_table
        >> drop_schema
    )

Was this entry helpful?