Source code for airflow.providers.amazon.aws.operators.emr
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importastfromtypingimportTYPE_CHECKING,Any,Dict,List,Optional,Sequence,Unionfromuuidimportuuid4fromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.emrimportEmrContainerHook,EmrHook,EmrServerlessHookfromairflow.providers.amazon.aws.links.emrimportEmrClusterLinkfromairflow.providers.amazon.aws.sensors.emrimportEmrServerlessApplicationSensor,EmrServerlessJobSensorifTYPE_CHECKING:fromairflow.utils.contextimportContextfromairflow.compat.functoolsimportcached_property
[docs]classEmrAddStepsOperator(BaseOperator):""" An operator that adds steps to an existing EMR job_flow. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrAddStepsOperator` :param job_flow_id: id of the JobFlow to add steps to. (templated) :param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing job_flow_id. will search for id of JobFlow with matching name in one of the states in param cluster_states. Exactly one cluster like this should exist or will fail. (templated) :param cluster_states: Acceptable cluster states when searching for JobFlow id by job_flow_name. (templated) :param aws_conn_id: aws connection to uses :param steps: boto3 style steps or reference to a steps file (must be '.json') to be added to the jobflow. (templated) :param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id. """
def__init__(self,*,job_flow_id:Optional[str]=None,job_flow_name:Optional[str]=None,cluster_states:Optional[List[str]]=None,aws_conn_id:str='aws_default',steps:Optional[Union[List[dict],str]]=None,**kwargs,):ifnot(job_flow_idisNone)^(job_flow_nameisNone):raiseAirflowException('Exactly one of job_flow_id or job_flow_name must be specified.')super().__init__(**kwargs)cluster_states=cluster_statesor[]steps=stepsor[]self.aws_conn_id=aws_conn_idself.job_flow_id=job_flow_idself.job_flow_name=job_flow_nameself.cluster_states=cluster_statesself.steps=steps
[docs]defexecute(self,context:'Context')->List[str]:emr_hook=EmrHook(aws_conn_id=self.aws_conn_id)emr=emr_hook.get_conn()job_flow_id=self.job_flow_idoremr_hook.get_cluster_id_by_name(str(self.job_flow_name),self.cluster_states)ifnotjob_flow_id:raiseAirflowException(f'No cluster found for name: {self.job_flow_name}')ifself.do_xcom_push:context['ti'].xcom_push(key='job_flow_id',value=job_flow_id)EmrClusterLink.persist(context=context,operator=self,region_name=emr_hook.conn_region_name,aws_partition=emr_hook.conn_partition,job_flow_id=job_flow_id,)self.log.info('Adding steps to %s',job_flow_id)# steps may arrive as a string representing a list# e.g. if we used XCom or a file then: steps="[{ step1 }, { step2 }]"steps=self.stepsifisinstance(steps,str):steps=ast.literal_eval(steps)response=emr.add_job_flow_steps(JobFlowId=job_flow_id,Steps=steps)ifnotresponse['ResponseMetadata']['HTTPStatusCode']==200:raiseAirflowException(f'Adding steps failed: {response}')else:self.log.info('Steps %s added to JobFlow',response['StepIds'])returnresponse['StepIds']
[docs]classEmrContainerOperator(BaseOperator):""" An operator that submits jobs to EMR on EKS virtual clusters. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrContainerOperator` :param name: The name of the job run. :param virtual_cluster_id: The EMR on EKS virtual cluster ID :param execution_role_arn: The IAM role ARN associated with the job run. :param release_label: The Amazon EMR release version to use for the job run. :param job_driver: Job configuration details, e.g. the Spark job parameters. :param configuration_overrides: The configuration overrides for the job run, specifically either application configuration or monitoring configuration. :param client_request_token: The client idempotency token of the job run request. Use this if you want to specify a unique ID to prevent two jobs from getting started. If no token is provided, a UUIDv4 token will be generated for you. :param aws_conn_id: The Airflow connection used for AWS credentials. :param wait_for_completion: Whether or not to wait in the operator for the job to complete. :param poll_interval: Time (in seconds) to wait between two consecutive calls to check query status on EMR :param max_tries: Maximum number of times to wait for the job run to finish. Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state. :param tags: The tags assigned to job runs. Defaults to None """
[docs]defhook(self)->EmrContainerHook:"""Create and return an EmrContainerHook."""returnEmrContainerHook(self.aws_conn_id,virtual_cluster_id=self.virtual_cluster_id,
)
[docs]defexecute(self,context:'Context')->Optional[str]:"""Run job on EMR Containers"""self.job_id=self.hook.submit_job(self.name,self.execution_role_arn,self.release_label,self.job_driver,self.configuration_overrides,self.client_request_token,self.tags,)ifself.wait_for_completion:query_status=self.hook.poll_query_status(self.job_id,self.max_tries,self.poll_interval)ifquery_statusinEmrContainerHook.FAILURE_STATES:error_message=self.hook.get_job_failure_reason(self.job_id)raiseAirflowException(f"EMR Containers job failed. Final state is {query_status}. "f"query_execution_id is {self.job_id}. Error: {error_message}")elifnotquery_statusorquery_statusinEmrContainerHook.INTERMEDIATE_STATES:raiseAirflowException(f"Final state of EMR Containers job is {query_status}. "f"Max tries of poll status exceeded, query_execution_id is {self.job_id}.")returnself.job_id
[docs]defon_kill(self)->None:"""Cancel the submitted job run"""ifself.job_id:self.log.info("Stopping job run with jobId - %s",self.job_id)response=self.hook.stop_query(self.job_id)http_status_code=Nonetry:http_status_code=response["ResponseMetadata"]["HTTPStatusCode"]exceptExceptionasex:self.log.error("Exception while cancelling query: %s",ex)finally:ifhttp_status_codeisNoneorhttp_status_code!=200:self.log.error("Unable to request query cancel on EMR. Exiting")else:self.log.info("Polling EMR for query with id %s to reach final state",self.job_id,)self.hook.poll_query_status(self.job_id)
[docs]classEmrCreateJobFlowOperator(BaseOperator):""" Creates an EMR JobFlow, reading the config from the EMR connection. A dictionary of JobFlow overrides can be passed that override the config from the connection. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrCreateJobFlowOperator` :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node) :param emr_conn_id: emr connection to use for run_job_flow request body. This will be overridden by the job_flow_overrides param :param job_flow_overrides: boto3 style arguments or reference to an arguments file (must be '.json') to override emr_connection extra. (templated) :param region_name: Region named passed to EmrHook """
[docs]defexecute(self,context:'Context')->str:emr=EmrHook(aws_conn_id=self.aws_conn_id,emr_conn_id=self.emr_conn_id,region_name=self.region_name)self.log.info('Creating JobFlow using aws-conn-id: %s, emr-conn-id: %s',self.aws_conn_id,self.emr_conn_id)ifisinstance(self.job_flow_overrides,str):job_flow_overrides:Dict[str,Any]=ast.literal_eval(self.job_flow_overrides)self.job_flow_overrides=job_flow_overrideselse:job_flow_overrides=self.job_flow_overridesresponse=emr.create_job_flow(job_flow_overrides)ifnotresponse['ResponseMetadata']['HTTPStatusCode']==200:raiseAirflowException(f'JobFlow creation failed: {response}')else:job_flow_id=response['JobFlowId']self.log.info('JobFlow with id %s created',job_flow_id)EmrClusterLink.persist(context=context,operator=self,region_name=emr.conn_region_name,aws_partition=emr.conn_partition,job_flow_id=job_flow_id,)returnjob_flow_id
[docs]classEmrModifyClusterOperator(BaseOperator):""" An operator that modifies an existing EMR cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrModifyClusterOperator` :param cluster_id: cluster identifier :param step_concurrency_level: Concurrency of the cluster :param aws_conn_id: aws connection to uses :param do_xcom_push: if True, cluster_id is pushed to XCom with key cluster_id. """
[docs]classEmrTerminateJobFlowOperator(BaseOperator):""" Operator to terminate EMR JobFlows. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrTerminateJobFlowOperator` :param job_flow_id: id of the JobFlow to terminate. (templated) :param aws_conn_id: aws connection to uses """
[docs]defexecute(self,context:'Context')->None:emr_hook=EmrHook(aws_conn_id=self.aws_conn_id)emr=emr_hook.get_conn()EmrClusterLink.persist(context=context,operator=self,region_name=emr_hook.conn_region_name,aws_partition=emr_hook.conn_partition,job_flow_id=self.job_flow_id,)self.log.info('Terminating JobFlow %s',self.job_flow_id)response=emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])ifnotresponse['ResponseMetadata']['HTTPStatusCode']==200:raiseAirflowException(f'JobFlow termination failed: {response}')else:self.log.info('JobFlow with id %s terminated',self.job_flow_id)
[docs]classEmrServerlessCreateApplicationOperator(BaseOperator):""" Operator to create Serverless EMR Application .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrServerlessCreateApplicationOperator` :param release_label: The EMR release version associated with the application. :param job_type: The type of application you want to start, such as Spark or Hive. :param wait_for_completion: If true, wait for the Application to start before returning. Default to True :param client_request_token: The client idempotency token of the application to create. Its value must be unique for each request. :param config: Optional dictionary for arbitrary parameters to the boto API create_application call. :param aws_conn_id: AWS connection to use """def__init__(self,release_label:str,job_type:str,client_request_token:str='',config:Optional[dict]=None,wait_for_completion:bool=True,aws_conn_id:str='aws_default',**kwargs,):self.aws_conn_id=aws_conn_idself.release_label=release_labelself.job_type=job_typeself.wait_for_completion=wait_for_completionself.kwargs=kwargsself.config=configor{}super().__init__(**kwargs)self.client_request_token=client_request_tokenorstr(uuid4())@cached_property
[docs]defhook(self)->EmrServerlessHook:"""Create and return an EmrServerlessHook."""returnEmrServerlessHook(aws_conn_id=self.aws_conn_id)
[docs]defexecute(self,context:'Context'):response=self.hook.conn.create_application(clientToken=self.client_request_token,releaseLabel=self.release_label,type=self.job_type,**self.config,)application_id=response['applicationId']ifresponse['ResponseMetadata']['HTTPStatusCode']!=200:raiseAirflowException(f'Application Creation failed: {response}')self.log.info('EMR serverless application created: %s',application_id)# This should be replaced with a boto waiter when available.self.hook.waiter(get_state_callable=self.hook.conn.get_application,get_state_args={'applicationId':application_id},parse_response=['application','state'],desired_state={'CREATED'},failure_states=EmrServerlessApplicationSensor.FAILURE_STATES,object_type='application',action='created',)self.log.info('Starting application %s',application_id)self.hook.conn.start_application(applicationId=application_id)ifself.wait_for_completion:# This should be replaced with a boto waiter when available.self.hook.waiter(get_state_callable=self.hook.conn.get_application,get_state_args={'applicationId':application_id},parse_response=['application','state'],desired_state={'STARTED'},failure_states=EmrServerlessApplicationSensor.FAILURE_STATES,object_type='application',action='started',)returnapplication_id
[docs]classEmrServerlessStartJobOperator(BaseOperator):""" Operator to start EMR Serverless job. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrServerlessStartJobOperator` :param application_id: ID of the EMR Serverless application to start. :param execution_role_arn: ARN of role to perform action. :param job_driver: Driver that the job runs on. :param configuration_overrides: Configuration specifications to override existing configurations. :param client_request_token: The client idempotency token of the application to create. Its value must be unique for each request. :param config: Optional dictionary for arbitrary parameters to the boto API start_job_run call. :param wait_for_completion: If true, waits for the job to start before returning. Defaults to True. :param aws_conn_id: AWS connection to use """
[docs]defhook(self)->EmrServerlessHook:"""Create and return an EmrServerlessHook."""returnEmrServerlessHook(aws_conn_id=self.aws_conn_id)
[docs]defexecute(self,context:'Context')->Dict:self.log.info('Starting job on Application: %s',self.application_id)app_state=self.hook.conn.get_application(applicationId=self.application_id)['application']['state']ifapp_statenotinEmrServerlessApplicationSensor.SUCCESS_STATES:self.hook.conn.start_application(applicationId=self.application_id)self.hook.waiter(get_state_callable=self.hook.conn.get_application,get_state_args={'applicationId':self.application_id},parse_response=['application','state'],desired_state={'STARTED'},failure_states=EmrServerlessApplicationSensor.FAILURE_STATES,object_type='application',action='started',)response=self.hook.conn.start_job_run(clientToken=self.client_request_token,applicationId=self.application_id,executionRoleArn=self.execution_role_arn,jobDriver=self.job_driver,configurationOverrides=self.configuration_overrides,**self.config,)ifresponse['ResponseMetadata']['HTTPStatusCode']!=200:raiseAirflowException(f'EMR serverless job failed to start: {response}')self.log.info('EMR serverless job started: %s',response['jobRunId'])ifself.wait_for_completion:# This should be replaced with a boto waiter when available.self.hook.waiter(get_state_callable=self.hook.conn.get_job_run,get_state_args={'applicationId':self.application_id,'jobRunId':response['jobRunId'],},parse_response=['jobRun','state'],desired_state=EmrServerlessJobSensor.TERMINAL_STATES,failure_states=EmrServerlessJobSensor.FAILURE_STATES,object_type='job',action='run',)returnresponse['jobRunId']
[docs]classEmrServerlessDeleteApplicationOperator(BaseOperator):""" Operator to delete EMR Serverless application .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrServerlessDeleteApplicationOperator` :param application_id: ID of the EMR Serverless application to delete. :param wait_for_completion: If true, wait for the Application to start before returning. Default to True :param aws_conn_id: AWS connection to use """
[docs]defhook(self)->EmrServerlessHook:"""Create and return an EmrServerlessHook."""returnEmrServerlessHook(aws_conn_id=self.aws_conn_id)
[docs]defexecute(self,context:'Context')->None:self.log.info('Stopping application: %s',self.application_id)self.hook.conn.stop_application(applicationId=self.application_id)# This should be replaced with a boto waiter when available.self.hook.waiter(get_state_callable=self.hook.conn.get_application,get_state_args={'applicationId':self.application_id,},parse_response=['application','state'],desired_state=EmrServerlessApplicationSensor.FAILURE_STATES,failure_states=set(),object_type='application',action='stopped',)self.log.info('Deleting application: %s',self.application_id)response=self.hook.conn.delete_application(applicationId=self.application_id)ifresponse['ResponseMetadata']['HTTPStatusCode']!=200:raiseAirflowException(f'Application deletion failed: {response}')ifself.wait_for_completion:# This should be replaced with a boto waiter when available.self.hook.waiter(get_state_callable=self.hook.conn.get_application,get_state_args={'applicationId':self.application_id},parse_response=['application','state'],desired_state={'TERMINATED'},failure_states=EmrServerlessApplicationSensor.FAILURE_STATES,object_type='application',action='deleted',)self.log.info('EMR serverless application deleted')