Google Dataplex Operators

Dataplex is an intelligent data fabric that provides unified analytics and data management across your data lakes, data warehouses, and data marts.

For more information about the task visit Dataplex production documentation <Product documentation

Create a Task

Before you create a dataplex task you need to define its body. For more information about the available fields to pass when creating a task, visit Dataplex create task API.

A simple task configuration can look as followed:

google/tests/system/google/cloud/dataplex/example_dataplex.py

EXAMPLE_TASK_BODY = {
    "trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
    "execution_spec": {"service_account": SERVICE_ACC},
    "spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}

With this configuration we can create the task both synchronously & asynchronously: DataplexCreateTaskOperator

google/tests/system/google/cloud/dataplex/example_dataplex.py

create_dataplex_task = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="create_dataplex_task",
)

google/tests/system/google/cloud/dataplex/example_dataplex.py

create_dataplex_task_async = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    asynchronous=True,
    task_id="create_dataplex_task_async",
)

Delete a task

To delete a task you can use:

DataplexDeleteTaskOperator

google/tests/system/google/cloud/dataplex/example_dataplex.py

delete_dataplex_task_async = DataplexDeleteTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    task_id="delete_dataplex_task_async",
)

List tasks

To list tasks you can use:

DataplexListTasksOperator

google/tests/system/google/cloud/dataplex/example_dataplex.py

list_dataplex_task = DataplexListTasksOperator(
    project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)

Get a task

To get a task you can use:

DataplexGetTaskOperator

google/tests/system/google/cloud/dataplex/example_dataplex.py

get_dataplex_task = DataplexGetTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="get_dataplex_task",
)

Wait for a task

To wait for a task created asynchronously you can use:

DataplexTaskStateSensor

google/tests/system/google/cloud/dataplex/example_dataplex.py

dataplex_task_state = DataplexTaskStateSensor(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="dataplex_task_state",
)

Create a Lake

Before you create a dataplex lake you need to define its body.

For more information about the available fields to pass when creating a lake, visit Dataplex create lake API.

A simple task configuration can look as followed:

google/tests/system/google/cloud/dataplex/example_dataplex.py

EXAMPLE_LAKE_BODY = {
    "display_name": "test_display_name",
    "labels": [],
    "description": "test_description",
    "metastore": {"service": ""},
}

With this configuration we can create the lake:

DataplexCreateLakeOperator

google/tests/system/google/cloud/dataplex/example_dataplex.py

create_lake = DataplexCreateLakeOperator(
    project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)

Delete a lake

To delete a lake you can use:

DataplexDeleteLakeOperator

google/tests/system/google/cloud/dataplex/example_dataplex.py

delete_lake = DataplexDeleteLakeOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    task_id="delete_lake",
    trigger_rule=TriggerRule.ALL_DONE,
)

Create or update a Data Quality scan

Before you create a Dataplex Data Quality scan you need to define its body. For more information about the available fields to pass when creating a Data Quality scan, visit Dataplex create data quality API.

A simple Data Quality scan configuration can look as followed:

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
    {
        "rules": [
            {
                "range_expectation": {
                    "min_value": "0",
                    "max_value": "10000",
                },
                "column": "value",
                "dimension": "VALIDITY",
            }
        ],
    }
)

With this configuration we can create or update the Data Quality scan:

DataplexCreateOrUpdateDataQualityScanOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

Get a Data Quality scan

To get a Data Quality scan you can use:

DataplexGetDataQualityScanOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

get_data_scan = DataplexGetDataQualityScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Delete a Data Quality scan

To delete a Data Quality scan you can use:

DataplexDeleteDataQualityScanOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

delete_data_scan = DataplexDeleteDataQualityScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Run a Data Quality scan

You can run Dataplex Data Quality scan in asynchronous modes to later check its status using sensor:

DataplexRunDataQualityScanOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

run_data_scan_async = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

To check that running Dataplex Data Quality scan succeeded you can use:

DataplexDataQualityJobStatusSensor.

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

Also for this action you can use operator in the deferrable mode:

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

run_data_scan_def = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

Get a Data Quality scan job

To get a Data Quality scan job you can use:

DataplexGetDataQualityScanResultOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Also for this action you can use operator in the deferrable mode:

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

Create a zone

Before you create a Dataplex zone you need to define its body.

For more information about the available fields to pass when creating a zone, visit Dataplex create zone API.

A simple zone configuration can look as followed:

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

EXAMPLE_ZONE = {
    "type_": "RAW",
    "resource_spec": {"location_type": "SINGLE_REGION"},
}

With this configuration we can create a zone:

DataplexCreateZoneOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

create_zone = DataplexCreateZoneOperator(
    task_id="create_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_ZONE,
    zone_id=ZONE_ID,
)

Delete a zone

To delete a zone you can use:

DataplexDeleteZoneOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

delete_zone = DataplexDeleteZoneOperator(
    task_id="delete_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Create an asset

Before you create a Dataplex asset you need to define its body.

For more information about the available fields to pass when creating an asset, visit Dataplex create asset API.

A simple asset configuration can look as followed:

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

EXAMPLE_ASSET = {
    "resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET}", "type_": "BIGQUERY_DATASET"},
    "discovery_spec": {"enabled": True},
}

