Complete the airflow survey & get a free airflow 3 certification!

Google Dataplex Operators

Dataplex is an intelligent data fabric that provides unified analytics and data management across your data lakes, data warehouses, and data marts.

For more information about the task visit Dataplex production documentation <Product documentation

Create a Task

Before you create a dataplex task you need to define its body. For more information about the available fields to pass when creating a task, visit Dataplex create task API.

A simple task configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex.py[source]

EXAMPLE_TASK_BODY = {
    "trigger_spec": {"type_": TRIGGER_SPEC_TYPE},
    "execution_spec": {"service_account": SERVICE_ACC},
    "spark": {"python_script_file": SPARK_FILE_FULL_PATH},
}

With this configuration we can create the task both synchronously & asynchronously: DataplexCreateTaskOperator

tests/system/google/cloud/dataplex/example_dataplex.py[source]

create_dataplex_task = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="create_dataplex_task",
)

tests/system/google/cloud/dataplex/example_dataplex.py[source]

create_dataplex_task_async = DataplexCreateTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_TASK_BODY,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    asynchronous=True,
    task_id="create_dataplex_task_async",
)

Delete a task

To delete a task you can use:

DataplexDeleteTaskOperator

tests/system/google/cloud/dataplex/example_dataplex.py[source]

delete_dataplex_task_async = DataplexDeleteTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=f"{DATAPLEX_TASK_ID}-1",
    task_id="delete_dataplex_task_async",
)

List tasks

To list tasks you can use:

DataplexListTasksOperator

tests/system/google/cloud/dataplex/example_dataplex.py[source]

list_dataplex_task = DataplexListTasksOperator(
    project_id=PROJECT_ID, region=REGION, lake_id=LAKE_ID, task_id="list_dataplex_task"
)

Get a task

To get a task you can use:

DataplexGetTaskOperator

tests/system/google/cloud/dataplex/example_dataplex.py[source]

get_dataplex_task = DataplexGetTaskOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="get_dataplex_task",
)

Wait for a task

To wait for a task created asynchronously you can use:

DataplexTaskStateSensor

tests/system/google/cloud/dataplex/example_dataplex.py[source]

dataplex_task_state = DataplexTaskStateSensor(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    dataplex_task_id=DATAPLEX_TASK_ID,
    task_id="dataplex_task_state",
)

Create a Lake

Before you create a dataplex lake you need to define its body.

For more information about the available fields to pass when creating a lake, visit Dataplex create lake API.

A simple task configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex.py[source]

EXAMPLE_LAKE_BODY = {
    "display_name": "test_display_name",
    "labels": [],
    "description": "test_description",
    "metastore": {"service": ""},
}

With this configuration we can create the lake:

DataplexCreateLakeOperator

tests/system/google/cloud/dataplex/example_dataplex.py[source]

create_lake = DataplexCreateLakeOperator(
    project_id=PROJECT_ID, region=REGION, body=EXAMPLE_LAKE_BODY, lake_id=LAKE_ID, task_id="create_lake"
)

Delete a lake

To delete a lake you can use:

DataplexDeleteLakeOperator

tests/system/google/cloud/dataplex/example_dataplex.py[source]

delete_lake = DataplexDeleteLakeOperator(
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    task_id="delete_lake",
    trigger_rule=TriggerRule.ALL_DONE,
)

Create or update a Data Quality scan

Before you create a Dataplex Data Quality scan you need to define its body. For more information about the available fields to pass when creating a Data Quality scan, visit Dataplex create data quality API.

A simple Data Quality scan configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_quality_spec = DataQualitySpec(
    {
        "rules": [
            {
                "range_expectation": {
                    "min_value": "0",
                    "max_value": "10000",
                },
                "column": "value",
                "dimension": "VALIDITY",
            }
        ],
    }
)

With this configuration we can create or update the Data Quality scan:

DataplexCreateOrUpdateDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

create_data_scan = DataplexCreateOrUpdateDataQualityScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

Get a Data Quality scan

To get a Data Quality scan you can use:

DataplexGetDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

get_data_scan = DataplexGetDataQualityScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Delete a Data Quality scan

To delete a Data Quality scan you can use:

DataplexDeleteDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

delete_data_scan = DataplexDeleteDataQualityScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Run a Data Quality scan

You can run Dataplex Data Quality scan in asynchronous modes to later check its status using sensor:

DataplexRunDataQualityScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

run_data_scan_async = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

To check that running Dataplex Data Quality scan succeeded you can use:

