Google Cloud Data Catalog Operators

The Data Catalog is a fully managed and scalable metadata management service that allows organizations to quickly discover, manage and understand all their data in Google Cloud. It offers:

  • A simple and easy to use search interface for data discovery, powered by the same Google search technology that supports Gmail and Drive

  • A flexible and powerful cataloging system for capturing technical and business metadata

  • An auto-tagging mechanism for sensitive data with DLP API integration

Prerequisite Tasks

Managing an entries

Operators uses a Entry for representing entry

Getting an entry

Getting an entry is performed with the CloudDataCatalogGetEntryOperator and CloudDataCatalogLookupEntryOperator operators.

The CloudDataCatalogGetEntryOperator use Project ID, Entry Group ID, Entry ID to get the entry.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

get_entry = CloudDataCatalogGetEntryOperator(
    task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)

You can use Jinja templating with location, entry_group, entry, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")

The CloudDataCatalogLookupEntryOperator use the resource name to get the entry.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

current_entry_template = (
    "//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
    "entryGroups/{entry_group}/entries/{entry}"
)
lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
    task_id="lookup_entry",
    linked_resource=current_entry_template.format(
        project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
    ),
)

You can use Jinja templating with linked_resource, sql_resource, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

lookup_entry_result = BashOperator(
    task_id="lookup_entry_result",
    bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
)

Creating an entry

The CloudDataCatalogCreateEntryOperator operator create the entry.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_entry_gcs = CloudDataCatalogCreateEntryOperator(
    task_id="create_entry_gcs",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry_id=ENTRY_ID,
    entry={
        "display_name": "Wizard",
        "type_": "FILESET",
        "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_ID}/**"]},
    },
)

You can use Jinja templating with location, entry_group, entry_id, entry, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_entry_gcs_result2 = BashOperator(
    task_id="create_entry_gcs_result2",
    bash_command=f"echo {create_entry_gcs.output}",
)

The newly created entry ID can be read with the entry_id key.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_entry_gcs_result = BashOperator(
    task_id="create_entry_gcs_result",
    bash_command=f"echo {create_entry_gcs.output['entry_id']}",
)

Updating an entry

The CloudDataCatalogUpdateEntryOperator operator update the entry.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

update_entry = CloudDataCatalogUpdateEntryOperator(
    task_id="update_entry",
    entry={"display_name": "New Wizard"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry_id=ENTRY_ID,
)

You can use Jinja templating with entry, update_mask, location, entry_group, entry_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Deleting a entry

The CloudDataCatalogDeleteEntryOperator operator delete the entry.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

delete_entry = CloudDataCatalogDeleteEntryOperator(
    task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)

You can use Jinja templating with location, entry_group, entry, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Managing a entry groups

Operators uses a Entry for representing a entry groups.

Creating an entry group

The CloudDataCatalogCreateEntryGroupOperator operator create the entry group.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
    task_id="create_entry_group",
    location=LOCATION,
    entry_group_id=ENTRY_GROUP_ID,
    entry_group={"display_name": "analytics data - jan 2011"},
)

You can use Jinja templating with location, entry_group_id, entry_group, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_entry_group_result2 = BashOperator(
    task_id="create_entry_group_result2",
    bash_command=f"echo {create_entry_group.output}",
)

The newly created entry group ID can be read with the entry_group_id key.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_entry_group_result2 = BashOperator(
    task_id="create_entry_group_result2",
    bash_command=f"echo {create_entry_group.output}",
)

Getting an entry group

The CloudDataCatalogGetEntryGroupOperator operator get the entry group.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

get_entry_group = CloudDataCatalogGetEntryGroupOperator(
    task_id="get_entry_group",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    read_mask={"paths": ["name", "display_name"]},
)

You can use Jinja templating with location, entry_group, read_mask, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

get_entry_group_result = BashOperator(
    task_id="get_entry_group_result",
    bash_command=f"echo {get_entry_group.output}",
)

Deleting an entry group

The CloudDataCatalogDeleteEntryGroupOperator operator delete the entry group.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
    task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
)

You can use Jinja templating with location, entry_group, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Managing a tag templates

Operators uses a TagTemplate for representing a tag templates.

Creating a tag templates

The CloudDataCatalogCreateTagTemplateOperator operator get the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
    task_id="create_tag_template",
    location=LOCATION,
    tag_template_id=TEMPLATE_ID,
    tag_template={
        "display_name": "Awesome Tag Template",
        "fields": {
            FIELD_NAME_1: TagTemplateField(
                display_name="first-field", type_=dict(primitive_type="STRING")
            )
        },
    },
)

You can use Jinja templating with location, tag_template_id, tag_template, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_tag_template_result2 = BashOperator(
    task_id="create_tag_template_result2",
    bash_command=f"echo {create_tag_template.output}",
)

