Source code for airflow.providers.amazon.aws.transfers.redshift_to_s3
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Transfers data from AWS Redshift into a S3 Bucket."""from__future__importannotationsimportrefromcollections.abcimportIterable,Mapping,SequencefromtypingimportTYPE_CHECKINGfromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.redshift_dataimportRedshiftDataHookfromairflow.providers.amazon.aws.hooks.redshift_sqlimportRedshiftSQLHookfromairflow.providers.amazon.aws.hooks.s3importS3Hookfromairflow.providers.amazon.aws.utils.redshiftimportbuild_credentials_blockifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classRedshiftToS3Operator(BaseOperator):""" Execute an UNLOAD command to s3 as a CSV with headers. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:RedshiftToS3Operator` :param s3_bucket: reference to a specific S3 bucket :param s3_key: reference to a specific S3 key. If ``table_as_file_name`` is set to False, this param must include the desired file name :param schema: reference to a specific schema in redshift database, used when ``table`` param provided and ``select_query`` param not provided. Do not provide when unloading a temporary table :param table: reference to a specific table in redshift database, used when ``schema`` param provided and ``select_query`` param not provided :param select_query: custom select query to fetch data from redshift database, has precedence over default query `SELECT * FROM ``schema``.``table`` :param redshift_conn_id: reference to a specific redshift database :param aws_conn_id: reference to a specific S3 connection If the AWS connection contains 'aws_iam_role' in ``extras`` the operator will use AWS STS credentials with a token https://docs.aws.amazon.com/redshift/latest/dg/copy-parameters-authorization.html#copy-credentials :param verify: Whether to verify SSL certificates for S3 connection. By default, SSL certificates are verified. You can provide the following values: - ``False``: do not validate SSL certificates. SSL will still be used (unless use_ssl is False), but SSL certificates will not be verified. - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore. :param unload_options: reference to a list of UNLOAD options :param autocommit: If set to True it will automatically commit the UNLOAD statement. Otherwise, it will be committed right before the redshift connection gets closed. :param include_header: If set to True the s3 file contains the header columns. :param parameters: (optional) the parameters to render the SQL query with. :param table_as_file_name: If set to True, the s3 file will be named as the table. Applicable when ``table`` param provided. :param redshift_data_api_kwargs: If using the Redshift Data API instead of the SQL-based connection, dict of arguments for the hook's ``execute_query`` method. Cannot include any of these kwargs: ``{'sql', 'parameters'}`` """
def__init__(self,*,s3_bucket:str,s3_key:str,schema:str|None=None,table:str|None=None,select_query:str|None=None,redshift_conn_id:str="redshift_default",aws_conn_id:str|None="aws_default",verify:bool|str|None=None,unload_options:list|None=None,autocommit:bool=False,include_header:bool=False,parameters:Iterable|Mapping|None=None,table_as_file_name:bool=True,# Set to True by default for not breaking current workflowsredshift_data_api_kwargs:dict|None=None,**kwargs,)->None:super().__init__(**kwargs)
[docs]defdefault_select_query(self)->str|None:ifnotself.table:returnNoneifself.schema:table=f"{self.schema}.{self.table}"else:# Relevant when unloading a temporary tabletable=self.tablereturnf"SELECT * FROM {table}"
[docs]defexecute(self,context:Context)->None:ifself.tableandself.table_as_file_name:self.s3_key=f"{self.s3_key}/{self.table}_"self.select_query=self.select_queryorself.default_select_queryifself.select_queryisNone:raiseValueError("Please specify either a table or `select_query` to fetch the data.")ifself.include_headerand"HEADER"notin[uo.upper().strip()foruoinself.unload_options]:self.unload_options=[*self.unload_options,"HEADER"]ifself.use_redshift_data:redshift_data_hook=RedshiftDataHook(aws_conn_id=self.redshift_conn_id)forargin["sql","parameters"]:ifarginself.redshift_data_api_kwargs:raiseAirflowException(f"Cannot include param '{arg}' in Redshift Data API kwargs")else:redshift_sql_hook=RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)conn=S3Hook.get_connection(conn_id=self.aws_conn_id)ifself.aws_conn_idelseNoneifconnandconn.extra_dejson.get("role_arn",False):credentials_block=f"aws_iam_role={conn.extra_dejson['role_arn']}"else:s3_hook=S3Hook(aws_conn_id=self.aws_conn_id,verify=self.verify)credentials=s3_hook.get_credentials()credentials_block=build_credentials_block(credentials)unload_options="\n\t\t\t".join(self.unload_options)unload_query=self._build_unload_query(credentials_block,self.select_query,self.s3_key,unload_options)self.log.info("Executing UNLOAD command...")ifself.use_redshift_data:redshift_data_hook.execute_query(sql=unload_query,parameters=self.parameters,**self.redshift_data_api_kwargs)else:redshift_sql_hook.run(unload_query,self.autocommit,parameters=self.parameters)self.log.info("UNLOAD command complete...")
[docs]defget_openlineage_facets_on_complete(self,task_instance):"""Implement on_complete as we may query for table details."""fromairflow.providers.amazon.aws.utils.openlineageimport(get_facets_from_redshift_table,get_identity_column_lineage_facet,)fromairflow.providers.common.compat.openlineage.facetimport(Dataset,Error,ExtractionErrorRunFacet,)fromairflow.providers.openlineage.extractorsimportOperatorLineageoutput_dataset=Dataset(namespace=f"s3://{self.s3_bucket}",name=self.s3_key,)ifself.use_redshift_data:redshift_data_hook=RedshiftDataHook(aws_conn_id=self.redshift_conn_id)database=self.redshift_data_api_kwargs.get("database")identifier=self.redshift_data_api_kwargs.get("cluster_identifier",self.redshift_data_api_kwargs.get("workgroup_name"))port=self.redshift_data_api_kwargs.get("port","5439")authority=f"{identifier}.{redshift_data_hook.region_name}:{port}"else:redshift_sql_hook=RedshiftSQLHook(redshift_conn_id=self.redshift_conn_id)database=redshift_sql_hook.conn.schemaauthority=redshift_sql_hook.get_openlineage_database_info(redshift_sql_hook.conn).authorityifself.select_query==self.default_select_query:ifself.use_redshift_data:input_dataset_facets=get_facets_from_redshift_table(redshift_data_hook,self.table,self.redshift_data_api_kwargs,self.schema)else:input_dataset_facets=get_facets_from_redshift_table(redshift_sql_hook,self.table,{},self.schema)input_dataset=Dataset(namespace=f"redshift://{authority}",name=f"{database}.{self.schema}.{self.table}"ifdatabaseelsef"{self.schema}.{self.table}",facets=input_dataset_facets,)# If default select query is used (SELECT *) output file matches the input table.output_dataset.facets={"schema":input_dataset_facets["schema"],"columnLineage":get_identity_column_lineage_facet(field_names=[field.nameforfieldininput_dataset_facets["schema"].fields],input_datasets=[input_dataset],),}returnOperatorLineage(inputs=[input_dataset],outputs=[output_dataset])try:fromairflow.providers.openlineage.sqlparserimportSQLParser,from_table_metaexceptImportError:returnOperatorLineage(outputs=[output_dataset])run_facets={}parse_result=SQLParser(dialect="redshift",default_schema=self.schema).parse(self.select_query)ifparse_result.errors:run_facets["extractionError"]=ExtractionErrorRunFacet(totalTasks=1,failedTasks=1,errors=[Error(errorMessage=error.message,stackTrace=None,task=error.origin_statement,taskNumber=error.index,)forerrorinparse_result.errors],)input_datasets=[]forin_tbinparse_result.in_tables:ds=from_table_meta(in_tb,database,f"redshift://{authority}",False)schema,table=ds.name.split(".")[-2:]ifself.use_redshift_data:input_dataset_facets=get_facets_from_redshift_table(redshift_data_hook,table,self.redshift_data_api_kwargs,schema)else:input_dataset_facets=get_facets_from_redshift_table(redshift_sql_hook,table,{},schema)ds.facets=input_dataset_facetsinput_datasets.append(ds)returnOperatorLineage(inputs=input_datasets,outputs=[output_dataset],run_facets=run_facets)