Source code for airflow.providers.amazon.aws.operators.emr
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportastimportwarningsfromtypingimportTYPE_CHECKING,Any,Sequencefromuuidimportuuid4fromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.emrimportEmrContainerHook,EmrHook,EmrServerlessHookfromairflow.providers.amazon.aws.links.emrimportEmrClusterLinkfromairflow.providers.amazon.aws.utils.waiterimportwaiterfromairflow.utils.helpersimportexactly_one,prune_dictfromairflow.utils.typesimportNOTSET,ArgNotSetifTYPE_CHECKING:fromairflow.utils.contextimportContextfromairflow.compat.functoolsimportcached_property
[docs]classEmrAddStepsOperator(BaseOperator):""" An operator that adds steps to an existing EMR job_flow. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrAddStepsOperator` :param job_flow_id: id of the JobFlow to add steps to. (templated) :param job_flow_name: name of the JobFlow to add steps to. Use as an alternative to passing job_flow_id. will search for id of JobFlow with matching name in one of the states in param cluster_states. Exactly one cluster like this should exist or will fail. (templated) :param cluster_states: Acceptable cluster states when searching for JobFlow id by job_flow_name. (templated) :param aws_conn_id: aws connection to uses :param steps: boto3 style steps or reference to a steps file (must be '.json') to be added to the jobflow. (templated) :param wait_for_completion: If True, the operator will wait for all the steps to be completed. :param execution_role_arn: The ARN of the runtime role for a step on the cluster. :param do_xcom_push: if True, job_flow_id is pushed to XCom with key job_flow_id. """
def__init__(self,*,job_flow_id:str|None=None,job_flow_name:str|None=None,cluster_states:list[str]|None=None,aws_conn_id:str="aws_default",steps:list[dict]|str|None=None,wait_for_completion:bool=False,waiter_delay:int|None=None,waiter_max_attempts:int|None=None,execution_role_arn:str|None=None,**kwargs,):ifnotexactly_one(job_flow_idisNone,job_flow_nameisNone):raiseAirflowException("Exactly one of job_flow_id or job_flow_name must be specified.")super().__init__(**kwargs)cluster_states=cluster_statesor[]steps=stepsor[]self.aws_conn_id=aws_conn_idself.job_flow_id=job_flow_idself.job_flow_name=job_flow_nameself.cluster_states=cluster_statesself.steps=stepsself.wait_for_completion=wait_for_completionself.waiter_delay=waiter_delayself.waiter_max_attempts=waiter_max_attemptsself.execution_role_arn=execution_role_arn
[docs]defexecute(self,context:Context)->list[str]:emr_hook=EmrHook(aws_conn_id=self.aws_conn_id)job_flow_id=self.job_flow_idoremr_hook.get_cluster_id_by_name(str(self.job_flow_name),self.cluster_states)ifnotjob_flow_id:raiseAirflowException(f"No cluster found for name: {self.job_flow_name}")ifself.do_xcom_push:context["ti"].xcom_push(key="job_flow_id",value=job_flow_id)EmrClusterLink.persist(context=context,operator=self,region_name=emr_hook.conn_region_name,aws_partition=emr_hook.conn_partition,job_flow_id=job_flow_id,)self.log.info("Adding steps to %s",job_flow_id)# steps may arrive as a string representing a list# e.g. if we used XCom or a file then: steps="[{ step1 }, { step2 }]"steps=self.stepsifisinstance(steps,str):steps=ast.literal_eval(steps)returnemr_hook.add_job_flow_steps(job_flow_id=job_flow_id,steps=steps,wait_for_completion=self.wait_for_completion,waiter_delay=self.waiter_delay,waiter_max_attempts=self.waiter_max_attempts,execution_role_arn=self.execution_role_arn,
)
[docs]classEmrStartNotebookExecutionOperator(BaseOperator):""" An operator that starts an EMR notebook execution. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrStartNotebookExecutionOperator` :param editor_id: The unique identifier of the EMR notebook to use for notebook execution. :param relative_path: The path and file name of the notebook file for this execution, relative to the path specified for the EMR notebook. :param cluster_id: The unique identifier of the EMR cluster the notebook is attached to. :param service_role: The name or ARN of the IAM role that is used as the service role for Amazon EMR (the EMR role) for the notebook execution. :param notebook_execution_name: Optional name for the notebook execution. :param notebook_params: Input parameters in JSON format passed to the EMR notebook at runtime for execution. :param: notebook_instance_security_group_id: The unique identifier of the Amazon EC2 security group to associate with the EMR notebook for this notebook execution. :param: master_instance_security_group_id: Optional unique ID of an EC2 security group to associate with the master instance of the EMR cluster for this notebook execution. :param tags: Optional list of key value pair to associate with the notebook execution. :param waiter_max_attempts: Maximum number of tries before failing. :param waiter_delay: Number of seconds between polling the state of the notebook. :param waiter_countdown: Total amount of time the operator will wait for the notebook to stop. Defaults to 25 * 60 seconds. (Deprecated. Please use waiter_max_attempts.) :param waiter_check_interval_seconds: Number of seconds between polling the state of the notebook. Defaults to 60 seconds. (Deprecated. Please use waiter_delay.) """
)def__init__(self,editor_id:str,relative_path:str,cluster_id:str,service_role:str,notebook_execution_name:str|None=None,notebook_params:str|None=None,notebook_instance_security_group_id:str|None=None,master_instance_security_group_id:str|None=None,tags:list|None=None,wait_for_completion:bool=False,aws_conn_id:str="aws_default",# TODO: waiter_max_attempts and waiter_delay should default to None when the other two are deprecated.waiter_max_attempts:int|None|ArgNotSet=NOTSET,waiter_delay:int|None|ArgNotSet=NOTSET,waiter_countdown:int=25*60,waiter_check_interval_seconds:int=60,**kwargs:Any,):ifwaiter_max_attemptsisNOTSET:warnings.warn("The parameter waiter_countdown has been deprecated to standardize ""naming conventions. Please use waiter_max_attempts instead. In the ""future this will default to None and defer to the waiter's default value.")waiter_max_attempts=waiter_countdown//waiter_check_interval_secondsifwaiter_delayisNOTSET:warnings.warn("The parameter waiter_check_interval_seconds has been deprecated to ""standardize naming conventions. Please use waiter_delay instead. In the ""future this will default to None and defer to the waiter's default value.")waiter_delay=waiter_check_interval_secondssuper().__init__(**kwargs)self.editor_id=editor_idself.relative_path=relative_pathself.service_role=service_roleself.notebook_execution_name=notebook_execution_nameorf"emr_notebook_{uuid4()}"self.notebook_params=notebook_paramsor""self.notebook_instance_security_group_id=notebook_instance_security_group_idor""self.tags=tagsor[]self.wait_for_completion=wait_for_completionself.cluster_id=cluster_idself.aws_conn_id=aws_conn_idself.waiter_max_attempts=waiter_max_attemptsself.waiter_delay=waiter_delayself.master_instance_security_group_id=master_instance_security_group_id
[docs]defexecute(self,context:Context):execution_engine={"Id":self.cluster_id,"Type":"EMR","MasterInstanceSecurityGroupId":self.master_instance_security_group_idor"",}emr_hook=EmrHook(aws_conn_id=self.aws_conn_id)response=emr_hook.conn.start_notebook_execution(EditorId=self.editor_id,RelativePath=self.relative_path,NotebookExecutionName=self.notebook_execution_name,NotebookParams=self.notebook_params,ExecutionEngine=execution_engine,ServiceRole=self.service_role,NotebookInstanceSecurityGroupId=self.notebook_instance_security_group_id,Tags=self.tags,)ifresponse["ResponseMetadata"]["HTTPStatusCode"]!=200:raiseAirflowException(f"Starting notebook execution failed: {response}")self.log.info("Notebook execution started: %s",response["NotebookExecutionId"])notebook_execution_id=response["NotebookExecutionId"]ifself.wait_for_completion:emr_hook.get_waiter("notebook_running").wait(NotebookExecutionId=notebook_execution_id,WaiterConfig=prune_dict({"Delay":self.waiter_delay,"MaxAttempts":self.waiter_max_attempts,}),)# The old Waiter method raised an exception if the notebook# failed, adding that here. This could maybe be deprecated# later to bring it in line with how other waiters behave.failure_states={"FAILED"}final_status=emr_hook.conn.describe_notebook_execution(NotebookExecutionId=notebook_execution_id)["NotebookExecution"]["Status"]iffinal_statusinfailure_states:raiseAirflowException(f"Notebook Execution reached failure state {final_status}.")returnnotebook_execution_id
[docs]classEmrStopNotebookExecutionOperator(BaseOperator):""" An operator that stops a running EMR notebook execution. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrStopNotebookExecutionOperator` :param notebook_execution_id: The unique identifier of the notebook execution. :param wait_for_completion: If True, the operator will wait for the notebook. to be in a STOPPED or FINISHED state. Defaults to False. :param aws_conn_id: aws connection to use. :param waiter_max_attempts: Maximum number of tries before failing. :param waiter_delay: Number of seconds between polling the state of the notebook. :param waiter_countdown: Total amount of time the operator will wait for the notebook to stop. Defaults to 25 * 60 seconds. (Deprecated. Please use waiter_max_attempts.) :param waiter_check_interval_seconds: Number of seconds between polling the state of the notebook. Defaults to 60 seconds. (Deprecated. Please use waiter_delay.) """
)def__init__(self,notebook_execution_id:str,wait_for_completion:bool=False,aws_conn_id:str="aws_default",# TODO: waiter_max_attempts and waiter_delay should default to None when the other two are deprecated.waiter_max_attempts:int|None|ArgNotSet=NOTSET,waiter_delay:int|None|ArgNotSet=NOTSET,waiter_countdown:int=25*60,waiter_check_interval_seconds:int=60,**kwargs:Any,):ifwaiter_max_attemptsisNOTSET:warnings.warn("The parameter waiter_countdown has been deprecated to standardize ""naming conventions. Please use waiter_max_attempts instead. In the ""future this will default to None and defer to the waiter's default value.")waiter_max_attempts=waiter_countdown//waiter_check_interval_secondsifwaiter_delayisNOTSET:warnings.warn("The parameter waiter_check_interval_seconds has been deprecated to ""standardize naming conventions. Please use waiter_delay instead. In the ""future this will default to None and defer to the waiter's default value.")waiter_delay=waiter_check_interval_secondssuper().__init__(**kwargs)self.notebook_execution_id=notebook_execution_idself.wait_for_completion=wait_for_completionself.aws_conn_id=aws_conn_idself.waiter_max_attempts=waiter_max_attemptsself.waiter_delay=waiter_delay
[docs]classEmrEksCreateClusterOperator(BaseOperator):""" An operator that creates EMR on EKS virtual clusters. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrEksCreateClusterOperator` :param virtual_cluster_name: The name of the EMR EKS virtual cluster to create. :param eks_cluster_name: The EKS cluster used by the EMR virtual cluster. :param eks_namespace: namespace used by the EKS cluster. :param virtual_cluster_id: The EMR on EKS virtual cluster id. :param aws_conn_id: The Airflow connection used for AWS credentials. :param tags: The tags assigned to created cluster. Defaults to None """
[docs]defhook(self)->EmrContainerHook:"""Create and return an EmrContainerHook."""returnEmrContainerHook(self.aws_conn_id)
[docs]defexecute(self,context:Context)->str|None:"""Create EMR on EKS virtual Cluster"""self.virtual_cluster_id=self.hook.create_emr_on_eks_cluster(self.virtual_cluster_name,self.eks_cluster_name,self.eks_namespace,self.tags)returnself.virtual_cluster_id
[docs]classEmrContainerOperator(BaseOperator):""" An operator that submits jobs to EMR on EKS virtual clusters. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrContainerOperator` :param name: The name of the job run. :param virtual_cluster_id: The EMR on EKS virtual cluster ID :param execution_role_arn: The IAM role ARN associated with the job run. :param release_label: The Amazon EMR release version to use for the job run. :param job_driver: Job configuration details, e.g. the Spark job parameters. :param configuration_overrides: The configuration overrides for the job run, specifically either application configuration or monitoring configuration. :param client_request_token: The client idempotency token of the job run request. Use this if you want to specify a unique ID to prevent two jobs from getting started. If no token is provided, a UUIDv4 token will be generated for you. :param aws_conn_id: The Airflow connection used for AWS credentials. :param wait_for_completion: Whether or not to wait in the operator for the job to complete. :param poll_interval: Time (in seconds) to wait between two consecutive calls to check query status on EMR :param max_tries: Deprecated - use max_polling_attempts instead. :param max_polling_attempts: Maximum number of times to wait for the job run to finish. Defaults to None, which will poll until the job is *not* in a pending, submitted, or running state. :param tags: The tags assigned to job runs. Defaults to None """
def__init__(self,*,name:str,virtual_cluster_id:str,execution_role_arn:str,release_label:str,job_driver:dict,configuration_overrides:dict|None=None,client_request_token:str|None=None,aws_conn_id:str="aws_default",wait_for_completion:bool=True,poll_interval:int=30,max_tries:int|None=None,tags:dict|None=None,max_polling_attempts:int|None=None,**kwargs:Any,)->None:super().__init__(**kwargs)self.name=nameself.virtual_cluster_id=virtual_cluster_idself.execution_role_arn=execution_role_arnself.release_label=release_labelself.job_driver=job_driverself.configuration_overrides=configuration_overridesor{}self.aws_conn_id=aws_conn_idself.client_request_token=client_request_tokenorstr(uuid4())self.wait_for_completion=wait_for_completionself.poll_interval=poll_intervalself.max_polling_attempts=max_polling_attemptsself.tags=tagsself.job_id:str|None=Noneifmax_tries:warnings.warn(f"Parameter `{self.__class__.__name__}.max_tries` is deprecated and will be removed ""in a future release. Please use method `max_polling_attempts` instead.",DeprecationWarning,stacklevel=2,)ifmax_polling_attemptsandmax_polling_attempts!=max_tries:raiseException("max_polling_attempts must be the same value as max_tries")else:self.max_polling_attempts=max_tries@cached_property
[docs]defhook(self)->EmrContainerHook:"""Create and return an EmrContainerHook."""returnEmrContainerHook(self.aws_conn_id,virtual_cluster_id=self.virtual_cluster_id,
)
[docs]defexecute(self,context:Context)->str|None:"""Run job on EMR Containers"""self.job_id=self.hook.submit_job(self.name,self.execution_role_arn,self.release_label,self.job_driver,self.configuration_overrides,self.client_request_token,self.tags,)ifself.wait_for_completion:query_status=self.hook.poll_query_status(self.job_id,max_polling_attempts=self.max_polling_attempts,poll_interval=self.poll_interval,)ifquery_statusinEmrContainerHook.FAILURE_STATES:error_message=self.hook.get_job_failure_reason(self.job_id)raiseAirflowException(f"EMR Containers job failed. Final state is {query_status}. "f"query_execution_id is {self.job_id}. Error: {error_message}")elifnotquery_statusorquery_statusinEmrContainerHook.INTERMEDIATE_STATES:raiseAirflowException(f"Final state of EMR Containers job is {query_status}. "f"Max tries of poll status exceeded, query_execution_id is {self.job_id}.")returnself.job_id
[docs]defon_kill(self)->None:"""Cancel the submitted job run"""ifself.job_id:self.log.info("Stopping job run with jobId - %s",self.job_id)response=self.hook.stop_query(self.job_id)http_status_code=Nonetry:http_status_code=response["ResponseMetadata"]["HTTPStatusCode"]exceptExceptionasex:self.log.error("Exception while cancelling query: %s",ex)finally:ifhttp_status_codeisNoneorhttp_status_code!=200:self.log.error("Unable to request query cancel on EMR. Exiting")else:self.log.info("Polling EMR for query with id %s to reach final state",self.job_id,)self.hook.poll_query_status(self.job_id)
[docs]classEmrCreateJobFlowOperator(BaseOperator):""" Creates an EMR JobFlow, reading the config from the EMR connection. A dictionary of JobFlow overrides can be passed that override the config from the connection. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrCreateJobFlowOperator` :param aws_conn_id: The Airflow connection used for AWS credentials. If this is None or empty then the default boto3 behaviour is used. If running Airflow in a distributed manner and aws_conn_id is None or empty, then default boto3 configuration would be used (and must be maintained on each worker node) :param emr_conn_id: :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`. Use to receive an initial Amazon EMR cluster configuration: ``boto3.client('emr').run_job_flow`` request body. If this is None or empty or the connection does not exist, then an empty initial configuration is used. :param job_flow_overrides: boto3 style arguments or reference to an arguments file (must be '.json') to override specific ``emr_conn_id`` extra parameters. (templated) :param region_name: Region named passed to EmrHook :param wait_for_completion: Whether to finish task immediately after creation (False) or wait for jobflow completion (True) :param waiter_max_attempts: Maximum number of tries before failing. :param waiter_delay: Number of seconds between polling the state of the notebook. :param waiter_countdown: Max. seconds to wait for jobflow completion (only in combination with wait_for_completion=True, None = no limit) (Deprecated. Please use waiter_max_attempts.) :param waiter_check_interval_seconds: Number of seconds between polling the jobflow state. Defaults to 60 seconds. (Deprecated. Please use waiter_delay.) """
def__init__(self,*,aws_conn_id:str="aws_default",emr_conn_id:str|None="emr_default",job_flow_overrides:str|dict[str,Any]|None=None,region_name:str|None=None,wait_for_completion:bool=False,# TODO: waiter_max_attempts and waiter_delay should default to None when the other two are deprecated.waiter_max_attempts:int|None|ArgNotSet=NOTSET,waiter_delay:int|None|ArgNotSet=NOTSET,waiter_countdown:int|None=None,waiter_check_interval_seconds:int=60,**kwargs:Any,):ifwaiter_max_attemptsisNOTSET:warnings.warn("The parameter waiter_countdown has been deprecated to standardize ""naming conventions. Please use waiter_max_attempts instead. In the ""future this will default to None and defer to the waiter's default value.")# waiter_countdown defaults to never timing out, which is not supported# by boto waiters, so we will set it here to "a very long time" for now.waiter_max_attempts=(waiter_countdownor999)//waiter_check_interval_secondsifwaiter_delayisNOTSET:warnings.warn("The parameter waiter_check_interval_seconds has been deprecated to ""standardize naming conventions. Please use waiter_delay instead. In the ""future this will default to None and defer to the waiter's default value.")waiter_delay=waiter_check_interval_secondssuper().__init__(**kwargs)self.aws_conn_id=aws_conn_idself.emr_conn_id=emr_conn_idself.job_flow_overrides=job_flow_overridesor{}self.region_name=region_nameself.wait_for_completion=wait_for_completionself.waiter_max_attempts=waiter_max_attemptsself.waiter_delay=waiter_delayself._job_flow_id:str|None=None@cached_propertydef_emr_hook(self)->EmrHook:"""Create and return an EmrHook."""returnEmrHook(aws_conn_id=self.aws_conn_id,emr_conn_id=self.emr_conn_id,region_name=self.region_name)
[docs]defexecute(self,context:Context)->str|None:self.log.info("Creating job flow using aws_conn_id: %s, emr_conn_id: %s",self.aws_conn_id,self.emr_conn_id)ifisinstance(self.job_flow_overrides,str):job_flow_overrides:dict[str,Any]=ast.literal_eval(self.job_flow_overrides)self.job_flow_overrides=job_flow_overrideselse:job_flow_overrides=self.job_flow_overridesresponse=self._emr_hook.create_job_flow(job_flow_overrides)ifnotresponse["ResponseMetadata"]["HTTPStatusCode"]==200:raiseAirflowException(f"Job flow creation failed: {response}")else:self._job_flow_id=response["JobFlowId"]self.log.info("Job flow with id %s created",self._job_flow_id)EmrClusterLink.persist(context=context,operator=self,region_name=self._emr_hook.conn_region_name,aws_partition=self._emr_hook.conn_partition,job_flow_id=self._job_flow_id,)ifself.wait_for_completion:self._emr_hook.get_waiter("job_flow_waiting").wait(ClusterId=self._job_flow_id,WaiterConfig=prune_dict({"Delay":self.waiter_delay,"MaxAttempts":self.waiter_max_attempts,}),)returnself._job_flow_id
[docs]defon_kill(self)->None:""" Terminate the EMR cluster (job flow). If TerminationProtected=True on the cluster, termination will be unsuccessful. """ifself._job_flow_id:self.log.info("Terminating job flow %s",self._job_flow_id)self._emr_hook.conn.terminate_job_flows(JobFlowIds=[self._job_flow_id])
[docs]classEmrModifyClusterOperator(BaseOperator):""" An operator that modifies an existing EMR cluster. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrModifyClusterOperator` :param cluster_id: cluster identifier :param step_concurrency_level: Concurrency of the cluster :param aws_conn_id: aws connection to uses :param do_xcom_push: if True, cluster_id is pushed to XCom with key cluster_id. """
[docs]classEmrTerminateJobFlowOperator(BaseOperator):""" Operator to terminate EMR JobFlows. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrTerminateJobFlowOperator` :param job_flow_id: id of the JobFlow to terminate. (templated) :param aws_conn_id: aws connection to uses """
[docs]defexecute(self,context:Context)->None:emr_hook=EmrHook(aws_conn_id=self.aws_conn_id)emr=emr_hook.get_conn()EmrClusterLink.persist(context=context,operator=self,region_name=emr_hook.conn_region_name,aws_partition=emr_hook.conn_partition,job_flow_id=self.job_flow_id,)self.log.info("Terminating JobFlow %s",self.job_flow_id)response=emr.terminate_job_flows(JobFlowIds=[self.job_flow_id])ifnotresponse["ResponseMetadata"]["HTTPStatusCode"]==200:raiseAirflowException(f"JobFlow termination failed: {response}")else:self.log.info("JobFlow with id %s terminated",self.job_flow_id)
[docs]classEmrServerlessCreateApplicationOperator(BaseOperator):""" Operator to create Serverless EMR Application .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrServerlessCreateApplicationOperator` :param release_label: The EMR release version associated with the application. :param job_type: The type of application you want to start, such as Spark or Hive. :param wait_for_completion: If true, wait for the Application to start before returning. Default to True. If set to False, ``waiter_countdown`` and ``waiter_check_interval_seconds`` will only be applied when waiting for the application to be in the ``CREATED`` state. :param client_request_token: The client idempotency token of the application to create. Its value must be unique for each request. :param config: Optional dictionary for arbitrary parameters to the boto API create_application call. :param aws_conn_id: AWS connection to use :param waiter_countdown: Total amount of time, in seconds, the operator will wait for the application to start. Defaults to 25 minutes. :param waiter_check_interval_seconds: Number of seconds between polling the state of the application. Defaults to 60 seconds. """def__init__(self,release_label:str,job_type:str,client_request_token:str="",config:dict|None=None,wait_for_completion:bool=True,aws_conn_id:str="aws_default",waiter_countdown:int=25*60,waiter_check_interval_seconds:int=60,**kwargs,):self.aws_conn_id=aws_conn_idself.release_label=release_labelself.job_type=job_typeself.wait_for_completion=wait_for_completionself.kwargs=kwargsself.config=configor{}self.waiter_countdown=waiter_countdownself.waiter_check_interval_seconds=waiter_check_interval_secondssuper().__init__(**kwargs)self.client_request_token=client_request_tokenorstr(uuid4())@cached_property
[docs]defhook(self)->EmrServerlessHook:"""Create and return an EmrServerlessHook."""returnEmrServerlessHook(aws_conn_id=self.aws_conn_id)
[docs]defexecute(self,context:Context):response=self.hook.conn.create_application(clientToken=self.client_request_token,releaseLabel=self.release_label,type=self.job_type,**self.config,)application_id=response["applicationId"]ifresponse["ResponseMetadata"]["HTTPStatusCode"]!=200:raiseAirflowException(f"Application Creation failed: {response}")self.log.info("EMR serverless application created: %s",application_id)# This should be replaced with a boto waiter when available.waiter(get_state_callable=self.hook.conn.get_application,get_state_args={"applicationId":application_id},parse_response=["application","state"],desired_state={"CREATED"},failure_states=EmrServerlessHook.APPLICATION_FAILURE_STATES,object_type="application",action="created",countdown=self.waiter_countdown,check_interval_seconds=self.waiter_check_interval_seconds,)self.log.info("Starting application %s",application_id)self.hook.conn.start_application(applicationId=application_id)ifself.wait_for_completion:# This should be replaced with a boto waiter when available.waiter(get_state_callable=self.hook.conn.get_application,get_state_args={"applicationId":application_id},parse_response=["application","state"],desired_state={"STARTED"},failure_states=EmrServerlessHook.APPLICATION_FAILURE_STATES,object_type="application",action="started",countdown=self.waiter_countdown,check_interval_seconds=self.waiter_check_interval_seconds,)returnapplication_id
[docs]classEmrServerlessStartJobOperator(BaseOperator):""" Operator to start EMR Serverless job. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrServerlessStartJobOperator` :param application_id: ID of the EMR Serverless application to start. :param execution_role_arn: ARN of role to perform action. :param job_driver: Driver that the job runs on. :param configuration_overrides: Configuration specifications to override existing configurations. :param client_request_token: The client idempotency token of the application to create. Its value must be unique for each request. :param config: Optional dictionary for arbitrary parameters to the boto API start_job_run call. :param wait_for_completion: If true, waits for the job to start before returning. Defaults to True. If set to False, ``waiter_countdown`` and ``waiter_check_interval_seconds`` will only be applied when waiting for the application be to in the ``STARTED`` state. :param aws_conn_id: AWS connection to use. :param name: Name for the EMR Serverless job. If not provided, a default name will be assigned. :param waiter_countdown: Total amount of time, in seconds, the operator will wait for the job finish. Defaults to 25 minutes. :param waiter_check_interval_seconds: Number of seconds between polling the state of the job. Defaults to 60 seconds. """
[docs]defhook(self)->EmrServerlessHook:"""Create and return an EmrServerlessHook."""returnEmrServerlessHook(aws_conn_id=self.aws_conn_id)
[docs]defexecute(self,context:Context)->dict:self.log.info("Starting job on Application: %s",self.application_id)app_state=self.hook.conn.get_application(applicationId=self.application_id)["application"]["state"]ifapp_statenotinEmrServerlessHook.APPLICATION_SUCCESS_STATES:self.hook.conn.start_application(applicationId=self.application_id)waiter(get_state_callable=self.hook.conn.get_application,get_state_args={"applicationId":self.application_id},parse_response=["application","state"],desired_state={"STARTED"},failure_states=EmrServerlessHook.APPLICATION_FAILURE_STATES,object_type="application",action="started",countdown=self.waiter_countdown,check_interval_seconds=self.waiter_check_interval_seconds,)response=self.hook.conn.start_job_run(clientToken=self.client_request_token,applicationId=self.application_id,executionRoleArn=self.execution_role_arn,jobDriver=self.job_driver,configurationOverrides=self.configuration_overrides,name=self.name,**self.config,)ifresponse["ResponseMetadata"]["HTTPStatusCode"]!=200:raiseAirflowException(f"EMR serverless job failed to start: {response}")self.log.info("EMR serverless job started: %s",response["jobRunId"])ifself.wait_for_completion:# This should be replaced with a boto waiter when available.waiter(get_state_callable=self.hook.conn.get_job_run,get_state_args={"applicationId":self.application_id,"jobRunId":response["jobRunId"],},parse_response=["jobRun","state"],desired_state=EmrServerlessHook.JOB_SUCCESS_STATES,failure_states=EmrServerlessHook.JOB_FAILURE_STATES,object_type="job",action="run",countdown=self.waiter_countdown,check_interval_seconds=self.waiter_check_interval_seconds,)returnresponse["jobRunId"]
[docs]classEmrServerlessStopApplicationOperator(BaseOperator):""" Operator to stop an EMR Serverless application .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrServerlessStopApplicationOperator` :param application_id: ID of the EMR Serverless application to stop. :param wait_for_completion: If true, wait for the Application to stop before returning. Default to True :param aws_conn_id: AWS connection to use :param waiter_countdown: Total amount of time, in seconds, the operator will wait for the application be stopped. Defaults to 5 minutes. :param waiter_check_interval_seconds: Number of seconds between polling the state of the application. Defaults to 30 seconds. :param force_stop: If set to True, any job for that app that is not in a terminal state will be cancelled. Otherwise, trying to stop an app with running jobs will return an error. If you want to wait for the jobs to finish gracefully, use :class:`airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor` """
[docs]defhook(self)->EmrServerlessHook:"""Create and return an EmrServerlessHook."""returnEmrServerlessHook(aws_conn_id=self.aws_conn_id)
[docs]defexecute(self,context:Context)->None:self.log.info("Stopping application: %s",self.application_id)ifself.force_stop:self.hook.cancel_running_jobs(self.application_id,waiter_config={"Delay":self.waiter_check_interval_seconds,"MaxAttempts":self.waiter_countdown/self.waiter_check_interval_seconds,},)self.hook.conn.stop_application(applicationId=self.application_id)ifself.wait_for_completion:# This should be replaced with a boto waiter when available.waiter(get_state_callable=self.hook.conn.get_application,get_state_args={"applicationId":self.application_id,},parse_response=["application","state"],desired_state=EmrServerlessHook.APPLICATION_FAILURE_STATES,failure_states=set(),object_type="application",action="stopped",countdown=self.waiter_countdown,check_interval_seconds=self.waiter_check_interval_seconds,)self.log.info("EMR serverless application %s stopped successfully",self.application_id)
[docs]classEmrServerlessDeleteApplicationOperator(EmrServerlessStopApplicationOperator):""" Operator to delete EMR Serverless application .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:EmrServerlessDeleteApplicationOperator` :param application_id: ID of the EMR Serverless application to delete. :param wait_for_completion: If true, wait for the Application to be deleted before returning. Defaults to True. Note that this operator will always wait for the application to be STOPPED first. :param aws_conn_id: AWS connection to use :param waiter_countdown: Total amount of time, in seconds, the operator will wait for each step of first, the application to be stopped, and then deleted. Defaults to 25 minutes. :param waiter_check_interval_seconds: Number of seconds between polling the state of the application. Defaults to 60 seconds. :param force_stop: If set to True, any job for that app that is not in a terminal state will be cancelled. Otherwise, trying to delete an app with running jobs will return an error. If you want to wait for the jobs to finish gracefully, use :class:`airflow.providers.amazon.aws.sensors.emr.EmrServerlessJobSensor` """
def__init__(self,application_id:str,wait_for_completion:bool=True,aws_conn_id:str="aws_default",waiter_countdown:int=25*60,waiter_check_interval_seconds:int=60,force_stop:bool=False,**kwargs,):self.wait_for_delete_completion=wait_for_completion# super stops the appsuper().__init__(application_id=application_id,# when deleting an app, we always need to wait for it to stop before we can call delete()wait_for_completion=True,aws_conn_id=aws_conn_id,waiter_countdown=waiter_countdown,waiter_check_interval_seconds=waiter_check_interval_seconds,force_stop=force_stop,**kwargs,)
[docs]defexecute(self,context:Context)->None:# super stops the app (or makes sure it's already stopped)super().execute(context)self.log.info("Now deleting application: %s",self.application_id)response=self.hook.conn.delete_application(applicationId=self.application_id)ifresponse["ResponseMetadata"]["HTTPStatusCode"]!=200:raiseAirflowException(f"Application deletion failed: {response}")ifself.wait_for_delete_completion:# This should be replaced with a boto waiter when available.waiter(get_state_callable=self.hook.conn.get_application,get_state_args={"applicationId":self.application_id},parse_response=["application","state"],desired_state={"TERMINATED"},failure_states=EmrServerlessHook.APPLICATION_FAILURE_STATES,object_type="application",action="deleted",countdown=self.waiter_countdown,check_interval_seconds=self.waiter_check_interval_seconds,)self.log.info("EMR serverless application deleted")