The newly created tag template ID can be read with the tag_template_id key.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_tag_template_result = BashOperator(
    task_id="create_tag_template_result",
    bash_command=f"echo {create_tag_template.output['tag_template_id']}",
)

Deleting a tag template

The CloudDataCatalogDeleteTagTemplateOperator operator delete the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
    task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)

You can use Jinja templating with location, tag_template, force, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Getting a tag template

The CloudDataCatalogGetTagTemplateOperator operator get the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

get_tag_template = CloudDataCatalogGetTagTemplateOperator(
    task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
)

You can use Jinja templating with location, tag_template, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

get_tag_template_result = BashOperator(
    task_id="get_tag_template_result",
    bash_command=f"{get_tag_template.output}",
)

Updating a tag template

The CloudDataCatalogUpdateTagTemplateOperator operator update the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
    task_id="update_tag_template",
    tag_template={"display_name": "Awesome Tag Template"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    tag_template_id=TEMPLATE_ID,
)

You can use Jinja templating with tag_template, update_mask, location, tag_template_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Managing a tags

Operators uses a Tag for representing a tag.

Creating a tag on an entry

The CloudDataCatalogCreateTagOperator operator get the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_tag = CloudDataCatalogCreateTagOperator(
    task_id="create_tag",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry=ENTRY_ID,
    template_id=TEMPLATE_ID,
    tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
)

You can use Jinja templating with location, entry_group, entry, tag, template_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_tag_result2 = BashOperator(task_id="create_tag_result2", bash_command=f"echo {create_tag.output}")

The newly created tag ID can be read with the tag_id key.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_entry_group_result2 = BashOperator(
    task_id="create_entry_group_result2",
    bash_command=f"echo {create_entry_group.output}",
)

Updating an tag

The CloudDataCatalogUpdateTagOperator operator update the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
    task_id="update_tag_template",
    tag_template={"display_name": "Awesome Tag Template"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    tag_template_id=TEMPLATE_ID,
)

You can use Jinja templating with tag, update_mask, location, entry_group, entry, tag_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Deleting an tag

The CloudDataCatalogDeleteTagOperator operator delete the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
    task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)

You can use Jinja templating with location, entry_group, entry, tag, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Listing an tags on an entry

The CloudDataCatalogListTagsOperator operator get list of the tags on the entry.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

list_tags = CloudDataCatalogListTagsOperator(
    task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)

You can use Jinja templating with location, entry_group, entry, page_size, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

list_tags_result = BashOperator(task_id="list_tags_result", bash_command=f"echo {list_tags.output}")

Managing a tag template fields

Operators uses a TagTemplateField for representing a tag template fields.

Creating a field

The CloudDataCatalogCreateTagTemplateFieldOperator operator get the tag template field.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
    task_id="create_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    tag_template_field_id=FIELD_NAME_2,
    tag_template_field=TagTemplateField(
        display_name="second-field", type_=FieldType(primitive_type="STRING")
    ),
)

You can use Jinja templating with location, tag_template, tag_template_field_id, tag_template_field, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_tag_template_field_result2 = BashOperator(
    task_id="create_tag_template_field_result2",
    bash_command=f"echo {create_tag_template_field.output}",
)

The newly created field ID can be read with the tag_template_field_id key.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

create_entry_group_result2 = BashOperator(
    task_id="create_entry_group_result2",
    bash_command=f"echo {create_entry_group.output}",
)

Renaming a field

The CloudDataCatalogRenameTagTemplateFieldOperator operator rename the tag template field.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
    task_id="rename_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    field=FIELD_NAME_1,
    new_tag_template_field_id=FIELD_NAME_3,
)

You can use Jinja templating with location, tag_template, field, new_tag_template_field_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Updating a field

The CloudDataCatalogUpdateTagTemplateFieldOperator operator get the tag template field.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
    task_id="update_tag_template_field",
    tag_template_field={"display_name": "Updated template field"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    tag_template_field_id=FIELD_NAME_1,
)

You can use Jinja templating with tag_template_field, update_mask, tag_template_field_name, location, tag_template, tag_template_field_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Deleting a field

The CloudDataCatalogDeleteTagTemplateFieldOperator operator delete the tag template field.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
    task_id="delete_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    field=FIELD_NAME_2,
    force=True,
)

You can use Jinja templating with location, tag_template, field, force, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Search resources

The CloudDataCatalogSearchCatalogOperator operator searches Data Catalog for multiple resources like entries, tags that match a query.

The query parameters should defined using search syntax.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

search_catalog = CloudDataCatalogSearchCatalogOperator(
    task_id="search_catalog", scope={"include_project_ids": [PROJECT_ID]}, query=f"projectid:{PROJECT_ID}"
)

You can use Jinja templating with scope, query, page_size, order_by, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.pyView Source

search_catalog_result = BashOperator(
    task_id="search_catalog_result",
    bash_command=f"echo {search_catalog.output}",
)

Reference

For further information, look at:

Was this entry helpful?