Source code for airflow.providers.amazon.aws.operators.glue
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportos.pathimporturllib.parsefromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKING,SequencefromairflowimportAirflowExceptionfromairflow.configurationimportconffromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.glueimportGlueJobHookfromairflow.providers.amazon.aws.hooks.s3importS3Hookfromairflow.providers.amazon.aws.links.glueimportGlueJobRunDetailsLinkfromairflow.providers.amazon.aws.triggers.glueimportGlueJobCompleteTriggerifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classGlueJobOperator(BaseOperator):"""Create an AWS Glue Job. AWS Glue is a serverless Spark ETL service for running Spark Jobs on the AWS cloud. Language support: Python and Scala. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GlueJobOperator` :param job_name: unique job name per AWS Account :param script_location: location of ETL script. Must be a local or S3 path :param job_desc: job description details :param concurrent_run_limit: The maximum number of concurrent runs allowed for a job :param script_args: etl script arguments and AWS Glue arguments (templated) :param retry_limit: The maximum number of times to retry this job if it fails :param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job. :param region_name: aws region name (example: us-east-1) :param s3_bucket: S3 bucket where logs and local etl script will be uploaded :param iam_role_name: AWS IAM Role for Glue Job Execution. If set `iam_role_arn` must equal None. :param iam_role_arn: AWS IAM ARN for Glue Job Execution. If set `iam_role_name` must equal None. :param create_job_kwargs: Extra arguments for Glue Job Creation :param run_job_kwargs: Extra arguments for Glue Job Run :param wait_for_completion: Whether to wait for job run completion. (default: True) :param deferrable: If True, the operator will wait asynchronously for the job to complete. This implies waiting for completion. This mode requires aiobotocore module to be installed. (default: False) :param verbose: If True, Glue Job Run logs show in the Airflow Task Logs. (default: False) :param update_config: If True, Operator will update job configuration. (default: False) :param stop_job_run_on_kill: If True, Operator will stop the job run when task is killed. """
def__init__(self,*,job_name:str="aws_glue_default_job",job_desc:str="AWS Glue Job with Airflow",script_location:str|None=None,concurrent_run_limit:int|None=None,script_args:dict|None=None,retry_limit:int=0,num_of_dpus:int|float|None=None,aws_conn_id:str="aws_default",region_name:str|None=None,s3_bucket:str|None=None,iam_role_name:str|None=None,iam_role_arn:str|None=None,create_job_kwargs:dict|None=None,run_job_kwargs:dict|None=None,wait_for_completion:bool=True,deferrable:bool=conf.getboolean("operators","default_deferrable",fallback=False),verbose:bool=False,update_config:bool=False,job_poll_interval:int|float=6,stop_job_run_on_kill:bool=False,**kwargs,):super().__init__(**kwargs)self.job_name=job_nameself.job_desc=job_descself.script_location=script_locationself.concurrent_run_limit=concurrent_run_limitor1self.script_args=script_argsor{}self.retry_limit=retry_limitself.num_of_dpus=num_of_dpusself.aws_conn_id=aws_conn_idself.region_name=region_nameself.s3_bucket=s3_bucketself.iam_role_name=iam_role_nameself.iam_role_arn=iam_role_arnself.s3_protocol="s3://"self.s3_artifacts_prefix="artifacts/glue-scripts/"self.create_job_kwargs=create_job_kwargsself.run_job_kwargs=run_job_kwargsor{}self.wait_for_completion=wait_for_completionself.verbose=verboseself.update_config=update_configself.deferrable=deferrableself.job_poll_interval=job_poll_intervalself.stop_job_run_on_kill=stop_job_run_on_killself._job_run_id:str|None=None@cached_property
[docs]defexecute(self,context:Context):"""Execute AWS Glue Job from Airflow. :return: the current Glue job ID. """self.log.info("Initializing AWS Glue Job: %s. Wait for completion: %s",self.job_name,self.wait_for_completion,)glue_job_run=self.glue_job_hook.initialize_job(self.script_args,self.run_job_kwargs)self._job_run_id=glue_job_run["JobRunId"]glue_job_run_url=GlueJobRunDetailsLink.format_str.format(aws_domain=GlueJobRunDetailsLink.get_aws_domain(self.glue_job_hook.conn_partition),region_name=self.glue_job_hook.conn_region_name,job_name=urllib.parse.quote(self.job_name,safe=""),job_run_id=self._job_run_id,)GlueJobRunDetailsLink.persist(context=context,operator=self,region_name=self.glue_job_hook.conn_region_name,aws_partition=self.glue_job_hook.conn_partition,job_name=urllib.parse.quote(self.job_name,safe=""),job_run_id=self._job_run_id,)self.log.info("You can monitor this Glue Job run at: %s",glue_job_run_url)ifself.deferrable:self.defer(trigger=GlueJobCompleteTrigger(job_name=self.job_name,run_id=self._job_run_id,verbose=self.verbose,aws_conn_id=self.aws_conn_id,job_poll_interval=self.job_poll_interval,),method_name="execute_complete",)elifself.wait_for_completion:glue_job_run=self.glue_job_hook.job_completion(self.job_name,self._job_run_id,self.verbose)self.log.info("AWS Glue Job: %s status: %s. Run Id: %s",self.job_name,glue_job_run["JobRunState"],self._job_run_id,)else:self.log.info("AWS Glue Job: %s. Run Id: %s",self.job_name,self._job_run_id)returnself._job_run_id
[docs]defexecute_complete(self,context,event=None):ifevent["status"]!="success":raiseAirflowException(f"Error in glue job: {event}")returnevent["value"]
[docs]defon_kill(self):"""Cancel the running AWS Glue Job."""ifself.stop_job_run_on_kill:self.log.info("Stopping AWS Glue Job: %s. Run Id: %s",self.job_name,self._job_run_id)response=self.glue_job_hook.conn.batch_stop_job_run(JobName=self.job_name,JobRunIds=[self._job_run_id],)ifnotresponse["SuccessfulSubmissions"]:self.log.error("Failed to stop AWS Glue Job: %s. Run Id: %s",self.job_name,self._job_run_id)