Source code for airflow.providers.amazon.aws.fs.s3
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasyncioimportloggingfromfunctoolsimportpartialfromtypingimportTYPE_CHECKING,Any,Callable,DictimportrequestsfrombotocoreimportUNSIGNEDfromrequestsimportHTTPErrorfromairflow.providers.amazon.aws.hooks.s3importS3HookifTYPE_CHECKING:frombotocore.awsrequestimportAWSRequestfromfsspecimportAbstractFileSystem
[docs]classSignError(Exception):"""Raises when unable to sign a S3 request."""
[docs]defget_fs(conn_id:str|None,storage_options:dict[str,str]|None=None)->AbstractFileSystem:try:froms3fsimportS3FileSystemexceptImportError:raiseImportError("Airflow FS S3 protocol requires the s3fs library, but it is not installed as it requires""aiobotocore. Please install the s3 protocol support library by running: ""pip install apache-airflow-providers-amazon[s3fs]")s3_hook=S3Hook(aws_conn_id=conn_id)session=s3_hook.get_session(deferrable=True)endpoint_url=s3_hook.conn_config.get_service_endpoint_url(service_name="s3")config_kwargs:dict[str,Any]=s3_hook.conn_config.extra_config.get("config_kwargs",{})config_kwargs.update(storage_optionsor{})register_events:dict[str,Callable[[Properties],None]]={}s3_service_config=s3_hook.service_configifsigner:=s3_service_config.get("signer",None):log.info("Loading signer %s",signer)ifsinger_func:=SIGNERS.get(signer):uri=s3_service_config.get("signer_uri",None)token=s3_service_config.get("signer_token",None)ifnoturiornottoken:raiseValueError(f"Signer {signer} requires uri and token")properties:Properties={"uri":uri,"token":uri,}singer_func_with_properties=partial(singer_func,properties)register_events["before-sign.s3"]=singer_func_with_properties# Disable the AWS Signerconfig_kwargs["signature_version"]=UNSIGNEDelse:raiseValueError(f"Signer not available: {signer}")ifproxy_uri:=s3_service_config.get(S3_PROXY_URI,None):config_kwargs["proxies"]={"http":proxy_uri,"https":proxy_uri}anon=Falseifasyncio.run(session.get_credentials())isNone:log.info("No credentials found, using anonymous access")anon=Truefs=S3FileSystem(session=session,config_kwargs=config_kwargs,endpoint_url=endpoint_url,anon=anon)forevent_name,event_functioninregister_events.items():fs.s3.meta.events.register_last(event_name,event_function,unique_id=1925)returnfs
[docs]defs3v4_rest_signer(properties:Properties,request:AWSRequest,**_:Any)->AWSRequest:if"token"notinproperties:raiseSignError("Signer set, but token is not available")signer_url=properties["uri"].rstrip("/")signer_headers={"Authorization":f"Bearer {properties['token']}"}signer_body={"method":request.method,"region":request.context["client_region"],"uri":request.url,"headers":{key:[val]forkey,valinrequest.headers.items()},}response=requests.post(f"{signer_url}/v1/aws/s3/sign",headers=signer_headers,json=signer_body)try:response.raise_for_status()response_json=response.json()exceptHTTPErrorase:raiseSignError(f"Failed to sign request {response.status_code}: {signer_body}")fromeforkey,valueinresponse_json["headers"].items():request.headers.add_header(key,", ".join(value))request.url=response_json["uri"]returnrequest