Source code for airflow.providers.google.cloud.hooks.gcs
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.#"""This module contains a Google Cloud Storage hook."""importfunctoolsimportgzipasgzimportosimportshutilimporttimeimportwarningsfromcontextlibimportcontextmanagerfromdatetimeimportdatetimefromfunctoolsimportpartialfromioimportBytesIOfromosimportpathfromtempfileimportNamedTemporaryFilefromtypingimportCallable,List,Optional,Sequence,Set,Tuple,TypeVar,Union,castfromurllib.parseimporturlparsefromgoogle.api_core.exceptionsimportNotFoundfromgoogle.cloudimportstoragefromgoogle.cloud.exceptionsimportGoogleCloudErrorfromairflow.exceptionsimportAirflowExceptionfromairflow.providers.google.common.hooks.base_googleimportGoogleBaseHookfromairflow.utilsimporttimezonefromairflow.versionimportversion
def_fallback_object_url_to_object_name_and_bucket_name(object_url_keyword_arg_name='object_url',bucket_name_keyword_arg_name='bucket_name',object_name_keyword_arg_name='object_name',)->Callable[[T],T]:""" Decorator factory that convert object URL parameter to object name and bucket name parameter. :param object_url_keyword_arg_name: Name of the object URL parameter :type object_url_keyword_arg_name: str :param bucket_name_keyword_arg_name: Name of the bucket name parameter :type bucket_name_keyword_arg_name: str :param object_name_keyword_arg_name: Name of the object name parameter :type object_name_keyword_arg_name: str :return: Decorator """def_wrapper(func:T):@functools.wraps(func)def_inner_wrapper(self:"GCSHook",*args,**kwargs)->RT:ifargs:raiseAirflowException("You must use keyword arguments in this methods rather than positional")object_url=kwargs.get(object_url_keyword_arg_name)bucket_name=kwargs.get(bucket_name_keyword_arg_name)object_name=kwargs.get(object_name_keyword_arg_name)ifobject_urlandbucket_nameandobject_name:raiseAirflowException("The mutually exclusive parameters. `object_url`, `bucket_name` together ""with `object_name` parameters are present. ""Please provide `object_url` or `bucket_name` and `object_name`.")ifobject_url:bucket_name,object_name=_parse_gcs_url(object_url)kwargs[bucket_name_keyword_arg_name]=bucket_namekwargs[object_name_keyword_arg_name]=object_namedelkwargs[object_url_keyword_arg_name]ifnotobject_nameornotbucket_name:raiseTypeError(f"{func.__name__}() missing 2 required positional arguments: "f"'{bucket_name_keyword_arg_name}' and '{object_name_keyword_arg_name}' "f"or {object_url_keyword_arg_name}")ifnotobject_name:raiseTypeError(f"{func.__name__}() missing 1 required positional argument: "f"'{object_name_keyword_arg_name}'")ifnotbucket_name:raiseTypeError(f"{func.__name__}() missing 1 required positional argument: "f"'{bucket_name_keyword_arg_name}'")returnfunc(self,*args,**kwargs)returncast(T,_inner_wrapper)return_wrapper
[docs]classGCSHook(GoogleBaseHook):""" Interact with Google Cloud Storage. This hook uses the Google Cloud connection. """_conn=None# type: Optional[storage.Client]def__init__(self,gcp_conn_id:str="google_cloud_default",delegate_to:Optional[str]=None,google_cloud_storage_conn_id:Optional[str]=None,impersonation_chain:Optional[Union[str,Sequence[str]]]=None,)->None:# To preserve backward compatibility# TODO: remove one dayifgoogle_cloud_storage_conn_id:warnings.warn("The google_cloud_storage_conn_id parameter has been deprecated. You should pass ""the gcp_conn_id parameter.",DeprecationWarning,stacklevel=2,)gcp_conn_id=google_cloud_storage_conn_idsuper().__init__(gcp_conn_id=gcp_conn_id,delegate_to=delegate_to,impersonation_chain=impersonation_chain,)
[docs]defget_conn(self)->storage.Client:"""Returns a Google Cloud Storage service object."""ifnotself._conn:self._conn=storage.Client(credentials=self._get_credentials(),client_info=self.client_info,project=self.project_id)returnself._conn
[docs]defcopy(self,source_bucket:str,source_object:str,destination_bucket:Optional[str]=None,destination_object:Optional[str]=None,)->None:""" Copies an object from a bucket to another, with renaming if requested. destination_bucket or destination_object can be omitted, in which case source bucket/object is used, but not both. :param source_bucket: The bucket of the object to copy from. :type source_bucket: str :param source_object: The object to copy. :type source_object: str :param destination_bucket: The destination of the object to copied to. Can be omitted; then the same bucket is used. :type destination_bucket: str :param destination_object: The (renamed) path of the object if given. Can be omitted; then the same name is used. :type destination_object: str """destination_bucket=destination_bucketorsource_bucketdestination_object=destination_objectorsource_objectifsource_bucket==destination_bucketandsource_object==destination_object:raiseValueError('Either source/destination bucket or source/destination object ''must be different, not both the same: bucket=%s, object=%s'%(source_bucket,source_object))ifnotsource_bucketornotsource_object:raiseValueError('source_bucket and source_object cannot be empty.')client=self.get_conn()source_bucket=client.bucket(source_bucket)source_object=source_bucket.blob(source_object)# type: ignore[attr-defined]destination_bucket=client.bucket(destination_bucket)destination_object=source_bucket.copy_blob(# type: ignore[attr-defined]blob=source_object,destination_bucket=destination_bucket,new_name=destination_object)self.log.info('Object %s in bucket %s copied to object %s in bucket %s',source_object.name,# type: ignore[attr-defined]source_bucket.name,# type: ignore[attr-defined]destination_object.name,# type: ignore[union-attr]destination_bucket.name,# type: ignore[union-attr]
)
[docs]defrewrite(self,source_bucket:str,source_object:str,destination_bucket:str,destination_object:Optional[str]=None,)->None:""" Has the same functionality as copy, except that will work on files over 5 TB, as well as when copying between locations and/or storage classes. destination_object can be omitted, in which case source_object is used. :param source_bucket: The bucket of the object to copy from. :type source_bucket: str :param source_object: The object to copy. :type source_object: str :param destination_bucket: The destination of the object to copied to. :type destination_bucket: str :param destination_object: The (renamed) path of the object if given. Can be omitted; then the same name is used. :type destination_object: str """destination_object=destination_objectorsource_objectifsource_bucket==destination_bucketandsource_object==destination_object:raiseValueError('Either source/destination bucket or source/destination object ''must be different, not both the same: bucket=%s, object=%s'%(source_bucket,source_object))ifnotsource_bucketornotsource_object:raiseValueError('source_bucket and source_object cannot be empty.')client=self.get_conn()source_bucket=client.bucket(source_bucket)source_object=source_bucket.blob(blob_name=source_object)# type: ignore[attr-defined]destination_bucket=client.bucket(destination_bucket)token,bytes_rewritten,total_bytes=destination_bucket.blob(# type: ignore[attr-defined]blob_name=destination_object).rewrite(source=source_object)self.log.info('Total Bytes: %s | Bytes Written: %s',total_bytes,bytes_rewritten)whiletokenisnotNone:token,bytes_rewritten,total_bytes=destination_bucket.blob(# type: ignore[attr-defined]blob_name=destination_object).rewrite(source=source_object,token=token)self.log.info('Total Bytes: %s | Bytes Written: %s',total_bytes,bytes_rewritten)self.log.info('Object %s in bucket %s rewritten to object %s in bucket %s',source_object.name,# type: ignore[attr-defined]source_bucket.name,# type: ignore[attr-defined]destination_object,destination_bucket.name,# type: ignore[attr-defined]
)
[docs]defdownload(self,bucket_name:str,object_name:str,filename:Optional[str]=None,chunk_size:Optional[int]=None,timeout:Optional[int]=DEFAULT_TIMEOUT,num_max_attempts:Optional[int]=1,)->Union[str,bytes]:""" Downloads a file from Google Cloud Storage. When no filename is supplied, the operator loads the file into memory and returns its content. When a filename is supplied, it writes the file to the specified location and returns the location. For file sizes that exceed the available memory it is recommended to write to a file. :param bucket_name: The bucket to fetch from. :type bucket_name: str :param object_name: The object to fetch. :type object_name: str :param filename: If set, a local file path where the file should be written to. :type filename: str :param chunk_size: Blob chunk size. :type chunk_size: int :param timeout: Request timeout in seconds. :type timeout: int :param num_max_attempts: Number of attempts to download the file. :type num_max_attempts: int """# TODO: future improvement check file size before downloading,# to check for local space availabilitynum_file_attempts=0whilenum_file_attempts<num_max_attempts:try:num_file_attempts+=1client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.blob(blob_name=object_name,chunk_size=chunk_size)iffilename:blob.download_to_filename(filename,timeout=timeout)self.log.info('File downloaded to %s',filename)returnfilenameelse:returnblob.download_as_string()exceptGoogleCloudError:ifnum_file_attempts==num_max_attempts:self.log.error('Download attempt of object: %s from %s has failed. Attempt: %s, max %s.',object_name,object_name,num_file_attempts,num_max_attempts,)raise# Wait with exponential backoff scheme before retrying.timeout_seconds=1.0*2**(num_file_attempts-1)time.sleep(timeout_seconds)continue
[docs]defprovide_file(self,bucket_name:Optional[str]=None,object_name:Optional[str]=None,object_url:Optional[str]=None,):""" Downloads the file to a temporary directory and returns a file handle You can use this method by passing the bucket_name and object_name parameters or just object_url parameter. :param bucket_name: The bucket to fetch from. :type bucket_name: str :param object_name: The object to fetch. :type object_name: str :param object_url: File reference url. Must start with "gs: //" :type object_url: str :return: File handler """ifobject_nameisNone:raiseValueError("Object name can not be empty")_,_,file_name=object_name.rpartition("/")withNamedTemporaryFile(suffix=file_name)astmp_file:self.download(bucket_name=bucket_name,object_name=object_name,filename=tmp_file.name)tmp_file.flush()yieldtmp_file
[docs]defprovide_file_and_upload(self,bucket_name:Optional[str]=None,object_name:Optional[str]=None,object_url:Optional[str]=None,):""" Creates temporary file, returns a file handle and uploads the files content on close. You can use this method by passing the bucket_name and object_name parameters or just object_url parameter. :param bucket_name: The bucket to fetch from. :type bucket_name: str :param object_name: The object to fetch. :type object_name: str :param object_url: File reference url. Must start with "gs: //" :type object_url: str :return: File handler """ifobject_nameisNone:raiseValueError("Object name can not be empty")_,_,file_name=object_name.rpartition("/")withNamedTemporaryFile(suffix=file_name)astmp_file:yieldtmp_filetmp_file.flush()self.upload(bucket_name=bucket_name,object_name=object_name,filename=tmp_file.name)
[docs]defupload(self,bucket_name:str,object_name:str,filename:Optional[str]=None,data:Optional[Union[str,bytes]]=None,mime_type:Optional[str]=None,gzip:bool=False,encoding:str='utf-8',chunk_size:Optional[int]=None,timeout:Optional[int]=DEFAULT_TIMEOUT,num_max_attempts:int=1,)->None:""" Uploads a local file or file data as string or bytes to Google Cloud Storage. :param bucket_name: The bucket to upload to. :type bucket_name: str :param object_name: The object name to set when uploading the file. :type object_name: str :param filename: The local file path to the file to be uploaded. :type filename: str :param data: The file's data as a string or bytes to be uploaded. :type data: str :param mime_type: The file's mime type set when uploading the file. :type mime_type: str :param gzip: Option to compress local file or file data for upload :type gzip: bool :param encoding: bytes encoding for file data if provided as string :type encoding: str :param chunk_size: Blob chunk size. :type chunk_size: int :param timeout: Request timeout in seconds. :type timeout: int :param num_max_attempts: Number of attempts to try to upload the file. :type num_max_attempts: int """def_call_with_retry(f:Callable[[],None])->None:"""Helper functions to upload a file or a string with a retry mechanism and exponential back-off. :param f: Callable that should be retried. :type f: Callable[[], None] """num_file_attempts=0whilenum_file_attempts<num_max_attempts:try:num_file_attempts+=1f()exceptGoogleCloudErrorase:ifnum_file_attempts==num_max_attempts:self.log.error('Upload attempt of object: %s from %s has failed. Attempt: %s, max %s.',object_name,object_name,num_file_attempts,num_max_attempts,)raisee# Wait with exponential backoff scheme before retrying.timeout_seconds=1.0*2**(num_file_attempts-1)time.sleep(timeout_seconds)continueclient=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.blob(blob_name=object_name,chunk_size=chunk_size)iffilenameanddata:raiseValueError("'filename' and 'data' parameter provided. Please ""specify a single parameter, either 'filename' for ""local file uploads or 'data' for file content uploads.")eliffilename:ifnotmime_type:mime_type='application/octet-stream'ifgzip:filename_gz=filename+'.gz'withopen(filename,'rb')asf_in:withgz.open(filename_gz,'wb')asf_out:shutil.copyfileobj(f_in,f_out)filename=filename_gz_call_with_retry(partial(blob.upload_from_filename,filename=filename,content_type=mime_type,timeout=timeout))ifgzip:os.remove(filename)self.log.info('File %s uploaded to %s in %s bucket',filename,object_name,bucket_name)elifdata:ifnotmime_type:mime_type='text/plain'ifgzip:ifisinstance(data,str):data=bytes(data,encoding)out=BytesIO()withgz.GzipFile(fileobj=out,mode="w")asf:f.write(data)data=out.getvalue()_call_with_retry(partial(blob.upload_from_string,data,content_type=mime_type,timeout=timeout))self.log.info('Data stream uploaded to %s in %s bucket',object_name,bucket_name)else:raiseValueError("'filename' and 'data' parameter missing. One is required to upload to gcs.")
[docs]defexists(self,bucket_name:str,object_name:str)->bool:""" Checks for the existence of a file in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the object is. :type bucket_name: str :param object_name: The name of the blob_name to check in the Google cloud storage bucket. :type object_name: str """client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.blob(blob_name=object_name)returnblob.exists()
[docs]defget_blob_update_time(self,bucket_name:str,object_name:str):""" Get the update time of a file in Google Cloud Storage :param bucket_name: The Google Cloud Storage bucket where the object is. :type bucket_name: str :param object_name: The name of the blob to get updated time from the Google cloud storage bucket. :type object_name: str """client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.get_blob(blob_name=object_name)ifblobisNone:raiseValueError(f"Object ({object_name}) not found in Bucket ({bucket_name})")returnblob.updated
[docs]defis_updated_after(self,bucket_name:str,object_name:str,ts:datetime)->bool:""" Checks if an blob_name is updated in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the object is. :type bucket_name: str :param object_name: The name of the object to check in the Google cloud storage bucket. :type object_name: str :param ts: The timestamp to check against. :type ts: datetime.datetime """blob_update_time=self.get_blob_update_time(bucket_name,object_name)ifblob_update_timeisnotNone:ifnotts.tzinfo:ts=ts.replace(tzinfo=timezone.utc)self.log.info("Verify object date: %s > %s",blob_update_time,ts)ifblob_update_time>ts:returnTruereturnFalse
[docs]defis_updated_between(self,bucket_name:str,object_name:str,min_ts:datetime,max_ts:datetime)->bool:""" Checks if an blob_name is updated in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the object is. :type bucket_name: str :param object_name: The name of the object to check in the Google cloud storage bucket. :type object_name: str :param min_ts: The minimum timestamp to check against. :type min_ts: datetime.datetime :param max_ts: The maximum timestamp to check against. :type max_ts: datetime.datetime """blob_update_time=self.get_blob_update_time(bucket_name,object_name)ifblob_update_timeisnotNone:ifnotmin_ts.tzinfo:min_ts=min_ts.replace(tzinfo=timezone.utc)ifnotmax_ts.tzinfo:max_ts=max_ts.replace(tzinfo=timezone.utc)self.log.info("Verify object date: %s is between %s and %s",blob_update_time,min_ts,max_ts)ifmin_ts<=blob_update_time<max_ts:returnTruereturnFalse
[docs]defis_updated_before(self,bucket_name:str,object_name:str,ts:datetime)->bool:""" Checks if an blob_name is updated before given time in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the object is. :type bucket_name: str :param object_name: The name of the object to check in the Google cloud storage bucket. :type object_name: str :param ts: The timestamp to check against. :type ts: datetime.datetime """blob_update_time=self.get_blob_update_time(bucket_name,object_name)ifblob_update_timeisnotNone:ifnotts.tzinfo:ts=ts.replace(tzinfo=timezone.utc)self.log.info("Verify object date: %s < %s",blob_update_time,ts)ifblob_update_time<ts:returnTruereturnFalse
[docs]defis_older_than(self,bucket_name:str,object_name:str,seconds:int)->bool:""" Check if object is older than given time :param bucket_name: The Google Cloud Storage bucket where the object is. :type bucket_name: str :param object_name: The name of the object to check in the Google cloud storage bucket. :type object_name: str :param seconds: The time in seconds to check against :type seconds: int """blob_update_time=self.get_blob_update_time(bucket_name,object_name)ifblob_update_timeisnotNone:fromdatetimeimporttimedeltacurrent_time=timezone.utcnow()given_time=current_time-timedelta(seconds=seconds)self.log.info("Verify object date: %s is older than %s",blob_update_time,given_time)ifblob_update_time<given_time:returnTruereturnFalse
[docs]defdelete(self,bucket_name:str,object_name:str)->None:""" Deletes an object from the bucket. :param bucket_name: name of the bucket, where the object resides :type bucket_name: str :param object_name: name of the object to delete :type object_name: str """client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.blob(blob_name=object_name)blob.delete()self.log.info('Blob %s deleted.',object_name)
[docs]defdelete_bucket(self,bucket_name:str,force:bool=False)->None:""" Delete a bucket object from the Google Cloud Storage. :param bucket_name: name of the bucket which will be deleted :type bucket_name: str :param force: false not allow to delete non empty bucket, set force=True allows to delete non empty bucket :type: bool """client=self.get_conn()bucket=client.bucket(bucket_name)self.log.info("Deleting %s bucket",bucket_name)try:bucket.delete(force=force)self.log.info("Bucket %s has been deleted",bucket_name)exceptNotFound:self.log.info("Bucket %s not exists",bucket_name)
[docs]deflist(self,bucket_name,versions=None,max_results=None,prefix=None,delimiter=None)->list:""" List all objects from the bucket with the give string prefix in name :param bucket_name: bucket name :type bucket_name: str :param versions: if true, list all versions of the objects :type versions: bool :param max_results: max count of items to return in a single page of responses :type max_results: int :param prefix: prefix string which filters objects whose name begin with this prefix :type prefix: str :param delimiter: filters objects based on the delimiter (for e.g '.csv') :type delimiter: str :return: a stream of object names matching the filtering criteria """client=self.get_conn()bucket=client.bucket(bucket_name)ids=[]page_token=NonewhileTrue:blobs=bucket.list_blobs(max_results=max_results,page_token=page_token,prefix=prefix,delimiter=delimiter,versions=versions,)blob_names=[]forblobinblobs:blob_names.append(blob.name)prefixes=blobs.prefixesifprefixes:ids+=list(prefixes)else:ids+=blob_namespage_token=blobs.next_page_tokenifpage_tokenisNone:# empty next page tokenbreakreturnids
[docs]deflist_by_timespan(self,bucket_name:str,timespan_start:datetime,timespan_end:datetime,versions:bool=None,max_results:int=None,prefix:str=None,delimiter:str=None,)->list:""" List all objects from the bucket with the give string prefix in name that were updated in the time between ``timespan_start`` and ``timespan_end``. :param bucket_name: bucket name :type bucket_name: str :param timespan_start: will return objects that were updated at or after this datetime (UTC) :type timespan_start: datetime :param timespan_end: will return objects that were updated before this datetime (UTC) :type timespan_end: datetime :param versions: if true, list all versions of the objects :type versions: bool :param max_results: max count of items to return in a single page of responses :type max_results: int :param prefix: prefix string which filters objects whose name begin with this prefix :type prefix: str :param delimiter: filters objects based on the delimiter (for e.g '.csv') :type delimiter: str :return: a stream of object names matching the filtering criteria """client=self.get_conn()bucket=client.bucket(bucket_name)ids=[]page_token=NonewhileTrue:blobs=bucket.list_blobs(max_results=max_results,page_token=page_token,prefix=prefix,delimiter=delimiter,versions=versions,)blob_names=[]forblobinblobs:iftimespan_start<=blob.updated.replace(tzinfo=timezone.utc)<timespan_end:blob_names.append(blob.name)prefixes=blobs.prefixesifprefixes:ids+=list(prefixes)else:ids+=blob_namespage_token=blobs.next_page_tokenifpage_tokenisNone:# empty next page tokenbreakreturnids
[docs]defget_size(self,bucket_name:str,object_name:str)->int:""" Gets the size of a file in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the blob_name is. :type bucket_name: str :param object_name: The name of the object to check in the Google cloud storage bucket_name. :type object_name: str """self.log.info('Checking the file size of object: %s in bucket_name: %s',object_name,bucket_name)client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.get_blob(blob_name=object_name)blob_size=blob.sizeself.log.info('The file size of %s is %s bytes.',object_name,blob_size)returnblob_size
[docs]defget_crc32c(self,bucket_name:str,object_name:str):""" Gets the CRC32c checksum of an object in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the blob_name is. :type bucket_name: str :param object_name: The name of the object to check in the Google cloud storage bucket_name. :type object_name: str """self.log.info('Retrieving the crc32c checksum of object_name: %s in bucket_name: %s',object_name,bucket_name,)client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.get_blob(blob_name=object_name)blob_crc32c=blob.crc32cself.log.info('The crc32c checksum of %s is %s',object_name,blob_crc32c)returnblob_crc32c
[docs]defget_md5hash(self,bucket_name:str,object_name:str)->str:""" Gets the MD5 hash of an object in Google Cloud Storage. :param bucket_name: The Google Cloud Storage bucket where the blob_name is. :type bucket_name: str :param object_name: The name of the object to check in the Google cloud storage bucket_name. :type object_name: str """self.log.info('Retrieving the MD5 hash of object: %s in bucket: %s',object_name,bucket_name)client=self.get_conn()bucket=client.bucket(bucket_name)blob=bucket.get_blob(blob_name=object_name)blob_md5hash=blob.md5_hashself.log.info('The md5Hash of %s is %s',object_name,blob_md5hash)returnblob_md5hash
@GoogleBaseHook.fallback_to_default_project_id
[docs]defcreate_bucket(self,bucket_name:str,resource:Optional[dict]=None,storage_class:str='MULTI_REGIONAL',location:str='US',project_id:Optional[str]=None,labels:Optional[dict]=None,)->str:""" Creates a new bucket. Google Cloud Storage uses a flat namespace, so you can't create a bucket with a name that is already in use. .. seealso:: For more information, see Bucket Naming Guidelines: https://cloud.google.com/storage/docs/bucketnaming.html#requirements :param bucket_name: The name of the bucket. :type bucket_name: str :param resource: An optional dict with parameters for creating the bucket. For information on available parameters, see Cloud Storage API doc: https://cloud.google.com/storage/docs/json_api/v1/buckets/insert :type resource: dict :param storage_class: This defines how objects in the bucket are stored and determines the SLA and the cost of storage. Values include - ``MULTI_REGIONAL`` - ``REGIONAL`` - ``STANDARD`` - ``NEARLINE`` - ``COLDLINE``. If this value is not specified when the bucket is created, it will default to STANDARD. :type storage_class: str :param location: The location of the bucket. Object data for objects in the bucket resides in physical storage within this region. Defaults to US. .. seealso:: https://developers.google.com/storage/docs/bucket-locations :type location: str :param project_id: The ID of the Google Cloud Project. :type project_id: str :param labels: User-provided labels, in key/value pairs. :type labels: dict :return: If successful, it returns the ``id`` of the bucket. """self.log.info('Creating Bucket: %s; Location: %s; Storage Class: %s',bucket_name,location,storage_class)# Add airflow-version label to the bucketlabels=labelsor{}labels['airflow-version']='v'+version.replace('.','-').replace('+','-')client=self.get_conn()bucket=client.bucket(bucket_name=bucket_name)bucket_resource=resourceor{}foriteminbucket_resource:ifitem!="name":bucket._patch_property(name=item,value=resource[item])# type: ignore[index]bucket.storage_class=storage_classbucket.labels=labelsbucket.create(project=project_id,location=location)returnbucket.id
[docs]definsert_bucket_acl(self,bucket_name:str,entity:str,role:str,user_project:Optional[str]=None)->None:""" Creates a new ACL entry on the specified bucket_name. See: https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert :param bucket_name: Name of a bucket_name. :type bucket_name: str :param entity: The entity holding the permission, in one of the following forms: user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers. See: https://cloud.google.com/storage/docs/access-control/lists#scopes :type entity: str :param role: The access permission for the entity. Acceptable values are: "OWNER", "READER", "WRITER". :type role: str :param user_project: (Optional) The project to be billed for this request. Required for Requester Pays buckets. :type user_project: str """self.log.info('Creating a new ACL entry in bucket: %s',bucket_name)client=self.get_conn()bucket=client.bucket(bucket_name=bucket_name)bucket.acl.reload()bucket.acl.entity_from_dict(entity_dict={"entity":entity,"role":role})ifuser_project:bucket.acl.user_project=user_projectbucket.acl.save()self.log.info('A new ACL entry created in bucket: %s',bucket_name)
[docs]definsert_object_acl(self,bucket_name:str,object_name:str,entity:str,role:str,generation:Optional[int]=None,user_project:Optional[str]=None,)->None:""" Creates a new ACL entry on the specified object. See: https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert :param bucket_name: Name of a bucket_name. :type bucket_name: str :param object_name: Name of the object. For information about how to URL encode object names to be path safe, see: https://cloud.google.com/storage/docs/json_api/#encoding :type object_name: str :param entity: The entity holding the permission, in one of the following forms: user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers See: https://cloud.google.com/storage/docs/access-control/lists#scopes :type entity: str :param role: The access permission for the entity. Acceptable values are: "OWNER", "READER". :type role: str :param generation: Optional. If present, selects a specific revision of this object. :type generation: long :param user_project: (Optional) The project to be billed for this request. Required for Requester Pays buckets. :type user_project: str """self.log.info('Creating a new ACL entry for object: %s in bucket: %s',object_name,bucket_name)client=self.get_conn()bucket=client.bucket(bucket_name=bucket_name)blob=bucket.blob(blob_name=object_name,generation=generation)# Reload fetches the current ACL from Cloud Storage.blob.acl.reload()blob.acl.entity_from_dict(entity_dict={"entity":entity,"role":role})ifuser_project:blob.acl.user_project=user_projectblob.acl.save()self.log.info('A new ACL entry created for object: %s in bucket: %s',object_name,bucket_name)
[docs]defcompose(self,bucket_name:str,source_objects:List,destination_object:str)->None:""" Composes a list of existing object into a new object in the same storage bucket_name Currently it only supports up to 32 objects that can be concatenated in a single operation https://cloud.google.com/storage/docs/json_api/v1/objects/compose :param bucket_name: The name of the bucket containing the source objects. This is also the same bucket to store the composed destination object. :type bucket_name: str :param source_objects: The list of source objects that will be composed into a single object. :type source_objects: list :param destination_object: The path of the object if given. :type destination_object: str """ifnotsource_objects:raiseValueError('source_objects cannot be empty.')ifnotbucket_nameornotdestination_object:raiseValueError('bucket_name and destination_object cannot be empty.')self.log.info("Composing %s to %s in the bucket %s",source_objects,destination_object,bucket_name)client=self.get_conn()bucket=client.bucket(bucket_name)destination_blob=bucket.blob(destination_object)destination_blob.compose(sources=[bucket.blob(blob_name=source_object)forsource_objectinsource_objects])self.log.info("Completed successfully.")
[docs]defsync(self,source_bucket:str,destination_bucket:str,source_object:Optional[str]=None,destination_object:Optional[str]=None,recursive:bool=True,allow_overwrite:bool=False,delete_extra_files:bool=False,)->None:""" Synchronizes the contents of the buckets. Parameters ``source_object`` and ``destination_object`` describe the root sync directories. If they are not passed, the entire bucket will be synchronized. If they are passed, they should point to directories. .. note:: The synchronization of individual files is not supported. Only entire directories can be synchronized. :param source_bucket: The name of the bucket containing the source objects. :type source_bucket: str :param destination_bucket: The name of the bucket containing the destination objects. :type destination_bucket: str :param source_object: The root sync directory in the source bucket. :type source_object: Optional[str] :param destination_object: The root sync directory in the destination bucket. :type destination_object: Optional[str] :param recursive: If True, subdirectories will be considered :type recursive: bool :param recursive: If True, subdirectories will be considered :type recursive: bool :param allow_overwrite: if True, the files will be overwritten if a mismatched file is found. By default, overwriting files is not allowed :type allow_overwrite: bool :param delete_extra_files: if True, deletes additional files from the source that not found in the destination. By default extra files are not deleted. .. note:: This option can delete data quickly if you specify the wrong source/destination combination. :type delete_extra_files: bool :return: none """client=self.get_conn()# Create bucket objectsource_bucket_obj=client.bucket(source_bucket)destination_bucket_obj=client.bucket(destination_bucket)# Normalize parameters when they are passedsource_object=self._normalize_directory_path(source_object)destination_object=self._normalize_directory_path(destination_object)# Calculate the number of characters that remove from the name, because they contain information# about the parent's pathsource_object_prefix_len=len(source_object)ifsource_objectelse0# Prepare synchronization planto_copy_blobs,to_delete_blobs,to_rewrite_blobs=self._prepare_sync_plan(source_bucket=source_bucket_obj,destination_bucket=destination_bucket_obj,source_object=source_object,destination_object=destination_object,recursive=recursive,)self.log.info("Planned synchronization. To delete blobs count: %s, to upload blobs count: %s, ""to rewrite blobs count: %s",len(to_delete_blobs),len(to_copy_blobs),len(to_rewrite_blobs),)# Copy missing object to new bucketifnotto_copy_blobs:self.log.info("Skipped blobs copying.")else:forblobinto_copy_blobs:dst_object=self._calculate_sync_destination_path(blob,destination_object,source_object_prefix_len)self.copy(source_bucket=source_bucket_obj.name,source_object=blob.name,destination_bucket=destination_bucket_obj.name,destination_object=dst_object,)self.log.info("Blobs copied.")# Delete redundant filesifnotto_delete_blobs:self.log.info("Skipped blobs deleting.")elifdelete_extra_files:# TODO: Add batch. I tried to do it, but the Google library is not stable at the moment.forblobinto_delete_blobs:self.delete(blob.bucket.name,blob.name)self.log.info("Blobs deleted.")# Overwrite files that are differentifnotto_rewrite_blobs:self.log.info("Skipped blobs overwriting.")elifallow_overwrite:forblobinto_rewrite_blobs:dst_object=self._calculate_sync_destination_path(blob,destination_object,source_object_prefix_len)self.rewrite(source_bucket=source_bucket_obj.name,source_object=blob.name,destination_bucket=destination_bucket_obj.name,destination_object=dst_object,)self.log.info("Blobs rewritten.")self.log.info("Synchronization finished.")
def_calculate_sync_destination_path(self,blob:storage.Blob,destination_object:Optional[str],source_object_prefix_len:int)->str:return(path.join(destination_object,blob.name[source_object_prefix_len:])ifdestination_objectelseblob.name[source_object_prefix_len:])def_normalize_directory_path(self,source_object:Optional[str])->Optional[str]:returnsource_object+"/"ifsource_objectandnotsource_object.endswith("/")elsesource_object@staticmethoddef_prepare_sync_plan(source_bucket:storage.Bucket,destination_bucket:storage.Bucket,source_object:Optional[str],destination_object:Optional[str],recursive:bool,)->Tuple[Set[storage.Blob],Set[storage.Blob],Set[storage.Blob]]:# Calculate the number of characters that remove from the name, because they contain information# about the parent's pathsource_object_prefix_len=len(source_object)ifsource_objectelse0destination_object_prefix_len=len(destination_object)ifdestination_objectelse0delimiter="/"ifnotrecursiveelseNone# Fetch blobs listsource_blobs=list(source_bucket.list_blobs(prefix=source_object,delimiter=delimiter))destination_blobs=list(destination_bucket.list_blobs(prefix=destination_object,delimiter=delimiter))# Create indexes that allow you to identify blobs based on their namesource_names_index={a.name[source_object_prefix_len:]:aforainsource_blobs}destination_names_index={a.name[destination_object_prefix_len:]:aforaindestination_blobs}# Create sets with names without parent object namesource_names=set(source_names_index.keys())destination_names=set(destination_names_index.keys())# Determine objects to copy and deleteto_copy=source_names-destination_namesto_delete=destination_names-source_namesto_copy_blobs={source_names_index[a]forainto_copy}# type: Set[storage.Blob]to_delete_blobs={destination_names_index[a]forainto_delete}# type: Set[storage.Blob]# Find names that are in both bucketsnames_to_check=source_names.intersection(destination_names)to_rewrite_blobs=set()# type: Set[storage.Blob]# Compare objects based on crc32forcurrent_nameinnames_to_check:source_blob=source_names_index[current_name]destination_blob=destination_names_index[current_name]# if the objects are different, save itifsource_blob.crc32c!=destination_blob.crc32c:to_rewrite_blobs.add(source_blob)returnto_copy_blobs,to_delete_blobs,to_rewrite_blobs
[docs]defgcs_object_is_directory(bucket:str)->bool:""" Return True if given Google Cloud Storage URL (gs://<bucket>/<blob>) is a directory or an empty bucket. Otherwise return False. """_,blob=_parse_gcs_url(bucket)returnlen(blob)==0orblob.endswith('/')
def_parse_gcs_url(gsurl:str)->Tuple[str,str]:""" Given a Google Cloud Storage URL (gs://<bucket>/<blob>), returns a tuple containing the corresponding bucket and blob. """parsed_url=urlparse(gsurl)ifnotparsed_url.netloc:raiseAirflowException('Please provide a bucket name')ifparsed_url.scheme.lower()!="gs":raiseAirflowException(f"Schema must be to 'gs://': Current schema: '{parsed_url.scheme}://'")bucket=parsed_url.netloc# Remove leading '/' but NOT trailing oneblob=parsed_url.path.lstrip('/')returnbucket,blob