Google Dataplex Operators

Dataplex is an intelligent data fabric that provides unified analytics and data management across your data lakes, data warehouses, and data marts.

For more information about the task visit Dataplex production documentation <Product documentation

Create a Task

Before you create a dataplex task you need to define its body. For more information about the available fields to pass when creating a task, visit Dataplex create task API.

A simple task configuration can look as followed:

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

EXAMPLE_TASK_BODY = {
    "trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
    "execution_spec": {"service_account": SERVICE_ACC},
    "spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}

With this configuration we can create the task both synchronously & asynchronously: DataplexCreateTaskOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

create_dataplex_task = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="create_dataplex_task",
)

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

create_dataplex_task_async = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    asynchronous=True,
    task_id="create_dataplex_task_async",
)

Delete a task

To delete a task you can use:

DataplexDeleteTaskOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

delete_dataplex_task_async = DataplexDeleteTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    task_id="delete_dataplex_task_async",
)

List tasks

To list tasks you can use:

DataplexListTasksOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

list_dataplex_task = DataplexListTasksOperator(
    project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)

Get a task

To get a task you can use:

DataplexGetTaskOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

get_dataplex_task = DataplexGetTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="get_dataplex_task",
)

Wait for a task

To wait for a task created asynchronously you can use:

DataplexTaskStateSensor

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

dataplex_task_state = DataplexTaskStateSensor(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="dataplex_task_state",
)

Create a Lake

Before you create a dataplex lake you need to define its body.

For more information about the available fields to pass when creating a lake, visit Dataplex create lake API.

A simple task configuration can look as followed:

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

EXAMPLE_LAKE_BODY = {
    "display_name": "test_display_name",
    "labels": [],
    "description": "test_description",
    "metastore": {"service": ""},
}

With this configuration we can create the lake:

DataplexCreateLakeOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

create_lake = DataplexCreateLakeOperator(
    project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)

Delete a lake

To delete a lake you can use:

DataplexDeleteLakeOperator

tests/system/providers/google/cloud/dataplex/example_dataplex.py[source]

delete_lake = DataplexDeleteLakeOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    task_id="delete_lake",
    trigger_rule=TriggerRule.ALL_DONE,
)

Create or update a Data Quality scan

Before you create a Dataplex Data Quality scan you need to define its body. For more information about the available fields to pass when creating a Data Quality scan, visit Dataplex create data quality API.

A simple Data Quality scan configuration can look as followed:

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
    {
        "rules": [
            {
                "range_expectation": {
                    "min_value": "0",
                    "max_value": "10000",
                },
                "column": "value",
                "dimension": "VALIDITY",
            }
        ],
    }
)

With this configuration we can create or update the Data Quality scan:

DataplexCreateOrUpdateDataQualityScanOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

Get a Data Quality scan

To get a Data Quality scan you can use:

DataplexGetDataQualityScanOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

get_data_scan = DataplexGetDataQualityScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Delete a Data Quality scan

To delete a Data Quality scan you can use:

DataplexDeleteDataQualityScanOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

delete_data_scan = DataplexDeleteDataQualityScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Run a Data Quality scan

You can run Dataplex Data Quality scan in asynchronous modes to later check its status using sensor:

DataplexRunDataQualityScanOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

run_data_scan_async = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

To check that running Dataplex Data Quality scan succeeded you can use:

DataplexDataQualityJobStatusSensor.

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

Also for this action you can use operator in the deferrable mode:

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

run_data_scan_def = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

Get a Data Quality scan job

To get a Data Quality scan job you can use:

DataplexGetDataQualityScanResultOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Also for this action you can use operator in the deferrable mode:

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

Create a zone

Before you create a Dataplex zone you need to define its body.

For more information about the available fields to pass when creating a zone, visit Dataplex create zone API.

A simple zone configuration can look as followed:

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

EXAMPLE_ZONE = {
    "type_": "RAW",
    "resource_spec": {"location_type": "SINGLE_REGION"},
}

With this configuration we can create a zone:

DataplexCreateZoneOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

create_zone = DataplexCreateZoneOperator(
    task_id="create_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_ZONE,
    zone_id=ZONE_ID,
)

Delete a zone

To delete a zone you can use:

DataplexDeleteZoneOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

delete_zone = DataplexDeleteZoneOperator(
    task_id="delete_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
)

Create a asset

Before you create a Dataplex asset you need to define its body.

For more information about the available fields to pass when creating a asset, visit Dataplex create asset API.

A simple asset configuration can look as followed:

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

EXAMPLE_ASSET = {
    "resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET_NAME}", "type_": "BIGQUERY_DATASET"},
    "discovery_spec": {"enabled": True},
}

With this configuration we can create the asset:

DataplexCreateAssetOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

create_asset = DataplexCreateAssetOperator(
    task_id="create_asset",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_ASSET,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
)

Delete a asset

To delete a asset you can use:

DataplexDeleteAssetOperator

tests/system/providers/google/cloud/dataplex/example_dataplex_dq.py[source]

delete_asset = DataplexDeleteAssetOperator(
    task_id="delete_asset",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
)

Was this entry helpful?