Google Dataplex Operators

Dataplex is an intelligent data fabric that provides unified analytics and data management across your data lakes, data warehouses, and data marts.

For more information about the task visit Dataplex production documentation <Product documentation

Create a Task

Before you create a dataplex task you need to define its body. For more information about the available fields to pass when creating a task, visit Dataplex create task API.

A simple task configuration can look as followed:

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

EXAMPLE_TASK_BODY = {
    "trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
    "execution_spec": {"service_account": SERVICE_ACC},
    "spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}

With this configuration we can create the task both synchronously & asynchronously: DataplexCreateTaskOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

create_dataplex_task = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="create_dataplex_task",
)

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

create_dataplex_task_async = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    asynchronous=True,
    task_id="create_dataplex_task_async",
)

Delete a task

To delete a task you can use:

DataplexDeleteTaskOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

delete_dataplex_task_async = DataplexDeleteTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    task_id="delete_dataplex_task_async",
)

List tasks

To list tasks you can use:

DataplexListTasksOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

list_dataplex_task = DataplexListTasksOperator(
    project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)

Get a task

To get a task you can use:

DataplexGetTaskOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

get_dataplex_task = DataplexGetTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="get_dataplex_task",
)

Wait for a task

To wait for a task created asynchronously you can use:

DataplexTaskStateSensor

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

dataplex_task_state = DataplexTaskStateSensor(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="dataplex_task_state",
)

Create a Lake

Before you create a dataplex lake you need to define its body.

For more information about the available fields to pass when creating a lake, visit Dataplex create lake API.

A simple task configuration can look as followed:

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

EXAMPLE_LAKE_BODY = {
    "display_name": "test_display_name",
    "labels": [],
    "description": "test_description",
    "metastore": {"service": ""},
}

With this configuration we can create the lake:

DataplexCreateLakeOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

create_lake = DataplexCreateLakeOperator(
    project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)

Delete a lake

To delete a lake you can use:

DataplexDeleteLakeOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

delete_lake = DataplexDeleteLakeOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    task_id="delete_lake",
    trigger_rule=TriggerRule.ALL_DONE,
)

Was this entry helpful?