Source code for airflow.providers.amazon.aws.hooks.glue
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importtimefromtypingimportDict,List,Optionalimportboto3fromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHook
[docs]classGlueJobHook(AwsBaseHook):""" Interact with AWS Glue - create job, trigger, crawler :param s3_bucket: S3 bucket where logs and local etl script will be uploaded :param job_name: unique job name per AWS account :param desc: job description :param concurrent_run_limit: The maximum number of concurrent runs allowed for a job :param script_location: path to etl script on s3 :param retry_limit: Maximum number of times to retry this job if it fails :param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job :param region_name: aws region name (example: us-east-1) :param iam_role_name: AWS IAM Role for Glue Job Execution :param create_job_kwargs: Extra arguments for Glue Job Creation """
[docs]JOB_POLL_INTERVAL=6# polls job status after every JOB_POLL_INTERVAL seconds
def__init__(self,s3_bucket:Optional[str]=None,job_name:Optional[str]=None,desc:Optional[str]=None,concurrent_run_limit:int=1,script_location:Optional[str]=None,retry_limit:int=0,num_of_dpus:Optional[int]=None,iam_role_name:Optional[str]=None,create_job_kwargs:Optional[dict]=None,*args,**kwargs,):self.job_name=job_nameself.desc=descself.concurrent_run_limit=concurrent_run_limitself.script_location=script_locationself.retry_limit=retry_limitself.s3_bucket=s3_bucketself.role_name=iam_role_nameself.s3_glue_logs='logs/glue-logs/'self.create_job_kwargs=create_job_kwargsor{}worker_type_exists="WorkerType"inself.create_job_kwargsnum_workers_exists="NumberOfWorkers"inself.create_job_kwargsifworker_type_existsandnum_workers_exists:ifnum_of_dpusisnotNone:raiseValueError("Cannot specify num_of_dpus with custom WorkerType")elifnotworker_type_existsandnum_workers_exists:raiseValueError("Need to specify custom WorkerType when specifying NumberOfWorkers")elifworker_type_existsandnotnum_workers_exists:raiseValueError("Need to specify NumberOfWorkers when specifying custom WorkerType")elifnum_of_dpusisNone:self.num_of_dpus=10else:self.num_of_dpus=num_of_dpuskwargs['client_type']='glue'super().__init__(*args,**kwargs)
[docs]deflist_jobs(self)->List:""":return: Lists of Jobs"""conn=self.get_conn()returnconn.get_jobs()
[docs]defget_iam_execution_role(self)->Dict:""":return: iam role for job execution"""try:iam_client=self.get_session(region_name=self.region_name).client('iam',endpoint_url=self.conn_config.endpoint_url,config=self.config,verify=self.verify)glue_execution_role=iam_client.get_role(RoleName=self.role_name)self.log.info("Iam Role Name: %s",self.role_name)returnglue_execution_roleexceptExceptionasgeneral_error:self.log.error("Failed to create aws glue job, error: %s",general_error)raise
[docs]definitialize_job(self,script_arguments:Optional[dict]=None,run_kwargs:Optional[dict]=None,)->Dict[str,str]:""" Initializes connection with AWS Glue to run job :return: """glue_client=self.get_conn()script_arguments=script_argumentsor{}run_kwargs=run_kwargsor{}try:job_name=self.get_or_create_glue_job()returnglue_client.start_job_run(JobName=job_name,Arguments=script_arguments,**run_kwargs)exceptExceptionasgeneral_error:self.log.error("Failed to run aws glue job, error: %s",general_error)raise
[docs]defget_job_state(self,job_name:str,run_id:str)->str:""" Get state of the Glue job. The job state can be running, finished, failed, stopped or timeout. :param job_name: unique job name per AWS account :param run_id: The job-run ID of the predecessor job run :return: State of the Glue job """glue_client=self.get_conn()job_run=glue_client.get_job_run(JobName=job_name,RunId=run_id,PredecessorsIncluded=True)returnjob_run['JobRun']['JobRunState']
[docs]defprint_job_logs(self,job_name:str,run_id:str,job_failed:bool=False,next_token:Optional[str]=None,)->Optional[str]:"""Prints the batch of logs to the Airflow task log and returns nextToken."""log_client=boto3.client('logs')response={}filter_pattern=FAILURE_LOG_FILTERifjob_failedelseDEFAULT_LOG_FILTERlog_group_prefix=self.conn.get_job_run(JobName=job_name,RunId=run_id)['JobRun']['LogGroupName']log_group_suffix=FAILURE_LOG_SUFFIXifjob_failedelseDEFAULT_LOG_SUFFIXlog_group_name=f'{log_group_prefix}/{log_group_suffix}'try:ifnext_token:response=log_client.filter_log_events(logGroupName=log_group_name,logStreamNames=[run_id],filterPattern=filter_pattern,nextToken=next_token,)else:response=log_client.filter_log_events(logGroupName=log_group_name,logStreamNames=[run_id],filterPattern=filter_pattern,)iflen(response['events']):messages='\t'.join([event['message']foreventinresponse['events']])self.log.info('Glue Job Run Logs:\n\t%s',messages)exceptlog_client.exceptions.ResourceNotFoundException:self.log.warning('No new Glue driver logs found. This might be because there are no new logs, ''or might be an error.\nIf the error persists, check the CloudWatch dashboard 'f'at: https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home')# If no new log events are available, filter_log_events will return None.# In that case, check the same token again next pass.returnresponse.get('nextToken')ornext_token
[docs]defjob_completion(self,job_name:str,run_id:str,verbose:bool=False)->Dict[str,str]:""" Waits until Glue job with job_name completes or fails and return final state if finished. Raises AirflowException when the job failed :param job_name: unique job name per AWS account :param run_id: The job-run ID of the predecessor job run :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False) :return: Dict of JobRunState and JobRunId """failed_states=['FAILED','TIMEOUT']finished_states=['SUCCEEDED','STOPPED']next_log_token=Nonejob_failed=FalsewhileTrue:try:job_run_state=self.get_job_state(job_name,run_id)ifjob_run_stateinfinished_states:self.log.info('Exiting Job %s Run State: %s',run_id,job_run_state)return{'JobRunState':job_run_state,'JobRunId':run_id}ifjob_run_stateinfailed_states:job_failed=Truejob_error_message=f'Exiting Job {run_id} Run State: {job_run_state}'self.log.info(job_error_message)raiseAirflowException(job_error_message)else:self.log.info('Polling for AWS Glue Job %s current run state with status %s',job_name,job_run_state,)time.sleep(self.JOB_POLL_INTERVAL)finally:ifverbose:next_log_token=self.print_job_logs(job_name=job_name,run_id=run_id,job_failed=job_failed,next_token=next_log_token,
)
[docs]defget_or_create_glue_job(self)->str:""" Creates(or just returns) and returns the Job name :return:Name of the Job """glue_client=self.get_conn()try:get_job_response=glue_client.get_job(JobName=self.job_name)self.log.info("Job Already exist. Returning Name of the job")returnget_job_response['Job']['Name']exceptglue_client.exceptions.EntityNotFoundException:self.log.info("Job doesn't exist. Now creating and running AWS Glue Job")ifself.s3_bucketisNone:raiseAirflowException('Could not initialize glue job, error: Specify Parameter `s3_bucket`')s3_log_path=f's3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}'execution_role=self.get_iam_execution_role()try:default_command={"Name":"glueetl","ScriptLocation":self.script_location,}command=self.create_job_kwargs.pop("Command",default_command)if"WorkerType"inself.create_job_kwargsand"NumberOfWorkers"inself.create_job_kwargs:create_job_response=glue_client.create_job(Name=self.job_name,Description=self.desc,LogUri=s3_log_path,Role=execution_role['Role']['Arn'],ExecutionProperty={"MaxConcurrentRuns":self.concurrent_run_limit},Command=command,MaxRetries=self.retry_limit,**self.create_job_kwargs,)else:create_job_response=glue_client.create_job(Name=self.job_name,Description=self.desc,LogUri=s3_log_path,Role=execution_role['Role']['Arn'],ExecutionProperty={"MaxConcurrentRuns":self.concurrent_run_limit},Command=command,MaxRetries=self.retry_limit,MaxCapacity=self.num_of_dpus,**self.create_job_kwargs,)returncreate_job_response['Name']exceptExceptionasgeneral_error:self.log.error("Failed to create aws glue job, error: %s",general_error)raise