Source code for airflow.providers.amazon.aws.hooks.glue
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importtimefromtypingimportDict,List,Optionalfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHook
[docs]classAwsGlueJobHook(AwsBaseHook):""" Interact with AWS Glue - create job, trigger, crawler :param s3_bucket: S3 bucket where logs and local etl script will be uploaded :type s3_bucket: Optional[str] :param job_name: unique job name per AWS account :type job_name: Optional[str] :param desc: job description :type desc: Optional[str] :param concurrent_run_limit: The maximum number of concurrent runs allowed for a job :type concurrent_run_limit: int :param script_location: path to etl script on s3 :type script_location: Optional[str] :param retry_limit: Maximum number of times to retry this job if it fails :type retry_limit: int :param num_of_dpus: Number of AWS Glue DPUs to allocate to this Job :type num_of_dpus: int :param region_name: aws region name (example: us-east-1) :type region_name: Optional[str] :param iam_role_name: AWS IAM Role for Glue Job Execution :type iam_role_name: Optional[str] :param create_job_kwargs: Extra arguments for Glue Job Creation :type create_job_kwargs: Optional[dict] """
[docs]JOB_POLL_INTERVAL=6# polls job status after every JOB_POLL_INTERVAL seconds
[docs]deflist_jobs(self)->List:""":return: Lists of Jobs"""conn=self.get_conn()returnconn.get_jobs()
[docs]defget_iam_execution_role(self)->Dict:""":return: iam role for job execution"""iam_client=self.get_client_type('iam',self.region_name)try:glue_execution_role=iam_client.get_role(RoleName=self.role_name)self.log.info("Iam Role Name: %s",self.role_name)returnglue_execution_roleexceptExceptionasgeneral_error:self.log.error("Failed to create aws glue job, error: %s",general_error)raise
[docs]definitialize_job(self,script_arguments:Optional[dict]=None)->Dict[str,str]:""" Initializes connection with AWS Glue to run job :return: """glue_client=self.get_conn()script_arguments=script_argumentsor{}try:job_name=self.get_or_create_glue_job()job_run=glue_client.start_job_run(JobName=job_name,Arguments=script_arguments)returnjob_runexceptExceptionasgeneral_error:self.log.error("Failed to run aws glue job, error: %s",general_error)raise
[docs]defget_job_state(self,job_name:str,run_id:str)->str:""" Get state of the Glue job. The job state can be running, finished, failed, stopped or timeout. :param job_name: unique job name per AWS account :type job_name: str :param run_id: The job-run ID of the predecessor job run :type run_id: str :return: State of the Glue job """glue_client=self.get_conn()job_run=glue_client.get_job_run(JobName=job_name,RunId=run_id,PredecessorsIncluded=True)job_run_state=job_run['JobRun']['JobRunState']returnjob_run_state
[docs]defjob_completion(self,job_name:str,run_id:str)->Dict[str,str]:""" Waits until Glue job with job_name completes or fails and return final state if finished. Raises AirflowException when the job failed :param job_name: unique job name per AWS account :type job_name: str :param run_id: The job-run ID of the predecessor job run :type run_id: str :return: Dict of JobRunState and JobRunId """failed_states=['FAILED','TIMEOUT']finished_states=['SUCCEEDED','STOPPED']whileTrue:job_run_state=self.get_job_state(job_name,run_id)ifjob_run_stateinfinished_states:self.log.info("Exiting Job %s Run State: %s",run_id,job_run_state)return{'JobRunState':job_run_state,'JobRunId':run_id}ifjob_run_stateinfailed_states:job_error_message="Exiting Job "+run_id+" Run State: "+job_run_stateself.log.info(job_error_message)raiseAirflowException(job_error_message)else:self.log.info("Polling for AWS Glue Job %s current run state with status %s",job_name,job_run_state)time.sleep(self.JOB_POLL_INTERVAL)
[docs]defget_or_create_glue_job(self)->str:""" Creates(or just returns) and returns the Job name :return:Name of the Job """glue_client=self.get_conn()try:get_job_response=glue_client.get_job(JobName=self.job_name)self.log.info("Job Already exist. Returning Name of the job")returnget_job_response['Job']['Name']exceptglue_client.exceptions.EntityNotFoundException:self.log.info("Job doesn't exist. Now creating and running AWS Glue Job")ifself.s3_bucketisNone:raiseAirflowException('Could not initialize glue job, error: Specify Parameter `s3_bucket`')s3_log_path=f's3://{self.s3_bucket}/{self.s3_glue_logs}{self.job_name}'execution_role=self.get_iam_execution_role()try:create_job_response=glue_client.create_job(Name=self.job_name,Description=self.desc,LogUri=s3_log_path,Role=execution_role['Role']['Arn'],ExecutionProperty={"MaxConcurrentRuns":self.concurrent_run_limit},Command={"Name":"glueetl","ScriptLocation":self.script_location},MaxRetries=self.retry_limit,AllocatedCapacity=self.num_of_dpus,**self.create_job_kwargs,)returncreate_job_response['Name']exceptExceptionasgeneral_error:self.log.error("Failed to create aws glue job, error: %s",general_error)raise