# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""AWS Glue Data Catalog operators."""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Literal
from botocore.exceptions import ClientError
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.operators.base_aws import AwsBaseOperator
from airflow.providers.amazon.aws.utils.mixins import aws_template_fields
from airflow.utils.helpers import prune_dict
if TYPE_CHECKING:
from airflow.sdk import Context
[docs]
class GlueCatalogCreateDatabaseOperator(AwsBaseOperator[AwsBaseHook]):
"""
Create a database in the AWS Glue Data Catalog.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GlueCatalogCreateDatabaseOperator`
:param database_name: The name of the database to create.
:param description: A description of the database.
:param location_uri: The location of the database (e.g. an S3 path).
:param parameters: Key-value pairs that define properties of the database.
:param catalog_id: The ID of the Data Catalog. Defaults to the account ID.
:param tags: Tags to assign to the database.
:param if_exists: Behavior when the database already exists.
``"fail"`` raises an error, ``"skip"`` logs and returns the database name.
"""
[docs]
aws_hook_class = AwsBaseHook
[docs]
template_fields: tuple[str, ...] = (
*AwsBaseOperator.template_fields,
"database_name",
"description",
"location_uri",
)
def __init__(
self,
*,
database_name: str,
description: str | None = None,
location_uri: str | None = None,
parameters: dict[str, str] | None = None,
catalog_id: str | None = None,
tags: dict[str, str] | None = None,
if_exists: Literal["fail", "skip"] = "skip",
**kwargs,
) -> None:
super().__init__(**kwargs)
[docs]
self.database_name = database_name
[docs]
self.description = description
[docs]
self.location_uri = location_uri
[docs]
self.parameters = parameters
[docs]
self.catalog_id = catalog_id
[docs]
self.if_exists = if_exists
@property
def _hook_parameters(self) -> dict[str, Any]:
return {**super()._hook_parameters, "client_type": "glue"}
[docs]
def execute(self, context: Context) -> str:
database_input: dict[str, Any] = prune_dict(
{
"Name": self.database_name,
"Description": self.description,
"LocationUri": self.location_uri,
"Parameters": self.parameters,
}
)
kwargs: dict[str, Any] = prune_dict(
{
"DatabaseInput": database_input,
"CatalogId": self.catalog_id,
"Tags": self.tags,
}
)
try:
self.hook.conn.create_database(**kwargs)
except ClientError as e:
if e.response["Error"]["Code"] == "AlreadyExistsException" and self.if_exists == "skip":
self.log.info("Database %s already exists, skipping.", self.database_name)
else:
raise
else:
self.log.info("Created Glue Catalog database: %s", self.database_name)
return self.database_name
[docs]
class GlueCatalogDeleteDatabaseOperator(AwsBaseOperator[AwsBaseHook]):
"""
Delete a database from the AWS Glue Data Catalog.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GlueCatalogDeleteDatabaseOperator`
:param database_name: The name of the database to delete.
:param catalog_id: The ID of the Data Catalog. Defaults to the account ID.
"""
[docs]
aws_hook_class = AwsBaseHook
[docs]
template_fields: tuple[str, ...] = (
*AwsBaseOperator.template_fields,
"database_name",
)
def __init__(
self,
*,
database_name: str,
catalog_id: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
[docs]
self.database_name = database_name
[docs]
self.catalog_id = catalog_id
@property
def _hook_parameters(self) -> dict[str, Any]:
return {**super()._hook_parameters, "client_type": "glue"}
[docs]
def execute(self, context: Context) -> None:
kwargs: dict[str, Any] = prune_dict(
{
"Name": self.database_name,
"CatalogId": self.catalog_id,
}
)
self.hook.conn.delete_database(**kwargs)
self.log.info("Deleted Glue Catalog database: %s", self.database_name)
[docs]
class GlueCatalogCreateTableOperator(AwsBaseOperator[AwsBaseHook]):
"""
Create a table in an AWS Glue Data Catalog database.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GlueCatalogCreateTableOperator`
:param database_name: The name of the database. (templated)
:param table_name: The name of the table. (templated)
:param table_input: The ``TableInput`` dict defining the table schema, storage, etc. (templated)
:param catalog_id: The ID of the Data Catalog. Defaults to the account ID. (templated)
:param if_exists: Behavior when the table already exists.
``"fail"`` raises an error, ``"skip"`` logs and returns.
"""
[docs]
aws_hook_class = AwsBaseHook
[docs]
template_fields: tuple[str, ...] = (
*AwsBaseOperator.template_fields,
"database_name",
"table_name",
"catalog_id",
)
[docs]
template_fields_renderers = {"table_input": "json"}
def __init__(
self,
*,
database_name: str,
table_name: str,
table_input: dict[str, Any],
catalog_id: str | None = None,
if_exists: Literal["fail", "skip"] = "skip",
**kwargs,
) -> None:
super().__init__(**kwargs)
[docs]
self.database_name = database_name
[docs]
self.table_name = table_name
[docs]
self.catalog_id = catalog_id
[docs]
self.if_exists = if_exists
@property
def _hook_parameters(self) -> dict[str, Any]:
return {**super()._hook_parameters, "client_type": "glue"}
[docs]
def execute(self, context: Context) -> str:
self.log.info("Creating Glue table %s in database %s", self.table_name, self.database_name)
# Ensure Name is set in TableInput
table_input = {**self.table_input, "Name": self.table_name}
kwargs: dict[str, Any] = prune_dict(
{
"DatabaseName": self.database_name,
"TableInput": table_input,
"CatalogId": self.catalog_id,
}
)
try:
self.hook.conn.create_table(**kwargs)
except ClientError as e:
if e.response["Error"]["Code"] == "AlreadyExistsException" and self.if_exists == "skip":
self.log.info("Table %s already exists, skipping.", self.table_name)
else:
raise
self.log.info("Table %s created.", self.table_name)
return self.table_name
[docs]
class GlueCatalogDeleteTableOperator(AwsBaseOperator[AwsBaseHook]):
"""
Delete a table from an AWS Glue Data Catalog database.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GlueCatalogDeleteTableOperator`
:param database_name: The name of the database. (templated)
:param table_name: The name of the table to delete. (templated)
:param catalog_id: The ID of the Data Catalog. Defaults to the account ID. (templated)
"""
[docs]
aws_hook_class = AwsBaseHook
[docs]
template_fields: tuple[str, ...] = (
*AwsBaseOperator.template_fields,
"database_name",
"table_name",
"catalog_id",
)
def __init__(
self,
*,
database_name: str,
table_name: str,
catalog_id: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
[docs]
self.database_name = database_name
[docs]
self.table_name = table_name
[docs]
self.catalog_id = catalog_id
@property
def _hook_parameters(self) -> dict[str, Any]:
return {**super()._hook_parameters, "client_type": "glue"}
[docs]
def execute(self, context: Context) -> None:
self.log.info("Deleting Glue table %s from database %s", self.table_name, self.database_name)
kwargs: dict[str, Any] = prune_dict(
{
"DatabaseName": self.database_name,
"Name": self.table_name,
"CatalogId": self.catalog_id,
}
)
self.hook.conn.delete_table(**kwargs)
self.log.info("Deleted table %s", self.table_name)
[docs]
class GlueCatalogCreatePartitionOperator(AwsBaseOperator[AwsBaseHook]):
"""
Create a partition in an AWS Glue Data Catalog table.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GlueCatalogCreatePartitionOperator`
:param database_name: The name of the database. (templated)
:param table_name: The name of the table. (templated)
:param partition_input: The ``PartitionInput`` dict defining the partition. (templated)
:param catalog_id: The ID of the Data Catalog. Defaults to the account ID. (templated)
:param if_exists: Behavior when the partition already exists.
``"fail"`` raises an error, ``"skip"`` logs and returns.
"""
[docs]
aws_hook_class = AwsBaseHook
[docs]
template_fields: tuple[str, ...] = (
*AwsBaseOperator.template_fields,
"database_name",
"table_name",
"catalog_id",
)
[docs]
template_fields_renderers = {"partition_input": "json"}
def __init__(
self,
*,
database_name: str,
table_name: str,
partition_input: dict[str, Any],
catalog_id: str | None = None,
if_exists: Literal["fail", "skip"] = "skip",
**kwargs,
) -> None:
super().__init__(**kwargs)
[docs]
self.database_name = database_name
[docs]
self.table_name = table_name
[docs]
self.catalog_id = catalog_id
[docs]
self.if_exists = if_exists
@property
def _hook_parameters(self) -> dict[str, Any]:
return {**super()._hook_parameters, "client_type": "glue"}
[docs]
def execute(self, context: Context) -> None:
self.log.info("Creating partition in table %s.%s", self.database_name, self.table_name)
kwargs: dict[str, Any] = prune_dict(
{
"DatabaseName": self.database_name,
"TableName": self.table_name,
"PartitionInput": self.partition_input,
"CatalogId": self.catalog_id,
}
)
try:
self.hook.conn.create_partition(**kwargs)
except ClientError as e:
if e.response["Error"]["Code"] == "AlreadyExistsException" and self.if_exists == "skip":
self.log.info("Partition already exists, skipping.")
else:
raise
self.log.info("Partition created.")
[docs]
class GlueCatalogBatchDeletePartitionOperator(AwsBaseOperator[AwsBaseHook]):
"""
Delete one or more partitions from an AWS Glue Data Catalog table.
.. seealso::
For more information on how to use this operator, take a look at the guide:
:ref:`howto/operator:GlueCatalogBatchDeletePartitionOperator`
:param database_name: The name of the database. (templated)
:param table_name: The name of the table. (templated)
:param partitions_to_delete: List of partition value dicts to delete. (templated)
:param catalog_id: The ID of the Data Catalog. Defaults to the account ID. (templated)
"""
[docs]
aws_hook_class = AwsBaseHook
[docs]
template_fields: tuple[str, ...] = aws_template_fields(
"database_name", "table_name", "catalog_id", "partitions_to_delete"
)
def __init__(
self,
*,
database_name: str,
table_name: str,
partitions_to_delete: list[dict[str, list[str]]],
catalog_id: str | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
[docs]
self.database_name = database_name
[docs]
self.table_name = table_name
[docs]
self.partitions_to_delete = partitions_to_delete
[docs]
self.catalog_id = catalog_id
@property
def _hook_parameters(self) -> dict[str, Any]:
return {**super()._hook_parameters, "client_type": "glue"}
[docs]
def execute(self, context: Context) -> list[dict[str, Any]]:
self.log.info(
"Deleting %d partitions from %s.%s",
len(self.partitions_to_delete),
self.database_name,
self.table_name,
)
kwargs: dict[str, Any] = prune_dict(
{
"DatabaseName": self.database_name,
"TableName": self.table_name,
"PartitionsToDelete": self.partitions_to_delete,
"CatalogId": self.catalog_id,
}
)
response = self.hook.conn.batch_delete_partition(**kwargs)
errors = response.get("Errors", [])
if errors:
# EntityNotFoundException is expected for idempotent deletes
real_errors = [
e for e in errors if e.get("ErrorDetail", {}).get("ErrorCode") != "EntityNotFoundException"
]
if real_errors:
raise RuntimeError(f"Failed to delete {len(real_errors)} partition(s): {real_errors}")
self.log.info("Some partitions not found (already deleted), continuing.")
self.log.info("Batch delete partitions complete.")
return errors