Source code for airflow.providers.amazon.aws.sensors.glue
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromcollections.abcimportSequencefromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Anyfromairflow.configurationimportconffromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.hooks.glueimportGlueDataQualityHook,GlueJobHookfromairflow.providers.amazon.aws.sensors.base_awsimportAwsBaseSensorfromairflow.providers.amazon.aws.triggers.glueimport(GlueDataQualityRuleRecommendationRunCompleteTrigger,GlueDataQualityRuleSetEvaluationRunCompleteTrigger,)fromairflow.providers.amazon.aws.utilsimportvalidate_execute_complete_eventfromairflow.providers.amazon.aws.utils.mixinsimportaws_template_fieldsfromairflow.sensors.baseimportBaseSensorOperatorifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classGlueJobSensor(BaseSensorOperator):""" Waits for an AWS Glue Job to reach any of the status below. 'FAILED', 'STOPPED', 'SUCCEEDED' .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:GlueJobSensor` :param job_name: The AWS Glue Job unique name :param run_id: The AWS Glue current running job identifier :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False) """
[docs]defpoke(self,context:Context):self.log.info("Poking for job run status :for Glue Job %s and ID %s",self.job_name,self.run_id)job_state=self.hook.get_job_state(job_name=self.job_name,run_id=self.run_id)try:ifjob_stateinself.success_states:self.log.info("Exiting Job %s Run State: %s",self.run_id,job_state)returnTrueelifjob_stateinself.errored_states:job_error_message="Exiting Job %s Run State: %s",self.run_id,job_stateself.log.info(job_error_message)raiseAirflowException(job_error_message)else:returnFalsefinally:ifself.verbose:self.hook.print_job_logs(job_name=self.job_name,run_id=self.run_id,continuation_tokens=self.next_log_tokens,)
[docs]classGlueDataQualityRuleSetEvaluationRunSensor(AwsBaseSensor[GlueDataQualityHook]):""" Waits for an AWS Glue data quality ruleset evaluation run to reach any of the status below. 'FAILED', 'STOPPED', 'STOPPING', 'TIMEOUT', 'SUCCEEDED' .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:GlueDataQualityRuleSetEvaluationRunSensor` :param evaluation_run_id: The AWS Glue data quality ruleset evaluation run identifier. :param verify_result_status: Validate all the ruleset rules evaluation run results, If any of the rule status is Fail or Error then an exception is thrown. (default: True) :param show_results: Displays all the ruleset rules evaluation run results. (default: True) :param deferrable: If True, the sensor will operate in deferrable mode. This mode requires aiobotocore module to be installed. (default: False, but can be overridden in config file by setting default_deferrable to True) :param poke_interval: Polling period in seconds to check for the status of the job. (default: 120) :param max_retries: Number of times before returning the current state. (default: 60) :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :param verify: Whether to verify SSL certificates. See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html :param botocore_config: Configuration dictionary (key-values) for botocore client. See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """
[docs]defexecute_complete(self,context:Context,event:dict[str,Any]|None=None)->None:event=validate_execute_complete_event(event)ifevent["status"]!="success":message=f"Error: AWS Glue data quality ruleset evaluation run: {event}"raiseAirflowException(message)self.hook.validate_evaluation_run_results(evaluation_run_id=event["evaluation_run_id"],show_results=self.show_results,verify_result_status=self.verify_result_status,)self.log.info("AWS Glue data quality ruleset evaluation run completed.")
[docs]defpoke(self,context:Context):self.log.info("Poking for AWS Glue data quality ruleset evaluation run RunId: %s",self.evaluation_run_id)response=self.hook.conn.get_data_quality_ruleset_evaluation_run(RunId=self.evaluation_run_id)status=response.get("Status")ifstatusinself.SUCCESS_STATES:self.hook.validate_evaluation_run_results(evaluation_run_id=self.evaluation_run_id,show_results=self.show_results,verify_result_status=self.verify_result_status,)self.log.info("AWS Glue data quality ruleset evaluation run completed RunId: %s Run State: %s",self.evaluation_run_id,response["Status"],)returnTrueelifstatusinself.FAILURE_STATES:job_error_message=(f"Error: AWS Glue data quality ruleset evaluation run RunId: {self.evaluation_run_id} Run "f"Status: {status}"f": {response.get('ErrorString')}")self.log.info(job_error_message)raiseAirflowException(job_error_message)else:returnFalse
[docs]classGlueDataQualityRuleRecommendationRunSensor(AwsBaseSensor[GlueDataQualityHook]):""" Waits for an AWS Glue data quality recommendation run to reach any of the status below. 'FAILED', 'STOPPED', 'STOPPING', 'TIMEOUT', 'SUCCEEDED' .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:GlueDataQualityRuleRecommendationRunSensor` :param recommendation_run_id: The AWS Glue data quality rule recommendation run identifier. :param show_results: Displays the recommended ruleset (a set of rules), when recommendation run completes. (default: True) :param deferrable: If True, the sensor will operate in deferrable mode. This mode requires aiobotocore module to be installed. (default: False, but can be overridden in config file by setting default_deferrable to True) :param poke_interval: Polling period in seconds to check for the status of the job. (default: 120) :param max_retries: Number of times before returning the current state. (default: 60) :param aws_conn_id: The Airflow connection used for AWS credentials. If this is ``None`` or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param region_name: AWS region_name. If not specified then the default boto3 behaviour is used. :param verify: Whether to verify SSL certificates. See: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/core/session.html :param botocore_config: Configuration dictionary (key-values) for botocore client. See: https://botocore.amazonaws.com/v1/documentation/api/latest/reference/config.html """
[docs]defexecute_complete(self,context:Context,event:dict[str,Any]|None=None)->None:event=validate_execute_complete_event(event)ifevent["status"]!="success":message=f"Error: AWS Glue data quality recommendation run: {event}"raiseAirflowException(message)ifself.show_results:self.hook.log_recommendation_results(run_id=self.recommendation_run_id)self.log.info("AWS Glue data quality recommendation run completed.")
[docs]defpoke(self,context:Context)->bool:self.log.info("Poking for AWS Glue data quality recommendation run RunId: %s",self.recommendation_run_id)response=self.hook.conn.get_data_quality_rule_recommendation_run(RunId=self.recommendation_run_id)status=response.get("Status")ifstatusinself.SUCCESS_STATES:ifself.show_results:self.hook.log_recommendation_results(run_id=self.recommendation_run_id)self.log.info("AWS Glue data quality recommendation run completed RunId: %s Run State: %s",self.recommendation_run_id,response["Status"],)returnTrueelifstatusinself.FAILURE_STATES:job_error_message=(f"Error: AWS Glue data quality recommendation run RunId: {self.recommendation_run_id} Run "f"Status: {status}"f": {response.get('ErrorString')}")self.log.info(job_error_message)raiseAirflowException(job_error_message)else:returnFalse