from__future__importannotationsfrommfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,
[docs]classGlueCrawlerOperator(BaseOperator):""" Creates, updates and triggers an AWS Glue Crawler. AWS Glue Crawler is a serverless service that manages a catalog of metadata tables that contain the inferred schema, format and data types of data stores within the AWS cloud. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GlueCrawlerOperator` :param config: Configurations for the AWS Glue crawler :param aws_conn_id: aws connection to use :param poll_interval: Time (in seconds) to wait between two consecutive calls to check crawler status :param wait_for_completion: Whether to wait for crawl execution completion. (default: True) :param deferrable: If True, the operator will wait asynchronously for the crawl to complete. This implies waiting for completion. This mode requires aiobotocore module to be installed. (default: False) """
[docs]defhook(self)->GlueCrawlerHook:"""Create and return a GlueCrawlerHook."""returnGlueCrawlerHook(self.aws_conn_id,region_name=self.region_name)
[docs]defexecute(self,context:Context):""" Execute AWS Glue Crawler from Airflow. :return: the name of the current glue crawler. """crawler_name=self.config["Name"]ifself.hook.has_crawler(crawler_name):self.hook.update_crawler(**self.config)else:self.hook.create_crawler(**self.config)"Triggering AWS Glue Crawler")self.hook.start_crawler(crawler_name)ifself.deferrable:self.defer(trigger=GlueCrawlerCompleteTrigger(crawler_name=crawler_name,waiter_delay=self.poll_interval,aws_conn_id=self.aws_conn_id,),method_name="execute_complete",)"Waiting for AWS Glue Crawler")self.hook.wait_for_crawler_completion(crawler_name=crawler_name,poll_interval=self.poll_interval)returncrawler_name
[docs]defexecute_complete(self,context,event=None):ifevent["status"]!="success":raiseAirflowException(f"Error in glue crawl: {event}")returnself.config["Name"]