AzureBlobStorageToTeradataOperator¶
The purpose of AzureBlobStorageToTeradataOperator
is to define tasks involving CSV, JSON and Parquet
format data transfer from an Azure Blob Storage to Teradata table.
Use the AzureBlobStorageToTeradataOperator
to transfer data from an Azure Blob Storage to Teradata.
Transferring data in CSV format from Azure Blob Storage to Teradata¶
An example usage of the AzureBlobStorageToTeradataOperator to transfer CSV data format from Azure Blob Storage to teradata table is as follows:
transfer_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_csv",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_csv",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
Transferring data in JSON format from Azure Blob Storage to Teradata¶
An example usage of the AzureBlobStorageToTeradataOperator to transfer JSON data format from Azure Blob Storage to teradata table is as follows:
transfer_data_json = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_json",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_json",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
Transferring data in PARQUET format from Azure Blob Storage to Teradata¶
An example usage of the AzureBlobStorageToTeradataOperator to transfer PARQUET data format from Azure Blob Storage to teradata table is as follows:
transfer_data_parquet = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_parquet",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
teradata_table="example_blob_teradata_parquet",
azure_conn_id="wasb_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
The complete AzureBlobStorageToTeradataOperator
Operator DAG¶
When we put everything together, our DAG should look like this:
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_azure_blob_to_teradata_transfer_operator"
CONN_ID = "teradata_default"
with DAG(
dag_id=DAG_ID,
start_date=datetime.datetime(2020, 2, 2),
schedule="@once",
catchup=False,
default_args={"teradata_conn_id": CONN_ID},
) as dag:
transfer_data_csv = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_csv",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/CSVDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_csv",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
read_data_table_csv = TeradataOperator(
task_id="read_data_table_csv",
sql="SELECT count(1) from example_blob_teradata_csv;",
)
drop_table_csv = TeradataOperator(
task_id="drop_table_csv",
sql="DROP TABLE example_blob_teradata_csv;",
)
transfer_data_json = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_json",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/JSONDATA/09380000/2018/06/",
teradata_table="example_blob_teradata_json",
azure_conn_id="wasb_default",
trigger_rule="all_done",
)
read_data_table_json = TeradataOperator(
task_id="read_data_table_json",
sql="SELECT count(1) from example_blob_teradata_json;",
)
drop_table_json = TeradataOperator(
task_id="drop_table_json",
sql="DROP TABLE example_blob_teradata_json;",
)
transfer_data_parquet = AzureBlobStorageToTeradataOperator(
task_id="transfer_data_blob_to_teradata_parquet",
blob_source_key="/az/akiaxox5jikeotfww4ul.blob.core.windows.net/td-usgs/PARQUETDATA/09394500/2018/06/",
teradata_table="example_blob_teradata_parquet",
azure_conn_id="wasb_default",
teradata_conn_id="teradata_default",
trigger_rule="all_done",
)
read_data_table_parquet = TeradataOperator(
task_id="read_data_table_parquet",
sql="SELECT count(1) from example_blob_teradata_parquet;",
)
drop_table_parquet = TeradataOperator(
task_id="drop_table_parquet",
sql="DROP TABLE example_blob_teradata_parquet;",
)
(
transfer_data_csv
>> transfer_data_json
>> transfer_data_parquet
>> read_data_table_csv
>> read_data_table_json
>> read_data_table_parquet
>> drop_table_csv
>> drop_table_json
>> drop_table_parquet
)