Google Dataplex Operators

Dataplex is an intelligent data fabric that provides unified analytics and data management across your data lakes, data warehouses, and data marts.

For more information about the task visit Dataplex production documentation <Product documentation

Create a Task

Before you create a dataplex task you need to define its body. For more information about the available fields to pass when creating a task, visit Dataplex create task API.

A simple task configuration can look as followed:

airflow/providers/google/cloud/example_dags/example_dataplex.py[source]

EXAMPLE_TASK_BODY = {
    "trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
    "execution_spec": {"service_account": SERVICE_ACC},
    "spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}

With this configuration we can create the task both synchronously & asynchronously: DataplexCreateTaskOperator

airflow/providers/google/cloud/example_dags/example_dataplex.py[source]

create_dataplex_task = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="create_dataplex_task",
)

airflow/providers/google/cloud/example_dags/example_dataplex.py[source]

create_dataplex_task_async = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=DATAPLEX_TASK_ID,
    asynchronous=True,
    task_id="create_dataplex_task_async",
)

Delete a task

To delete a task you can use:

DataplexDeleteTaskOperator

airflow/providers/google/cloud/example_dags/example_dataplex.py[source]

delete_dataplex_task = DataplexDeleteTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="delete_dataplex_task",
)

List tasks

To list tasks you can use:

DataplexListTasksOperator

airflow/providers/google/cloud/example_dags/example_dataplex.py[source]

list_dataplex_task = DataplexListTasksOperator(
    project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)

Get a task

To get a task you can use:

DataplexGetTaskOperator

airflow/providers/google/cloud/example_dags/example_dataplex.py[source]

get_dataplex_task = DataplexGetTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="get_dataplex_task",
)

Wait for a task

To wait for a task created asynchronously you can use:

DataplexTaskStateSensor

airflow/providers/google/cloud/example_dags/example_dataplex.py[source]

dataplex_task_state = DataplexTaskStateSensor(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="dataplex_task_state",
)

Was this entry helpful?