DataplexDataQualityJobStatusSensor.

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

get_data_scan_job_status = DataplexDataQualityJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

Also for this action you can use operator in the deferrable mode:

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

run_data_scan_def = DataplexRunDataQualityScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

Get a Data Quality scan job

To get a Data Quality scan job you can use:

DataplexGetDataQualityScanResultOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

get_data_scan_job_result_2 = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Also for this action you can use operator in the deferrable mode:

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

get_data_scan_job_result_def = DataplexGetDataQualityScanResultOperator(
    task_id="get_data_scan_job_result_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

Create a zone

Before you create a Dataplex zone you need to define its body.

For more information about the available fields to pass when creating a zone, visit Dataplex create zone API.

A simple zone configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

EXAMPLE_ZONE = {
    "type_": "RAW",
    "resource_spec": {"location_type": "SINGLE_REGION"},
}

With this configuration we can create a zone:

DataplexCreateZoneOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

create_zone = DataplexCreateZoneOperator(
    task_id="create_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    body=EXAMPLE_ZONE,
    zone_id=ZONE_ID,
)

Delete a zone

To delete a zone you can use:

DataplexDeleteZoneOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

delete_zone = DataplexDeleteZoneOperator(
    task_id="delete_zone",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Create an asset

Before you create a Dataplex asset you need to define its body.

For more information about the available fields to pass when creating an asset, visit Dataplex create asset API.

A simple asset configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

EXAMPLE_ASSET = {
    "resource_spec": {"name": f"projects/{PROJECT_ID}/datasets/{DATASET}", "type_": "BIGQUERY_DATASET"},
    "discovery_spec": {"enabled": True},
}

With this configuration we can create the asset:

DataplexCreateAssetOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

create_asset = DataplexCreateAssetOperator(
    task_id="create_asset",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_ASSET,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
)

Delete an asset

To delete an asset you can use:

DataplexDeleteAssetOperator

tests/system/google/cloud/dataplex/example_dataplex_dq.py[source]

delete_asset = DataplexDeleteAssetOperator(
    task_id="delete_asset",
    project_id=PROJECT_ID,
    region=REGION,
    lake_id=LAKE_ID,
    zone_id=ZONE_ID,
    asset_id=ASSET_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Create or update a Data Profile scan

Before you create a Dataplex Data Profile scan you need to define its body. For more information about the available fields to pass when creating a Data Profile scan, visit Dataplex create data profile API.

A simple Data Profile scan configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

EXAMPLE_DATA_SCAN = dataplex_v1.DataScan()
EXAMPLE_DATA_SCAN.data.entity = (
    f"projects/{PROJECT_ID}/locations/{REGION}/lakes/{LAKE_ID}/zones/{ZONE_ID}/entities/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data.resource = (
    f"//bigquery.googleapis.com/projects/{PROJECT_ID}/datasets/{DATASET}/tables/{TABLE_1}"
)
EXAMPLE_DATA_SCAN.data_profile_spec = DataProfileSpec({})

With this configuration we can create or update the Data Profile scan:

DataplexCreateOrUpdateDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

create_data_scan = DataplexCreateOrUpdateDataProfileScanOperator(
    task_id="create_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    body=EXAMPLE_DATA_SCAN,
    data_scan_id=DATA_SCAN_ID,
)

Get a Data Profile scan

To get a Data Profile scan you can use:

DataplexGetDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

get_data_scan = DataplexGetDataProfileScanOperator(
    task_id="get_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Delete a Data Profile scan

To delete a Data Profile scan you can use:

DataplexDeleteDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

delete_data_scan = DataplexDeleteDataProfileScanOperator(
    task_id="delete_data_scan",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    trigger_rule=TriggerRule.ALL_DONE,
)

Run a Data Profile scan

You can run Dataplex Data Profile scan in asynchronous modes to later check its status using sensor:

DataplexRunDataProfileScanOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

run_data_scan_async = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_async",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    asynchronous=True,
)

To check that running Dataplex Data Profile scan succeeded you can use:

DataplexDataProfileJobStatusSensor.

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

get_data_scan_job_status = DataplexDataProfileJobStatusSensor(
    task_id="get_data_scan_job_status",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    job_id="{{ task_instance.xcom_pull('run_data_scan_async') }}",
)

Also for this action you can use operator in the deferrable mode:

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

run_data_scan_def = DataplexRunDataProfileScanOperator(
    task_id="run_data_scan_def",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
    deferrable=True,
)

Get a Data Profile scan job

To get a Data Profile scan job you can use:

DataplexGetDataProfileScanResultOperator

tests/system/google/cloud/dataplex/example_dataplex_dp.py[source]

get_data_scan_job_result_2 = DataplexGetDataProfileScanResultOperator(
    task_id="get_data_scan_job_result_2",
    project_id=PROJECT_ID,
    region=REGION,
    data_scan_id=DATA_SCAN_ID,
)

Google Dataplex Catalog Operators

Dataplex Catalog provides a unified inventory of Google Cloud resources, such as BigQuery, and other resources, such as on-premises resources. Dataplex Catalog automatically retrieves metadata for Google Cloud resources, and you bring metadata for third-party resources into Dataplex Catalog.

For more information about Dataplex Catalog visit Dataplex Catalog production documentation <Product documentation

Create an EntryGroup

To create an Entry Group in specific location in Dataplex Catalog you can use DataplexCatalogCreateEntryGroupOperator For more information about the available fields to pass when creating an Entry Group, visit Entry Group resource configuration.

A simple Entry Group configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

ENTRY_GROUP_BODY = {"display_name": "Display Name", "description": "Some description"}

With this configuration you can create an Entry Group resource:

DataplexCatalogCreateEntryGroupOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

create_entry_group = DataplexCatalogCreateEntryGroupOperator(
    task_id="create_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_group_configuration=ENTRY_GROUP_BODY,
    validate_request=False,
)

Delete an EntryGroup

To delete an Entry Group in specific location in Dataplex Catalog you can use DataplexCatalogDeleteEntryGroupOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

delete_entry_group = DataplexCatalogDeleteEntryGroupOperator(
    task_id="delete_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

List EntryGroups

To list all Entry Groups in specific location in Dataplex Catalog you can use DataplexCatalogListEntryGroupsOperator. This operator also supports filtering and ordering the result of the operation.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

list_entry_group = DataplexCatalogListEntryGroupsOperator(
    task_id="list_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

Get an EntryGroup

To retrieve an Entry Group in specific location in Dataplex Catalog you can use DataplexCatalogGetEntryGroupOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

get_entry_group = DataplexCatalogGetEntryGroupOperator(
    task_id="get_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
)

Update an EntryGroup

To update an Entry Group in specific location in Dataplex Catalog you can use DataplexCatalogUpdateEntryGroupOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

update_entry_group = DataplexCatalogUpdateEntryGroupOperator(
    task_id="update_entry_group",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_group_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

Create an EntryType

To create an Entry Type in specific location in Dataplex Catalog you can use DataplexCatalogCreateEntryTypeOperator For more information about the available fields to pass when creating an Entry Type, visit Entry Type resource configuration.

A simple Entry Group configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

ENTRY_TYPE_BODY = {"display_name": "Display Name", "description": "Some description"}

With this configuration you can create an Entry Type resource:

DataplexCatalogCreateEntryTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

create_entry_type = DataplexCatalogCreateEntryTypeOperator(
    task_id="create_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    entry_type_configuration=ENTRY_TYPE_BODY,
    validate_request=False,
)

Delete an EntryType

To delete an Entry Type in specific location in Dataplex Catalog you can use DataplexCatalogDeleteEntryTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

delete_entry_type = DataplexCatalogDeleteEntryTypeOperator(
    task_id="delete_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

List EntryTypes

To list all Entry Types in specific location in Dataplex Catalog you can use DataplexCatalogListEntryTypesOperator. This operator also supports filtering and ordering the result of the operation.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

list_entry_type = DataplexCatalogListEntryTypesOperator(
    task_id="list_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

Get an EntryType

To retrieve an Entry Group in specific location in Dataplex Catalog you can use DataplexCatalogGetEntryTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

get_entry_type = DataplexCatalogGetEntryTypeOperator(
    task_id="get_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
)

Update an EntryType

To update an Entry Type in specific location in Dataplex Catalog you can use DataplexCatalogUpdateEntryTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

update_entry_type = DataplexCatalogUpdateEntryTypeOperator(
    task_id="update_entry_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_type_id=ENTRY_TYPE_NAME,
    entry_type_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

Create an AspectType

To create an Aspect Type in specific location in Dataplex Catalog you can use DataplexCatalogCreateAspectTypeOperator For more information about the available fields to pass when creating an Aspect Type, visit Aspect Type resource configuration.

A simple Aspect Group configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

ASPECT_TYPE_BODY = {
    "display_name": "Sample AspectType",
    "description": "A simple AspectType for demonstration purposes.",
    "metadata_template": {
        "name": "sample_field",
        "type": "record",
        "annotations": {
            "display_name": "Sample Field",
            "description": "A sample field within the AspectType.",
        },
    },
}

With this configuration you can create an Aspect Type resource:

DataplexCatalogCreateAspectTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

create_aspect_type = DataplexCatalogCreateAspectTypeOperator(
    task_id="create_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    aspect_type_configuration=ASPECT_TYPE_BODY,
    validate_request=False,
)

Delete an AspectType

To delete an Aspect Type in specific location in Dataplex Catalog you can use DataplexCatalogDeleteAspectTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

delete_aspect_type = DataplexCatalogDeleteAspectTypeOperator(
    task_id="delete_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

List AspectTypes

To list all Aspect Types in specific location in Dataplex Catalog you can use DataplexCatalogListAspectTypesOperator. This operator also supports filtering and ordering the result of the operation.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

list_aspect_type = DataplexCatalogListAspectTypesOperator(
    task_id="list_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    order_by="name",
    filter_by='display_name = "Display Name"',
)

Get an AspectType

To retrieve an Aspect Group in specific location in Dataplex Catalog you can use DataplexCatalogGetAspectTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

get_aspect_type = DataplexCatalogGetAspectTypeOperator(
    task_id="get_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
)

Update an AspectType

To update an Aspect Type in specific location in Dataplex Catalog you can use DataplexCatalogUpdateAspectTypeOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

update_aspect_type = DataplexCatalogUpdateAspectTypeOperator(
    task_id="update_aspect_type",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    aspect_type_id=ASPECT_TYPE_NAME,
    aspect_type_configuration={"display_name": "Updated Display Name"},
    update_mask=["display_name"],
)

Create an Entry

To create an Entry in specific location in Dataplex Catalog you can use DataplexCatalogCreateEntryOperator For more information about the available fields to pass when creating an Entry, visit Entry resource configuration.

A simple Entry configuration can look as followed:

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

ENTRY_BODY = {
    "name": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryGroups/{ENTRY_GROUP_NAME}/entries/{ENTRY_NAME}",
    "entry_type": f"projects/{PROJECT_ID}/locations/{GCP_LOCATION}/entryTypes/{ENTRY_TYPE_NAME}",
}

With this configuration you can create an Entry resource:

DataplexCatalogCreateEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

create_entry = DataplexCatalogCreateEntryOperator(
    task_id="create_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_configuration=ENTRY_BODY,
)

Delete an Entry

To delete an Entry in specific location in Dataplex Catalog you can use DataplexCatalogDeleteEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

delete_entry = DataplexCatalogDeleteEntryOperator(
    task_id="delete_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    trigger_rule=TriggerRule.ALL_DONE,
)

List Entries

To list all Entries in specific location in Dataplex Catalog you can use DataplexCatalogListEntriesOperator. This operator also supports filtering and ordering the result of the operation.

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

list_entry = DataplexCatalogListEntriesOperator(
    task_id="list_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_group_id=ENTRY_GROUP_NAME,
)

Get an Entry

To retrieve an Entry in specific location in Dataplex Catalog you can use DataplexCatalogGetEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

get_entry = DataplexCatalogGetEntryOperator(
    task_id="get_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
)

Update an Entry

To update an Entry in specific location in Dataplex Catalog you can use DataplexCatalogUpdateEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

update_entry = DataplexCatalogUpdateEntryOperator(
    task_id="update_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
    entry_configuration={
        "fully_qualified_name": f"dataplex:{PROJECT_ID}.{GCP_LOCATION}.{ENTRY_GROUP_NAME}.some-entry"
    },
    update_mask=["fully_qualified_name"],
)

Look up a single Entry

To look up a single Entry by name using the permission on the source system in Dataplex Catalog you can use DataplexCatalogLookupEntryOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

lookup_entry = DataplexCatalogLookupEntryOperator(
    task_id="lookup_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    entry_id=ENTRY_NAME,
    entry_group_id=ENTRY_GROUP_NAME,
)

Search Entries

To search for Entries matching the given query and scope in Dataplex Catalog you can use DataplexCatalogSearchEntriesOperator

tests/system/google/cloud/dataplex/example_dataplex_catalog.py[source]

search_entry = DataplexCatalogSearchEntriesOperator(
    task_id="search_entry",
    project_id=PROJECT_ID,
    location=GCP_LOCATION,
    query=f"name={ENTRY_NAME}",
)

Was this entry helpful?