#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from time import sleep
from airflow.compat.functools import cached_property
from airflow.exceptions import AirflowException
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.providers.amazon.aws.hooks.sts import StsHook
[docs]class GlueCrawlerHook(AwsBaseHook):
"""
Interacts with AWS Glue Crawler.
Additional arguments (such as ``aws_conn_id``) may be specified and
are passed down to the underlying AwsBaseHook.
.. seealso::
:class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook`
"""
def __init__(self, *args, **kwargs):
kwargs["client_type"] = "glue"
super().__init__(*args, **kwargs)
@cached_property
[docs] def glue_client(self):
""":return: AWS Glue client"""
return self.get_conn()
[docs] def has_crawler(self, crawler_name) -> bool:
"""
Checks if the crawler already exists
:param crawler_name: unique crawler name per AWS account
:return: Returns True if the crawler already exists and False if not.
"""
self.log.info("Checking if crawler already exists: %s", crawler_name)
try:
self.get_crawler(crawler_name)
return True
except self.glue_client.exceptions.EntityNotFoundException:
return False
[docs] def get_crawler(self, crawler_name: str) -> dict:
"""
Gets crawler configurations
:param crawler_name: unique crawler name per AWS account
:return: Nested dictionary of crawler configurations
"""
return self.glue_client.get_crawler(Name=crawler_name)["Crawler"]
[docs] def update_crawler(self, **crawler_kwargs) -> bool:
"""
Updates crawler configurations
:param crawler_kwargs: Keyword args that define the configurations used for the crawler
:return: True if crawler was updated and false otherwise
"""
crawler_name = crawler_kwargs["Name"]
current_crawler = self.get_crawler(crawler_name)
tags_updated = self.update_tags(crawler_name, crawler_kwargs.pop("Tags", {}))
update_config = {
key: value
for key, value in crawler_kwargs.items()
if current_crawler.get(key, None) != crawler_kwargs.get(key)
}
if update_config:
self.log.info("Updating crawler: %s", crawler_name)
self.glue_client.update_crawler(**crawler_kwargs)
self.log.info("Updated configurations: %s", update_config)
return True
return tags_updated
[docs] def create_crawler(self, **crawler_kwargs) -> str:
"""
Creates an AWS Glue Crawler
:param crawler_kwargs: Keyword args that define the configurations used to create the crawler
:return: Name of the crawler
"""
crawler_name = crawler_kwargs["Name"]
self.log.info("Creating crawler: %s", crawler_name)
return self.glue_client.create_crawler(**crawler_kwargs)
[docs] def start_crawler(self, crawler_name: str) -> dict:
"""
Triggers the AWS Glue crawler
:param crawler_name: unique crawler name per AWS account
:return: Empty dictionary
"""
self.log.info("Starting crawler %s", crawler_name)
return self.glue_client.start_crawler(Name=crawler_name)
[docs] def wait_for_crawler_completion(self, crawler_name: str, poll_interval: int = 5) -> str:
"""
Waits until Glue crawler completes and
returns the status of the latest crawl run.
Raises AirflowException if the crawler fails or is cancelled.
:param crawler_name: unique crawler name per AWS account
:param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status
:return: Crawler's status
"""
failed_status = ["FAILED", "CANCELLED"]
while True:
crawler = self.get_crawler(crawler_name)
crawler_state = crawler["State"]
if crawler_state == "READY":
self.log.info("State: %s", crawler_state)
self.log.info("crawler_config: %s", crawler)
crawler_status = crawler["LastCrawl"]["Status"]
if crawler_status in failed_status:
raise AirflowException(f"Status: {crawler_status}")
metrics = self.glue_client.get_crawler_metrics(CrawlerNameList=[crawler_name])[
"CrawlerMetricsList"
][0]
self.log.info("Status: %s", crawler_status)
self.log.info("Last Runtime Duration (seconds): %s", metrics["LastRuntimeSeconds"])
self.log.info("Median Runtime Duration (seconds): %s", metrics["MedianRuntimeSeconds"])
self.log.info("Tables Created: %s", metrics["TablesCreated"])
self.log.info("Tables Updated: %s", metrics["TablesUpdated"])
self.log.info("Tables Deleted: %s", metrics["TablesDeleted"])
return crawler_status
else:
self.log.info("Polling for AWS Glue crawler: %s ", crawler_name)
self.log.info("State: %s", crawler_state)
metrics = self.glue_client.get_crawler_metrics(CrawlerNameList=[crawler_name])[
"CrawlerMetricsList"
][0]
time_left = int(metrics["TimeLeftSeconds"])
if time_left > 0:
self.log.info("Estimated Time Left (seconds): %s", time_left)
else:
self.log.info("Crawler should finish soon")
sleep(poll_interval)