Source code for airflow.providers.amazon.aws.hooks.athena
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains AWS Athena hook... spelling:word-list:: PageIterator"""from__future__importannotationsfromtimeimportsleepfromtypingimportAnyfrombotocore.paginateimportPageIteratorfromairflow.providers.amazon.aws.hooks.base_awsimportAwsBaseHook
[docs]classAthenaHook(AwsBaseHook):"""Interact with Amazon Athena. Provide thick wrapper around :external+boto3:py:class:`boto3.client("athena") <Athena.Client>`. :param sleep_time: Time (in seconds) to wait between two consecutive calls to check query status on Athena. :param log_query: Whether to log athena query and other execution params when it's executed. Defaults to *True*. Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` """
[docs]defrun_query(self,query:str,query_context:dict[str,str],result_configuration:dict[str,Any],client_request_token:str|None=None,workgroup:str="primary",)->str:"""Run a Presto query on Athena with provided config. .. seealso:: - :external+boto3:py:meth:`Athena.Client.start_query_execution` :param query: Presto query to run. :param query_context: Context in which query need to be run. :param result_configuration: Dict with path to store results in and config related to encryption. :param client_request_token: Unique token created by user to avoid multiple executions of same query. :param workgroup: Athena workgroup name, when not specified, will be ``'primary'``. :return: Submitted query execution ID. """params={"QueryString":query,"QueryExecutionContext":query_context,"ResultConfiguration":result_configuration,"WorkGroup":workgroup,}ifclient_request_token:params["ClientRequestToken"]=client_request_tokenifself.log_query:self.log.info("Running Query with params: %s",params)response=self.get_conn().start_query_execution(**params)query_execution_id=response["QueryExecutionId"]self.log.info("Query execution id: %s",query_execution_id)returnquery_execution_id
[docs]defcheck_query_status(self,query_execution_id:str)->str|None:"""Fetch the state of a submitted query. .. seealso:: - :external+boto3:py:meth:`Athena.Client.get_query_execution` :param query_execution_id: Id of submitted athena query :return: One of valid query states, or *None* if the response is malformed. """response=self.get_conn().get_query_execution(QueryExecutionId=query_execution_id)state=Nonetry:state=response["QueryExecution"]["Status"]["State"]exceptException:self.log.exception("Exception while getting query state. Query execution id: %s",query_execution_id)finally:# The error is being absorbed here and is being handled by the caller.# The error is being absorbed to implement retries.returnstate
[docs]defget_state_change_reason(self,query_execution_id:str)->str|None:""" Fetch the reason for a state change (e.g. error message). Returns None or reason string. .. seealso:: - :external+boto3:py:meth:`Athena.Client.get_query_execution` :param query_execution_id: Id of submitted athena query """response=self.get_conn().get_query_execution(QueryExecutionId=query_execution_id)reason=Nonetry:reason=response["QueryExecution"]["Status"]["StateChangeReason"]exceptException:self.log.exception("Exception while getting query state change reason. Query execution id: %s",query_execution_id,)finally:# The error is being absorbed here and is being handled by the caller.# The error is being absorbed to implement retries.returnreason
[docs]defget_query_results(self,query_execution_id:str,next_token_id:str|None=None,max_results:int=1000)->dict|None:"""Fetch submitted query results. .. seealso:: - :external+boto3:py:meth:`Athena.Client.get_query_results` :param query_execution_id: Id of submitted athena query :param next_token_id: The token that specifies where to start pagination. :param max_results: The maximum number of results (rows) to return in this request. :return: *None* if the query is in intermediate, failed, or cancelled state. Otherwise a dict of query outputs. """query_state=self.check_query_status(query_execution_id)ifquery_stateisNone:self.log.error("Invalid Query state. Query execution id: %s",query_execution_id)returnNoneelifquery_stateinself.INTERMEDIATE_STATESorquery_stateinself.FAILURE_STATES:self.log.error('Query is in "%s" state. Cannot fetch results. Query execution id: %s',query_state,query_execution_id,)returnNoneresult_params={"QueryExecutionId":query_execution_id,"MaxResults":max_results}ifnext_token_id:result_params["NextToken"]=next_token_idreturnself.get_conn().get_query_results(**result_params)
[docs]defget_query_results_paginator(self,query_execution_id:str,max_items:int|None=None,page_size:int|None=None,starting_token:str|None=None,)->PageIterator|None:"""Fetch submitted Athena query results. .. seealso:: - :external+boto3:py:class:`Athena.Paginator.GetQueryResults` :param query_execution_id: Id of submitted athena query :param max_items: The total number of items to return. :param page_size: The size of each page. :param starting_token: A token to specify where to start paginating. :return: *None* if the query is in intermediate, failed, or cancelled state. Otherwise a paginator to iterate through pages of results. Call :meth`.build_full_result()` on the returned paginator to get all results at once. """query_state=self.check_query_status(query_execution_id)ifquery_stateisNone:self.log.error("Invalid Query state (null). Query execution id: %s",query_execution_id)returnNoneifquery_stateinself.INTERMEDIATE_STATESorquery_stateinself.FAILURE_STATES:self.log.error('Query is in "%s" state. Cannot fetch results, Query execution id: %s',query_state,query_execution_id,)returnNoneresult_params={"QueryExecutionId":query_execution_id,"PaginationConfig":{"MaxItems":max_items,"PageSize":page_size,"StartingToken":starting_token,},}paginator=self.get_conn().get_paginator("get_query_results")returnpaginator.paginate(**result_params)
[docs]defpoll_query_status(self,query_execution_id:str,max_polling_attempts:int|None=None,)->str|None:"""Poll the state of a submitted query until it reaches final state. :param query_execution_id: ID of submitted athena query :param max_polling_attempts: Number of times to poll for query state before function exits :return: One of the final states """try_number=1final_query_state=None# Query state when query reaches final state or max_polling_attempts reachedwhileTrue:query_state=self.check_query_status(query_execution_id)ifquery_stateisNone:self.log.info("Query execution id: %s, trial %s: Invalid query state. Retrying again",query_execution_id,try_number,)elifquery_stateinself.TERMINAL_STATES:self.log.info("Query execution id: %s, trial %s: Query execution completed. Final state is %s",query_execution_id,try_number,query_state,)final_query_state=query_statebreakelse:self.log.info("Query execution id: %s, trial %s: Query is still in non-terminal state - %s",query_execution_id,try_number,query_state,)if(max_polling_attemptsandtry_number>=max_polling_attempts):# Break loop if max_polling_attempts reachedfinal_query_state=query_statebreaktry_number+=1sleep(self.sleep_time)returnfinal_query_state
[docs]defget_output_location(self,query_execution_id:str)->str:"""Get the output location of the query results in S3 URI format. .. seealso:: - :external+boto3:py:meth:`Athena.Client.get_query_execution` :param query_execution_id: Id of submitted athena query """output_location=Noneifquery_execution_id:response=self.get_conn().get_query_execution(QueryExecutionId=query_execution_id)ifresponse:try:output_location=response["QueryExecution"]["ResultConfiguration"]["OutputLocation"]exceptKeyError:self.log.error("Error retrieving OutputLocation. Query execution id: %s",query_execution_id)raiseelse:raiseelse:raiseValueError("Invalid Query execution id. Query execution id: %s",query_execution_id)returnoutput_location
[docs]defstop_query(self,query_execution_id:str)->dict:"""Cancel the submitted query. .. seealso:: - :external+boto3:py:meth:`Athena.Client.stop_query_execution` :param query_execution_id: Id of submitted athena query """self.log.info("Stopping Query with executionId - %s",query_execution_id)returnself.get_conn().stop_query_execution(QueryExecutionId=query_execution_id)