Source code for airflow.providers.amazon.aws.hooks.athena
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains AWS Athena hook... spelling:: PageIterator"""from__future__importannotationsfromtimeimportsleepfromtypingimportAnyfrombotocore.paginateimportPageIteratorfromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHook
[docs]classAthenaHook(AwsBaseHook):""" Interact with Amazon Athena. Provide thick wrapper around :external+boto3:py:class:`boto3.client("athena") <Athena.Client>`. :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on Athena :param log_query: Whether to log athena query and other execution params when it's executed. Defaults to *True*. Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """
[docs]defrun_query(self,query:str,query_context:dict[str,str],result_configuration:dict[str,Any],client_request_token:str|None=None,workgroup:str="primary",)->str:""" Run Presto query on athena with provided config and return submitted query_execution_id .. seealso:: - :external+boto3:py:meth:`Athena.Client.start_query_execution` :param query: Presto query to run :param query_context: Context in which query need to be run :param result_configuration: Dict with path to store results in and config related to encryption :param client_request_token: Unique token created by user to avoid multiple executions of same query :param workgroup: Athena workgroup name, when not specified, will be 'primary' """params={"QueryString":query,"QueryExecutionContext":query_context,"ResultConfiguration":result_configuration,"WorkGroup":workgroup,}ifclient_request_token:params["ClientRequestToken"]=client_request_tokenifself.log_query:self.log.info("Running Query with params: %s",params)response=self.get_conn().start_query_execution(**params)query_execution_id=response["QueryExecutionId"]self.log.info("Query execution id: %s",query_execution_id)returnquery_execution_id
[docs]defcheck_query_status(self,query_execution_id:str)->str|None:""" Fetch the status of submitted athena query. Returns None or one of valid query states. .. seealso:: - :external+boto3:py:meth:`Athena.Client.get_query_execution` :param query_execution_id: Id of submitted athena query """response=self.get_conn().get_query_execution(QueryExecutionId=query_execution_id)state=Nonetry:state=response["QueryExecution"]["Status"]["State"]exceptException:self.log.exception("Exception while getting query state. Query execution id: %s",query_execution_id)finally:# The error is being absorbed here and is being handled by the caller.# The error is being absorbed to implement retries.returnstate
[docs]defget_state_change_reason(self,query_execution_id:str)->str|None:""" Fetch the reason for a state change (e.g. error message). Returns None or reason string. .. seealso:: - :external+boto3:py:meth:`Athena.Client.get_query_execution` :param query_execution_id: Id of submitted athena query """response=self.get_conn().get_query_execution(QueryExecutionId=query_execution_id)reason=Nonetry:reason=response["QueryExecution"]["Status"]["StateChangeReason"]exceptException:self.log.exception("Exception while getting query state change reason. Query execution id: %s",query_execution_id,)finally:# The error is being absorbed here and is being handled by the caller.# The error is being absorbed to implement retries.returnreason
[docs]defget_query_results(self,query_execution_id:str,next_token_id:str|None=None,max_results:int=1000)->dict|None:""" Fetch submitted athena query results. returns none if query is in intermediate state or failed/cancelled state else dict of query output .. seealso:: - :external+boto3:py:meth:`Athena.Client.get_query_results` :param query_execution_id: Id of submitted athena query :param next_token_id: The token that specifies where to start pagination. :param max_results: The maximum number of results (rows) to return in this request. """query_state=self.check_query_status(query_execution_id)ifquery_stateisNone:self.log.error("Invalid Query state. Query execution id: %s",query_execution_id)returnNoneelifquery_stateinself.INTERMEDIATE_STATESorquery_stateinself.FAILURE_STATES:self.log.error('Query is in "%s" state. Cannot fetch results. Query execution id: %s',query_state,query_execution_id,)returnNoneresult_params={"QueryExecutionId":query_execution_id,"MaxResults":max_results}ifnext_token_id:result_params["NextToken"]=next_token_idreturnself.get_conn().get_query_results(**result_params)
[docs]defget_query_results_paginator(self,query_execution_id:str,max_items:int|None=None,page_size:int|None=None,starting_token:str|None=None,)->PageIterator|None:""" Fetch submitted athena query results. returns none if query is in intermediate state or failed/cancelled state else a paginator to iterate through pages of results. If you wish to get all results at once, call build_full_result() on the returned PageIterator .. seealso:: - :external+boto3:py:class:`Athena.Paginator.GetQueryResults` :param query_execution_id: Id of submitted athena query :param max_items: The total number of items to return. :param page_size: The size of each page. :param starting_token: A token to specify where to start paginating. """query_state=self.check_query_status(query_execution_id)ifquery_stateisNone:self.log.error("Invalid Query state (null). Query execution id: %s",query_execution_id)returnNoneifquery_stateinself.INTERMEDIATE_STATESorquery_stateinself.FAILURE_STATES:self.log.error('Query is in "%s" state. Cannot fetch results, Query execution id: %s',query_state,query_execution_id,)returnNoneresult_params={"QueryExecutionId":query_execution_id,"PaginationConfig":{"MaxItems":max_items,"PageSize":page_size,"StartingToken":starting_token,},}paginator=self.get_conn().get_paginator("get_query_results")returnpaginator.paginate(**result_params)
[docs]defpoll_query_status(self,query_execution_id:str,max_polling_attempts:int|None=None,)->str|None:""" Poll the status of submitted athena query until query state reaches final state. Returns one of the final states :param query_execution_id: Id of submitted athena query :param max_polling_attempts: Number of times to poll for query state before function exits """try_number=1final_query_state=None# Query state when query reaches final state or max_polling_attempts reachedwhileTrue:query_state=self.check_query_status(query_execution_id)ifquery_stateisNone:self.log.info("Query execution id: %s, trial %s: Invalid query state. Retrying again",query_execution_id,try_number,)elifquery_stateinself.TERMINAL_STATES:self.log.info("Query execution id: %s, trial %s: Query execution completed. Final state is %s",query_execution_id,try_number,query_state,)final_query_state=query_statebreakelse:self.log.info("Query execution id: %s, trial %s: Query is still in non-terminal state - %s",query_execution_id,try_number,query_state,)if(max_polling_attemptsandtry_number>=max_polling_attempts):# Break loop if max_polling_attempts reachedfinal_query_state=query_statebreaktry_number+=1sleep(self.sleep_time)returnfinal_query_state
[docs]defget_output_location(self,query_execution_id:str)->str:""" Function to get the output location of the query results in s3 uri format. .. seealso:: - :external+boto3:py:meth:`Athena.Client.get_query_execution` :param query_execution_id: Id of submitted athena query """output_location=Noneifquery_execution_id:response=self.get_conn().get_query_execution(QueryExecutionId=query_execution_id)ifresponse:try:output_location=response["QueryExecution"]["ResultConfiguration"]["OutputLocation"]exceptKeyError:self.log.error("Error retrieving OutputLocation. Query execution id: %s",query_execution_id)raiseelse:raiseelse:raiseValueError("Invalid Query execution id. Query execution id: %s",query_execution_id)returnoutput_location
[docs]defstop_query(self,query_execution_id:str)->dict:""" Cancel the submitted athena query .. seealso:: - :external+boto3:py:meth:`Athena.Client.stop_query_execution` :param query_execution_id: Id of submitted athena query """self.log.info("Stopping Query with executionId - %s",query_execution_id)returnself.get_conn().stop_query_execution(QueryExecutionId=query_execution_id)