Source code for airflow.providers.amazon.aws.hooks.emr
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportjsonimporttimeimportwarningsfromtypingimportAnyfrombotocore.exceptionsimportClientErrorfromairflow.exceptionsimportAirflowException,AirflowNotFoundExceptionfromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHookfromairflow.providers.amazon.aws.utils.waiter_with_loggingimportwait
[docs]classEmrHook(AwsBaseHook):""" Interact with Amazon Elastic MapReduce Service (EMR). Provide thick wrapper around :external+boto3:py:class:`boto3.client("emr") <EMR.Client>`. :param emr_conn_id: :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>`. This attribute is only necessary when using the :meth:`airflow.providers.amazon.aws.hooks.emr.EmrHook.create_job_flow`. Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """
[docs]defget_cluster_id_by_name(self,emr_cluster_name:str,cluster_states:list[str])->str|None:""" Fetch id of EMR cluster with given name and (optional) states; returns only if single id is found. .. seealso:: - :external+boto3:py:meth:`EMR.Client.list_clusters` :param emr_cluster_name: Name of a cluster to find :param cluster_states: State(s) of cluster to find :return: id of the EMR cluster """response_iterator=(self.get_conn().get_paginator("list_clusters").paginate(ClusterStates=cluster_states))matching_clusters=[clusterforpageinresponse_iteratorforclusterinpage["Clusters"]ifcluster["Name"]==emr_cluster_name]iflen(matching_clusters)==1:cluster_id=matching_clusters[0]["Id"]self.log.info("Found cluster name = %s id = %s",emr_cluster_name,cluster_id)returncluster_ideliflen(matching_clusters)>1:raiseAirflowException(f"More than one cluster found for name {emr_cluster_name}")else:self.log.info("No cluster found for name %s",emr_cluster_name)returnNone
[docs]defcreate_job_flow(self,job_flow_overrides:dict[str,Any])->dict[str,Any]:""" Create and start running a new cluster (job flow). .. seealso:: - :external+boto3:py:meth:`EMR.Client.run_job_flow` This method uses ``EmrHook.emr_conn_id`` to receive the initial Amazon EMR cluster configuration. If ``EmrHook.emr_conn_id`` is empty or the connection does not exist, then an empty initial configuration is used. :param job_flow_overrides: Is used to overwrite the parameters in the initial Amazon EMR configuration cluster. The resulting configuration will be used in the :external+boto3:py:meth:`EMR.Client.run_job_flow`. .. seealso:: - :ref:`Amazon Elastic MapReduce Connection <howto/connection:emr>` - :external+boto3:py:meth:`EMR.Client.run_job_flow` - `API RunJobFlow <https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html>`_ """config={}ifself.emr_conn_id:try:emr_conn=self.get_connection(self.emr_conn_id)exceptAirflowNotFoundException:warnings.warn(f"Unable to find {self.hook_name} Connection ID {self.emr_conn_id!r}, ""using an empty initial configuration. If you want to get rid of this warning ""message please provide a valid `emr_conn_id` or set it to None.",UserWarning,stacklevel=2,)else:ifemr_conn.conn_typeandemr_conn.conn_type!=self.conn_type:warnings.warn(f"{self.hook_name} Connection expected connection type {self.conn_type!r}, "f"Connection {self.emr_conn_id!r} has conn_type={emr_conn.conn_type!r}. "f"This connection might not work correctly.",UserWarning,stacklevel=2,)config=emr_conn.extra_dejson.copy()config.update(job_flow_overrides)response=self.get_conn().run_job_flow(**config)returnresponse
[docs]defadd_job_flow_steps(self,job_flow_id:str,steps:list[dict]|str|None=None,wait_for_completion:bool=False,waiter_delay:int|None=None,waiter_max_attempts:int|None=None,execution_role_arn:str|None=None,)->list[str]:""" Add new steps to a running cluster. .. seealso:: - :external+boto3:py:meth:`EMR.Client.add_job_flow_steps` :param job_flow_id: The id of the job flow to which the steps are being added :param steps: A list of the steps to be executed by the job flow :param wait_for_completion: If True, wait for the steps to be completed. Default is False :param waiter_delay: The amount of time in seconds to wait between attempts. Default is 5 :param waiter_max_attempts: The maximum number of attempts to be made. Default is 100 :param execution_role_arn: The ARN of the runtime role for a step on the cluster. """config={}waiter_delay=waiter_delayor30waiter_max_attempts=waiter_max_attemptsor60ifexecution_role_arn:config["ExecutionRoleArn"]=execution_role_arnresponse=self.get_conn().add_job_flow_steps(JobFlowId=job_flow_id,Steps=steps,**config)ifresponse["ResponseMetadata"]["HTTPStatusCode"]!=200:raiseAirflowException(f"Adding steps failed: {response}")self.log.info("Steps %s added to JobFlow",response["StepIds"])ifwait_for_completion:waiter=self.get_conn().get_waiter("step_complete")forstep_idinresponse["StepIds"]:try:wait(waiter=waiter,waiter_max_attempts=waiter_max_attempts,waiter_delay=waiter_delay,args={"ClusterId":job_flow_id,"StepId":step_id},failure_message=f"EMR Steps failed: {step_id}",status_message="EMR Step status is",status_args=["Step.Status.State","Step.Status.StateChangeReason"],)exceptAirflowExceptionasex:if"EMR Steps failed"instr(ex):resp=self.get_conn().describe_step(ClusterId=job_flow_id,StepId=step_id)failure_details=resp["Step"]["Status"].get("FailureDetails",None)iffailure_details:self.log.error("EMR Steps failed: %s",failure_details)raisereturnresponse["StepIds"]
[docs]deftest_connection(self):""" Return failed state for test Amazon Elastic MapReduce Connection (untestable). We need to overwrite this method because this hook is based on :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsGenericHook`, otherwise it will try to test connection to AWS STS by using the default boto3 credential strategy. """msg=(f"{self.hook_name!r} Airflow Connection cannot be tested, by design it stores "f"only key/value pairs and does not make a connection to an external resource.")returnFalse,msg
@classmethod
[docs]defget_ui_field_behaviour(cls)->dict[str,Any]:"""Return custom UI field behaviour for Amazon Elastic MapReduce Connection."""return{"hidden_fields":["host","schema","port","login","password"],"relabeling":{"extra":"Run Job Flow Configuration",},"placeholders":{"extra":json.dumps({"Name":"MyClusterName","ReleaseLabel":"emr-5.36.0","Applications":[{"Name":"Spark"}],"Instances":{"InstanceGroups":[{"Name":"Primary node","Market":"SPOT","InstanceRole":"MASTER","InstanceType":"m5.large","InstanceCount":1,},],"KeepJobFlowAliveWhenNoSteps":False,"TerminationProtected":False,},"StepConcurrencyLevel":2,},indent=2,),},}
[docs]classEmrServerlessHook(AwsBaseHook):""" Interact with Amazon EMR Serverless. Provide thin wrapper around :py:class:`boto3.client("emr-serverless") <EMRServerless.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """
[docs]defcancel_running_jobs(self,application_id:str,waiter_config:dict|None=None,wait_for_completion:bool=True)->int:""" Cancel jobs in an intermediate state, and return the number of cancelled jobs. If wait_for_completion is True, then the method will wait until all jobs are cancelled before returning. Note: if new jobs are triggered while this operation is ongoing, it's going to time out and return an error. """paginator=self.conn.get_paginator("list_job_runs")results_per_response=50iterator=paginator.paginate(applicationId=application_id,states=list(self.JOB_INTERMEDIATE_STATES),PaginationConfig={"PageSize":results_per_response,},)count=0forriniterator:job_ids=[jr["id"]forjrinr["jobRuns"]]count+=len(job_ids)ifjob_ids:self.log.info("Cancelling %s pending job(s) for the application %s so that it can be stopped",len(job_ids),application_id,)forjob_idinjob_ids:self.conn.cancel_job_run(applicationId=application_id,jobRunId=job_id)ifwait_for_completion:ifcount>0:self.log.info("now waiting for the %s cancelled job(s) to terminate",count)self.get_waiter("no_job_running").wait(applicationId=application_id,states=list(self.JOB_INTERMEDIATE_STATES.union({"CANCELLING"})),WaiterConfig=waiter_configor{},)returncount
[docs]classEmrContainerHook(AwsBaseHook):""" Interact with Amazon EMR Containers (Amazon EMR on EKS). Provide thick wrapper around :py:class:`boto3.client("emr-containers") <EMRContainers.Client>`. :param virtual_cluster_id: Cluster ID of the EMR on EKS virtual cluster Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """
[docs]defsubmit_job(self,name:str,execution_role_arn:str,release_label:str,job_driver:dict,configuration_overrides:dict|None=None,client_request_token:str|None=None,tags:dict|None=None,retry_max_attempts:int|None=None,)->str:""" Submit a job to the EMR Containers API and return the job ID. A job run is a unit of work, such as a Spark jar, PySpark script, or SparkSQL query, that you submit to Amazon EMR on EKS. .. seealso:: - :external+boto3:py:meth:`EMRContainers.Client.start_job_run` :param name: The name of the job run. :param execution_role_arn: The IAM role ARN associated with the job run. :param release_label: The Amazon EMR release version to use for the job run. :param job_driver: Job configuration details, e.g. the Spark job parameters. :param configuration_overrides: The configuration overrides for the job run, specifically either application configuration or monitoring configuration. :param client_request_token: The client idempotency token of the job run request. Use this if you want to specify a unique ID to prevent two jobs from getting started. :param tags: The tags assigned to job runs. :param retry_max_attempts: The maximum number of attempts on the job's driver. :return: The ID of the job run request. """params={"name":name,"virtualClusterId":self.virtual_cluster_id,"executionRoleArn":execution_role_arn,"releaseLabel":release_label,"jobDriver":job_driver,"configurationOverrides":configuration_overridesor{},"tags":tagsor{},}ifclient_request_token:params["clientToken"]=client_request_tokenifretry_max_attempts:params["retryPolicyConfiguration"]={"maxAttempts":retry_max_attempts,}response=self.conn.start_job_run(**params)ifresponse["ResponseMetadata"]["HTTPStatusCode"]!=200:raiseAirflowException(f"Start Job Run failed: {response}")else:self.log.info("Start Job Run success - Job Id %s and virtual cluster id %s",response["id"],response["virtualClusterId"],)returnresponse["id"]
[docs]defget_job_failure_reason(self,job_id:str)->str|None:""" Fetch the reason for a job failure (e.g. error message). Returns None or reason string. .. seealso:: - :external+boto3:py:meth:`EMRContainers.Client.describe_job_run` :param job_id: The ID of the job run request. """try:response=self.conn.describe_job_run(virtualClusterId=self.virtual_cluster_id,id=job_id,)failure_reason=response["jobRun"]["failureReason"]state_details=response["jobRun"]["stateDetails"]returnf"{failure_reason} - {state_details}"exceptKeyError:self.log.error("Could not get status of the EMR on EKS job")exceptClientErrorasex:self.log.error("AWS request failed, check logs for more info: %s",ex)returnNone
[docs]defcheck_query_status(self,job_id:str)->str|None:""" Fetch the status of submitted job run. Returns None or one of valid query states. .. seealso:: - :external+boto3:py:meth:`EMRContainers.Client.describe_job_run` :param job_id: The ID of the job run request. """try:response=self.conn.describe_job_run(virtualClusterId=self.virtual_cluster_id,id=job_id,)returnresponse["jobRun"]["state"]exceptself.conn.exceptions.ResourceNotFoundException:# If the job is not found, we raise an exception as something fatal has happened.raiseAirflowException(f"Job ID {job_id} not found on Virtual Cluster {self.virtual_cluster_id}")exceptClientErrorasex:# If we receive a generic ClientError, we swallow the exception so that theself.log.error("AWS request failed, check logs for more info: %s",ex)returnNone
[docs]defpoll_query_status(self,job_id:str,poll_interval:int=30,max_polling_attempts:int|None=None,)->str|None:""" Poll the status of submitted job run until query state reaches final state; returns the final state. :param job_id: The ID of the job run request. :param poll_interval: Time (in seconds) to wait between calls to check query status on EMR :param max_polling_attempts: Number of times to poll for query state before function exits """poll_attempt=1whileTrue:query_state=self.check_query_status(job_id)ifquery_stateinself.TERMINAL_STATES:self.log.info("Try %s: Query execution completed. Final state is %s",poll_attempt,query_state)returnquery_stateifquery_stateisNone:self.log.info("Try %s: Invalid query state. Retrying again",poll_attempt)else:self.log.info("Try %s: Query is still in non-terminal state - %s",poll_attempt,query_state)if(max_polling_attemptsandpoll_attempt>=max_polling_attempts):# Break loop if max_polling_attempts reachedreturnquery_statepoll_attempt+=1time.sleep(poll_interval)
[docs]defstop_query(self,job_id:str)->dict:""" Cancel the submitted job_run. .. seealso:: - :external+boto3:py:meth:`EMRContainers.Client.cancel_job_run` :param job_id: The ID of the job run to cancel. """returnself.conn.cancel_job_run(virtualClusterId=self.virtual_cluster_id,id=job_id,)