Source code for airflow.providers.amazon.aws.hooks.athena
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains AWS Athena hook... spelling:: PageIterator"""importwarningsfromtimeimportsleepfromtypingimportAny,Dict,Optionalfrombotocore.paginateimportPageIteratorfromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHook
[docs]classAthenaHook(AwsBaseHook):""" Interact with AWS Athena to run, poll queries and return query results Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: :class:`~airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on Athena """
[docs]defrun_query(self,query:str,query_context:Dict[str,str],result_configuration:Dict[str,Any],client_request_token:Optional[str]=None,workgroup:str='primary',)->str:""" Run Presto query on athena with provided config and return submitted query_execution_id :param query: Presto query to run :param query_context: Context in which query need to be run :param result_configuration: Dict with path to store results in and config related to encryption :param client_request_token: Unique token created by user to avoid multiple executions of same query :param workgroup: Athena workgroup name, when not specified, will be 'primary' :return: str """params={'QueryString':query,'QueryExecutionContext':query_context,'ResultConfiguration':result_configuration,'WorkGroup':workgroup,}ifclient_request_token:params['ClientRequestToken']=client_request_tokenresponse=self.get_conn().start_query_execution(**params)query_execution_id=response['QueryExecutionId']returnquery_execution_id
[docs]defcheck_query_status(self,query_execution_id:str)->Optional[str]:""" Fetch the status of submitted athena query. Returns None or one of valid query states. :param query_execution_id: Id of submitted athena query :return: str """response=self.get_conn().get_query_execution(QueryExecutionId=query_execution_id)state=Nonetry:state=response['QueryExecution']['Status']['State']exceptExceptionasex:self.log.error('Exception while getting query state %s',ex)finally:# The error is being absorbed here and is being handled by the caller.# The error is being absorbed to implement retries.returnstate
[docs]defget_state_change_reason(self,query_execution_id:str)->Optional[str]:""" Fetch the reason for a state change (e.g. error message). Returns None or reason string. :param query_execution_id: Id of submitted athena query :return: str """response=self.get_conn().get_query_execution(QueryExecutionId=query_execution_id)reason=Nonetry:reason=response['QueryExecution']['Status']['StateChangeReason']exceptExceptionasex:self.log.error('Exception while getting query state change reason: %s',ex)finally:# The error is being absorbed here and is being handled by the caller.# The error is being absorbed to implement retries.returnreason
[docs]defget_query_results(self,query_execution_id:str,next_token_id:Optional[str]=None,max_results:int=1000)->Optional[dict]:""" Fetch submitted athena query results. returns none if query is in intermediate state or failed/cancelled state else dict of query output :param query_execution_id: Id of submitted athena query :param next_token_id: The token that specifies where to start pagination. :param max_results: The maximum number of results (rows) to return in this request. :return: dict """query_state=self.check_query_status(query_execution_id)ifquery_stateisNone:self.log.error('Invalid Query state')returnNoneelifquery_stateinself.INTERMEDIATE_STATESorquery_stateinself.FAILURE_STATES:self.log.error('Query is in "%s" state. Cannot fetch results',query_state)returnNoneresult_params={'QueryExecutionId':query_execution_id,'MaxResults':max_results}ifnext_token_id:result_params['NextToken']=next_token_idreturnself.get_conn().get_query_results(**result_params)
[docs]defget_query_results_paginator(self,query_execution_id:str,max_items:Optional[int]=None,page_size:Optional[int]=None,starting_token:Optional[str]=None,)->Optional[PageIterator]:""" Fetch submitted athena query results. returns none if query is in intermediate state or failed/cancelled state else a paginator to iterate through pages of results. If you wish to get all results at once, call build_full_result() on the returned PageIterator :param query_execution_id: Id of submitted athena query :param max_items: The total number of items to return. :param page_size: The size of each page. :param starting_token: A token to specify where to start paginating. :return: PageIterator """query_state=self.check_query_status(query_execution_id)ifquery_stateisNone:self.log.error('Invalid Query state (null)')returnNoneifquery_stateinself.INTERMEDIATE_STATESorquery_stateinself.FAILURE_STATES:self.log.error('Query is in "%s" state. Cannot fetch results',query_state)returnNoneresult_params={'QueryExecutionId':query_execution_id,'PaginationConfig':{'MaxItems':max_items,'PageSize':page_size,'StartingToken':starting_token,},}paginator=self.get_conn().get_paginator('get_query_results')returnpaginator.paginate(**result_params)
[docs]defpoll_query_status(self,query_execution_id:str,max_tries:Optional[int]=None)->Optional[str]:""" Poll the status of submitted athena query until query state reaches final state. Returns one of the final states :param query_execution_id: Id of submitted athena query :param max_tries: Number of times to poll for query state before function exits :return: str """try_number=1final_query_state=None# Query state when query reaches final state or max_tries reachedwhileTrue:query_state=self.check_query_status(query_execution_id)ifquery_stateisNone:self.log.info('Trial %s: Invalid query state. Retrying again',try_number)elifquery_stateinself.TERMINAL_STATES:self.log.info('Trial %s: Query execution completed. Final state is %s}',try_number,query_state)final_query_state=query_statebreakelse:self.log.info('Trial %s: Query is still in non-terminal state - %s',try_number,query_state)ifmax_triesandtry_number>=max_tries:# Break loop if max_tries reachedfinal_query_state=query_statebreaktry_number+=1sleep(self.sleep_time)returnfinal_query_state
[docs]defget_output_location(self,query_execution_id:str)->str:""" Function to get the output location of the query results in s3 uri format. :param query_execution_id: Id of submitted athena query :return: str """output_location=Noneifquery_execution_id:response=self.get_conn().get_query_execution(QueryExecutionId=query_execution_id)ifresponse:try:output_location=response['QueryExecution']['ResultConfiguration']['OutputLocation']exceptKeyError:self.log.error("Error retrieving OutputLocation")raiseelse:raiseelse:raiseValueError("Invalid Query execution id")returnoutput_location
[docs]defstop_query(self,query_execution_id:str)->Dict:""" Cancel the submitted athena query :param query_execution_id: Id of submitted athena query :return: dict """returnself.get_conn().stop_query_execution(QueryExecutionId=query_execution_id)
[docs]classAWSAthenaHook(AthenaHook):""" This hook is deprecated. Please use :class:`airflow.providers.amazon.aws.hooks.athena.AthenaHook`. """def__init__(self,*args,**kwargs):warnings.warn("This hook is deprecated. Please use `airflow.providers.amazon.aws.hooks.athena.AthenaHook`.",DeprecationWarning,stacklevel=2,)super().__init__(*args,**kwargs)