Source code for airflow.providers.google.cloud.operators.dataproc_metastore
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains Google Dataproc Metastore operators."""
from __future__ import annotations
import time
from typing import TYPE_CHECKING, Sequence
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.api_core.retry import Retry, exponential_sleep_generator
from google.cloud.metastore_v1 import MetadataExport, MetadataManagementActivity
from google.cloud.metastore_v1.types import Backup, MetadataImport, Service
from google.cloud.metastore_v1.types.metastore import DatabaseDumpSpec, Restore
from googleapiclient.errors import HttpError
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator, BaseOperatorLink
from airflow.models.xcom import XCom
from airflow.providers.google.cloud.hooks.dataproc_metastore import DataprocMetastoreHook
from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator
from airflow.providers.google.common.links.storage import StorageLink
if TYPE_CHECKING:
from google.protobuf.field_mask_pb2 import FieldMask
from airflow.models.taskinstancekey import TaskInstanceKey
from airflow.utils.context import Context
[docs]class DataprocMetastoreLink(BaseOperatorLink):
"""Helper class for constructing Dataproc Metastore resource link."""
@staticmethod
[docs] def persist(
context: Context,
task_instance: (
DataprocMetastoreCreateServiceOperator
| DataprocMetastoreGetServiceOperator
| DataprocMetastoreRestoreServiceOperator
| DataprocMetastoreUpdateServiceOperator
| DataprocMetastoreListBackupsOperator
| DataprocMetastoreExportMetadataOperator
),
url: str,
):
task_instance.xcom_push(
context=context,
key=DataprocMetastoreLink.key,
value={
"region": task_instance.region,
"service_id": task_instance.service_id,
"project_id": task_instance.project_id,
"url": url,
},
)
[docs] def get_link(
self,
operator: BaseOperator,
*,
ti_key: TaskInstanceKey,
) -> str:
conf = XCom.get_value(key=self.key, ti_key=ti_key)
return (
conf["url"].format(
region=conf["region"],
service_id=conf["service_id"],
project_id=conf["project_id"],
)
if conf
else ""
)
[docs]class DataprocMetastoreDetailedLink(BaseOperatorLink):
"""Helper class for constructing Dataproc Metastore detailed resource link."""
@staticmethod
[docs] def persist(
context: Context,
task_instance: (
DataprocMetastoreCreateBackupOperator | DataprocMetastoreCreateMetadataImportOperator
),
url: str,
resource: str,
):
task_instance.xcom_push(
context=context,
key=DataprocMetastoreDetailedLink.key,
value={
"region": task_instance.region,
"service_id": task_instance.service_id,
"project_id": task_instance.project_id,
"url": url,
"resource": resource,
},
)
[docs] def get_link(
self,
operator: BaseOperator,
*,
ti_key: TaskInstanceKey,
) -> str:
conf = XCom.get_value(key=self.key, ti_key=ti_key)
return (
conf["url"].format(
region=conf["region"],
service_id=conf["service_id"],
project_id=conf["project_id"],
resource=conf["resource"],
)
if conf
else ""
)
[docs]class DataprocMetastoreCreateBackupOperator(GoogleCloudBaseOperator):
"""Create a new backup in a given project and location.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param service_id: Required. The ID of the metastore service, which is used as the final component of
the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
hyphens.
This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
provided, this should not be set.
:param backup: Required. The backup to create. The ``name`` field is ignored. The ID of the created
backup must be provided in the request's ``backup_id`` field.
This corresponds to the ``backup`` field on the ``request`` instance; if ``request`` is provided, this
should not be set.
:param backup_id: Required. The ID of the backup, which is used as the final component of the backup's
name. This value must be between 1 and 64 characters long, begin with a letter, end with a letter or
number, and consist of alphanumeric ASCII characters or hyphens.
This corresponds to the ``backup_id`` field on the ``request`` instance; if ``request`` is provided,
this should not be set.
:param request_id: Optional. A unique id used to identify the request.
:param retry: Optional. Designation of what errors, if any, should be retried.
:param timeout: Optional. The timeout for this request.
:param metadata: Optional. Strings which should be sent along with the request as metadata.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""
def __init__(
self,
*,
project_id: str,
region: str,
service_id: str,
backup: dict | Backup,
backup_id: str,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.project_id = project_id
self.region = region
self.service_id = service_id
self.backup = backup
self.backup_id = backup_id
self.request_id = request_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
[docs] def execute(self, context: Context) -> dict:
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Creating Dataproc Metastore backup: %s", self.backup_id)
try:
operation = hook.create_backup(
project_id=self.project_id,
region=self.region,
service_id=self.service_id,
backup=self.backup,
backup_id=self.backup_id,
request_id=self.request_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
backup = hook.wait_for_operation(self.timeout, operation)
self.log.info("Backup %s created successfully", self.backup_id)
except HttpError as err:
if err.resp.status not in (409, "409"):
raise
self.log.info("Backup %s already exists", self.backup_id)
backup = hook.get_backup(
project_id=self.project_id,
region=self.region,
service_id=self.service_id,
backup_id=self.backup_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
DataprocMetastoreDetailedLink.persist(
context=context, task_instance=self, url=METASTORE_BACKUP_LINK, resource=self.backup_id
)
return Backup.to_dict(backup)
[docs]class DataprocMetastoreCreateMetadataImportOperator(GoogleCloudBaseOperator):
"""Create a new MetadataImport in a given project and location.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param service_id: Required. The ID of the metastore service, which is used as the final component of
the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
hyphens.
This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
provided, this should not be set.
:param metadata_import: Required. The metadata import to create. The ``name`` field is ignored. The ID of
the created metadata import must be provided in the request's ``metadata_import_id`` field.
This corresponds to the ``metadata_import`` field on the ``request`` instance; if ``request`` is
provided, this should not be set.
:param metadata_import_id: Required. The ID of the metadata import, which is used as the final component
of the metadata import's name. This value must be between 1 and 64 characters long, begin with a
letter, end with a letter or number, and consist of alphanumeric ASCII characters or hyphens.
This corresponds to the ``metadata_import_id`` field on the ``request`` instance; if ``request`` is
provided, this should not be set.
:param request_id: Optional. A unique id used to identify the request.
:param retry: Optional. Designation of what errors, if any, should be retried.
:param timeout: Optional. The timeout for this request.
:param metadata: Optional. Strings which should be sent along with the request as metadata.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""
def __init__(
self,
*,
project_id: str,
region: str,
service_id: str,
metadata_import: dict | MetadataImport,
metadata_import_id: str,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.project_id = project_id
self.region = region
self.service_id = service_id
self.metadata_import = metadata_import
self.metadata_import_id = metadata_import_id
self.request_id = request_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
[docs] def execute(self, context: Context):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Creating Dataproc Metastore metadata import: %s", self.metadata_import_id)
operation = hook.create_metadata_import(
project_id=self.project_id,
region=self.region,
service_id=self.service_id,
metadata_import=self.metadata_import,
metadata_import_id=self.metadata_import_id,
request_id=self.request_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
metadata_import = hook.wait_for_operation(self.timeout, operation)
self.log.info("Metadata import %s created successfully", self.metadata_import_id)
DataprocMetastoreDetailedLink.persist(
context=context, task_instance=self, url=METASTORE_IMPORT_LINK, resource=self.metadata_import_id
)
return MetadataImport.to_dict(metadata_import)
[docs]class DataprocMetastoreCreateServiceOperator(GoogleCloudBaseOperator):
"""Create a metastore service in a project and location.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param service: Required. The Metastore service to create. The ``name`` field is ignored. The ID of
the created metastore service must be provided in the request's ``service_id`` field.
This corresponds to the ``service`` field on the ``request`` instance; if ``request`` is provided,
this should not be set.
:param service_id: Required. The ID of the metastore service, which is used as the final component of
the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
hyphens.
This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
provided, this should not be set.
:param request_id: Optional. A unique id used to identify the request.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""
def __init__(
self,
*,
region: str,
project_id: str,
service: dict | Service,
service_id: str,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.region = region
self.project_id = project_id
self.service = service
self.service_id = service_id
self.request_id = request_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
[docs] def execute(self, context: Context) -> dict:
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Creating Dataproc Metastore service: %s", self.project_id)
try:
operation = hook.create_service(
region=self.region,
project_id=self.project_id,
service=self.service,
service_id=self.service_id,
request_id=self.request_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
service = hook.wait_for_operation(self.timeout, operation)
self.log.info("Service %s created successfully", self.service_id)
except HttpError as err:
if err.resp.status not in (409, "409"):
raise
self.log.info("Instance %s already exists", self.service_id)
service = hook.get_service(
region=self.region,
project_id=self.project_id,
service_id=self.service_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
DataprocMetastoreLink.persist(context=context, task_instance=self, url=METASTORE_SERVICE_LINK)
return Service.to_dict(service)
[docs]class DataprocMetastoreDeleteBackupOperator(GoogleCloudBaseOperator):
"""Delete a single backup.
:param project_id: Required. The ID of the Google Cloud project that the backup belongs to.
:param region: Required. The ID of the Google Cloud region that the backup belongs to.
:param service_id: Required. The ID of the metastore service, which is used as the final component of
the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
hyphens.
This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
provided, this should not be set.
:param backup_id: Required. The ID of the backup, which is used as the final component of the backup's
name. This value must be between 1 and 64 characters long, begin with a letter, end with a letter or
number, and consist of alphanumeric ASCII characters or hyphens.
This corresponds to the ``backup_id`` field on the ``request`` instance; if ``request`` is provided,
this should not be set.
:param request_id: Optional. A unique id used to identify the request.
:param retry: Optional. Designation of what errors, if any, should be retried.
:param timeout: Optional. The timeout for this request.
:param metadata: Optional. Strings which should be sent along with the request as metadata.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""
def __init__(
self,
*,
project_id: str,
region: str,
service_id: str,
backup_id: str,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.project_id = project_id
self.region = region
self.service_id = service_id
self.backup_id = backup_id
self.request_id = request_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
[docs] def execute(self, context: Context) -> None:
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Deleting Dataproc Metastore backup: %s", self.backup_id)
operation = hook.delete_backup(
project_id=self.project_id,
region=self.region,
service_id=self.service_id,
backup_id=self.backup_id,
request_id=self.request_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
hook.wait_for_operation(self.timeout, operation)
self.log.info("Backup %s deleted successfully", self.project_id)
[docs]class DataprocMetastoreDeleteServiceOperator(GoogleCloudBaseOperator):
"""Delete a single service.
:param request: The request object. Request message for
[DataprocMetastore.DeleteService][google.cloud.metastore.v1.DataprocMetastore.DeleteService].
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
:param gcp_conn_id:
"""
def __init__(
self,
*,
region: str,
project_id: str,
service_id: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.region = region
self.project_id = project_id
self.service_id = service_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
[docs] def execute(self, context: Context):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Deleting Dataproc Metastore service: %s", self.project_id)
operation = hook.delete_service(
region=self.region,
project_id=self.project_id,
service_id=self.service_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
hook.wait_for_operation(self.timeout, operation)
self.log.info("Service %s deleted successfully", self.project_id)
[docs]class DataprocMetastoreExportMetadataOperator(GoogleCloudBaseOperator):
"""Export metadata from a service.
:param destination_gcs_folder: A Cloud Storage URI of a folder, in the format
``gs://<bucket_name>/<path_inside_bucket>``. A sub-folder
``<export_folder>`` containing exported files will be
created below it.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param service_id: Required. The ID of the metastore service, which is used as the final component of
the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
hyphens.
This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
provided, this should not be set.
:param request_id: Optional. A unique id used to identify the request.
:param retry: Optional. Designation of what errors, if any, should be retried.
:param timeout: Optional. The timeout for this request.
:param metadata: Optional. Strings which should be sent along with the request as metadata.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""
def __init__(
self,
*,
destination_gcs_folder: str,
project_id: str,
region: str,
service_id: str,
request_id: str | None = None,
database_dump_type: DatabaseDumpSpec | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.destination_gcs_folder = destination_gcs_folder
self.project_id = project_id
self.region = region
self.service_id = service_id
self.request_id = request_id
self.database_dump_type = database_dump_type
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
[docs] def execute(self, context: Context):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Exporting metadata from Dataproc Metastore service: %s", self.service_id)
hook.export_metadata(
destination_gcs_folder=self.destination_gcs_folder,
project_id=self.project_id,
region=self.region,
service_id=self.service_id,
request_id=self.request_id,
database_dump_type=self.database_dump_type,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
metadata_export = self._wait_for_export_metadata(hook)
self.log.info("Metadata from service %s exported successfully", self.service_id)
DataprocMetastoreLink.persist(context=context, task_instance=self, url=METASTORE_EXPORT_LINK)
uri = self._get_uri_from_destination(MetadataExport.to_dict(metadata_export)["destination_gcs_uri"])
StorageLink.persist(context=context, task_instance=self, uri=uri, project_id=self.project_id)
return MetadataExport.to_dict(metadata_export)
def _get_uri_from_destination(self, destination_uri: str):
return destination_uri[5:] if destination_uri.startswith("gs://") else destination_uri
def _wait_for_export_metadata(self, hook: DataprocMetastoreHook):
"""Check that export was created successfully.
This is a workaround to an issue parsing result to MetadataExport inside
the SDK.
"""
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
time.sleep(time_to_wait)
service = hook.get_service(
region=self.region,
project_id=self.project_id,
service_id=self.service_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
activities: MetadataManagementActivity = service.metadata_management_activity
metadata_export: MetadataExport = activities.metadata_exports[0]
if metadata_export.state == MetadataExport.State.SUCCEEDED:
return metadata_export
if metadata_export.state == MetadataExport.State.FAILED:
raise AirflowException(
f"Exporting metadata from Dataproc Metastore {metadata_export.name} FAILED"
)
[docs]class DataprocMetastoreGetServiceOperator(GoogleCloudBaseOperator):
"""Get the details of a single service.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param service_id: Required. The ID of the metastore service, which is used as the final component of
the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
hyphens.
This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
provided, this should not be set.
:param retry: Designation of what errors, if any, should be retried.
:param timeout: The timeout for this request.
:param metadata: Strings which should be sent along with the request as metadata.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""
def __init__(
self,
*,
region: str,
project_id: str,
service_id: str,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.region = region
self.project_id = project_id
self.service_id = service_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
[docs] def execute(self, context: Context) -> dict:
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Gets the details of a single Dataproc Metastore service: %s", self.project_id)
result = hook.get_service(
region=self.region,
project_id=self.project_id,
service_id=self.service_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
DataprocMetastoreLink.persist(context=context, task_instance=self, url=METASTORE_SERVICE_LINK)
return Service.to_dict(result)
[docs]class DataprocMetastoreListBackupsOperator(GoogleCloudBaseOperator):
"""List backups in a service.
:param project_id: Required. The ID of the Google Cloud project that the backup belongs to.
:param region: Required. The ID of the Google Cloud region that the backup belongs to.
:param service_id: Required. The ID of the metastore service, which is used as the final component of
the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
hyphens.
This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
provided, this should not be set.
:param retry: Optional. Designation of what errors, if any, should be retried.
:param timeout: Optional. The timeout for this request.
:param metadata: Optional. Strings which should be sent along with the request as metadata.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""
def __init__(
self,
*,
project_id: str,
region: str,
service_id: str,
page_size: int | None = None,
page_token: str | None = None,
filter: str | None = None,
order_by: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.project_id = project_id
self.region = region
self.service_id = service_id
self.page_size = page_size
self.page_token = page_token
self.filter = filter
self.order_by = order_by
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
[docs] def execute(self, context: Context) -> list[dict]:
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Listing Dataproc Metastore backups: %s", self.service_id)
backups = hook.list_backups(
project_id=self.project_id,
region=self.region,
service_id=self.service_id,
page_size=self.page_size,
page_token=self.page_token,
filter=self.filter,
order_by=self.order_by,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
DataprocMetastoreLink.persist(context=context, task_instance=self, url=METASTORE_BACKUPS_LINK)
return [Backup.to_dict(backup) for backup in backups]
[docs]class DataprocMetastoreRestoreServiceOperator(GoogleCloudBaseOperator):
"""Restore a service from a backup.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param service_id: Required. The ID of the metastore service, which is used as the final component of
the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
hyphens.
This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
provided, this should not be set.
:param backup_project_id: Required. The ID of the Google Cloud project that the metastore
service backup to restore from.
:param backup_region: Required. The ID of the Google Cloud region that the metastore
service backup to restore from.
:param backup_service_id: Required. The ID of the metastore service backup to restore from, which is
used as the final component of the metastore service's name. This value must be between 2 and 63
characters long inclusive, begin with a letter, end with a letter or number, and consist
of alphanumeric ASCII characters or hyphens.
:param backup_id: Required. The ID of the metastore service backup to restore from
:param restore_type: Optional. The type of restore. If unspecified, defaults to
``METADATA_ONLY``
:param request_id: Optional. A unique id used to identify the request.
:param retry: Optional. Designation of what errors, if any, should be retried.
:param timeout: Optional. The timeout for this request.
:param metadata: Optional. Strings which should be sent along with the request as metadata.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""
def __init__(
self,
*,
project_id: str,
region: str,
service_id: str,
backup_project_id: str,
backup_region: str,
backup_service_id: str,
backup_id: str,
restore_type: Restore | None = None,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.project_id = project_id
self.region = region
self.service_id = service_id
self.backup_project_id = backup_project_id
self.backup_region = backup_region
self.backup_service_id = backup_service_id
self.backup_id = backup_id
self.restore_type = restore_type
self.request_id = request_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
[docs] def execute(self, context: Context):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info(
"Restoring Dataproc Metastore service: %s from backup: %s", self.service_id, self.backup_id
)
hook.restore_service(
project_id=self.project_id,
region=self.region,
service_id=self.service_id,
backup_project_id=self.backup_project_id,
backup_region=self.backup_region,
backup_service_id=self.backup_service_id,
backup_id=self.backup_id,
restore_type=self.restore_type,
request_id=self.request_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
self._wait_for_restore_service(hook)
self.log.info("Service %s restored from backup %s", self.service_id, self.backup_id)
DataprocMetastoreLink.persist(context=context, task_instance=self, url=METASTORE_SERVICE_LINK)
def _wait_for_restore_service(self, hook: DataprocMetastoreHook):
"""Check that export was created successfully.
This is a workaround to an issue parsing result to MetadataExport inside
the SDK.
"""
for time_to_wait in exponential_sleep_generator(initial=10, maximum=120):
time.sleep(time_to_wait)
service = hook.get_service(
region=self.region,
project_id=self.project_id,
service_id=self.service_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
activities: MetadataManagementActivity = service.metadata_management_activity
restore_service: Restore = activities.restores[0]
if restore_service.state == Restore.State.SUCCEEDED:
return restore_service
if restore_service.state == Restore.State.FAILED:
raise AirflowException("Restoring service FAILED")
[docs]class DataprocMetastoreUpdateServiceOperator(GoogleCloudBaseOperator):
"""Update the parameters of a single service.
:param project_id: Required. The ID of the Google Cloud project that the service belongs to.
:param region: Required. The ID of the Google Cloud region that the service belongs to.
:param service_id: Required. The ID of the metastore service, which is used as the final component of
the metastore service's name. This value must be between 2 and 63 characters long inclusive, begin
with a letter, end with a letter or number, and consist of alphanumeric ASCII characters or
hyphens.
This corresponds to the ``service_id`` field on the ``request`` instance; if ``request`` is
provided, this should not be set.
:param service: Required. The metastore service to update. The server only merges fields in the service
if they are specified in ``update_mask``.
The metastore service's ``name`` field is used to identify the metastore service to be updated.
This corresponds to the ``service`` field on the ``request`` instance; if ``request`` is provided,
this should not be set.
:param update_mask: Required. A field mask used to specify the fields to be overwritten in the metastore
service resource by the update. Fields specified in the ``update_mask`` are relative to the resource
(not to the full request). A field is overwritten if it is in the mask.
This corresponds to the ``update_mask`` field on the ``request`` instance; if ``request`` is provided,
this should not be set.
:param request_id: Optional. A unique id used to identify the request.
:param retry: Optional. Designation of what errors, if any, should be retried.
:param timeout: Optional. The timeout for this request.
:param metadata: Optional. Strings which should be sent along with the request as metadata.
:param gcp_conn_id: The connection ID to use connecting to Google Cloud.
:param impersonation_chain: Optional service account to impersonate using short-term
credentials, or chained list of accounts required to get the access_token
of the last account in the list, which will be impersonated in the request.
If set as a string, the account must grant the originating account
the Service Account Token Creator IAM role.
If set as a sequence, the identities from the list must grant
Service Account Token Creator IAM role to the directly preceding identity, with first
account from the list granting this role to the originating account (templated).
"""
def __init__(
self,
*,
project_id: str,
region: str,
service_id: str,
service: dict | Service,
update_mask: FieldMask,
request_id: str | None = None,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
gcp_conn_id: str = "google_cloud_default",
impersonation_chain: str | Sequence[str] | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.project_id = project_id
self.region = region
self.service_id = service_id
self.service = service
self.update_mask = update_mask
self.request_id = request_id
self.retry = retry
self.timeout = timeout
self.metadata = metadata
self.gcp_conn_id = gcp_conn_id
self.impersonation_chain = impersonation_chain
[docs] def execute(self, context: Context):
hook = DataprocMetastoreHook(
gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain
)
self.log.info("Updating Dataproc Metastore service: %s", self.service.get("name"))
operation = hook.update_service(
project_id=self.project_id,
region=self.region,
service_id=self.service_id,
service=self.service,
update_mask=self.update_mask,
request_id=self.request_id,
retry=self.retry,
timeout=self.timeout,
metadata=self.metadata,
)
hook.wait_for_operation(self.timeout, operation)
self.log.info("Service %s updated successfully", self.service.get("name"))
DataprocMetastoreLink.persist(context=context, task_instance=self, url=METASTORE_SERVICE_LINK)