Google Cloud Data Catalog Operators¶
The Data Catalog is a fully managed and scalable metadata management service that allows organizations to quickly discover, manage and understand all their data in Google Cloud. It offers:
A simple and easy to use search interface for data discovery, powered by the same Google search technology that supports Gmail and Drive
A flexible and powerful cataloging system for capturing technical and business metadata
An auto-tagging mechanism for sensitive data with DLP API integration
Prerequisite Tasks¶
To use these operators, you must do a few things:
Select or create a Cloud Platform project using the Cloud Console.
Enable billing for your project, as described in the Google Cloud documentation.
Enable the API, as described in the Cloud Console documentation.
Install API libraries via pip.
pip install 'apache-airflow[google]'Detailed information is available for Installation.
Managing an entries¶
Operators uses a Entry
for representing entry
Getting an entry¶
Getting an entry is performed with the
CloudDataCatalogGetEntryOperator
and
CloudDataCatalogLookupEntryOperator
operators.
The CloudDataCatalogGetEntryOperator
use Project ID, Entry Group ID, Entry ID to get the entry.
get_entry = CloudDataCatalogGetEntryOperator(
task_id="get_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
You can use Jinja templating with
location
, entry_group
, entry
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
get_entry_result = BashOperator(task_id="get_entry_result", bash_command=f"echo {get_entry.output}")
The CloudDataCatalogLookupEntryOperator
use the resource name to get the entry.
current_entry_template = (
"//datacatalog.googleapis.com/projects/{project_id}/locations/{location}/"
"entryGroups/{entry_group}/entries/{entry}"
)
lookup_entry_linked_resource = CloudDataCatalogLookupEntryOperator(
task_id="lookup_entry",
linked_resource=current_entry_template.format(
project_id=PROJECT_ID, location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
),
)
You can use Jinja templating with
linked_resource
, sql_resource
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
lookup_entry_result = BashOperator(
task_id="lookup_entry_result",
bash_command="echo \"{{ task_instance.xcom_pull('lookup_entry')['display_name'] }}\"",
)
Creating an entry¶
The CloudDataCatalogCreateEntryOperator
operator create the entry.
create_entry_gcs = CloudDataCatalogCreateEntryOperator(
task_id="create_entry_gcs",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
entry={
"display_name": ENTRY_NAME,
"type_": "FILESET",
"gcs_fileset_spec": {"file_patterns": [f"gs://{BUCKET_NAME}/**"]},
},
)
You can use Jinja templating with
location
, entry_group
, entry_id
, entry
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
The newly created entry ID can be read with the entry_id
key.
create_entry_gcs_result = BashOperator(
task_id="create_entry_gcs_result",
bash_command=f"echo {XComArg(create_entry_gcs, key='entry_id')}",
)
Updating an entry¶
The CloudDataCatalogUpdateEntryOperator
operator update the entry.
update_entry = CloudDataCatalogUpdateEntryOperator(
task_id="update_entry",
entry={"display_name": f"{ENTRY_NAME} UPDATED"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry_id=ENTRY_ID,
)
You can use Jinja templating with
entry
, update_mask
, location
, entry_group
, entry_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Deleting a entry¶
The CloudDataCatalogDeleteEntryOperator
operator delete the entry.
delete_entry = CloudDataCatalogDeleteEntryOperator(
task_id="delete_entry", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
You can use Jinja templating with
location
, entry_group
, entry
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Managing a entry groups¶
Operators uses a Entry
for representing a entry groups.
Creating an entry group¶
The CloudDataCatalogCreateEntryGroupOperator
operator create the entry group.
create_entry_group = CloudDataCatalogCreateEntryGroupOperator(
task_id="create_entry_group",
location=LOCATION,
entry_group_id=ENTRY_GROUP_ID,
entry_group={"display_name": ENTRY_GROUP_NAME},
)
You can use Jinja templating with
location
, entry_group_id
, entry_group
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
The newly created entry group ID can be read with the entry_group_id
key.
create_entry_group_result = BashOperator(
task_id="create_entry_group_result",
bash_command=f"echo {XComArg(create_entry_group, key='entry_group_id')}",
)
Getting an entry group¶
The CloudDataCatalogGetEntryGroupOperator
operator get the entry group.
get_entry_group = CloudDataCatalogGetEntryGroupOperator(
task_id="get_entry_group",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
read_mask=FieldMask(paths=["name", "display_name"]),
)
You can use Jinja templating with
location
, entry_group
, read_mask
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
get_entry_group_result = BashOperator(
task_id="get_entry_group_result",
bash_command=f"echo {get_entry_group.output}",
)
Deleting an entry group¶
The CloudDataCatalogDeleteEntryGroupOperator
operator delete the entry group.
delete_entry_group = CloudDataCatalogDeleteEntryGroupOperator(
task_id="delete_entry_group", location=LOCATION, entry_group=ENTRY_GROUP_ID
)
You can use Jinja templating with
location
, entry_group
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Managing tag templates¶
Operators uses a TagTemplate
for representing a tag templates.
Creating a tag template¶
The CloudDataCatalogCreateTagTemplateOperator
operator get the tag template.
create_tag_template = CloudDataCatalogCreateTagTemplateOperator(
task_id="create_tag_template",
location=LOCATION,
tag_template_id=TEMPLATE_ID,
tag_template={
"display_name": TAG_TEMPLATE_DISPLAY_NAME,
"fields": {
FIELD_NAME_1: TagTemplateField(
display_name="first-field", type_=dict(primitive_type="STRING")
)
},
},
)
You can use Jinja templating with
location
, tag_template_id
, tag_template
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
The newly created tag template ID can be read with the tag_template_id
key.
create_tag_template_result = BashOperator(
task_id="create_tag_template_result",
bash_command=f"echo {XComArg(create_tag_template, key='tag_template_id')}",
)
Deleting a tag template¶
The CloudDataCatalogDeleteTagTemplateOperator
operator delete the tag template.
delete_tag_template = CloudDataCatalogDeleteTagTemplateOperator(
task_id="delete_tag_template", location=LOCATION, tag_template=TEMPLATE_ID, force=True
)
You can use Jinja templating with
location
, tag_template
, force
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Getting a tag template¶
The CloudDataCatalogGetTagTemplateOperator
operator get the tag template.
get_tag_template = CloudDataCatalogGetTagTemplateOperator(
task_id="get_tag_template", location=LOCATION, tag_template=TEMPLATE_ID
)
You can use Jinja templating with
location
, tag_template
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
get_tag_template_result = BashOperator(
task_id="get_tag_template_result",
bash_command=f"echo {get_tag_template.output}",
)
Updating a tag template¶
The CloudDataCatalogUpdateTagTemplateOperator
operator update the tag template.
update_tag_template = CloudDataCatalogUpdateTagTemplateOperator(
task_id="update_tag_template",
tag_template={"display_name": f"{TAG_TEMPLATE_DISPLAY_NAME} UPDATED"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template_id=TEMPLATE_ID,
)
You can use Jinja templating with
tag_template
, update_mask
, location
, tag_template_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Managing tags¶
Operators uses a Tag
for representing a tag.
Creating a tag on an entry¶
The CloudDataCatalogCreateTagOperator
operator get the tag template.
create_tag = CloudDataCatalogCreateTagOperator(
task_id="create_tag",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
template_id=TEMPLATE_ID,
tag={"fields": {FIELD_NAME_1: TagField(string_value="example-value-string")}},
)
You can use Jinja templating with
location
, entry_group
, entry
, tag
, template_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
The newly created tag ID can be read with the tag_id
key.
create_tag_result = BashOperator(
task_id="create_tag_result",
bash_command=f"echo {tag_id}",
)
Updating a tag¶
The CloudDataCatalogUpdateTagOperator
operator update the tag template.
update_tag = CloudDataCatalogUpdateTagOperator(
task_id="update_tag",
tag={"fields": {FIELD_NAME_1: TagField(string_value="new-value-string")}},
update_mask={"paths": ["fields"]},
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
tag_id=tag_id,
)
You can use Jinja templating with
tag
, update_mask
, location
, entry_group
, entry
, tag_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Deleting a tag¶
The CloudDataCatalogDeleteTagOperator
operator delete the tag template.
delete_tag = CloudDataCatalogDeleteTagOperator(
task_id="delete_tag",
location=LOCATION,
entry_group=ENTRY_GROUP_ID,
entry=ENTRY_ID,
tag=tag_id,
)
You can use Jinja templating with
location
, entry_group
, entry
, tag
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Listing tags on an entry¶
The CloudDataCatalogListTagsOperator
operator get list of the tags on the entry.
list_tags = CloudDataCatalogListTagsOperator(
task_id="list_tags", location=LOCATION, entry_group=ENTRY_GROUP_ID, entry=ENTRY_ID
)
You can use Jinja templating with
location
, entry_group
, entry
, page_size
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
list_tags_result = BashOperator(task_id="list_tags_result", bash_command=f"echo {list_tags.output}")
Managing a tag template fields¶
Operators uses a TagTemplateField
for representing a tag template fields.
Creating a field¶
The CloudDataCatalogCreateTagTemplateFieldOperator
operator get the tag template field.
create_tag_template_field = CloudDataCatalogCreateTagTemplateFieldOperator(
task_id="create_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_2,
tag_template_field=TagTemplateField(
display_name="second-field", type_=FieldType(primitive_type="STRING")
),
)
You can use Jinja templating with
location
, tag_template
, tag_template_field_id
, tag_template_field
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
The newly created field ID can be read with the tag_template_field_id
key.
create_tag_template_field_result = BashOperator(
task_id="create_tag_template_field_result",
bash_command=f"echo {XComArg(create_tag_template_field, key='tag_template_field_id')}",
)
Renaming a field¶
The CloudDataCatalogRenameTagTemplateFieldOperator
operator rename the tag template field.
rename_tag_template_field = CloudDataCatalogRenameTagTemplateFieldOperator(
task_id="rename_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_1,
new_tag_template_field_id=FIELD_NAME_3,
)
You can use Jinja templating with
location
, tag_template
, field
, new_tag_template_field_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Updating a field¶
The CloudDataCatalogUpdateTagTemplateFieldOperator
operator get the tag template field.
update_tag_template_field = CloudDataCatalogUpdateTagTemplateFieldOperator(
task_id="update_tag_template_field",
tag_template_field={"display_name": "Updated template field"},
update_mask={"paths": ["display_name"]},
location=LOCATION,
tag_template=TEMPLATE_ID,
tag_template_field_id=FIELD_NAME_1,
)
You can use Jinja templating with
tag_template_field
, update_mask
, tag_template_field_name
, location
, tag_template
, tag_template_field_id
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Deleting a field¶
The CloudDataCatalogDeleteTagTemplateFieldOperator
operator delete the tag template field.
delete_tag_template_field = CloudDataCatalogDeleteTagTemplateFieldOperator(
task_id="delete_tag_template_field",
location=LOCATION,
tag_template=TEMPLATE_ID,
field=FIELD_NAME_2,
force=True,
)
You can use Jinja templating with
location
, tag_template
, field
, force
, project_id
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
Search resources¶
The CloudDataCatalogSearchCatalogOperator
operator searches Data Catalog for multiple resources like entries, tags that match a query.
The query
parameters should defined using search syntax.
search_catalog = CloudDataCatalogSearchCatalogOperator(
task_id="search_catalog",
scope={"include_project_ids": [PROJECT_ID]},
query=f"name:{ENTRY_GROUP_NAME}",
)
You can use Jinja templating with
scope
, query
, page_size
, order_by
, retry
, timeout
, metadata
, gcp_conn_id
, impersonation_chain
parameters which allows you to dynamically determine values.
The result is saved to XCom, which allows it to be used by other operators.
search_catalog_result = BashOperator(
task_id="search_catalog_result",
bash_command=f"echo {search_catalog.output}",
)