Google Cloud Dataproc Metastore Operators

Dataproc Metastore is a fully managed, highly available, auto-healing serverless Apache Hive metastore (HMS) that runs on Google Cloud. It supports HMS, serves as a critical component for managing the metadata of relational entities, and provides interoperability between data processing applications in the open source data ecosystem.

For more information about the service visit Dataproc Metastore production documentation <Product documentation

Create a Service

Before you create a dataproc metastore service you need to define the service. For more information about the available fields to pass when creating a service, visit Dataproc Metastore create service API.

A simple service configuration can look as followed:

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

SERVICE = {
    "name": "test-service",
}

With this configuration we can create the service: DataprocMetastoreCreateServiceOperator

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

create_service = DataprocMetastoreCreateServiceOperator(
    task_id="create_service",
    region=REGION,
    project_id=PROJECT_ID,
    service=SERVICE,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

Get a service

To get a service you can use:

DataprocMetastoreGetServiceOperator

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

get_service_details = DataprocMetastoreGetServiceOperator(
    task_id="get_service",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
)

Update a service

You can update the service by providing a service config and an updateMask. In the updateMask argument you specifies the path, relative to Service, of the field to update. For more information on updateMask and other parameters take a look at Dataproc Metastore update service API.

An example of a new service config and the updateMask:

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

SERVICE_TO_UPDATE = {
    "labels": {
        "mylocalmachine": "mylocalmachine",
        "systemtest": "systemtest",
    }
}
UPDATE_MASK = FieldMask(paths=["labels"])

To update a service you can use: DataprocMetastoreUpdateServiceOperator

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

update_service = DataprocMetastoreUpdateServiceOperator(
    task_id="update_service",
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    region=REGION,
    service=SERVICE_TO_UPDATE,
    update_mask=UPDATE_MASK,
    timeout=TIMEOUT,
)

Delete a service

To delete a service you can use:

DataprocMetastoreDeleteServiceOperator

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

delete_service = DataprocMetastoreDeleteServiceOperator(
    task_id="delete_service",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

Export a service metadata

To export metadata you can use:

DataprocMetastoreExportMetadataOperator

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

export_metadata = DataprocMetastoreExportMetadataOperator(
    task_id="export_metadata",
    destination_gcs_folder=DESTINATION_GCS_FOLDER,
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

Restore a service

To restore a service you can use:

DataprocMetastoreRestoreServiceOperator

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

restore_service = DataprocMetastoreRestoreServiceOperator(
    task_id="restore_metastore",
    region=REGION,
    project_id=PROJECT_ID,
    service_id=SERVICE_ID,
    backup_id=BACKUP_ID,
    backup_region=REGION,
    backup_project_id=PROJECT_ID,
    backup_service_id=SERVICE_ID,
    timeout=TIMEOUT,
)

Create a metadata import

Before you create a dataproc metastore metadata import you need to define the metadata import. For more information about the available fields to pass when creating a metadata import, visit Dataproc Metastore create metadata import API.

A simple metadata import configuration can look as followed:

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

METADATA_IMPORT = MetadataImport(
    {
        "name": "test-metadata-import",
        "database_dump": {
            "gcs_uri": GCS_URI,
            "database_type": DB_TYPE,
        },
    }
)

To create a metadata import you can use: DataprocMetastoreCreateMetadataImportOperator

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

import_metadata = DataprocMetastoreCreateMetadataImportOperator(
    task_id="create_metadata_import",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    metadata_import=METADATA_IMPORT,
    metadata_import_id=METADATA_IMPORT_ID,
    timeout=TIMEOUT,
)

Create a Backup

Before you create a dataproc metastore backup of the service you need to define the backup. For more information about the available fields to pass when creating a backup, visit Dataproc Metastore create backup API.

A simple backup configuration can look as followed:

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

BACKUP = {
    "name": "test-backup",
}

With this configuration we can create the backup: DataprocMetastoreCreateBackupOperator

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

backup_service = DataprocMetastoreCreateBackupOperator(
    task_id="create_backup",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    backup=BACKUP,
    backup_id=BACKUP_ID,
    timeout=TIMEOUT,
)

Delete a backup

To delete a backup you can use:

DataprocMetastoreDeleteBackupOperator

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

delete_backup = DataprocMetastoreDeleteBackupOperator(
    task_id="delete_backup",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
    backup_id=BACKUP_ID,
    timeout=TIMEOUT,
)

List backups

To list backups you can use:

DataprocMetastoreListBackupsOperator

airflow/providers/google/cloud/example_dags/example_dataproc_metastore.py[source]

list_backups = DataprocMetastoreListBackupsOperator(
    task_id="list_backups",
    project_id=PROJECT_ID,
    region=REGION,
    service_id=SERVICE_ID,
)

Was this entry helpful?