Source code for airflow.providers.amazon.aws.sensors.emr_base
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromtypingimportAny,Dict,Iterable,Optionalfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.hooks.emrimportEmrHookfromairflow.sensors.baseimportBaseSensorOperator
[docs]classEmrBaseSensor(BaseSensorOperator):""" Contains general sensor behavior for EMR. Subclasses should implement following methods: - ``get_emr_response()`` - ``state_from_response()`` - ``failure_message_from_response()`` Subclasses should set ``target_states`` and ``failed_states`` fields. :param aws_conn_id: aws connection to uses :type aws_conn_id: str """
def__init__(self,*,aws_conn_id:str='aws_default',**kwargs):super().__init__(**kwargs)self.aws_conn_id=aws_conn_idself.target_states:Optional[Iterable[str]]=None# will be set in subclassesself.failed_states:Optional[Iterable[str]]=None# will be set in subclassesself.hook:Optional[EmrHook]=None
[docs]defpoke(self,context):response=self.get_emr_response()ifnotresponse['ResponseMetadata']['HTTPStatusCode']==200:self.log.info('Bad HTTP response: %s',response)returnFalsestate=self.state_from_response(response)self.log.info('Job flow currently %s',state)ifstateinself.target_states:returnTrueifstateinself.failed_states:final_message='EMR job failed'failure_message=self.failure_message_from_response(response)iffailure_message:final_message+=' '+failure_messageraiseAirflowException(final_message)returnFalse
[docs]defget_emr_response(self)->Dict[str,Any]:""" Make an API call with boto3 and get response. :return: response :rtype: dict[str, Any] """raiseNotImplementedError('Please implement get_emr_response() in subclass')
@staticmethod
[docs]defstate_from_response(response:Dict[str,Any])->str:""" Get state from response dictionary. :param response: response from AWS API :type response: dict[str, Any] :return: state :rtype: str """raiseNotImplementedError('Please implement state_from_response() in subclass')
@staticmethod
[docs]deffailure_message_from_response(response:Dict[str,Any])->Optional[str]:""" Get failure message from response dictionary. :param response: response from AWS API :type response: dict[str, Any] :return: failure message :rtype: Optional[str] """raiseNotImplementedError('Please implement failure_message_from_response() in subclass')