Google Cloud Data Catalog Operators

The Data Catalog is a fully managed and scalable metadata management service that allows organizations to quickly discover, manage and understand all their data in Google Cloud. It offers:

  • A simple and easy to use search interface for data discovery, powered by the same Google search technology that supports Gmail and Drive

  • A flexible and powerful cataloging system for capturing technical and business metadata

  • An auto-tagging mechanism for sensitive data with DLP API integration

Prerequisite Tasks

Managing an entries

Operators uses a Entry for representing entry

Getting an entry

Getting an entry is performed with the CloudDataCatalogGetEntryOperator and CloudDataCatalogLookupEntryOperator operators.

The CloudDataCatalogGetEntryOperator use Project ID, Entry Group ID, Entry ID to get the entry.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

get_entry = CloudDataCatalogGetEntryOperator(
    task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)

You can use Jinja templating with location, entry_group, entry, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")

The CloudDataCatalogLookupEntryOperator use the resource name to get the entry.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

current_entry_template = (
    "//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
    "entryGroups/{entry_group}/entries/{entry}"
)
lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
    task_id="lookup_entry",
    linked_resource=current_entry_template.format(
        project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
    ),
)

You can use Jinja templating with linked_resource, sql_resource, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

lookup_entry_result = BashOperator(
    task_id="lookup_entry_result",
    bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
)

Creating an entry

The CloudDataCatalogCreateEntryOperator operator create the entry.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_entry_gcs = CloudDataCatalogCreateEntryOperator(
    task_id="create_entry_gcs",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry_id=ENTRY_ID,
    entry={
        "display_name": "Wizard",
        "type_": "FILESET",
        "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_ID}/**"]},
    },
)

You can use Jinja templating with location, entry_group, entry_id, entry, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_entry_gcs_result2 = BashOperator(
    task_id="create_entry_gcs_result2",
    bash_command=f"echo {create_entry_gcs.output}",
)

The newly created entry ID can be read with the entry_id key.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_entry_gcs_result = BashOperator(
    task_id="create_entry_gcs_result",
    bash_command=f"echo {create_entry_gcs.output['entry_id']}",
)

Updating an entry

The CloudDataCatalogUpdateEntryOperator operator update the entry.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

update_entry = CloudDataCatalogUpdateEntryOperator(
    task_id="update_entry",
    entry={"display_name": "New Wizard"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry_id=ENTRY_ID,
)

You can use Jinja templating with entry, update_mask, location, entry_group, entry_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Deleting a entry

The CloudDataCatalogDeleteEntryOperator operator delete the entry.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

delete_entry = CloudDataCatalogDeleteEntryOperator(
    task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)

You can use Jinja templating with location, entry_group, entry, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Managing a entry groups

Operators uses a Entry for representing a entry groups.

Creating an entry group

The CloudDataCatalogCreateEntryGroupOperator operator create the entry group.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
    task_id="create_entry_group",
    location=LOCATION,
    entry_group_id=ENTRY_GROUP_ID,
    entry_group={"display_name": "analytics data - jan 2011"},
)

You can use Jinja templating with location, entry_group_id, entry_group, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_entry_group_result2 = BashOperator(
    task_id="create_entry_group_result2",
    bash_command=f"echo {create_entry_group.output}",
)

The newly created entry group ID can be read with the entry_group_id key.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_entry_group_result2 = BashOperator(
    task_id="create_entry_group_result2",
    bash_command=f"echo {create_entry_group.output}",
)

Getting an entry group

The CloudDataCatalogGetEntryGroupOperator operator get the entry group.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

get_entry_group = CloudDataCatalogGetEntryGroupOperator(
    task_id="get_entry_group",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    read_mask=FieldMask(paths=["name", "display_name"]),
)

You can use Jinja templating with location, entry_group, read_mask, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

get_entry_group_result = BashOperator(
    task_id="get_entry_group_result",
    bash_command=f"echo {get_entry_group.output}",
)

Deleting an entry group

The CloudDataCatalogDeleteEntryGroupOperator operator delete the entry group.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
    task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
)

You can use Jinja templating with location, entry_group, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Managing a tag templates

Operators uses a TagTemplate for representing a tag templates.

Creating a tag templates

The CloudDataCatalogCreateTagTemplateOperator operator get the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
    task_id="create_tag_template",
    location=LOCATION,
    tag_template_id=TEMPLATE_ID,
    tag_template={
        "display_name": "Awesome Tag Template",
        "fields": {
            FIELD_NAME_1: TagTemplateField(
                display_name="first-field", type_=dict(primitive_type="STRING")
            )
        },
    },
)

You can use Jinja templating with location, tag_template_id, tag_template, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_tag_template_result2 = BashOperator(
    task_id="create_tag_template_result2",
    bash_command=f"echo {create_tag_template.output}",
)

The newly created tag template ID can be read with the tag_template_id key.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_tag_template_result = BashOperator(
    task_id="create_tag_template_result",
    bash_command=f"echo {create_tag_template.output['tag_template_id']}",
)

