AzureBlobStorageToTeradataOperator

The purpose of AzureBlobStorageToTeradataOperator is to define tasks involving CSV, JSON and Parquet format data transfer from an Azure Blob Storage to Teradata table. Use the AzureBlobStorageToTeradataOperator to transfer data from an Azure Blob Storage to Teradata.

Transferring data in CSV format from Azure Blob Storage to Teradata

An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from Azure Blob Storage to teradata table is as follows:

tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py[source]

    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        teradata_table="example_blob_teradata_csv",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

Transferring data in JSON format from Azure Blob Storage to Teradata

An example usage of the AzureBlobStorageToTeradataOperator to transfer JSON data format from Azure Blob Storage to teradata table is as follows:

tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py[source]

    transfer_data_json = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_json",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
        teradata_table="example_blob_teradata_json",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )

Transferring data in PARQUET format from Azure Blob Storage to Teradata

An example usage of the AzureBlobStorageToTeradataOperator to transfer PARQUET data format from Azure Blob Storage to teradata table is as follows:

tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py[source]

    transfer_data_parquet = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_parquet",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
        teradata_table="example_blob_teradata_parquet",
        azure_conn_id="wasb_default",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )

The complete AzureBlobStorageToTeradataOperator Operator DAG

When we put everything together, our DAG should look like this:

tests/system/providers/teradata/example_azure_blob_to_teradata_transfer.py[source]


ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_azure_blob_to_teradata_transfer_operator"
CONN_ID = "teradata_default"

with DAG(
    dag_id=DAG_ID,
    start_date=datetime.datetime(2020, 2, 2),
    schedule="@once",
    catchup=False,
    default_args={"teradata_conn_id": CONN_ID},
) as dag:
    transfer_data_csv = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_csv",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
        teradata_table="example_blob_teradata_csv",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )
    read_data_table_csv = TeradataOperator(
        task_id="read_data_table_csv",
        sql="SELECT count(1) from example_blob_teradata_csv;",
    )
    drop_table_csv = TeradataOperator(
        task_id="drop_table_csv",
        sql="DROP TABLE example_blob_teradata_csv;",
    )
    transfer_data_json = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_json",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
        teradata_table="example_blob_teradata_json",
        azure_conn_id="wasb_default",
        trigger_rule="all_done",
    )
    read_data_table_json = TeradataOperator(
        task_id="read_data_table_json",
        sql="SELECT count(1) from example_blob_teradata_json;",
    )
    drop_table_json = TeradataOperator(
        task_id="drop_table_json",
        sql="DROP TABLE example_blob_teradata_json;",
    )
    transfer_data_parquet = AzureBlobStorageToTeradataOperator(
        task_id="transfer_data_blob_to_teradata_parquet",
        blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
        teradata_table="example_blob_teradata_parquet",
        azure_conn_id="wasb_default",
        teradata_conn_id="teradata_default",
        trigger_rule="all_done",
    )
    read_data_table_parquet = TeradataOperator(
        task_id="read_data_table_parquet",
        sql="SELECT count(1) from example_blob_teradata_parquet;",
    )
    drop_table_parquet = TeradataOperator(
        task_id="drop_table_parquet",
        sql="DROP TABLE example_blob_teradata_parquet;",
    )

    (
        transfer_data_csv
        >> transfer_data_json
        >> transfer_data_parquet
        >> read_data_table_csv
        >> read_data_table_json
        >> read_data_table_parquet
        >> drop_table_csv
        >> drop_table_json
        >> drop_table_parquet
    )

Was this entry helpful?