Source code for airflow.providers.amazon.aws.transfers.s3_to_redshift
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromcollections.abcimportIterable,SequencefromtypingimportTYPE_CHECKINGfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.redshift_dataimportRedshiftDataHookfromairflow.providers.amazon.aws.hooks.redshift_sqlimportRedshiftSQLHookfromairflow.providers.amazon.aws.hooks.s3importS3Hookfromairflow.providers.amazon.aws.utils.redshiftimportbuild_credentials_blockifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classS3ToRedshiftOperator(BaseOperator):""" Executes an COPY command to load files from s3 to Redshift. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:S3ToRedshiftOperator` :param table: reference to a specific table in redshift database :param s3_bucket: reference to a specific S3 bucket :param s3_key: key prefix that selects single or multiple objects from S3 :param schema: reference to a specific schema in redshift database. Do not provide when copying into a temporary table :param redshift_conn_id: reference to a specific redshift database OR a redshift data-api connection :param aws_conn_id: reference to a specific S3 connection If the AWS connection contains 'aws_iam_role' in ``extras`` the operator will use AWS STS credentials with a token https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html#copy-credentials :param verify: Whether to verify SSL certificates for S3 connection. By default, SSL certificates are verified. You can provide the following values: - ``False``: do not validate SSL certificates. SSL will still be used (unless use_ssl is False), but SSL certificates will not be verified. - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore. :param column_list: list of column names to load source data fields into specific target columns https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-column-mapping.html#copy-column-list :param copy_options: reference to a list of COPY options :param method: Action to be performed on execution. Available ``APPEND``, ``UPSERT`` and ``REPLACE``. :param upsert_keys: List of fields to use as key on upsert action :param redshift_data_api_kwargs: If using the Redshift Data API instead of the SQL-based connection, dict of arguments for the hook's ``execute_query`` method. Cannot include any of these kwargs: ``{'sql', 'parameters'}`` """
def__init__(self,*,table:str,s3_bucket:str,s3_key:str,schema:str|None=None,redshift_conn_id:str="redshift_default",aws_conn_id:str|None="aws_default",verify:bool|str|None=None,column_list:list[str]|None=None,copy_options:list|None=None,autocommit:bool=False,method:str="APPEND",upsert_keys:list[str]|None=None,redshift_data_api_kwargs:dict|None=None,**kwargs,)->None:super().__init__(**kwargs)self.schema=schemaself.table=tableself.s3_bucket=s3_bucketself.s3_key=s3_keyself.redshift_conn_id=redshift_conn_idself.aws_conn_id=aws_conn_idself.verify=verifyself.column_list=column_listself.copy_options=copy_optionsor[]self.autocommit=autocommitself.method=methodself.upsert_keys=upsert_keysself.redshift_data_api_kwargs=redshift_data_api_kwargsor{}ifself.redshift_data_api_kwargs:forargin["sql","parameters"]:ifarginself.redshift_data_api_kwargs:raiseAirflowException(f"Cannot include param '{arg}' in Redshift Data API kwargs")@property
def_build_copy_query(self,copy_destination:str,credentials_block:str,region_info:str,copy_options:str)->str:column_names="("+", ".join(self.column_list)+")"ifself.column_listelse""returnf""" COPY {copy_destination}{column_names} FROM 's3://{self.s3_bucket}/{self.s3_key}' credentials '{credentials_block}'{region_info}{copy_options}; """
[docs]defexecute(self,context:Context)->None:ifself.methodnotinAVAILABLE_METHODS:raiseAirflowException(f"Method not found! Available methods: {AVAILABLE_METHODS}")ifself.use_redshift_data:redshift_data_hook=RedshiftDataHook(aws_conn_id=self.redshift_conn_id)else:redshift_sql_hook=RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)conn=S3Hook.get_connection(conn_id=self.aws_conn_id)ifself.aws_conn_idelseNoneregion_info=""ifconnandconn.extra_dejson.get("region",False):region_info=f"region '{conn.extra_dejson['region']}'"ifconnandconn.extra_dejson.get("role_arn",False):credentials_block=f"aws_iam_role={conn.extra_dejson['role_arn']}"else:s3_hook=S3Hook(aws_conn_id=self.aws_conn_id,verify=self.verify)credentials=s3_hook.get_credentials()credentials_block=build_credentials_block(credentials)copy_options="\n\t\t\t".join(self.copy_options)destination=f"{self.schema}.{self.table}"ifself.schemaelseself.tablecopy_destination=f"#{self.table}"ifself.method=="UPSERT"elsedestinationcopy_statement=self._build_copy_query(copy_destination,credentials_block,region_info,copy_options)sql:str|Iterable[str]ifself.method=="REPLACE":sql=["BEGIN;",f"DELETE FROM {destination};",copy_statement,"COMMIT"]elifself.method=="UPSERT":ifself.use_redshift_data:keys=self.upsert_keysorredshift_data_hook.get_table_primary_key(table=self.table,schema=self.schema,**self.redshift_data_api_kwargs)else:keys=self.upsert_keysorredshift_sql_hook.get_table_primary_key(self.table,self.schema)ifnotkeys:raiseAirflowException(f"No primary key on {self.schema}.{self.table}. Please provide keys on 'upsert_keys'")where_statement=" AND ".join([f"{self.table}.{k} = {copy_destination}.{k}"forkinkeys])sql=[f"CREATE TABLE {copy_destination} (LIKE {destination} INCLUDING DEFAULTS);",copy_statement,"BEGIN;",f"DELETE FROM {destination} USING {copy_destination} WHERE {where_statement};",f"INSERT INTO {destination} SELECT * FROM {copy_destination};","COMMIT",]else:sql=copy_statementself.log.info("Executing COPY command...")ifself.use_redshift_data:redshift_data_hook.execute_query(sql=sql,**self.redshift_data_api_kwargs)else:redshift_sql_hook.run(sql,autocommit=self.autocommit)self.log.info("COPY command complete...")
[docs]defget_openlineage_facets_on_complete(self,task_instance):"""Implement on_complete as we will query destination table."""fromairflow.providers.amazon.aws.utils.openlineageimport(get_facets_from_redshift_table,)fromairflow.providers.common.compat.openlineage.facetimport(Dataset,LifecycleStateChange,LifecycleStateChangeDatasetFacet,)fromairflow.providers.openlineage.extractorsimportOperatorLineageifself.use_redshift_data:redshift_data_hook=RedshiftDataHook(aws_conn_id=self.redshift_conn_id)database=self.redshift_data_api_kwargs.get("database")identifier=self.redshift_data_api_kwargs.get("cluster_identifier",self.redshift_data_api_kwargs.get("workgroup_name"))port=self.redshift_data_api_kwargs.get("port","5439")authority=f"{identifier}.{redshift_data_hook.region_name}:{port}"output_dataset_facets=get_facets_from_redshift_table(redshift_data_hook,self.table,self.redshift_data_api_kwargs,self.schema)else:redshift_sql_hook=RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)database=redshift_sql_hook.conn.schemaauthority=redshift_sql_hook.get_openlineage_database_info(redshift_sql_hook.conn).authorityoutput_dataset_facets=get_facets_from_redshift_table(redshift_sql_hook,self.table,{},self.schema)ifself.method=="REPLACE":output_dataset_facets["lifecycleStateChange"]=LifecycleStateChangeDatasetFacet(lifecycleStateChange=LifecycleStateChange.OVERWRITE)output_dataset=Dataset(namespace=f"redshift://{authority}",name=f"{database}.{self.schema}.{self.table}"ifdatabaseelsef"{self.schema}.{self.table}",facets=output_dataset_facets,)input_dataset=Dataset(namespace=f"s3://{self.s3_bucket}",name=self.s3_key,)returnOperatorLineage(inputs=[input_dataset],outputs=[output_dataset])