Deleting a tag template

The CloudDataCatalogDeleteTagTemplateOperator operator delete the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
    task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)

You can use Jinja templating with location, tag_template, force, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Getting a tag template

The CloudDataCatalogGetTagTemplateOperator operator get the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

get_tag_template = CloudDataCatalogGetTagTemplateOperator(
    task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
)

You can use Jinja templating with location, tag_template, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

get_tag_template_result = BashOperator(
    task_id="get_tag_template_result",
    bash_command=f"{get_tag_template.output}",
)

Updating a tag template

The CloudDataCatalogUpdateTagTemplateOperator operator update the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
    task_id="update_tag_template",
    tag_template={"display_name": "Awesome Tag Template"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    tag_template_id=TEMPLATE_ID,
)

You can use Jinja templating with tag_template, update_mask, location, tag_template_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Managing a tags

Operators uses a Tag for representing a tag.

Creating a tag on an entry

The CloudDataCatalogCreateTagOperator operator get the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_tag = CloudDataCatalogCreateTagOperator(
    task_id="create_tag",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry=ENTRY_ID,
    template_id=TEMPLATE_ID,
    tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
)

You can use Jinja templating with location, entry_group, entry, tag, template_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_tag_result2 = BashOperator(task_id="create_tag_result2", bash_command=f"echo {create_tag.output}")

The newly created tag ID can be read with the tag_id key.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_entry_group_result2 = BashOperator(
    task_id="create_entry_group_result2",
    bash_command=f"echo {create_entry_group.output}",
)

Updating an tag

The CloudDataCatalogUpdateTagOperator operator update the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
    task_id="update_tag_template",
    tag_template={"display_name": "Awesome Tag Template"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    tag_template_id=TEMPLATE_ID,
)

You can use Jinja templating with tag, update_mask, location, entry_group, entry, tag_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Deleting an tag

The CloudDataCatalogDeleteTagOperator operator delete the tag template.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
    task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)

You can use Jinja templating with location, entry_group, entry, tag, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Listing an tags on an entry

The CloudDataCatalogListTagsOperator operator get list of the tags on the entry.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

list_tags = CloudDataCatalogListTagsOperator(
    task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)

You can use Jinja templating with location, entry_group, entry, page_size, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

list_tags_result = BashOperator(task_id="list_tags_result", bash_command=f"echo {list_tags.output}")

Managing a tag template fields

Operators uses a TagTemplateField for representing a tag template fields.

Creating a field

The CloudDataCatalogCreateTagTemplateFieldOperator operator get the tag template field.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
    task_id="create_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    tag_template_field_id=FIELD_NAME_2,
    tag_template_field=TagTemplateField(
        display_name="second-field", type_=FieldType(primitive_type="STRING")
    ),
)

You can use Jinja templating with location, tag_template, tag_template_field_id, tag_template_field, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_tag_template_field_result2 = BashOperator(
    task_id="create_tag_template_field_result2",
    bash_command=f"echo {create_tag_template_field.output}",
)

The newly created field ID can be read with the tag_template_field_id key.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

create_entry_group_result2 = BashOperator(
    task_id="create_entry_group_result2",
    bash_command=f"echo {create_entry_group.output}",
)

Renaming a field

The CloudDataCatalogRenameTagTemplateFieldOperator operator rename the tag template field.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
    task_id="rename_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    field=FIELD_NAME_1,
    new_tag_template_field_id=FIELD_NAME_3,
)

You can use Jinja templating with location, tag_template, field, new_tag_template_field_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Updating a field

The CloudDataCatalogUpdateTagTemplateFieldOperator operator get the tag template field.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
    task_id="update_tag_template_field",
    tag_template_field={"display_name": "Updated template field"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    tag_template_field_id=FIELD_NAME_1,
)

You can use Jinja templating with tag_template_field, update_mask, tag_template_field_name, location, tag_template, tag_template_field_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Deleting a field

The CloudDataCatalogDeleteTagTemplateFieldOperator operator delete the tag template field.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
    task_id="delete_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    field=FIELD_NAME_2,
    force=True,
)

You can use Jinja templating with location, tag_template, field, force, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

Search resources

The CloudDataCatalogSearchCatalogOperator operator searches Data Catalog for multiple resources like entries, tags that match a query.

The query parameters should defined using search syntax.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

search_catalog = CloudDataCatalogSearchCatalogOperator(
    task_id="search_catalog", scope={"include_project_ids": [PROJECT_ID]}, query=f"projectid:{PROJECT_ID}"
)

You can use Jinja templating with scope, query, page_size, order_by, retry, timeout, metadata, gcp_conn_id, impersonation_chain parameters which allows you to dynamically determine values.

The result is saved to XCom, which allows it to be used by other operators.

airflow/providers/google/cloud/example_dags/example_datacatalog.py[source]

search_catalog_result = BashOperator(
    task_id="search_catalog_result",
    bash_command=f"echo {search_catalog.output}",
)

Reference

For further information, look at:

Was this entry helpful?