Source code for airflow.providers.google.cloud.hooks.gcs
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""This module contains a Google Cloud Storage hook."""from__future__importannotationsimportfunctoolsimportgzipasgzimportosimportshutilimporttimefromcontextlibimportcontextmanagerfromdatetimeimportdatetimefromfunctoolsimportpartialfromioimportBytesIOfromosimportpathfromtempfileimportNamedTemporaryFilefromtypingimportIO,Callable,Generator,Sequence,TypeVar,cast,overloadfromurllib.parseimporturlsplitfromgoogle.api_core.exceptionsimportNotFound# not sure why but mypy complains on missing `storage` but it is clearly there and is importablefromgoogle.cloudimportstorage# type: ignore[attr-defined]fromgoogle.cloud.exceptionsimportGoogleCloudErrorfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.google.cloud.utils.helpersimportnormalize_directory_pathfromairflow.providers.google.common.constsimportCLIENT_INFOfromairflow.providers.google.common.hooks.base_googleimportGoogleBaseHookfromairflow.utilsimporttimezonefromairflow.versionimportversion
def_fallback_object_url_to_object_name_and_bucket_name(object_url_keyword_arg_name="object_url",bucket_name_keyword_arg_name="bucket_name",object_name_keyword_arg_name="object_name",)->Callable[[T],T]:""" Decorator factory that convert object URL parameter to object name and bucket name parameter. :param object_url_keyword_arg_name: Name of the object URL parameter :param bucket_name_keyword_arg_name: Name of the bucket name parameter :param object_name_keyword_arg_name: Name of the object name parameter :return: Decorator """def_wrapper(func:T):@functools.wraps(func)def_inner_wrapper(self:GCSHook,*args,**kwargs)->RT:ifargs:raiseAirflowException("You must use keyword arguments in this methods rather than positional")object_url=kwargs.get(object_url_keyword_arg_name)bucket_name=kwargs.get(bucket_name_keyword_arg_name)object_name=kwargs.get(object_name_keyword_arg_name)ifobject_urlandbucket_nameandobject_name:raiseAirflowException("The mutually exclusive parameters. `object_url`, `bucket_name` together ""with `object_name` parameters are present. ""Please provide `object_url` or `bucket_name` and `object_name`.")ifobject_url:bucket_name,object_name=_parse_gcs_url(object_url)kwargs[bucket_name_keyword_arg_name]=bucket_namekwargs[object_name_keyword_arg_name]=object_namedelkwargs[object_url_keyword_arg_name]ifnotobject_nameornotbucket_name:raiseTypeError(f"{func.__name__}() missing 2 required positional arguments: "f"'{bucket_name_keyword_arg_name}' and '{object_name_keyword_arg_name}' "f"or {object_url_keyword_arg_name}")ifnotobject_name:raiseTypeError(f"{func.__name__}() missing 1 required positional argument: "f"'{object_name_keyword_arg_name}'")ifnotbucket_name:raiseTypeError(f"{func.__name__}() missing 1 required positional argument: "f"'{bucket_name_keyword_arg_name}'")returnfunc(self,*args,**kwargs)returncast(T,_inner_wrapper)return_wrapper# A fake bucket to use in functions decorated by _fallback_object_url_to_object_name_and_bucket_name.# This allows the 'bucket' argument to be of type str instead of str | None,# making it easier to type hint the function body without dealing with the None# case that can never happen at runtime.
[docs]classGCSHook(GoogleBaseHook):""" Interact with Google Cloud Storage. This hook uses the Google Cloud connection. """_conn:storage.Client|None=Nonedef__init__(self,gcp_conn_id:str="google_cloud_default",delegate_to:str|None=None,impersonation_chain:str|Sequence[str]|None=None,)->None:super().__init__(gcp_conn_id=gcp_conn_id,delegate_to=delegate_to,impersonation_chain=impersonation_chain,)
[docs]defget_conn(self)->storage.Client:"""Returns a Google Cloud Storage service object."""ifnotself._conn:self._conn=storage.Client(credentials=self.get_credentials(),client_info=CLIENT_INFO,project=self.project_id)returnself._conn
[docs]defcopy(self,source_bucket:str,source_object:str,destination_bucket:str|None=None,destination_object:str|None=None,)->None:""" Copies an object from a bucket to another, with renaming if requested. destination_bucket or destination_object can be omitted, in which case source bucket/object is used, but not both. :param source_bucket: The bucket of the object to copy from. :param source_object: The object to copy. :param destination_bucket: The destination of the object to copied to. Can be omitted; then the same bucket is used. :param destination_object: The (renamed) path of the object if given. Can be omitted; then the same name is used. """destination_bucket=destination_bucketorsource_bucketdestination_object=destination_objectorsource_objectifsource_bucket==destination_bucketandsource_object==destination_object:raiseValueError(f"Either source/destination bucket or source/destination object must be different, "f"not both the same: bucket={source_bucket}, object={source_object}")ifnotsource_bucketornotsource_object:raiseValueError("source_bucket and source_object cannot be empty.")client=self.get_conn()source_bucket=client.bucket(source_bucket)source_object=source_bucket.blob(source_object)# type: ignore[attr-defined]destination_bucket=client.bucket(destination_bucket)destination_object=source_bucket.copy_blob(# type: ignore[attr-defined]blob=source_object,destination_bucket=destination_bucket,new_name=destination_object)self.log.info("Object %s in bucket %s copied to object %s in bucket %s",source_object.name,# type: ignore[attr-defined]source_bucket.name,# type: ignore[attr-defined]destination_object.name,# type: ignore[union-attr]destination_bucket.name,# type: ignore[union-attr]
)
[docs]defrewrite(self,source_bucket:str,source_object:str,destination_bucket:str,destination_object:str|None=None,)->None:""" Has the same functionality as copy, except that will work on files over 5 TB, as well as when copying between locations and/or storage classes. destination_object can be omitted, in which case source_object is used. :param source_bucket: The bucket of the object to copy from. :param source_object: The object to copy. :param destination_bucket: The destination of the object to copied to. :param destination_object: The (renamed) path of the object if given. Can be omitted; then the same name is used. """destination_object=destination_objectorsource_objectifsource_bucket==destination_bucketandsource_object==destination_object:raiseValueError(f"Either source/destination bucket or source/destination object must be different, "f"not both the same: bucket={source_bucket}, object={source_object}")ifnotsource_bucketornotsource_object:raiseValueError("source_bucket and source_object cannot be empty.")client=self.get_conn()source_bucket=client.bucket(source_bucket)source_object=source_bucket.blob(blob_name=source_object)# type: ignore[attr-defined]destination_bucket=client.bucket(destination_bucket)token,bytes_rewritten,total_bytes=destination_bucket.blob(# type: ignore[attr-defined]blob_name=destination_object).rewrite(source=source_object)self.log.info("Total Bytes: %s | Bytes Written: %s",total_bytes,bytes_rewritten)whiletokenisnotNone:token,bytes_rewritten,total_bytes=destination_bucket.blob(# type: ignore[attr-defined]blob_name=destination_object).rewrite(source=source_object,token=token)self.log.info("Total Bytes: %s | Bytes Written: %s",total_bytes,bytes_rewritten)self.log.info("Object %s in bucket %s rewritten to object %s in bucket %s",source_object.name,# type: ignore[attr-defined]source_bucket.name,# type: ignore[attr-defined]destination_object,destination_bucket.name,# type: ignore[attr-defined]
@overloaddefdownload(self,bucket_name:str,object_name:str,filename:str,chunk_size:int|None=None,timeout:int|None=DEFAULT_TIMEOUT,num_max_attempts:int|None=1,)->str:...defdownload(self,bucket_name:str,object_name:str,filename:str|None=None,chunk_size:int|None=None,timeout:int|None=DEFAULT_TIMEOUT,num_max_attempts:int|None=1,)->str|bytes:""" Downloads a file from Google Cloud Storage. When no filename is supplied, the operator loads the file into memory and returns its content. When a filename is supplied, it writes the file to the specified location and returns the location. For file sizes that exceed the available memory it is recommended to write to a file. :param bucket_name: The bucket to fetch from. :param object_name: The object to fetch. :param filename: If set, a local file path where the file should be written to. :param chunk_size: Blob chunk size. :param timeout: Request timeout in seconds. :param num_max_attempts: Number of attempts to download the file. """# TODO: future improvement check file size before downloading,# to check for local space availabilitynum_file_attempts=0whileTrue:try:num_file_attempts+=1client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.blob(blob_name=object_name,chunk_size=chunk_size)iffilename:blob.download_to_filename(filename,timeout=timeout)self.log.info("File downloaded to %s",filename)returnfilenameelse:returnblob.download_as_bytes()exceptGoogleCloudError:ifnum_file_attempts==num_max_attempts:self.log.error("Download attempt of object: %s from %s has failed. Attempt: %s, max %s.",object_name,bucket_name,num_file_attempts,num_max_attempts,)raise# Wait with exponential backoff scheme before retrying.timeout_seconds=1.0*2**(num_file_attempts-1)time.sleep(timeout_seconds)continue
[docs]defdownload_as_byte_array(self,bucket_name:str,object_name:str,chunk_size:int|None=None,timeout:int|None=DEFAULT_TIMEOUT,num_max_attempts:int|None=1,)->bytes:""" Downloads a file from Google Cloud Storage. When no filename is supplied, the operator loads the file into memory and returns its content. When a filename is supplied, it writes the file to the specified location and returns the location. For file sizes that exceed the available memory it is recommended to write to a file. :param bucket_name: The bucket to fetch from. :param object_name: The object to fetch. :param chunk_size: Blob chunk size. :param timeout: Request timeout in seconds. :param num_max_attempts: Number of attempts to download the file. """# We do not pass filename, so will never receive string as responsereturnself.download(bucket_name=bucket_name,object_name=object_name,chunk_size=chunk_size,timeout=timeout,num_max_attempts=num_max_attempts,
[docs]defprovide_file(self,bucket_name:str=PROVIDE_BUCKET,object_name:str|None=None,object_url:str|None=None,dir:str|None=None,)->Generator[IO[bytes],None,None]:""" Downloads the file to a temporary directory and returns a file handle You can use this method by passing the bucket_name and object_name parameters or just object_url parameter. :param bucket_name: The bucket to fetch from. :param object_name: The object to fetch. :param object_url: File reference url. Must start with "gs: //" :param dir: The tmp sub directory to download the file to. (passed to NamedTemporaryFile) :return: File handler """ifobject_nameisNone:raiseValueError("Object name can not be empty")_,_,file_name=object_name.rpartition("/")withNamedTemporaryFile(suffix=file_name,dir=dir)astmp_file:self.download(bucket_name=bucket_name,object_name=object_name,filename=tmp_file.name)tmp_file.flush()yieldtmp_file
[docs]defprovide_file_and_upload(self,bucket_name:str=PROVIDE_BUCKET,object_name:str|None=None,object_url:str|None=None,)->Generator[IO[bytes],None,None]:""" Creates temporary file, returns a file handle and uploads the files content on close. You can use this method by passing the bucket_name and object_name parameters or just object_url parameter. :param bucket_name: The bucket to fetch from. :param object_name: The object to fetch. :param object_url: File reference url. Must start with "gs: //" :return: File handler """ifobject_nameisNone:raiseValueError("Object name can not be empty")_,_,file_name=object_name.rpartition("/")withNamedTemporaryFile(suffix=file_name)astmp_file:yieldtmp_filetmp_file.flush()self.upload(bucket_name=bucket_name,object_name=object_name,filename=tmp_file.name)
[docs]defupload(self,bucket_name:str,object_name:str,filename:str|None=None,data:str|bytes|None=None,mime_type:str|None=None,gzip:bool=False,encoding:str="utf-8",chunk_size:int|None=None,timeout:int|None=DEFAULT_TIMEOUT,num_max_attempts:int=1,metadata:dict|None=None,)->None:""" Uploads a local file or file data as string or bytes to Google Cloud Storage. :param bucket_name: The bucket to upload to. :param object_name: The object name to set when uploading the file. :param filename: The local file path to the file to be uploaded. :param data: The file's data as a string or bytes to be uploaded. :param mime_type: The file's mime type set when uploading the file. :param gzip: Option to compress local file or file data for upload :param encoding: bytes encoding for file data if provided as string :param chunk_size: Blob chunk size. :param timeout: Request timeout in seconds. :param num_max_attempts: Number of attempts to try to upload the file. :param metadata: The metadata to be uploaded with the file. """def_call_with_retry(f:Callable[[],None])->None:"""Helper functions to upload a file or a string with a retry mechanism and exponential back-off. :param f: Callable that should be retried. """num_file_attempts=0whilenum_file_attempts<num_max_attempts:try:num_file_attempts+=1f()exceptGoogleCloudErrorase:ifnum_file_attempts==num_max_attempts:self.log.error("Upload attempt of object: %s from %s has failed. Attempt: %s, max %s.",object_name,object_name,num_file_attempts,num_max_attempts,)raisee# Wait with exponential backoff scheme before retrying.timeout_seconds=1.0*2**(num_file_attempts-1)time.sleep(timeout_seconds)continueclient=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.blob(blob_name=object_name,chunk_size=chunk_size)ifmetadata:blob.metadata=metadataiffilenameanddata:raiseValueError("'filename' and 'data' parameter provided. Please ""specify a single parameter, either 'filename' for ""local file uploads or 'data' for file content uploads.")eliffilename:ifnotmime_type:mime_type="application/octet-stream"ifgzip:filename_gz=filename+".gz"withopen(filename,"rb")asf_in:withgz.open(filename_gz,"wb")asf_out:shutil.copyfileobj(f_in,f_out)filename=filename_gz_call_with_retry(partial(blob.upload_from_filename,filename=filename,content_type=mime_type,timeout=timeout))ifgzip:os.remove(filename)self.log.info("File %s uploaded to %s in %s bucket",filename,object_name,bucket_name)elifdata:ifnotmime_type:mime_type="text/plain"ifgzip:ifisinstance(data,str):data=bytes(data,encoding)out=BytesIO()withgz.GzipFile(fileobj=out,mode="w")asf:f.write(data)data=out.getvalue()_call_with_retry(partial(blob.upload_from_string,data,content_type=mime_type,timeout=timeout))self.log.info("Data stream uploaded to %s in %s bucket",object_name,bucket_name)else:raiseValueError("'filename' and 'data' parameter missing. One is required to upload to gcs.")
[docs]defexists(self,bucket_name:str,object_name:str)->bool:""" Checks for the existence of a file in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the object is. :param object_name: The name of the blob_name to check in the Google cloud storage bucket. """client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.blob(blob_name=object_name)returnblob.exists()
[docs]defget_blob_update_time(self,bucket_name:str,object_name:str):""" Get the update time of a file in Google Cloud Storage :param bucket_name: The Google Cloud Storage bucket where the object is. :param object_name: The name of the blob to get updated time from the Google cloud storage bucket. """client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.get_blob(blob_name=object_name)ifblobisNone:raiseValueError(f"Object ({object_name}) not found in Bucket ({bucket_name})")returnblob.updated
[docs]defis_updated_after(self,bucket_name:str,object_name:str,ts:datetime)->bool:""" Checks if an blob_name is updated in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the object is. :param object_name: The name of the object to check in the Google cloud storage bucket. :param ts: The timestamp to check against. """blob_update_time=self.get_blob_update_time(bucket_name,object_name)ifblob_update_timeisnotNone:ifnotts.tzinfo:ts=ts.replace(tzinfo=timezone.utc)self.log.info("Verify object date: %s > %s",blob_update_time,ts)ifblob_update_time>ts:returnTruereturnFalse
[docs]defis_updated_between(self,bucket_name:str,object_name:str,min_ts:datetime,max_ts:datetime)->bool:""" Checks if an blob_name is updated in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the object is. :param object_name: The name of the object to check in the Google cloud storage bucket. :param min_ts: The minimum timestamp to check against. :param max_ts: The maximum timestamp to check against. """blob_update_time=self.get_blob_update_time(bucket_name,object_name)ifblob_update_timeisnotNone:ifnotmin_ts.tzinfo:min_ts=min_ts.replace(tzinfo=timezone.utc)ifnotmax_ts.tzinfo:max_ts=max_ts.replace(tzinfo=timezone.utc)self.log.info("Verify object date: %s is between %s and %s",blob_update_time,min_ts,max_ts)ifmin_ts<=blob_update_time<max_ts:returnTruereturnFalse
[docs]defis_updated_before(self,bucket_name:str,object_name:str,ts:datetime)->bool:""" Checks if an blob_name is updated before given time in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the object is. :param object_name: The name of the object to check in the Google cloud storage bucket. :param ts: The timestamp to check against. """blob_update_time=self.get_blob_update_time(bucket_name,object_name)ifblob_update_timeisnotNone:ifnotts.tzinfo:ts=ts.replace(tzinfo=timezone.utc)self.log.info("Verify object date: %s < %s",blob_update_time,ts)ifblob_update_time<ts:returnTruereturnFalse
[docs]defis_older_than(self,bucket_name:str,object_name:str,seconds:int)->bool:""" Check if object is older than given time :param bucket_name: The Google Cloud Storage bucket where the object is. :param object_name: The name of the object to check in the Google cloud storage bucket. :param seconds: The time in seconds to check against """blob_update_time=self.get_blob_update_time(bucket_name,object_name)ifblob_update_timeisnotNone:fromdatetimeimporttimedeltacurrent_time=timezone.utcnow()given_time=current_time-timedelta(seconds=seconds)self.log.info("Verify object date: %s is older than %s",blob_update_time,given_time)ifblob_update_time<given_time:returnTruereturnFalse
[docs]defdelete(self,bucket_name:str,object_name:str)->None:""" Deletes an object from the bucket. :param bucket_name: name of the bucket, where the object resides :param object_name: name of the object to delete """client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.blob(blob_name=object_name)blob.delete()self.log.info("Blob %s deleted.",object_name)
[docs]defdelete_bucket(self,bucket_name:str,force:bool=False)->None:""" Delete a bucket object from the Google Cloud Storage. :param bucket_name: name of the bucket which will be deleted :param force: false not allow to delete non empty bucket, set force=True allows to delete non empty bucket """client=self.get_conn()bucket=client.bucket(bucket_name)self.log.info("Deleting %s bucket",bucket_name)try:bucket.delete(force=force)self.log.info("Bucket %s has been deleted",bucket_name)exceptNotFound:self.log.info("Bucket %s not exists",bucket_name)
[docs]deflist(self,bucket_name,versions=None,max_results=None,prefix=None,delimiter=None)->List:""" List all objects from the bucket with the give string prefix in name :param bucket_name: bucket name :param versions: if true, list all versions of the objects :param max_results: max count of items to return in a single page of responses :param prefix: prefix string which filters objects whose name begin with this prefix :param delimiter: filters objects based on the delimiter (for e.g '.csv') :return: a stream of object names matching the filtering criteria """client=self.get_conn()bucket=client.bucket(bucket_name)ids=[]page_token=NonewhileTrue:blobs=bucket.list_blobs(max_results=max_results,page_token=page_token,prefix=prefix,delimiter=delimiter,versions=versions,)blob_names=[]forblobinblobs:blob_names.append(blob.name)prefixes=blobs.prefixesifprefixes:ids+=list(prefixes)else:ids+=blob_namespage_token=blobs.next_page_tokenifpage_tokenisNone:# empty next page tokenbreakreturnids
[docs]deflist_by_timespan(self,bucket_name:str,timespan_start:datetime,timespan_end:datetime,versions:bool|None=None,max_results:int|None=None,prefix:str|None=None,delimiter:str|None=None,)->List[str]:""" List all objects from the bucket with the give string prefix in name that were updated in the time between ``timespan_start`` and ``timespan_end``. :param bucket_name: bucket name :param timespan_start: will return objects that were updated at or after this datetime (UTC) :param timespan_end: will return objects that were updated before this datetime (UTC) :param versions: if true, list all versions of the objects :param max_results: max count of items to return in a single page of responses :param prefix: prefix string which filters objects whose name begin with this prefix :param delimiter: filters objects based on the delimiter (for e.g '.csv') :return: a stream of object names matching the filtering criteria """client=self.get_conn()bucket=client.bucket(bucket_name)ids=[]page_token=NonewhileTrue:blobs=bucket.list_blobs(max_results=max_results,page_token=page_token,prefix=prefix,delimiter=delimiter,versions=versions,)blob_names=[]forblobinblobs:iftimespan_start<=blob.updated.replace(tzinfo=timezone.utc)<timespan_end:blob_names.append(blob.name)prefixes=blobs.prefixesifprefixes:ids+=list(prefixes)else:ids+=blob_namespage_token=blobs.next_page_tokenifpage_tokenisNone:# empty next page tokenbreakreturnids
[docs]defget_size(self,bucket_name:str,object_name:str)->int:""" Gets the size of a file in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the blob_name is. :param object_name: The name of the object to check in the Google cloud storage bucket_name. """self.log.info("Checking the file size of object: %s in bucket_name: %s",object_name,bucket_name)client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.get_blob(blob_name=object_name)blob_size=blob.sizeself.log.info("The file size of %s is %s bytes.",object_name,blob_size)returnblob_size
[docs]defget_crc32c(self,bucket_name:str,object_name:str):""" Gets the CRC32c checksum of an object in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the blob_name is. :param object_name: The name of the object to check in the Google cloud storage bucket_name. """self.log.info("Retrieving the crc32c checksum of object_name: %s in bucket_name: %s",object_name,bucket_name,)client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.get_blob(blob_name=object_name)blob_crc32c=blob.crc32cself.log.info("The crc32c checksum of %s is %s",object_name,blob_crc32c)returnblob_crc32c
[docs]defget_md5hash(self,bucket_name:str,object_name:str)->str:""" Gets the MD5 hash of an object in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the blob_name is. :param object_name: The name of the object to check in the Google cloud storage bucket_name. """self.log.info("Retrieving the MD5 hash of object: %s in bucket: %s",object_name,bucket_name)client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.get_blob(blob_name=object_name)blob_md5hash=blob.md5_hashself.log.info("The md5Hash of %s is %s",object_name,blob_md5hash)returnblob_md5hash
@GoogleBaseHook.fallback_to_default_project_id
[docs]defcreate_bucket(self,bucket_name:str,resource:dict|None=None,storage_class:str="MULTI_REGIONAL",location:str="US",project_id:str|None=None,labels:dict|None=None,)->str:""" Creates a new bucket. Google Cloud Storage uses a flat namespace, so you can't create a bucket with a name that is already in use. .. seealso:: For more information, see Bucket Naming Guidelines: https://cloud.google.com/storage/docs/bucketnaming.html#requirements :param bucket_name: The name of the bucket. :param resource: An optional dict with parameters for creating the bucket. For information on available parameters, see Cloud Storage API doc: https://cloud.google.com/storage/docs/json_api/v1/buckets/insert :param storage_class: This defines how objects in the bucket are stored and determines the SLA and the cost of storage. Values include - ``MULTI_REGIONAL`` - ``REGIONAL`` - ``STANDARD`` - ``NEARLINE`` - ``COLDLINE``. If this value is not specified when the bucket is created, it will default to STANDARD. :param location: The location of the bucket. Object data for objects in the bucket resides in physical storage within this region. Defaults to US. .. seealso:: https://developers.google.com/storage/docs/bucket-locations :param project_id: The ID of the Google Cloud Project. :param labels: User-provided labels, in key/value pairs. :return: If successful, it returns the ``id`` of the bucket. """self.log.info("Creating Bucket: %s; Location: %s; Storage Class: %s",bucket_name,location,storage_class)# Add airflow-version label to the bucketlabels=labelsor{}labels["airflow-version"]="v"+version.replace(".","-").replace("+","-")client=self.get_conn()bucket=client.bucket(bucket_name=bucket_name)bucket_resource=resourceor{}foriteminbucket_resource:ifitem!="name":bucket._patch_property(name=item,value=resource[item])# type: ignore[index]bucket.storage_class=storage_classbucket.labels=labelsbucket.create(project=project_id,location=location)returnbucket.id
[docs]definsert_bucket_acl(self,bucket_name:str,entity:str,role:str,user_project:str|None=None)->None:""" Creates a new ACL entry on the specified bucket_name. See: https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert :param bucket_name: Name of a bucket_name. :param entity: The entity holding the permission, in one of the following forms: user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers. See: https://cloud.google.com/storage/docs/access-control/lists#scopes :param role: The access permission for the entity. Acceptable values are: "OWNER", "READER", "WRITER". :param user_project: (Optional) The project to be billed for this request. Required for Requester Pays buckets. """self.log.info("Creating a new ACL entry in bucket: %s",bucket_name)client=self.get_conn()bucket=client.bucket(bucket_name=bucket_name)bucket.acl.reload()bucket.acl.entity_from_dict(entity_dict={"entity":entity,"role":role})ifuser_project:bucket.acl.user_project=user_projectbucket.acl.save()self.log.info("A new ACL entry created in bucket: %s",bucket_name)
[docs]definsert_object_acl(self,bucket_name:str,object_name:str,entity:str,role:str,generation:int|None=None,user_project:str|None=None,)->None:""" Creates a new ACL entry on the specified object. See: https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert :param bucket_name: Name of a bucket_name. :param object_name: Name of the object. For information about how to URL encode object names to be path safe, see: https://cloud.google.com/storage/docs/json_api/#encoding :param entity: The entity holding the permission, in one of the following forms: user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers See: https://cloud.google.com/storage/docs/access-control/lists#scopes :param role: The access permission for the entity. Acceptable values are: "OWNER", "READER". :param generation: Optional. If present, selects a specific revision of this object. :param user_project: (Optional) The project to be billed for this request. Required for Requester Pays buckets. """self.log.info("Creating a new ACL entry for object: %s in bucket: %s",object_name,bucket_name)client=self.get_conn()bucket=client.bucket(bucket_name=bucket_name)blob=bucket.blob(blob_name=object_name,generation=generation)# Reload fetches the current ACL from Cloud Storage.blob.acl.reload()blob.acl.entity_from_dict(entity_dict={"entity":entity,"role":role})ifuser_project:blob.acl.user_project=user_projectblob.acl.save()self.log.info("A new ACL entry created for object: %s in bucket: %s",object_name,bucket_name)
[docs]defcompose(self,bucket_name:str,source_objects:List[str],destination_object:str)->None:""" Composes a list of existing object into a new object in the same storage bucket_name Currently it only supports up to 32 objects that can be concatenated in a single operation https://cloud.google.com/storage/docs/json_api/v1/objects/compose :param bucket_name: The name of the bucket containing the source objects. This is also the same bucket to store the composed destination object. :param source_objects: The list of source objects that will be composed into a single object. :param destination_object: The path of the object if given. """ifnotsource_objects:raiseValueError("source_objects cannot be empty.")ifnotbucket_nameornotdestination_object:raiseValueError("bucket_name and destination_object cannot be empty.")self.log.info("Composing %s to %s in the bucket %s",source_objects,destination_object,bucket_name)client=self.get_conn()bucket=client.bucket(bucket_name)destination_blob=bucket.blob(destination_object)destination_blob.compose(sources=[bucket.blob(blob_name=source_object)forsource_objectinsource_objects])self.log.info("Completed successfully.")
[docs]defsync(self,source_bucket:str,destination_bucket:str,source_object:str|None=None,destination_object:str|None=None,recursive:bool=True,allow_overwrite:bool=False,delete_extra_files:bool=False,)->None:""" Synchronizes the contents of the buckets. Parameters ``source_object`` and ``destination_object`` describe the root sync directories. If they are not passed, the entire bucket will be synchronized. If they are passed, they should point to directories. .. note:: The synchronization of individual files is not supported. Only entire directories can be synchronized. :param source_bucket: The name of the bucket containing the source objects. :param destination_bucket: The name of the bucket containing the destination objects. :param source_object: The root sync directory in the source bucket. :param destination_object: The root sync directory in the destination bucket. :param recursive: If True, subdirectories will be considered :param recursive: If True, subdirectories will be considered :param allow_overwrite: if True, the files will be overwritten if a mismatched file is found. By default, overwriting files is not allowed :param delete_extra_files: if True, deletes additional files from the source that not found in the destination. By default extra files are not deleted. .. note:: This option can delete data quickly if you specify the wrong source/destination combination. :return: none """client=self.get_conn()# Create bucket objectsource_bucket_obj=client.bucket(source_bucket)destination_bucket_obj=client.bucket(destination_bucket)# Normalize parameters when they are passedsource_object=normalize_directory_path(source_object)destination_object=normalize_directory_path(destination_object)# Calculate the number of characters that remove from the name, because they contain information# about the parent's pathsource_object_prefix_len=len(source_object)ifsource_objectelse0# Prepare synchronization planto_copy_blobs,to_delete_blobs,to_rewrite_blobs=self._prepare_sync_plan(source_bucket=source_bucket_obj,destination_bucket=destination_bucket_obj,source_object=source_object,destination_object=destination_object,recursive=recursive,)self.log.info("Planned synchronization. To delete blobs count: %s, to upload blobs count: %s, ""to rewrite blobs count: %s",len(to_delete_blobs),len(to_copy_blobs),len(to_rewrite_blobs),)# Copy missing object to new bucketifnotto_copy_blobs:self.log.info("Skipped blobs copying.")else:forblobinto_copy_blobs:dst_object=self._calculate_sync_destination_path(blob,destination_object,source_object_prefix_len)self.copy(source_bucket=source_bucket_obj.name,source_object=blob.name,destination_bucket=destination_bucket_obj.name,destination_object=dst_object,)self.log.info("Blobs copied.")# Delete redundant filesifnotto_delete_blobs:self.log.info("Skipped blobs deleting.")elifdelete_extra_files:# TODO: Add batch. I tried to do it, but the Google library is not stable at the moment.forblobinto_delete_blobs:self.delete(blob.bucket.name,blob.name)self.log.info("Blobs deleted.")# Overwrite files that are differentifnotto_rewrite_blobs:self.log.info("Skipped blobs overwriting.")elifallow_overwrite:forblobinto_rewrite_blobs:dst_object=self._calculate_sync_destination_path(blob,destination_object,source_object_prefix_len)self.rewrite(source_bucket=source_bucket_obj.name,source_object=blob.name,destination_bucket=destination_bucket_obj.name,destination_object=dst_object,)self.log.info("Blobs rewritten.")self.log.info("Synchronization finished.")
def_calculate_sync_destination_path(self,blob:storage.Blob,destination_object:str|None,source_object_prefix_len:int)->str:return(path.join(destination_object,blob.name[source_object_prefix_len:])ifdestination_objectelseblob.name[source_object_prefix_len:])@staticmethoddef_prepare_sync_plan(source_bucket:storage.Bucket,destination_bucket:storage.Bucket,source_object:str|None,destination_object:str|None,recursive:bool,)->tuple[set[storage.Blob],set[storage.Blob],set[storage.Blob]]:# Calculate the number of characters that remove from the name, because they contain information# about the parent's pathsource_object_prefix_len=len(source_object)ifsource_objectelse0destination_object_prefix_len=len(destination_object)ifdestination_objectelse0delimiter="/"ifnotrecursiveelseNone# Fetch blobs listsource_blobs=list(source_bucket.list_blobs(prefix=source_object,delimiter=delimiter))destination_blobs=list(destination_bucket.list_blobs(prefix=destination_object,delimiter=delimiter))# Create indexes that allow you to identify blobs based on their namesource_names_index={a.name[source_object_prefix_len:]:aforainsource_blobs}destination_names_index={a.name[destination_object_prefix_len:]:aforaindestination_blobs}# Create sets with names without parent object namesource_names=set(source_names_index.keys())destination_names=set(destination_names_index.keys())# Determine objects to copy and deleteto_copy=source_names-destination_namesto_delete=destination_names-source_namesto_copy_blobs:set[storage.Blob]={source_names_index[a]forainto_copy}to_delete_blobs:set[storage.Blob]={destination_names_index[a]forainto_delete}# Find names that are in both bucketsnames_to_check=source_names.intersection(destination_names)to_rewrite_blobs:set[storage.Blob]=set()# Compare objects based on crc32forcurrent_nameinnames_to_check:source_blob=source_names_index[current_name]destination_blob=destination_names_index[current_name]# if the objects are different, save itifsource_blob.crc32c!=destination_blob.crc32c:to_rewrite_blobs.add(source_blob)returnto_copy_blobs,to_delete_blobs,to_rewrite_blobs
[docs]defgcs_object_is_directory(bucket:str)->bool:""" Return True if given Google Cloud Storage URL (gs://<bucket>/<blob>) is a directory or an empty bucket. Otherwise return False. """_,blob=_parse_gcs_url(bucket)returnlen(blob)==0orblob.endswith("/")
def_parse_gcs_url(gsurl:str)->tuple[str,str]:""" Given a Google Cloud Storage URL (gs://<bucket>/<blob>), returns a tuple containing the corresponding bucket and blob. """parsed_url=urlsplit(gsurl)ifnotparsed_url.netloc:raiseAirflowException("Please provide a bucket name")ifparsed_url.scheme.lower()!="gs":raiseAirflowException(f"Schema must be to 'gs://': Current schema: '{parsed_url.scheme}://'")bucket=parsed_url.netloc# Remove leading '/' but NOT trailing oneblob=parsed_url.path.lstrip("/")returnbucket,blob