Google Cloud Data Catalog Operators¶
The Data Catalog is a fully managed and scalable metadata management service that allows organizations to quickly discover, manage and understand all their data in Google Cloud. It offers:
- A simple and easy to use search interface for data discovery, powered by the same Google search technology that supports Gmail and Drive 
- A flexible and powerful cataloging system for capturing technical and business metadata 
- An auto-tagging mechanism for sensitive data with DLP API integration 
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Managing an entries¶
Operators uses a Entry for representing entry
Getting an entry¶
Getting an entry is performed with the
CloudDataCatalogGetEntryOperator and
CloudDataCatalogLookupEntryOperator
operators.
The CloudDataCatalogGetEntryOperator use Project ID, Entry Group ID, Entry ID to get the entry.
get_entry = CloudDataCatalogGetEntryOperator(
    task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
You can use Jinja templating with
location, entry_group, entry, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")
The CloudDataCatalogLookupEntryOperator use the resource name to get the entry.
current_entry_template = (
    "//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
    "entryGroups/{entry_group}/entries/{entry}"
)
lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
    task_id="lookup_entry",
    linked_resource=current_entry_template.format(
        project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
    ),
)
You can use Jinja templating with
linked_resource, sql_resource, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
lookup_entry_result = BashOperator(
    task_id="lookup_entry_result",
    bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
)
Creating an entry¶
The CloudDataCatalogCreateEntryOperator
operator create the entry.
create_entry_gcs = CloudDataCatalogCreateEntryOperator(
    task_id="create_entry_gcs",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry_id=ENTRY_ID,
    entry={
        "display_name": ENTRY_NAME,
        "type_": "FILESET",
        "gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
    },
)
You can use Jinja templating with
location, entry_group, entry_id, entry, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
The newly created entry ID can be read with the entry_id key.
create_entry_gcs_result = BashOperator(
    task_id="create_entry_gcs_result",
    bash_command=f"echo {XComArg(create_entry_gcs, key='entry_id')}",
)
Updating an entry¶
The CloudDataCatalogUpdateEntryOperator
operator update the entry.
update_entry = CloudDataCatalogUpdateEntryOperator(
    task_id="update_entry",
    entry={"display_name": f"{ENTRY_NAME} UPDATED"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    entry_id=ENTRY_ID,
)
You can use Jinja templating with
entry, update_mask, location, entry_group, entry_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
Deleting a entry¶
The CloudDataCatalogDeleteEntryOperator
operator delete the entry.
delete_entry = CloudDataCatalogDeleteEntryOperator(
    task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
You can use Jinja templating with
location, entry_group, entry, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
Managing a entry groups¶
Operators uses a Entry for representing a entry groups.
Creating an entry group¶
The CloudDataCatalogCreateEntryGroupOperator
operator create the entry group.
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
    task_id="create_entry_group",
    location=LOCATION,
    entry_group_id=ENTRY_GROUP_ID,
    entry_group={"display_name": ENTRY_GROUP_NAME},
)
You can use Jinja templating with
location, entry_group_id, entry_group, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
The newly created entry group ID can be read with the entry_group_id key.
create_entry_group_result = BashOperator(
    task_id="create_entry_group_result",
    bash_command=f"echo {XComArg(create_entry_group, key='entry_group_id')}",
)
Getting an entry group¶
The CloudDataCatalogGetEntryGroupOperator
operator get the entry group.
get_entry_group = CloudDataCatalogGetEntryGroupOperator(
    task_id="get_entry_group",
    location=LOCATION,
    entry_group=ENTRY_GROUP_ID,
    read_mask=FieldMask(paths=["name", "display_name"]),
)
You can use Jinja templating with
location, entry_group, read_mask, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
get_entry_group_result = BashOperator(
    task_id="get_entry_group_result",
    bash_command=f"echo {get_entry_group.output}",
)
Deleting an entry group¶
The CloudDataCatalogDeleteEntryGroupOperator
operator delete the entry group.
delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
    task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
)
You can use Jinja templating with
location, entry_group, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
Managing tag templates¶
Operators uses a TagTemplate for representing a tag templates.
Creating a tag template¶
The CloudDataCatalogCreateTagTemplateOperator
operator get the tag template.
create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
    task_id="create_tag_template",
    location=LOCATION,
    tag_template_id=TEMPLATE_ID,
    tag_template={
        "display_name": TAG_TEMPLATE_DISPLAY_NAME,
        "fields": {
            FIELD_NAME_1: TagTemplateField(
                display_name="first-field", type_=dict(primitive_type="STRING")
            )
        },
    },
)
You can use Jinja templating with
location, tag_template_id, tag_template, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
The newly created tag template ID can be read with the tag_template_id key.
create_tag_template_result = BashOperator(
    task_id="create_tag_template_result",
    bash_command=f"echo {XComArg(create_tag_template, key='tag_template_id')}",
)
Deleting a tag template¶
The CloudDataCatalogDeleteTagTemplateOperator
operator delete the tag template.
delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
    task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)
