Source code for airflow.providers.amazon.aws.transfers.s3_to_redshift
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromtypingimportList,Optional,Unionfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.s3importS3Hookfromairflow.providers.amazon.aws.utils.redshiftimportbuild_credentials_blockfromairflow.providers.postgres.hooks.postgresimportPostgresHook
[docs]classS3ToRedshiftOperator(BaseOperator):""" Executes an COPY command to load files from s3 to Redshift .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:S3ToRedshiftOperator` :param schema: reference to a specific schema in redshift database :type schema: str :param table: reference to a specific table in redshift database :type table: str :param s3_bucket: reference to a specific S3 bucket :type s3_bucket: str :param s3_key: reference to a specific S3 key :type s3_key: str :param redshift_conn_id: reference to a specific redshift database :type redshift_conn_id: str :param aws_conn_id: reference to a specific S3 connection If the AWS connection contains 'aws_iam_role' in ``extras`` the operator will use AWS STS credentials with a token https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html#copy-credentials :type aws_conn_id: str :param verify: Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values: - ``False``: do not validate SSL certificates. SSL will still be used (unless use_ssl is False), but SSL certificates will not be verified. - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore. :type verify: bool or str :param column_list: list of column names to load :type column_list: List[str] :param copy_options: reference to a list of COPY options :type copy_options: list :param truncate_table: whether or not to truncate the destination table before the copy :type truncate_table: bool """
def__init__(self,*,schema:str,table:str,s3_bucket:str,s3_key:str,redshift_conn_id:str='redshift_default',aws_conn_id:str='aws_default',verify:Optional[Union[bool,str]]=None,column_list:Optional[List[str]]=None,copy_options:Optional[List]=None,autocommit:bool=False,truncate_table:bool=False,**kwargs,)->None:super().__init__(**kwargs)self.schema=schemaself.table=tableself.s3_bucket=s3_bucketself.s3_key=s3_keyself.redshift_conn_id=redshift_conn_idself.aws_conn_id=aws_conn_idself.verify=verifyself.column_list=column_listself.copy_options=copy_optionsor[]self.autocommit=autocommitself.truncate_table=truncate_tabledef_build_copy_query(self,credentials_block:str,copy_options:str)->str:column_names="("+", ".join(self.column_list)+")"ifself.column_listelse''returnf""" COPY {self.schema}.{self.table}{column_names} FROM 's3://{self.s3_bucket}/{self.s3_key}' with credentials '{credentials_block}'{copy_options}; """
[docs]defexecute(self,context)->None:postgres_hook=PostgresHook(postgres_conn_id=self.redshift_conn_id)s3_hook=S3Hook(aws_conn_id=self.aws_conn_id,verify=self.verify)credentials=s3_hook.get_credentials()credentials_block=build_credentials_block(credentials)copy_options='\n\t\t\t'.join(self.copy_options)copy_statement=self._build_copy_query(credentials_block,copy_options)ifself.truncate_table:delete_statement=f'DELETE FROM {self.schema}.{self.table};'sql=f""" BEGIN;{delete_statement}{copy_statement} COMMIT """else:sql=copy_statementself.log.info('Executing COPY command...')postgres_hook.run(sql,self.autocommit)self.log.info("COPY command complete...")