Source code for airflow.providers.amazon.aws.transfers.sql_to_s3
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.fromcollectionsimportnamedtuplefromenumimportEnumfromtempfileimportNamedTemporaryFilefromtypingimportTYPE_CHECKING,Iterable,Mapping,Optional,Sequence,Unionimportnumpyasnpimportpandasaspdfromtyping_extensionsimportLiteralfromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookfromairflow.modelsimportBaseOperatorfromairflow.providers.amazon.aws.hooks.s3importS3Hookfromairflow.providers.common.sql.hooks.sqlimportDbApiHookifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classSqlToS3Operator(BaseOperator):""" Saves data from a specific SQL query into a file in S3. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:SqlToS3Operator` :param query: the sql query to be executed. If you want to execute a file, place the absolute path of it, ending with .sql extension. (templated) :param s3_bucket: bucket where the data will be stored. (templated) :param s3_key: desired key for the file. It includes the name of the file. (templated) :param replace: whether or not to replace the file in S3 if it previously existed :param sql_conn_id: reference to a specific database. :param parameters: (optional) the parameters to render the SQL query with. :param aws_conn_id: reference to a specific S3 connection :param verify: Whether or not to verify SSL certificates for S3 connection. By default SSL certificates are verified. You can provide the following values: - ``False``: do not validate SSL certificates. SSL will still be used (unless use_ssl is False), but SSL certificates will not be verified. - ``path/to/cert/bundle.pem``: A filename of the CA cert bundle to uses. You can specify this argument if you want to use a different CA cert bundle than the one used by botocore. :param file_format: the destination file format, only string 'csv', 'json' or 'parquet' is accepted. :param pd_kwargs: arguments to include in DataFrame ``.to_parquet()``, ``.to_json()`` or ``.to_csv()``. """
}def__init__(self,*,query:str,s3_bucket:str,s3_key:str,sql_conn_id:str,parameters:Union[None,Mapping,Iterable]=None,replace:bool=False,aws_conn_id:str='aws_default',verify:Optional[Union[bool,str]]=None,file_format:Literal['csv','json','parquet']='csv',pd_kwargs:Optional[dict]=None,**kwargs,)->None:super().__init__(**kwargs)self.query=queryself.s3_bucket=s3_bucketself.s3_key=s3_keyself.sql_conn_id=sql_conn_idself.aws_conn_id=aws_conn_idself.verify=verifyself.replace=replaceself.pd_kwargs=pd_kwargsor{}self.parameters=parametersif"path_or_buf"inself.pd_kwargs:raiseAirflowException('The argument path_or_buf is not allowed, please remove it')self.file_format=getattr(FILE_FORMAT,file_format.upper(),None)ifself.file_formatisNone:raiseAirflowException(f"The argument file_format doesn't support {file_format} value.")@staticmethoddef_fix_int_dtypes(df:pd.DataFrame)->None:"""Mutate DataFrame to set dtypes for int columns containing NaN values."""forcolindf:if"float"indf[col].dtype.nameanddf[col].hasnans:# inspect values to determine if dtype of non-null values is int or floatnotna_series=df[col].dropna().valuesifnp.equal(notna_series,notna_series.astype(int)).all():# set to dtype that retains integers and supports NaNsdf[col]=np.where(df[col].isnull(),None,df[col])df[col]=df[col].astype(pd.Int64Dtype())elifnp.isclose(notna_series,notna_series.astype(int)).all():# set to float dtype that retains floats and supports NaNsdf[col]=np.where(df[col].isnull(),None,df[col])df[col]=df[col].astype(pd.Float64Dtype())
[docs]defexecute(self,context:'Context')->None:sql_hook=self._get_hook()s3_conn=S3Hook(aws_conn_id=self.aws_conn_id,verify=self.verify)data_df=sql_hook.get_pandas_df(sql=self.query,parameters=self.parameters)self.log.info("Data from SQL obtained")self._fix_int_dtypes(data_df)file_options=FILE_OPTIONS_MAP[self.file_format]withNamedTemporaryFile(mode=file_options.mode,suffix=file_options.suffix)astmp_file:self.log.info("Writing data to temp file")getattr(data_df,file_options.function)(tmp_file.name,**self.pd_kwargs)self.log.info("Uploading data to S3")s3_conn.load_file(filename=tmp_file.name,key=self.s3_key,bucket_name=self.s3_bucket,replace=self.replace
)def_get_hook(self)->DbApiHook:self.log.debug("Get connection for %s",self.sql_conn_id)conn=BaseHook.get_connection(self.sql_conn_id)hook=conn.get_hook()ifnotcallable(getattr(hook,'get_pandas_df',None)):raiseAirflowException("This hook is not supported. The hook class must have get_pandas_df method.")returnhook