You can use Jinja templating with
location, tag_template, force, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
Getting a tag template¶
The CloudDataCatalogGetTagTemplateOperator
operator get the tag template.
get_tag_template = CloudDataCatalogGetTagTemplateOperator(
    task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
)
You can use Jinja templating with
location, tag_template, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
get_tag_template_result = BashOperator(
    task_id="get_tag_template_result",
    bash_command=f"echo {get_tag_template.output}",
)
Updating a tag template¶
The CloudDataCatalogUpdateTagTemplateOperator
operator update the tag template.
update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
    task_id="update_tag_template",
    tag_template={"display_name": f"{TAG_TEMPLATE_DISPLAY_NAME} UPDATED"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    tag_template_id=TEMPLATE_ID,
)
You can use Jinja templating with
tag_template, update_mask, location, tag_template_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
Managing a tag template fields¶
Operators uses a TagTemplateField for representing a tag template fields.
Creating a field¶
The CloudDataCatalogCreateTagTemplateFieldOperator
operator get the tag template field.
create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
    task_id="create_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    tag_template_field_id=FIELD_NAME_2,
    tag_template_field=TagTemplateField(
        display_name="second-field", type_=FieldType(primitive_type="STRING")
    ),
)
You can use Jinja templating with
location, tag_template, tag_template_field_id, tag_template_field, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
The newly created field ID can be read with the tag_template_field_id key.
create_tag_template_field_result = BashOperator(
    task_id="create_tag_template_field_result",
    bash_command=f"echo {XComArg(create_tag_template_field, key='tag_template_field_id')}",
)
Renaming a field¶
The CloudDataCatalogRenameTagTemplateFieldOperator
operator rename the tag template field.
rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
    task_id="rename_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    field=FIELD_NAME_1,
    new_tag_template_field_id=FIELD_NAME_3,
)
You can use Jinja templating with
location, tag_template, field, new_tag_template_field_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
Updating a field¶
The CloudDataCatalogUpdateTagTemplateFieldOperator
operator get the tag template field.
update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
    task_id="update_tag_template_field",
    tag_template_field={"display_name": "Updated template field"},
    update_mask={"paths": ["display_name"]},
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    tag_template_field_id=FIELD_NAME_1,
)
You can use Jinja templating with
tag_template_field, update_mask, tag_template_field_name, location, tag_template, tag_template_field_id, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
Deleting a field¶
The CloudDataCatalogDeleteTagTemplateFieldOperator
operator delete the tag template field.
delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
    task_id="delete_tag_template_field",
    location=LOCATION,
    tag_template=TEMPLATE_ID,
    field=FIELD_NAME_2,
    force=True,
)
You can use Jinja templating with
location, tag_template, field, force, project_id, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
Search resources¶
The CloudDataCatalogSearchCatalogOperator
operator searches Data Catalog for multiple resources like entries, tags that match a query.
The query parameters should defined using search syntax.
search_catalog = CloudDataCatalogSearchCatalogOperator(
    task_id="search_catalog",
    scope={"include_project_ids": [PROJECT_ID]},
    query=f"name:{ENTRY_GROUP_NAME}",
)
You can use Jinja templating with
scope, query, page_size, order_by, retry, timeout, metadata, gcp_conn_id, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
search_catalog_result = BashOperator(
    task_id="search_catalog_result",
    bash_command=f"echo {search_catalog.output}",
)
Reference¶
For further information, look at: