Source code for airflow.providers.amazon.aws.hooks.glue_crawler
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importsysimportwarningsfromtimeimportsleepifsys.version_info>=(3,8):fromfunctoolsimportcached_propertyelse:fromcached_propertyimportcached_propertyfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHook
[docs]classGlueCrawlerHook(AwsBaseHook):""" Interacts with AWS Glue Crawler. Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """def__init__(self,*args,**kwargs):kwargs['client_type']='glue'super().__init__(*args,**kwargs)@cached_property
[docs]defhas_crawler(self,crawler_name)->bool:""" Checks if the crawler already exists :param crawler_name: unique crawler name per AWS account :return: Returns True if the crawler already exists and False if not. """self.log.info("Checking if crawler already exists: %s",crawler_name)try:self.get_crawler(crawler_name)returnTrueexceptself.glue_client.exceptions.EntityNotFoundException:returnFalse
[docs]defget_crawler(self,crawler_name:str)->dict:""" Gets crawler configurations :param crawler_name: unique crawler name per AWS account :return: Nested dictionary of crawler configurations """returnself.glue_client.get_crawler(Name=crawler_name)['Crawler']
[docs]defupdate_crawler(self,**crawler_kwargs)->bool:""" Updates crawler configurations :param crawler_kwargs: Keyword args that define the configurations used for the crawler :return: True if crawler was updated and false otherwise """crawler_name=crawler_kwargs['Name']current_crawler=self.get_crawler(crawler_name)update_config={key:valueforkey,valueincrawler_kwargs.items()ifcurrent_crawler[key]!=crawler_kwargs[key]}ifupdate_config!={}:self.log.info("Updating crawler: %s",crawler_name)self.glue_client.update_crawler(**crawler_kwargs)self.log.info("Updated configurations: %s",update_config)returnTrueelse:returnFalse
[docs]defcreate_crawler(self,**crawler_kwargs)->str:""" Creates an AWS Glue Crawler :param crawler_kwargs: Keyword args that define the configurations used to create the crawler :return: Name of the crawler """crawler_name=crawler_kwargs['Name']self.log.info("Creating crawler: %s",crawler_name)crawler=self.glue_client.create_crawler(**crawler_kwargs)returncrawler
[docs]defstart_crawler(self,crawler_name:str)->dict:""" Triggers the AWS Glue crawler :param crawler_name: unique crawler name per AWS account :return: Empty dictionary """self.log.info("Starting crawler %s",crawler_name)crawler=self.glue_client.start_crawler(Name=crawler_name)returncrawler
[docs]defwait_for_crawler_completion(self,crawler_name:str,poll_interval:int=5)->str:""" Waits until Glue crawler completes and returns the status of the latest crawl run. Raises AirflowException if the crawler fails or is cancelled. :param crawler_name: unique crawler name per AWS account :param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status :return: Crawler's status """failed_status=['FAILED','CANCELLED']whileTrue:crawler=self.get_crawler(crawler_name)crawler_state=crawler['State']ifcrawler_state=='READY':self.log.info("State: %s",crawler_state)self.log.info("crawler_config: %s",crawler)crawler_status=crawler['LastCrawl']['Status']ifcrawler_statusinfailed_status:raiseAirflowException(f"Status: {crawler_status}")else:metrics=self.glue_client.get_crawler_metrics(CrawlerNameList=[crawler_name])['CrawlerMetricsList'][0]self.log.info("Status: %s",crawler_status)self.log.info("Last Runtime Duration (seconds): %s",metrics['LastRuntimeSeconds'])self.log.info("Median Runtime Duration (seconds): %s",metrics['MedianRuntimeSeconds'])self.log.info("Tables Created: %s",metrics['TablesCreated'])self.log.info("Tables Updated: %s",metrics['TablesUpdated'])self.log.info("Tables Deleted: %s",metrics['TablesDeleted'])returncrawler_statuselse:self.log.info("Polling for AWS Glue crawler: %s ",crawler_name)self.log.info("State: %s",crawler_state)metrics=self.glue_client.get_crawler_metrics(CrawlerNameList=[crawler_name])['CrawlerMetricsList'][0]time_left=int(metrics['TimeLeftSeconds'])iftime_left>0:self.log.info("Estimated Time Left (seconds): %s",time_left)else:self.log.info("Crawler should finish soon")sleep(poll_interval)
[docs]classAwsGlueCrawlerHook(GlueCrawlerHook):""" This hook is deprecated. Please use :class:`airflow.providers.amazon.aws.hooks.glue_crawler.GlueCrawlerHook`. """def__init__(self,*args,**kwargs):warnings.warn("This hook is deprecated. ""Please use :class:`airflow.providers.amazon.aws.hooks.glue_crawler.GlueCrawlerHook`.",DeprecationWarning,stacklevel=2,)super().__init__(*args,**kwargs)