Source code for airflow.providers.amazon.aws.sensors.emr
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromcollections.abcimportIterable,Sequencefromdatetimeimporttimedeltafromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,Anyfromairflow.configurationimportconffromairflow.exceptionsimport(AirflowException,)fromairflow.providers.amazon.aws.hooks.emrimportEmrContainerHook,EmrHook,EmrServerlessHookfromairflow.providers.amazon.aws.links.emrimportEmrClusterLink,EmrLogsLink,get_log_urifromairflow.providers.amazon.aws.triggers.emrimport(EmrContainerTrigger,EmrStepSensorTrigger,EmrTerminateJobFlowTrigger,)fromairflow.providers.amazon.aws.utilsimportvalidate_execute_complete_eventfromairflow.sensors.baseimportBaseSensorOperatorifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classEmrBaseSensor(BaseSensorOperator):""" Contains general sensor behavior for EMR. Subclasses should implement following methods: - ``get_emr_response()`` - ``state_from_response()`` - ``failure_message_from_response()`` Subclasses should set ``target_states`` and ``failed_states`` fields. :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). """
[docs]defpoke(self,context:Context):response=self.get_emr_response(context=context)ifresponse["ResponseMetadata"]["HTTPStatusCode"]!=200:self.log.info("Bad HTTP response: %s",response)returnFalsestate=self.state_from_response(response)self.log.info("Job flow currently %s",state)ifstateinself.target_states:returnTrueifstateinself.failed_states:raiseAirflowException(f"EMR job failed: {self.failure_message_from_response(response)}")returnFalse
[docs]defget_emr_response(self,context:Context)->dict[str,Any]:""" Make an API call with boto3 and get response. :return: response """raiseNotImplementedError("Please implement get_emr_response() in subclass")
@staticmethod
[docs]defstate_from_response(response:dict[str,Any])->str:""" Get state from boto3 response. :param response: response from AWS API :return: state """raiseNotImplementedError("Please implement state_from_response() in subclass")
@staticmethod
[docs]deffailure_message_from_response(response:dict[str,Any])->str|None:""" Get state from boto3 response. :param response: response from AWS API :return: failure message """raiseNotImplementedError("Please implement failure_message_from_response() in subclass")
[docs]classEmrServerlessJobSensor(BaseSensorOperator):""" Poll the state of the job run until it reaches a terminal state; fails if the job run fails. .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:EmrServerlessJobSensor` :param application_id: application_id to check the state of :param job_run_id: job_run_id to check the state of :param target_states: a set of states to wait for, defaults to 'SUCCESS' :param aws_conn_id: aws connection to use, defaults to 'aws_default' If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). """
[docs]defhook(self)->EmrServerlessHook:"""Create and return an EmrServerlessHook."""returnEmrServerlessHook(aws_conn_id=self.aws_conn_id)
@staticmethod
[docs]deffailure_message_from_response(response:dict[str,Any])->str|None:""" Get failure message from response dictionary. :param response: response from AWS API :return: failure message """returnresponse["jobRun"]["stateDetails"]
[docs]classEmrServerlessApplicationSensor(BaseSensorOperator):""" Poll the state of the application until it reaches a terminal state; fails if the application fails. .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:EmrServerlessApplicationSensor` :param application_id: application_id to check the state of :param target_states: a set of states to wait for, defaults to {'CREATED', 'STARTED'} :param aws_conn_id: aws connection to use, defaults to 'aws_default' If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). """
[docs]defhook(self)->EmrServerlessHook:"""Create and return an EmrServerlessHook."""returnEmrServerlessHook(aws_conn_id=self.aws_conn_id)
@staticmethod
[docs]deffailure_message_from_response(response:dict[str,Any])->str|None:""" Get failure message from response dictionary. :param response: response from AWS API :return: failure message """returnresponse["application"]["stateDetails"]
[docs]classEmrContainerSensor(BaseSensorOperator):""" Poll the state of the job run until it reaches a terminal state; fail if the job run fails. .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:EmrContainerSensor` :param job_id: job_id to check the state of :param max_retries: Number of times to poll for query state before returning the current state, defaults to None :param aws_conn_id: aws connection to use, defaults to 'aws_default' If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node). :param poll_interval: Time in seconds to wait between two consecutive call to check query status on athena, defaults to 10 :param deferrable: Run sensor in the deferrable mode. """
[docs]defexecute_complete(self,context:Context,event:dict[str,Any]|None=None)->None:validated_event=validate_execute_complete_event(event)ifvalidated_event["status"]!="success":raiseAirflowException(f"Error while running job: {validated_event}")self.log.info("Job completed.")
[docs]classEmrNotebookExecutionSensor(EmrBaseSensor):""" Poll the EMR notebook until it reaches any of the target states; raise AirflowException on failure. .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:EmrNotebookExecutionSensor` :param notebook_execution_id: Unique id of the notebook execution to be poked. :target_states: the states the sensor will wait for the execution to reach. Default target_states is ``FINISHED``. :failed_states: if the execution reaches any of the failed_states, the sensor will fail. Default failed_states is ``FAILED``. """
[docs]defstate_from_response(response:dict[str,Any])->str:""" Make an API call with boto3 and get cluster-level details. .. seealso:: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.describe_cluster :return: response """returnresponse["NotebookExecution"]["Status"]
@staticmethod
[docs]deffailure_message_from_response(response:dict[str,Any])->str|None:""" Get failure message from response dictionary. :param response: response from AWS API :return: failure message """cluster_status=response["NotebookExecution"]returncluster_status.get("LastStateChangeReason",None)
[docs]classEmrJobFlowSensor(EmrBaseSensor):""" Poll the EMR JobFlow Cluster until it reaches any of the target states; raise AirflowException on failure. With the default target states, sensor waits cluster to be terminated. When target_states is set to ['RUNNING', 'WAITING'] sensor waits until job flow to be ready (after 'STARTING' and 'BOOTSTRAPPING' states) .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:EmrJobFlowSensor` :param job_flow_id: job_flow_id to check the state of :param target_states: the target states, sensor waits until job flow reaches any of these states. In deferrable mode it would run until reach the terminal state. :param failed_states: the failure states, sensor fails when job flow reaches any of these states :param max_attempts: Maximum number of tries before failing :param deferrable: Run sensor in the deferrable mode. """
[docs]defget_emr_response(self,context:Context)->dict[str,Any]:""" Make an API call with boto3 and get cluster-level details. .. seealso:: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.describe_cluster :return: response """emr_client=self.hook.connself.log.info("Poking cluster %s",self.job_flow_id)response=emr_client.describe_cluster(ClusterId=self.job_flow_id)EmrClusterLink.persist(context=context,operator=self,region_name=self.hook.conn_region_name,aws_partition=self.hook.conn_partition,job_flow_id=self.job_flow_id,)EmrLogsLink.persist(context=context,operator=self,region_name=self.hook.conn_region_name,aws_partition=self.hook.conn_partition,job_flow_id=self.job_flow_id,log_uri=get_log_uri(cluster=response),)returnresponse
@staticmethod
[docs]defstate_from_response(response:dict[str,Any])->str:""" Get state from response dictionary. :param response: response from AWS API :return: current state of the cluster """returnresponse["Cluster"]["Status"]["State"]
@staticmethod
[docs]deffailure_message_from_response(response:dict[str,Any])->str|None:""" Get failure message from response dictionary. :param response: response from AWS API :return: failure message """cluster_status=response["Cluster"]["Status"]state_change_reason=cluster_status.get("StateChangeReason")ifstate_change_reason:return(f"for code: {state_change_reason.get('Code','No code')} "f"with message {state_change_reason.get('Message','Unknown')}")returnNone
[docs]defexecute_complete(self,context:Context,event:dict[str,Any]|None=None)->None:validated_event=validate_execute_complete_event(event)ifvalidated_event["status"]!="success":raiseAirflowException(f"Error while running job: {validated_event}")self.log.info("Job completed.")
[docs]classEmrStepSensor(EmrBaseSensor):""" Poll the state of the step until it reaches any of the target states; raise AirflowException on failure. With the default target states, sensor waits step to be completed. .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/sensor:EmrStepSensor` :param job_flow_id: job_flow_id which contains the step check the state of :param step_id: step to check the state of :param target_states: the target states, sensor waits until step reaches any of these states. In case of deferrable sensor it will for reach to terminal state :param failed_states: the failure states, sensor fails when step reaches any of these states :param max_attempts: Maximum number of tries before failing :param deferrable: Run sensor in the deferrable mode. """
[docs]defget_emr_response(self,context:Context)->dict[str,Any]:""" Make an API call with boto3 and get details about the cluster step. .. seealso:: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/emr.html#EMR.Client.describe_step :return: response """emr_client=self.hook.connself.log.info("Poking step %s on cluster %s",self.step_id,self.job_flow_id)response=emr_client.describe_step(ClusterId=self.job_flow_id,StepId=self.step_id)EmrClusterLink.persist(context=context,operator=self,region_name=self.hook.conn_region_name,aws_partition=self.hook.conn_partition,job_flow_id=self.job_flow_id,)EmrLogsLink.persist(context=context,operator=self,region_name=self.hook.conn_region_name,aws_partition=self.hook.conn_partition,job_flow_id=self.job_flow_id,log_uri=get_log_uri(emr_client=emr_client,job_flow_id=self.job_flow_id),)returnresponse
@staticmethod
[docs]defstate_from_response(response:dict[str,Any])->str:""" Get state from response dictionary. :param response: response from AWS API :return: execution state of the cluster step """returnresponse["Step"]["Status"]["State"]
@staticmethod
[docs]deffailure_message_from_response(response:dict[str,Any])->str|None:""" Get failure message from response dictionary. :param response: response from AWS API :return: failure message """fail_details=response["Step"]["Status"].get("FailureDetails")iffail_details:return(f"for reason {fail_details.get('Reason')} "f"with message {fail_details.get('Message')} and log file {fail_details.get('LogFile')}")returnNone