Source code for airflow.providers.amazon.aws.hooks.glue
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasyncioimporttimefromfunctoolsimportcached_propertyfromtypingimportAnyfrombotocore.exceptionsimportClientErrorfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHookfromairflow.providers.amazon.aws.hooks.logsimportAwsLogsHook
[docs]classGlueJobHook(AwsBaseHook):""" Interact with AWS Glue. Provide thick wrapper around :external+boto3:py:class:`boto3.client("glue") <Glue.Client>`. :param s3_bucket: S3 bucket where logs and local etl script will be uploaded :param job_name: unique job name per AWS account :param desc: job description :param concurrent_run_limit: The maximum number of concurrent runs allowed for a job :param script_location: path to etl script on s3 :param retry_limit: Maximum number of times to retry this job if it fails :param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job :param region_name: aws region name (example: us-east-1) :param iam_role_name: AWS IAM Role for Glue Job Execution. If set `iam_role_arn` must equal None. :param iam_role_arn: AWS IAM Role ARN for Glue Job Execution, If set `iam_role_name` must equal None. :param create_job_kwargs: Extra arguments for Glue Job Creation :param update_config: Update job configuration on Glue (default: False) Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """
[docs]classLogContinuationTokens:"""Used to hold the continuation tokens when reading logs from both streams Glue Jobs write to."""def__init__(self):
worker_type_exists="WorkerType"inself.create_job_kwargsnum_workers_exists="NumberOfWorkers"inself.create_job_kwargsifself.role_arnandself.role_name:raiseValueError("Cannot set iam_role_arn and iam_role_name simultaneously")ifworker_type_existsandnum_workers_exists:ifnum_of_dpusisnotNone:raiseValueError("Cannot specify num_of_dpus with custom WorkerType")elifnotworker_type_existsandnum_workers_exists:raiseValueError("Need to specify custom WorkerType when specifying NumberOfWorkers")elifworker_type_existsandnotnum_workers_exists:raiseValueError("Need to specify NumberOfWorkers when specifying custom WorkerType")elifnum_of_dpusisNone:self.num_of_dpus:int|float=10else:self.num_of_dpus=num_of_dpuskwargs["client_type"]="glue"super().__init__(*args,**kwargs)
[docs]deflist_jobs(self)->list:""" Get list of Jobs. .. seealso:: - :external+boto3:py:meth:`Glue.Client.get_jobs` """returnself.conn.get_jobs()
[docs]defget_iam_execution_role(self)->dict:try:iam_client=self.get_session(region_name=self.region_name).client("iam",endpoint_url=self.conn_config.endpoint_url,config=self.config,verify=self.verify)glue_execution_role=iam_client.get_role(RoleName=self.role_name)self.log.info("Iam Role Name: %s",self.role_name)returnglue_execution_roleexceptExceptionasgeneral_error:self.log.error("Failed to create aws glue job, error: %s",general_error)raise
[docs]definitialize_job(self,script_arguments:dict|None=None,run_kwargs:dict|None=None,)->dict[str,str]:""" Initialize connection with AWS Glue to run job. .. seealso:: - :external+boto3:py:meth:`Glue.Client.start_job_run` """script_arguments=script_argumentsor{}run_kwargs=run_kwargsor{}try:ifself.update_config:job_name=self.create_or_update_glue_job()else:job_name=self.get_or_create_glue_job()returnself.conn.start_job_run(JobName=job_name,Arguments=script_arguments,**run_kwargs)exceptExceptionasgeneral_error:self.log.error("Failed to run aws glue job, error: %s",general_error)raise
[docs]defget_job_state(self,job_name:str,run_id:str)->str:""" Get state of the Glue job; the job state can be running, finished, failed, stopped or timeout. .. seealso:: - :external+boto3:py:meth:`Glue.Client.get_job_run` :param job_name: unique job name per AWS account :param run_id: The job-run ID of the predecessor job run :return: State of the Glue job """job_run=self.conn.get_job_run(JobName=job_name,RunId=run_id,PredecessorsIncluded=True)returnjob_run["JobRun"]["JobRunState"]
[docs]asyncdefasync_get_job_state(self,job_name:str,run_id:str)->str:""" Get state of the Glue job; the job state can be running, finished, failed, stopped or timeout. The async version of get_job_state. """asyncwithself.async_connasclient:job_run=awaitclient.get_job_run(JobName=job_name,RunId=run_id)returnjob_run["JobRun"]["JobRunState"]
@cached_property
[docs]deflogs_hook(self):"""Returns an AwsLogsHook instantiated with the parameters of the GlueJobHook."""returnAwsLogsHook(aws_conn_id=self.aws_conn_id,region_name=self.region_name,verify=self.verify,config=self.config)
[docs]defprint_job_logs(self,job_name:str,run_id:str,continuation_tokens:LogContinuationTokens,):""" Print the latest job logs to the Airflow task log and updates the continuation tokens. :param continuation_tokens: the tokens where to resume from when reading logs. The object gets updated with the new tokens by this method. """log_client=self.logs_hook.get_conn()paginator=log_client.get_paginator("filter_log_events")defdisplay_logs_from(log_group:str,continuation_token:str|None)->str|None:"""Mutualize iteration over the 2 different log streams glue jobs write to."""fetched_logs=[]next_token=continuation_tokentry:forresponseinpaginator.paginate(logGroupName=log_group,logStreamNames=[run_id],PaginationConfig={"StartingToken":continuation_token},):fetched_logs.extend([event["message"]foreventinresponse["events"]])# if the response is empty there is no nextToken in itnext_token=response.get("nextToken")ornext_tokenexceptClientErrorase:ife.response["Error"]["Code"]=="ResourceNotFoundException":# we land here when the log groups/streams don't exist yetself.log.warning("No new Glue driver logs so far.\n""If this persists, check the CloudWatch dashboard at: %r.",f"https://{self.conn_region_name}.console.aws.amazon.com/cloudwatch/home",)else:raiseiflen(fetched_logs):# Add a tab to indent those logs and distinguish them from airflow logs.# Log lines returned already contain a newline character at the end.messages="\t".join(fetched_logs)self.log.info("Glue Job Run %s Logs:\n\t%s",log_group,messages)else:self.log.info("No new log from the Glue Job in %s",log_group)returnnext_tokenlog_group_prefix=self.conn.get_job_run(JobName=job_name,RunId=run_id)["JobRun"]["LogGroupName"]log_group_default=f"{log_group_prefix}/{DEFAULT_LOG_SUFFIX}"log_group_error=f"{log_group_prefix}/{ERROR_LOG_SUFFIX}"# one would think that the error log group would contain only errors, but it actually contains# a lot of interesting logs too, so it's valuable to have bothcontinuation_tokens.output_stream_continuation=display_logs_from(log_group_default,continuation_tokens.output_stream_continuation)continuation_tokens.error_stream_continuation=display_logs_from(log_group_error,continuation_tokens.error_stream_continuation)
[docs]defjob_completion(self,job_name:str,run_id:str,verbose:bool=False,sleep_before_return:int=0)->dict[str,str]:""" Wait until Glue job with job_name finishes; return final state if finished or raises AirflowException. :param job_name: unique job name per AWS account :param run_id: The job-run ID of the predecessor job run :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False) :param sleep_before_return: time in seconds to wait before returning final status. :return: Dict of JobRunState and JobRunId """next_log_tokens=self.LogContinuationTokens()whileTrue:job_run_state=self.get_job_state(job_name,run_id)ret=self._handle_state(job_run_state,job_name,run_id,verbose,next_log_tokens)ifret:time.sleep(sleep_before_return)returnretelse:time.sleep(self.job_poll_interval)
[docs]asyncdefasync_job_completion(self,job_name:str,run_id:str,verbose:bool=False)->dict[str,str]:""" Wait until Glue job with job_name finishes; return final state if finished or raises AirflowException. :param job_name: unique job name per AWS account :param run_id: The job-run ID of the predecessor job run :param verbose: If True, more Glue Job Run logs show in the Airflow Task Logs. (default: False) :return: Dict of JobRunState and JobRunId """next_log_tokens=self.LogContinuationTokens()whileTrue:job_run_state=awaitself.async_get_job_state(job_name,run_id)ret=self._handle_state(job_run_state,job_name,run_id,verbose,next_log_tokens)ifret:returnretelse:awaitasyncio.sleep(self.job_poll_interval)
def_handle_state(self,state:str,job_name:str,run_id:str,verbose:bool,next_log_tokens:GlueJobHook.LogContinuationTokens,)->dict|None:"""Process Glue Job state while polling; used by both sync and async methods."""failed_states=["FAILED","TIMEOUT"]finished_states=["SUCCEEDED","STOPPED"]ifverbose:self.print_job_logs(job_name=job_name,run_id=run_id,continuation_tokens=next_log_tokens,)ifstateinfinished_states:self.log.info("Exiting Job %s Run State: %s",run_id,state)return{"JobRunState":state,"JobRunId":run_id}ifstateinfailed_states:job_error_message=f"Exiting Job {run_id} Run State: {state}"self.log.info(job_error_message)raiseAirflowException(job_error_message)else:self.log.info("Polling for AWS Glue Job %s current run state with status %s",job_name,state,)returnNone
[docs]defhas_job(self,job_name)->bool:""" Check if the job already exists. .. seealso:: - :external+boto3:py:meth:`Glue.Client.get_job` :param job_name: unique job name per AWS account :return: Returns True if the job already exists and False if not. """self.log.info("Checking if job already exists: %s",job_name)try:self.conn.get_job(JobName=job_name)returnTrueexceptself.conn.exceptions.EntityNotFoundException:returnFalse
[docs]defupdate_job(self,**job_kwargs)->bool:""" Update job configurations. .. seealso:: - :external+boto3:py:meth:`Glue.Client.update_job` :param job_kwargs: Keyword args that define the configurations used for the job :return: True if job was updated and false otherwise """job_name=job_kwargs.pop("Name")current_job=self.conn.get_job(JobName=job_name)["Job"]update_config={key:valueforkey,valueinjob_kwargs.items()ifcurrent_job.get(key)!=job_kwargs[key]}ifupdate_config!={}:self.log.info("Updating job: %s",job_name)self.conn.update_job(JobName=job_name,JobUpdate=job_kwargs)self.log.info("Updated configurations: %s",update_config)returnTrueelse:returnFalse
[docs]defget_or_create_glue_job(self)->str|None:""" Get (or creates) and returns the Job name. .. seealso:: - :external+boto3:py:meth:`Glue.Client.create_job` :return:Name of the Job """ifself.has_job(self.job_name):returnself.job_nameconfig=self.create_glue_job_config()self.log.info("Creating job: %s",self.job_name)self.conn.create_job(**config)returnself.job_name
[docs]defcreate_or_update_glue_job(self)->str|None:""" Create (or update) and return the Job name. .. seealso:: - :external+boto3:py:meth:`Glue.Client.update_job` - :external+boto3:py:meth:`Glue.Client.create_job` :return:Name of the Job """config=self.create_glue_job_config()ifself.has_job(self.job_name):self.update_job(**config)else:self.log.info("Creating job: %s",self.job_name)self.conn.create_job(**config)returnself.job_name
[docs]classGlueDataQualityHook(AwsBaseHook):""" Interact with AWS Glue Data Quality. Provide thick wrapper around :external+boto3:py:class:`boto3.client("glue") <Glue.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """def__init__(self,*args,**kwargs,):kwargs["client_type"]="glue"super().__init__(*args,**kwargs)
def_log_results(self,result:dict[str,Any])->None:""" Print the outcome of evaluation run, An evaluation run can involve multiple rulesets evaluated against a data source (Glue table). Name Description Result EvaluatedMetrics EvaluationMessage Rule_1 RowCount between 150000 and 600000 PASS {'Dataset.*.RowCount': 300000.0} NaN Rule_2 IsComplete "marketplace" PASS {'Column.marketplace.Completeness': 1.0} NaN Rule_3 ColumnLength "marketplace" between 1 and 2 FAIL {'Column.marketplace.MaximumLength': 9.0, 'Column.marketplace.MinimumLength': 3.0} Value: 9.0 does not meet the constraint requirement! """importpandasaspdpd.set_option("display.max_rows",None)pd.set_option("display.max_columns",None)pd.set_option("display.width",None)pd.set_option("display.max_colwidth",None)self.log.info("AWS Glue data quality ruleset evaluation result for RulesetName: %s RulesetEvaluationRunId: %s Score: %s",result.get("RulesetName"),result.get("RulesetEvaluationRunId"),result.get("Score"),)rule_results=result["RuleResults"]rule_results_df=pd.DataFrame(rule_results)self.log.info(rule_results_df)
[docs]defvalidate_evaluation_run_results(self,evaluation_run_id:str,show_results:bool=True,verify_result_status:bool=True)->None:results=self.get_evaluation_run_results(evaluation_run_id)total_failed_rules=0ifresults.get("ResultsNotFound"):self.log.info("AWS Glue data quality ruleset evaluation run, results not found for %s",results["ResultsNotFound"],)forresultinresults["Results"]:rule_results=result["RuleResults"]total_failed_rules+=len(list(filter(lambdaresult:result.get("Result")=="FAIL"orresult.get("Result")=="ERROR",rule_results,)))ifshow_results:self._log_results(result)self.log.info("AWS Glue data quality ruleset evaluation run, total number of rules failed: %s",total_failed_rules,)ifverify_result_statusandtotal_failed_rules>0:raiseAirflowException("AWS Glue data quality ruleset evaluation run failed for one or more rules")
[docs]deflog_recommendation_results(self,run_id:str)->None:""" Print the outcome of recommendation run, recommendation run generates multiple rules against a data source (Glue table) in Data Quality Definition Language (DQDL) format. Rules = [ IsComplete "NAME", ColumnLength "EMP_ID" between 1 and 12, IsUnique "EMP_ID", ColumnValues "INCOME" > 50000 ] """result=self.conn.get_data_quality_rule_recommendation_run(RunId=run_id)ifresult.get("RecommendedRuleset"):self.log.info("AWS Glue data quality recommended rules for DatabaseName: %s TableName: %s",result["DataSource"]["GlueTable"]["DatabaseName"],result["DataSource"]["GlueTable"]["TableName"],)self.log.info(result["RecommendedRuleset"])else:self.log.info("AWS Glue data quality, no recommended rules available for RunId: %s",run_id)