With this configuration we can create the asset:

DataplexCreateAssetOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

create_asset = DataplexCreateAssetOperator(
    task_id="create_asset",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_ASSET,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
)

Delete an asset

To delete an asset you can use:

DataplexDeleteAssetOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dq.py

delete_asset = DataplexDeleteAssetOperator(
    task_id="delete_asset",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Create or update a Data Profile scan

Before you create a Dataplex Data Profile scan you need to define its body. For more information about the available fields to pass when creating a Data Profile scan, visit Dataplex create data profile API.

A simple Data Profile scan configuration can look as followed:

google/tests/system/google/cloud/dataplex/example_dataplex_dp.py

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({})

With this configuration we can create or update the Data Profile scan:

DataplexCreateOrUpdateDataProfileScanOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dp.py

create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

Get a Data Profile scan

To get a Data Profile scan you can use:

DataplexGetDataProfileScanOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dp.py

get_data_scan = DataplexGetDataProfileScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Delete a Data Profile scan

To delete a Data Profile scan you can use:

DataplexDeleteDataProfileScanOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dp.py

delete_data_scan = DataplexDeleteDataProfileScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Run a Data Profile scan

You can run Dataplex Data Profile scan in asynchronous modes to later check its status using sensor:

DataplexRunDataProfileScanOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dp.py

run_data_scan_async = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

To check that running Dataplex Data Profile scan succeeded you can use:

DataplexDataProfileJobStatusSensor.

google/tests/system/google/cloud/dataplex/example_dataplex_dp.py

get_data_scan_job_status = DataplexDataProfileJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

Also for this action you can use operator in the deferrable mode:

google/tests/system/google/cloud/dataplex/example_dataplex_dp.py

run_data_scan_def = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

Get a Data Profile scan job

To get a Data Profile scan job you can use:

DataplexGetDataProfileScanResultOperator

google/tests/system/google/cloud/dataplex/example_dataplex_dp.py

get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Google Dataplex Catalog Operators

Dataplex Catalog provides a unified inventory of Google Cloud resources, such as BigQuery, and other resources, such as on-premises resources. Dataplex Catalog automatically retrieves metadata for Google Cloud resources, and you bring metadata for third-party resources into Dataplex Catalog.

For more information about Dataplex Catalog visit Dataplex Catalog production documentation <Product documentation

Create an EntryGroup

To create an Entry Group in specific location in Dataplex Catalog you can use DataplexCatalogCreateEntryGroupOperator For more information about the available fields to pass when creating an Entry Group, visit Entry Group resource configuration.

A simple Entry Group configuration can look as followed:

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

ENTRY_GROUP_BODY = {"display_name": "Display Name", "description": "Some description"}

With this configuration you can create an Entry Group resource:

DataplexCatalogCreateEntryGroupOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

create_entry_group = DataplexCatalogCreateEntryGroupOperator(
    task_id="create_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_group_configuration=ENTRY_GROUP_BODY,
    validate_request=False,
)

Delete an EntryGroup

To delete an Entry Group in specific location in Dataplex Catalog you can use DataplexCatalogDeleteEntryGroupOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

delete_entry_group = DataplexCatalogDeleteEntryGroupOperator(
    task_id="delete_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

List EntryGroups

To list all Entry Groups in specific location in Dataplex Catalog you can use DataplexCatalogListEntryGroupsOperator. This operator also supports filtering and ordering the result of the operation.

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

list_entry_group = DataplexCatalogListEntryGroupsOperator(
    task_id="list_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

Get an EntryGroup

To retrieve an Entry Group in specific location in Dataplex Catalog you can use DataplexCatalogGetEntryGroupOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

get_entry_group = DataplexCatalogGetEntryGroupOperator(
    task_id="get_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
)

Update an EntryGroup

To update an Entry Group in specific location in Dataplex Catalog you can use DataplexCatalogUpdateEntryGroupOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

update_entry_group = DataplexCatalogUpdateEntryGroupOperator(
    task_id="update_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_group_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

Create an EntryType

To create an Entry Type in specific location in Dataplex Catalog you can use DataplexCatalogCreateEntryTypeOperator For more information about the available fields to pass when creating an Entry Type, visit Entry Type resource configuration.

A simple Entry Group configuration can look as followed:

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

ENTRY_TYPE_BODY = {"display_name": "Display Name", "description": "Some description"}

With this configuration you can create an Entry Type resource:

DataplexCatalogCreateEntryTypeOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

create_entry_type = DataplexCatalogCreateEntryTypeOperator(
    task_id="create_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    entry_type_configuration=ENTRY_TYPE_BODY,
    validate_request=False,
)

Delete an EntryType

To delete an Entry Type in specific location in Dataplex Catalog you can use DataplexCatalogDeleteEntryTypeOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

delete_entry_type = DataplexCatalogDeleteEntryTypeOperator(
    task_id="delete_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

List EntryTypes

To list all Entry Types in specific location in Dataplex Catalog you can use DataplexCatalogListEntryTypesOperator. This operator also supports filtering and ordering the result of the operation.

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

list_entry_type = DataplexCatalogListEntryTypesOperator(
    task_id="list_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

Get an EntryType

To retrieve an Entry Group in specific location in Dataplex Catalog you can use DataplexCatalogGetEntryTypeOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

get_entry_type = DataplexCatalogGetEntryTypeOperator(
    task_id="get_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
)

Update an EntryType

To update an Entry Type in specific location in Dataplex Catalog you can use DataplexCatalogUpdateEntryTypeOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

update_entry_type = DataplexCatalogUpdateEntryTypeOperator(
    task_id="update_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    entry_type_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

Create an AspectType

To create an Aspect Type in specific location in Dataplex Catalog you can use DataplexCatalogCreateAspectTypeOperator For more information about the available fields to pass when creating an Aspect Type, visit Aspect Type resource configuration.

A simple Aspect Group configuration can look as followed:

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

ASPECT_TYPE_BODY = {
    "display_name": "Sample AspectType",
    "description": "A simple AspectType for demonstration purposes.",
    "metadata_template": {
        "name": "sample_field",
        "type": "record",
        "annotations": {
            "display_name": "Sample Field",
            "description": "A sample field within the AspectType.",
        },
    },
}

With this configuration you can create an Aspect Type resource:

DataplexCatalogCreateAspectTypeOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

create_aspect_type = DataplexCatalogCreateAspectTypeOperator(
    task_id="create_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    aspect_type_configuration=ASPECT_TYPE_BODY,
    validate_request=False,
)

Delete an AspectType

To delete an Aspect Type in specific location in Dataplex Catalog you can use DataplexCatalogDeleteAspectTypeOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

delete_aspect_type = DataplexCatalogDeleteAspectTypeOperator(
    task_id="delete_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

List AspectTypes

To list all Aspect Types in specific location in Dataplex Catalog you can use DataplexCatalogListAspectTypesOperator. This operator also supports filtering and ordering the result of the operation.

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

list_aspect_type = DataplexCatalogListAspectTypesOperator(
    task_id="list_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

Get an AspectType

To retrieve an Aspect Group in specific location in Dataplex Catalog you can use DataplexCatalogGetAspectTypeOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

get_aspect_type = DataplexCatalogGetAspectTypeOperator(
    task_id="get_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
)

Update an AspectType

To update an Aspect Type in specific location in Dataplex Catalog you can use DataplexCatalogUpdateAspectTypeOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

update_aspect_type = DataplexCatalogUpdateAspectTypeOperator(
    task_id="update_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    aspect_type_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

Create an Entry

To create an Entry in specific location in Dataplex Catalog you can use DataplexCatalogCreateEntryOperator For more information about the available fields to pass when creating an Entry, visit Entry resource configuration.

A simple Entry configuration can look as followed:

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

ENTRY_BODY = {
    "name": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryGroups/{ENTRY_GROUP_NAME}/entries/{ENTRY_NAME}",
    "entry_type": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryTypes/{ENTRY_TYPE_NAME}",
}

With this configuration you can create an Entry resource:

DataplexCatalogCreateEntryOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

create_entry = DataplexCatalogCreateEntryOperator(
    task_id="create_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_configuration=ENTRY_BODY,
)

Delete an Entry

To delete an Entry in specific location in Dataplex Catalog you can use DataplexCatalogDeleteEntryOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

delete_entry = DataplexCatalogDeleteEntryOperator(
    task_id="delete_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

List Entries

To list all Entries in specific location in Dataplex Catalog you can use DataplexCatalogListEntriesOperator. This operator also supports filtering and ordering the result of the operation.

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

list_entry = DataplexCatalogListEntriesOperator(
    task_id="list_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
)

Get an Entry

To retrieve an Entry in specific location in Dataplex Catalog you can use DataplexCatalogGetEntryOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

get_entry = DataplexCatalogGetEntryOperator(
    task_id="get_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
)

Update an Entry

To update an Entry in specific location in Dataplex Catalog you can use DataplexCatalogUpdateEntryOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

update_entry = DataplexCatalogUpdateEntryOperator(
    task_id="update_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_configuration={
        "fully_qualified_name": f"dataplex:{PROJECT_ID}.{GCP_LOCATION}.{ENTRY_GROUP_NAME}.some-entry"
    },
    update_mask=["fully_qualified_name"],
)

Look up a single Entry

To look up a single Entry by name using the permission on the source system in Dataplex Catalog you can use DataplexCatalogLookupEntryOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

lookup_entry = DataplexCatalogLookupEntryOperator(
    task_id="lookup_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
)

Search Entries

To search for Entries matching the given query and scope in Dataplex Catalog you can use DataplexCatalogSearchEntriesOperator

google/tests/system/google/cloud/dataplex/example_dataplex_catalog.py

search_entry = DataplexCatalogSearchEntriesOperator(
    task_id="search_entry",
    project_id=PROJECT_ID,
    location="global",
    query="displayname:Display Name",
)

Was this entry helpful?