Source code for airflow.providers.amazon.aws.hooks.redshift_data

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import time
from pprint import pformat
from typing import TYPE_CHECKING, Any, Iterable

from airflow.providers.amazon.aws.hooks.base_aws import AwsGenericHook
from airflow.providers.amazon.aws.utils import trim_none_values

if TYPE_CHECKING:
    from mypy_boto3_redshift_data import RedshiftDataAPIServiceClient  # noqa: F401
    from mypy_boto3_redshift_data.type_defs import DescribeStatementResponseTypeDef

[docs]FINISHED_STATE = "FINISHED"
[docs]FAILED_STATE = "FAILED"
[docs]ABORTED_STATE = "ABORTED"
[docs]FAILURE_STATES = {FAILED_STATE, ABORTED_STATE}
[docs]RUNNING_STATES = {"PICKED", "STARTED", "SUBMITTED"}
[docs]class RedshiftDataQueryFailedError(ValueError): """Raise an error that redshift data query failed."""
[docs]class RedshiftDataQueryAbortedError(ValueError): """Raise an error that redshift data query was aborted."""
[docs]class RedshiftDataHook(AwsGenericHook["RedshiftDataAPIServiceClient"]): """ Interact with Amazon Redshift Data API. Provide thin wrapper around :external+boto3:py:class:`boto3.client("redshift-data") <RedshiftDataAPIService.Client>`. Additional arguments (such as ``aws_conn_id``) may be specified and are passed down to the underlying AwsBaseHook. .. seealso:: - :class:`airflow.providers.amazon.aws.hooks.base_aws.AwsBaseHook` - `Amazon Redshift Data API \ <https://docs.aws.amazon.com/redshift-data/latest/APIReference/Welcome.html>`__ """ def __init__(self, *args, **kwargs) -> None: kwargs["client_type"] = "redshift-data" super().__init__(*args, **kwargs)
[docs] def execute_query( self, database: str, sql: str | list[str], cluster_identifier: str | None = None, db_user: str | None = None, parameters: Iterable | None = None, secret_arn: str | None = None, statement_name: str | None = None, with_event: bool = False, wait_for_completion: bool = True, poll_interval: int = 10, workgroup_name: str | None = None, ) -> str: """ Execute a statement against Amazon Redshift. :param database: the name of the database :param sql: the SQL statement or list of SQL statement to run :param cluster_identifier: unique identifier of a cluster :param db_user: the database username :param parameters: the parameters for the SQL statement :param secret_arn: the name or ARN of the secret that enables db access :param statement_name: the name of the SQL statement :param with_event: indicates whether to send an event to EventBridge :param wait_for_completion: indicates whether to wait for a result, if True wait, if False don't wait :param poll_interval: how often in seconds to check the query status :param workgroup_name: name of the Redshift Serverless workgroup. Mutually exclusive with `cluster_identifier`. Specify this parameter to query Redshift Serverless. More info https://docs.aws.amazon.com/redshift/latest/mgmt/working-with-serverless.html :returns statement_id: str, the UUID of the statement """ kwargs: dict[str, Any] = { "ClusterIdentifier": cluster_identifier, "Database": database, "DbUser": db_user, "Parameters": parameters, "WithEvent": with_event, "SecretArn": secret_arn, "StatementName": statement_name, "WorkgroupName": workgroup_name, } if isinstance(sql, list): kwargs["Sqls"] = sql resp = self.conn.batch_execute_statement(**trim_none_values(kwargs)) else: kwargs["Sql"] = sql resp = self.conn.execute_statement(**trim_none_values(kwargs)) statement_id = resp["Id"] if bool(cluster_identifier) is bool(workgroup_name): raise ValueError("Either 'cluster_identifier' or 'workgroup_name' must be specified.") if wait_for_completion: self.wait_for_results(statement_id, poll_interval=poll_interval) return statement_id
[docs] def wait_for_results(self, statement_id: str, poll_interval: int) -> str: while True: self.log.info("Polling statement %s", statement_id) is_finished = self.check_query_is_finished(statement_id) if is_finished: return FINISHED_STATE time.sleep(poll_interval)
[docs] def check_query_is_finished(self, statement_id: str) -> bool: """Check whether query finished, raise exception is failed.""" resp = self.conn.describe_statement(Id=statement_id) return self.parse_statement_resposne(resp)
[docs] def parse_statement_resposne(self, resp: DescribeStatementResponseTypeDef) -> bool: """Parse the response of describe_statement.""" status = resp["Status"] if status == FINISHED_STATE: num_rows = resp.get("ResultRows") if num_rows is not None: self.log.info("Processed %s rows", num_rows) return True elif status in FAILURE_STATES: exception_cls = ( RedshiftDataQueryFailedError if status == FAILED_STATE else RedshiftDataQueryAbortedError ) raise exception_cls( f"Statement {resp['Id']} terminated with status {status}. " f"Response details: {pformat(resp)}" ) self.log.info("Query status: %s", status) return False
[docs] def get_table_primary_key( self, table: str, database: str, schema: str | None = "public", cluster_identifier: str | None = None, workgroup_name: str | None = None, db_user: str | None = None, secret_arn: str | None = None, statement_name: str | None = None, with_event: bool = False, wait_for_completion: bool = True, poll_interval: int = 10, ) -> list[str] | None: """ Return the table primary key. Copied from ``RedshiftSQLHook.get_table_primary_key()`` :param table: Name of the target table :param database: the name of the database :param schema: Name of the target schema, public by default :param sql: the SQL statement or list of SQL statement to run :param cluster_identifier: unique identifier of a cluster :param db_user: the database username :param secret_arn: the name or ARN of the secret that enables db access :param statement_name: the name of the SQL statement :param with_event: indicates whether to send an event to EventBridge :param wait_for_completion: indicates whether to wait for a result, if True wait, if False don't wait :param poll_interval: how often in seconds to check the query status :return: Primary key columns list """ sql = f""" select kcu.column_name from information_schema.table_constraints tco join information_schema.key_column_usage kcu on kcu.constraint_name = tco.constraint_name and kcu.constraint_schema = tco.constraint_schema and kcu.constraint_name = tco.constraint_name where tco.constraint_type = 'PRIMARY KEY' and kcu.table_schema = {schema} and kcu.table_name = {table} """ stmt_id = self.execute_query( sql=sql, database=database, cluster_identifier=cluster_identifier, workgroup_name=workgroup_name, db_user=db_user, secret_arn=secret_arn, statement_name=statement_name, with_event=with_event, wait_for_completion=wait_for_completion, poll_interval=poll_interval, ) pk_columns = [] token = "" while True: kwargs = {"Id": stmt_id} if token: kwargs["NextToken"] = token response = self.conn.get_statement_result(**kwargs) # we only select a single column (that is a string), # so safe to assume that there is only a single col in the record pk_columns += [y["stringValue"] for x in response["Records"] for y in x] if "NextToken" in response: token = response["NextToken"] else: break return pk_columns or None
[docs] async def is_still_running(self, statement_id: str) -> bool: """Async function to check whether the query is still running. :param statement_id: the UUID of the statement """ async with self.async_conn as client: desc = await client.describe_statement(Id=statement_id) return desc["Status"] in RUNNING_STATES
[docs] async def check_query_is_finished_async(self, statement_id: str) -> bool: """Async function to check statement is finished. It takes statement_id, makes async connection to redshift data to get the query status by statement_id and returns the query status. :param statement_id: the UUID of the statement """ async with self.async_conn as client: resp = await client.describe_statement(Id=statement_id) return self.parse_statement_resposne(resp)

Was this entry helpful?