Source code for airflow.providers.amazon.aws.operators.redshift_data
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importsysfromtimeimportsleepfromtypingimportTYPE_CHECKING,Any,Dict,Optionalifsys.version_info>=(3,8):fromfunctoolsimportcached_propertyelse:fromcached_propertyimportcached_propertyfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.redshift_dataimportRedshiftDataHookifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classRedshiftDataOperator(BaseOperator):""" Executes SQL Statements against an Amazon Redshift cluster using Redshift Data .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftDataOperator` :param database: the name of the database :param sql: the SQL statement text to run :param cluster_identifier: unique identifier of a cluster :param db_user: the database username :param parameters: the parameters for the SQL statement :param secret_arn: the name or ARN of the secret that enables db access :param statement_name: the name of the SQL statement :param with_event: indicates whether to send an event to EventBridge :param await_result: indicates whether to wait for a result, if True wait, if False don't wait :param poll_interval: how often in seconds to check the query status :param aws_conn_id: aws connection to use :param region: aws region to use """
[docs]defhook(self)->RedshiftDataHook:"""Create and return an RedshiftDataHook."""returnRedshiftDataHook(aws_conn_id=self.aws_conn_id,region_name=self.region)
[docs]defwait_for_results(self,statement_id):whileTrue:self.log.info("Polling statement %s",statement_id)resp=self.hook.conn.describe_statement(Id=statement_id,)status=resp['Status']ifstatus=='FINISHED':returnstatuselifstatus=='FAILED'orstatus=='ABORTED':raiseValueError(f"Statement {statement_id!r} terminated with status {status}.")else:self.log.info(f"Query {status}")sleep(self.poll_interval)
[docs]defexecute(self,context:'Context')->None:"""Execute a statement against Amazon Redshift"""self.log.info(f"Executing statement: {self.sql}")self.statement_id=self.execute_query()ifself.await_result:self.wait_for_results(self.statement_id)returnself.statement_id
[docs]defon_kill(self)->None:"""Cancel the submitted redshift query"""ifself.statement_id:self.log.info('Received a kill signal.')self.log.info('Stopping Query with statementId - %s',self.statement_id)try:self.hook.conn.cancel_statement(Id=self.statement_id)exceptExceptionasex:self.log.error('Unable to cancel query. Exiting. %s',ex)