Source code for airflow.providers.amazon.aws.operators.emr
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importastimportsysfromdatetimeimportdatetimefromtypingimportTYPE_CHECKING,Any,Dict,List,Optional,Sequence,Unionfromuuidimportuuid4fromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperator,BaseOperatorLink,TaskInstancefromairflow.providers.amazon.aws.hooks.emrimportEmrHookifTYPE_CHECKING:fromairflow.utils.contextimportContextifsys.version_info>=(3,8):fromfunctoolsimportcached_propertyelse:fromcached_propertyimportcached_propertyfromairflow.providers.amazon.aws.hooks.emrimportEmrContainerHook
[docs]classEmrAddStepsOperator(BaseOperator):""" An operator that adds steps to an existing EMR job_flow. :param job_flow_id: id of the JobFlow to add steps to. (templated) :type job_flow_id: Optional[str] :param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing job_flow_id. will search for id of JobFlow with matching name in one of the states in param cluster_states. Exactly one cluster like this should exist or will fail. (templated) :type job_flow_name: Optional[str] :param cluster_states: Acceptable cluster states when searching for JobFlow id by job_flow_name. (templated) :type cluster_states: list :param aws_conn_id: aws connection to uses :type aws_conn_id: str :param steps: boto3 style steps or reference to a steps file (must be '.json') to be added to the jobflow. (templated) :type steps: list|str :param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id. :type do_xcom_push: bool """
def__init__(self,*,job_flow_id:Optional[str]=None,job_flow_name:Optional[str]=None,cluster_states:Optional[List[str]]=None,aws_conn_id:str='aws_default',steps:Optional[Union[List[dict],str]]=None,**kwargs,):ifkwargs.get('xcom_push')isnotNone:raiseAirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")ifnot(job_flow_idisNone)^(job_flow_nameisNone):raiseAirflowException('Exactly one of job_flow_id or job_flow_name must be specified.')super().__init__(**kwargs)cluster_states=cluster_statesor[]steps=stepsor[]self.aws_conn_id=aws_conn_idself.job_flow_id=job_flow_idself.job_flow_name=job_flow_nameself.cluster_states=cluster_statesself.steps=steps
[docs]defexecute(self,context:'Context')->List[str]:emr_hook=EmrHook(aws_conn_id=self.aws_conn_id)emr=emr_hook.get_conn()job_flow_id=self.job_flow_idoremr_hook.get_cluster_id_by_name(str(self.job_flow_name),self.cluster_states)ifnotjob_flow_id:raiseAirflowException(f'No cluster found for name: {self.job_flow_name}')ifself.do_xcom_push:context['ti'].xcom_push(key='job_flow_id',value=job_flow_id)self.log.info('Adding steps to %s',job_flow_id)# steps may arrive as a string representing a list# e.g. if we used XCom or a file then: steps="[{ step1 }, { step2 }]"steps=self.stepsifisinstance(steps,str):steps=ast.literal_eval(steps)response=emr.add_job_flow_steps(JobFlowId=job_flow_id,Steps=steps)ifnotresponse['ResponseMetadata']['HTTPStatusCode']==200:raiseAirflowException(f'Adding steps failed: {response}')else:self.log.info('Steps %s added to JobFlow',response['StepIds'])returnresponse['StepIds']
[docs]classEmrContainerOperator(BaseOperator):""" An operator that submits jobs to EMR on EKS virtual clusters. :param name: The name of the job run. :type name: str :param virtual_cluster_id: The EMR on EKS virtual cluster ID :type virtual_cluster_id: str :param execution_role_arn: The IAM role ARN associated with the job run. :type execution_role_arn: str :param release_label: The Amazon EMR release version to use for the job run. :type release_label: str :param job_driver: Job configuration details, e.g. the Spark job parameters. :type job_driver: dict :param configuration_overrides: The configuration overrides for the job run, specifically either application configuration or monitoring configuration. :type configuration_overrides: dict :param client_request_token: The client idempotency token of the job run request. Use this if you want to specify a unique ID to prevent two jobs from getting started. If no token is provided, a UUIDv4 token will be generated for you. :type client_request_token: str :param aws_conn_id: The Airflow connection used for AWS credentials. :type aws_conn_id: str :param poll_interval: Time (in seconds) to wait between two consecutive calls to check query status on EMR :type poll_interval: int :param max_tries: Maximum number of times to wait for the job run to finish. Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state. :type max_tries: int """
[docs]defhook(self)->EmrContainerHook:"""Create and return an EmrContainerHook."""returnEmrContainerHook(self.aws_conn_id,virtual_cluster_id=self.virtual_cluster_id,
)
[docs]defexecute(self,context:'Context')->Optional[str]:"""Run job on EMR Containers"""self.job_id=self.hook.submit_job(self.name,self.execution_role_arn,self.release_label,self.job_driver,self.configuration_overrides,self.client_request_token,)query_status=self.hook.poll_query_status(self.job_id,self.max_tries,self.poll_interval)ifquery_statusinEmrContainerHook.FAILURE_STATES:error_message=self.hook.get_job_failure_reason(self.job_id)raiseAirflowException(f"EMR Containers job failed. Final state is {query_status}. "f"query_execution_id is {self.job_id}. Error: {error_message}")elifnotquery_statusorquery_statusinEmrContainerHook.INTERMEDIATE_STATES:raiseAirflowException(f"Final state of EMR Containers job is {query_status}. "f"Max tries of poll status exceeded, query_execution_id is {self.job_id}.")returnself.job_id
[docs]defon_kill(self)->None:"""Cancel the submitted job run"""ifself.job_id:self.log.info("Stopping job run with jobId - %s",self.job_id)response=self.hook.stop_query(self.job_id)http_status_code=Nonetry:http_status_code=response["ResponseMetadata"]["HTTPStatusCode"]exceptExceptionasex:self.log.error("Exception while cancelling query: %s",ex)finally:ifhttp_status_codeisNoneorhttp_status_code!=200:self.log.error("Unable to request query cancel on EMR. Exiting")else:self.log.info("Polling EMR for query with id %s to reach final state",self.job_id,)self.hook.poll_query_status(self.job_id)
[docs]classEmrClusterLink(BaseOperatorLink):"""Operator link for EmrCreateJobFlowOperator. It allows users to access the EMR Cluster"""
[docs]defget_link(self,operator:BaseOperator,dttm:datetime)->str:""" Get link to EMR cluster. :param operator: operator :param dttm: datetime :return: url link """ti=TaskInstance(task=operator,execution_date=dttm)flow_id=ti.xcom_pull(task_ids=operator.task_id)return(f'https://console.aws.amazon.com/elasticmapreduce/home#cluster-details:{flow_id}'ifflow_idelse''
)
[docs]classEmrCreateJobFlowOperator(BaseOperator):""" Creates an EMR JobFlow, reading the config from the EMR connection. A dictionary of JobFlow overrides can be passed that override the config from the connection. :param aws_conn_id: aws connection to uses :type aws_conn_id: str :param emr_conn_id: emr connection to use :type emr_conn_id: str :param job_flow_overrides: boto3 style arguments or reference to an arguments file (must be '.json') to override emr_connection extra. (templated) :type job_flow_overrides: dict|str :param region_name: Region named passed to EmrHook :type region_name: Optional[str] """
[docs]defexecute(self,context:'Context')->str:emr=EmrHook(aws_conn_id=self.aws_conn_id,emr_conn_id=self.emr_conn_id,region_name=self.region_name)self.log.info('Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s',self.aws_conn_id,self.emr_conn_id)ifisinstance(self.job_flow_overrides,str):job_flow_overrides:Dict[str,Any]=ast.literal_eval(self.job_flow_overrides)self.job_flow_overrides=job_flow_overrideselse:job_flow_overrides=self.job_flow_overridesresponse=emr.create_job_flow(job_flow_overrides)ifnotresponse['ResponseMetadata']['HTTPStatusCode']==200:raiseAirflowException(f'JobFlow creation failed: {response}')else:self.log.info('JobFlow with id %s created',response['JobFlowId'])returnresponse['JobFlowId']
[docs]classEmrModifyClusterOperator(BaseOperator):""" An operator that modifies an existing EMR cluster. :param cluster_id: cluster identifier :type cluster_id: str :param step_concurrency_level: Concurrency of the cluster :type step_concurrency_level: int :param aws_conn_id: aws connection to uses :type aws_conn_id: str :param do_xcom_push: if True, cluster_id is pushed to XCom with key cluster_id. :type do_xcom_push: bool """
def__init__(self,*,cluster_id:str,step_concurrency_level:int,aws_conn_id:str='aws_default',**kwargs):ifkwargs.get('xcom_push')isnotNone:raiseAirflowException("'xcom_push' was deprecated, use 'do_xcom_push' instead")super().__init__(**kwargs)self.aws_conn_id=aws_conn_idself.cluster_id=cluster_idself.step_concurrency_level=step_concurrency_level
[docs]classEmrTerminateJobFlowOperator(BaseOperator):""" Operator to terminate EMR JobFlows. :param job_flow_id: id of the JobFlow to terminate. (templated) :type job_flow_id: str :param aws_conn_id: aws connection to uses :type aws_conn_id: str """
[docs]defexecute(self,context:'Context')->None:emr=EmrHook(aws_conn_id=self.aws_conn_id).get_conn()self.log.info('Terminating JobFlow %s',self.job_flow_id)response=emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])ifnotresponse['ResponseMetadata']['HTTPStatusCode']==200:raiseAirflowException(f'JobFlow termination failed: {response}')else:self.log.info('JobFlow with id %s terminated',self.job_flow_id)