Source code for airflow.providers.amazon.aws.transfers.redshift_to_s3
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Transfers data from AWS Redshift into a S3 Bucket."""from__future__importannotationsfromtypingimportTYPE_CHECKING,Iterable,Mapping,Sequencefromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.redshift_sqlimportRedshiftSQLHookfromairflow.providers.amazon.aws.hooks.s3importS3Hookfromairflow.providers.amazon.aws.utils.redshiftimportbuild_credentials_blockifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classRedshiftToS3Operator(BaseOperator):""" Execute an UNLOAD command to s3 as a CSV with headers. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftToS3Operator` :param s3_bucket: reference to a specific S3 bucket :param s3_key: reference to a specific S3 key. If ``table_as_file_name`` is set to False, this param must include the desired file name :param schema: reference to a specific schema in redshift database Applicable when ``table`` param provided. :param table: reference to a specific table in redshift database Used when ``select_query`` param not provided. :param select_query: custom select query to fetch data from redshift database :param redshift_conn_id: reference to a specific redshift database :param aws_conn_id: reference to a specific S3 connection If the AWS connection contains 'aws_iam_role' in ``extras`` the operator will use AWS STS credentials with a token https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html#copy-credentials :param verify: Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values: - ``False``: do not validate SSL certificates. SSL will still be used (unless use_ssl is False), but SSL certificates will not be verified. - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore. :param unload_options: reference to a list of UNLOAD options :param autocommit: If set to True it will automatically commit the UNLOAD statement. Otherwise it will be committed right before the redshift connection gets closed. :param include_header: If set to True the s3 file contains the header columns. :param parameters: (optional) the parameters to render the SQL query with. :param table_as_file_name: If set to True, the s3 file will be named as the table. Applicable when ``table`` param provided. """
def__init__(self,*,s3_bucket:str,s3_key:str,schema:str|None=None,table:str|None=None,select_query:str|None=None,redshift_conn_id:str="redshift_default",aws_conn_id:str="aws_default",verify:bool|str|None=None,unload_options:list|None=None,autocommit:bool=False,include_header:bool=False,parameters:Iterable|Mapping|None=None,table_as_file_name:bool=True,# Set to True by default for not breaking current workflows**kwargs,)->None:super().__init__(**kwargs)self.s3_bucket=s3_bucketself.s3_key=f"{s3_key}/{table}_"if(tableandtable_as_file_name)elses3_keyself.schema=schemaself.table=tableself.redshift_conn_id=redshift_conn_idself.aws_conn_id=aws_conn_idself.verify=verifyself.unload_options:list=unload_optionsor[]self.autocommit=autocommitself.include_header=include_headerself.parameters=parametersself.table_as_file_name=table_as_file_nameifselect_query:self.select_query=select_queryelifself.schemaandself.table:self.select_query=f"SELECT * FROM {self.schema}.{self.table}"else:raiseValueError("Please provide both `schema` and `table` params or `select_query` to fetch the data.")ifself.include_headerand"HEADER"notin[uo.upper().strip()foruoinself.unload_options]:self.unload_options=list(self.unload_options)+["HEADER",]def_build_unload_query(self,credentials_block:str,select_query:str,s3_key:str,unload_options:str)->str:returnf""" UNLOAD ('{select_query}') TO 's3://{self.s3_bucket}/{s3_key}' credentials '{credentials_block}'{unload_options}; """