Source code for airflow.providers.amazon.aws.operators.redshift_data
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKINGfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.redshift_dataimportRedshiftDataHookifTYPE_CHECKING:frommypy_boto3_redshift_data.type_defsimportGetStatementResultResponseTypeDeffromairflow.utils.contextimportContext
[docs]classRedshiftDataOperator(BaseOperator):""" Executes SQL Statements against an Amazon Redshift cluster using Redshift Data. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftDataOperator` :param database: the name of the database :param sql: the SQL statement or list of SQL statement to run :param cluster_identifier: unique identifier of a cluster :param db_user: the database username :param parameters: the parameters for the SQL statement :param secret_arn: the name or ARN of the secret that enables db access :param statement_name: the name of the SQL statement :param with_event: indicates whether to send an event to EventBridge :param wait_for_completion: indicates whether to wait for a result, if True wait, if False don't wait :param poll_interval: how often in seconds to check the query status :param return_sql_result: if True will return the result of an SQL statement, if False (default) will return statement ID :param aws_conn_id: aws connection to use :param region: aws region to use :param workgroup_name: name of the Redshift Serverless workgroup. Mutually exclusive with `cluster_identifier`. Specify this parameter to query Redshift Serverless. More info https://docs.aws.amazon.com/redshift/latest/mgmt/working-with-serverless.html """
[docs]defhook(self)->RedshiftDataHook:"""Create and return an RedshiftDataHook."""returnRedshiftDataHook(aws_conn_id=self.aws_conn_id,region_name=self.region)
[docs]defexecute(self,context:Context)->GetStatementResultResponseTypeDef|str:"""Execute a statement against Amazon Redshift."""self.log.info("Executing statement: %s",self.sql)self.statement_id=self.hook.execute_query(database=self.database,sql=self.sql,cluster_identifier=self.cluster_identifier,workgroup_name=self.workgroup_name,db_user=self.db_user,parameters=self.parameters,secret_arn=self.secret_arn,statement_name=self.statement_name,with_event=self.with_event,wait_for_completion=self.wait_for_completion,poll_interval=self.poll_interval,)ifself.return_sql_result:result=self.hook.conn.get_statement_result(Id=self.statement_id)self.log.debug("Statement result: %s",result)returnresultelse:returnself.statement_id
[docs]defon_kill(self)->None:"""Cancel the submitted redshift query."""ifself.statement_id:self.log.info("Received a kill signal.")self.log.info("Stopping Query with statementId - %s",self.statement_id)try:self.hook.conn.cancel_statement(Id=self.statement_id)exceptExceptionasex:self.log.error("Unable to cancel query. Exiting. %s",ex)