Source code for airflow.providers.google.cloud.transfers.gcs_to_gcs

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""This module contains a Google Cloud Storage operator."""
import warnings
from typing import Optional, Sequence, Union

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.google.cloud.hooks.gcs import GCSHook
from airflow.utils.decorators import apply_defaults

[docs]WILDCARD = '*'
[docs]class GCSToGCSOperator(BaseOperator): """ Copies objects from a bucket to another, with renaming if requested. .. seealso:: For more information on how to use this operator, take a look at the guide: :ref:`howto/operator:GCSToGCSOperator` :param source_bucket: The source Google Cloud Storage bucket where the object is. (templated) :type source_bucket: str :param source_object: The source name of the object to copy in the Google cloud storage bucket. (templated) You can use only one wildcard for objects (filenames) within your bucket. The wildcard can appear inside the object name or at the end of the object name. Appending a wildcard to the bucket name is unsupported. :type source_object: str :param source_objects: A list of source name of the objects to copy in the Google cloud storage bucket. (templated) :type source_objects: List[str] :param destination_bucket: The destination Google Cloud Storage bucket where the object should be. If the destination_bucket is None, it defaults to source_bucket. (templated) :type destination_bucket: str :param destination_object: The destination name of the object in the destination Google Cloud Storage bucket. (templated) If a wildcard is supplied in the source_object argument, this is the prefix that will be prepended to the final destination objects' paths. Note that the source path's part before the wildcard will be removed; if it needs to be retained it should be appended to destination_object. For example, with prefix ``foo/*`` and destination_object ``blah/``, the file ``foo/baz`` will be copied to ``blah/baz``; to retain the prefix write the destination_object as e.g. ``blah/foo``, in which case the copied file will be named ``blah/foo/baz``. The same thing applies to source objects inside source_objects. :type destination_object: str :param move_object: When move object is True, the object is moved instead of copied to the new location. This is the equivalent of a mv command as opposed to a cp command. :type move_object: bool :param replace: Whether you want to replace existing destination files or not. :type replace: bool :param delimiter: This is used to restrict the result to only the 'files' in a given 'folder'. If source_objects = ['foo/bah/'] and delimiter = '.avro', then only the 'files' in the folder 'foo/bah/' with '.avro' delimiter will be copied to the destination object. :type delimiter: str :param gcp_conn_id: (Optional) The connection ID used to connect to Google Cloud. :type gcp_conn_id: str :param google_cloud_storage_conn_id: (Deprecated) The connection ID used to connect to Google Cloud. This parameter has been deprecated. You should pass the gcp_conn_id parameter instead. :type google_cloud_storage_conn_id: str :param delegate_to: The account to impersonate using domain-wide delegation of authority, if any. For this to work, the service account making the request must have domain-wide delegation enabled. :type delegate_to: str :param last_modified_time: When specified, the objects will be copied or moved, only if they were modified after last_modified_time. If tzinfo has not been set, UTC will be assumed. :type last_modified_time: datetime.datetime :param maximum_modified_time: When specified, the objects will be copied or moved, only if they were modified before maximum_modified_time. If tzinfo has not been set, UTC will be assumed. :type maximum_modified_time: datetime.datetime :param is_older_than: When specified, the objects will be copied if they are older than the specified time in seconds. :type is_older_than: int :param impersonation_chain: Optional service account to impersonate using short-term credentials, or chained list of accounts required to get the access_token of the last account in the list, which will be impersonated in the request. If set as a string, the account must grant the originating account the Service Account Token Creator IAM role. If set as a sequence, the identities from the list must grant Service Account Token Creator IAM role to the directly preceding identity, with first account from the list granting this role to the originating account (templated). :type impersonation_chain: Union[str, Sequence[str]] :Example: The following Operator would copy a single file named ``sales/sales-2017/january.avro`` in the ``data`` bucket to the file named ``copied_sales/2017/january-backup.avro`` in the ``data_backup`` bucket :: copy_single_file = GCSToGCSOperator( task_id='copy_single_file', source_bucket='data', source_objects=['sales/sales-2017/january.avro'], destination_bucket='data_backup', destination_object='copied_sales/2017/january-backup.avro', gcp_conn_id=google_cloud_conn_id ) The following Operator would copy all the Avro files from ``sales/sales-2017`` folder (i.e. with names starting with that prefix) in ``data`` bucket to the ``copied_sales/2017`` folder in the ``data_backup`` bucket. :: copy_files = GCSToGCSOperator( task_id='copy_files', source_bucket='data', source_objects=['sales/sales-2017'], destination_bucket='data_backup', destination_object='copied_sales/2017/', delimiter='.avro' gcp_conn_id=google_cloud_conn_id ) Or :: copy_files = GCSToGCSOperator( task_id='copy_files', source_bucket='data', source_object='sales/sales-2017/*.avro', destination_bucket='data_backup', destination_object='copied_sales/2017/', gcp_conn_id=google_cloud_conn_id ) The following Operator would move all the Avro files from ``sales/sales-2017`` folder (i.e. with names starting with that prefix) in ``data`` bucket to the same folder in the ``data_backup`` bucket, deleting the original files in the process. :: move_files = GCSToGCSOperator( task_id='move_files', source_bucket='data', source_object='sales/sales-2017/*.avro', destination_bucket='data_backup', move_object=True, gcp_conn_id=google_cloud_conn_id ) The following Operator would move all the Avro files from ``sales/sales-2019`` and ``sales/sales-2020` folder in ``data`` bucket to the same folder in the ``data_backup`` bucket, deleting the original files in the process. :: move_files = GCSToGCSOperator( task_id='move_files', source_bucket='data', source_objects=['sales/sales-2019/*.avro', 'sales/sales-2020'], destination_bucket='data_backup', delimiter='.avro', move_object=True, gcp_conn_id=google_cloud_conn_id ) """
[docs] template_fields = ( 'source_bucket', 'source_object', 'source_objects', 'destination_bucket', 'destination_object', 'delimiter', 'impersonation_chain',
)
[docs] ui_color = '#f0eee4'
@apply_defaults def __init__( self, *, # pylint: disable=too-many-arguments source_bucket, source_object=None, source_objects=None, destination_bucket=None, destination_object=None, delimiter=None, move_object=False, replace=True, gcp_conn_id='google_cloud_default', google_cloud_storage_conn_id=None, delegate_to=None, last_modified_time=None, maximum_modified_time=None, is_older_than=None, impersonation_chain: Optional[Union[str, Sequence[str]]] = None, **kwargs, ): super().__init__(**kwargs) if google_cloud_storage_conn_id: warnings.warn( "The google_cloud_storage_conn_id parameter has been deprecated. You should pass " "the gcp_conn_id parameter.", DeprecationWarning, stacklevel=3, ) gcp_conn_id = google_cloud_storage_conn_id self.source_bucket = source_bucket self.source_object = source_object self.source_objects = source_objects self.destination_bucket = destination_bucket self.destination_object = destination_object self.delimiter = delimiter self.move_object = move_object self.replace = replace self.gcp_conn_id = gcp_conn_id self.delegate_to = delegate_to self.last_modified_time = last_modified_time self.maximum_modified_time = maximum_modified_time self.is_older_than = is_older_than self.impersonation_chain = impersonation_chain
[docs] def execute(self, context): hook = GCSHook( gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to, impersonation_chain=self.impersonation_chain, ) if self.source_objects and self.source_object: error_msg = ( "You can either set source_object parameter or source_objects " "parameter but not both. Found source_object={} and" " source_objects={}".format(self.source_object, self.source_objects) ) raise AirflowException(error_msg) if not self.source_object and not self.source_objects: error_msg = "You must set source_object parameter or source_objects parameter. None set" raise AirflowException(error_msg) if self.source_objects and not all(isinstance(item, str) for item in self.source_objects): raise AirflowException('At least, one of the `objects` in the `source_objects` is not a string') # If source_object is set, default it to source_objects if self.source_object: self.source_objects = [self.source_object] if self.destination_bucket is None: self.log.warning( 'destination_bucket is None. Defaulting it to source_bucket (%s)', self.source_bucket ) self.destination_bucket = self.source_bucket # An empty source_object means to copy all files if len(self.source_objects) == 0: self.source_objects = [''] # Raise exception if empty string `''` is used twice in source_object, this is to avoid double copy if self.source_objects.count('') > 1: raise AirflowException("You can't have two empty strings inside source_object") # Iterate over the source_objects and do the copy for prefix in self.source_objects: # Check if prefix contains wildcard if WILDCARD in prefix: self._copy_source_with_wildcard(hook=hook, prefix=prefix) # Now search with prefix using provided delimiter if any else: self._copy_source_without_wildcard(hook=hook, prefix=prefix)
[docs] def _copy_source_without_wildcard(self, hook, prefix): """ For source_objects with no wildcard, this operator would first list all files in source_objects, using provided delimiter if any. Then copy files from source_objects to destination_object and rename each source file. Example 1: The following Operator would copy all the files from ``a/``folder (i.e a/a.csv, a/b.csv, a/c.csv)in ``data`` bucket to the ``b/`` folder in the ``data_backup`` bucket (b/a.csv, b/b.csv, b/c.csv) :: copy_files = GCSToGCSOperator( task_id='copy_files_without_wildcard', source_bucket='data', source_objects=['a/'], destination_bucket='data_backup', destination_object='b/', gcp_conn_id=google_cloud_conn_id ) Example 2: The following Operator would copy all avro files from ``a/``folder (i.e a/a.avro, a/b.avro, a/c.avro)in ``data`` bucket to the ``b/`` folder in the ``data_backup`` bucket (b/a.avro, b/b.avro, b/c.avro) :: copy_files = GCSToGCSOperator( task_id='copy_files_without_wildcard', source_bucket='data', source_objects=['a/'], destination_bucket='data_backup', destination_object='b/', delimiter='.avro', gcp_conn_id=google_cloud_conn_id ) """ objects = hook.list(self.source_bucket, prefix=prefix, delimiter=self.delimiter) # If objects is empty and we have prefix, let's check if prefix is a blob # and copy directly if len(objects) == 0 and prefix: if hook.exists(self.source_bucket, prefix): self._copy_single_object( hook=hook, source_object=prefix, destination_object=self.destination_object ) for source_obj in objects: if self.destination_object is None: destination_object = source_obj else: destination_object = source_obj.replace(prefix, self.destination_object, 1) self._copy_single_object( hook=hook, source_object=source_obj, destination_object=destination_object
)
[docs] def _copy_source_with_wildcard(self, hook, prefix): total_wildcards = prefix.count(WILDCARD) if total_wildcards > 1: error_msg = ( "Only one wildcard '*' is allowed in source_object parameter. " "Found {} in {}.".format(total_wildcards, prefix) ) raise AirflowException(error_msg) self.log.info('Delimiter ignored because wildcard is in prefix') prefix_, delimiter = prefix.split(WILDCARD, 1) objects = hook.list(self.source_bucket, prefix=prefix_, delimiter=delimiter) if not self.replace: # If we are not replacing, list all files in the Destination GCS bucket # and only keep those files which are present in # Source GCS bucket and not in Destination GCS bucket existing_objects = hook.list(self.destination_bucket, prefix=prefix_, delimiter=delimiter) objects = set(objects) - set(existing_objects) if len(objects) > 0: self.log.info('%s files are going to be synced: %s.', len(objects), objects) else: self.log.info('There are no new files to sync. Have a nice day!') for source_object in objects: if self.destination_object is None: destination_object = source_object else: destination_object = source_object.replace(prefix_, self.destination_object, 1) self._copy_single_object( hook=hook, source_object=source_object, destination_object=destination_object
)
[docs] def _copy_single_object(self, hook, source_object, destination_object): if self.is_older_than: # Here we check if the given object is older than the given time # If given, last_modified_time and maximum_modified_time is ignored if hook.is_older_than(self.source_bucket, source_object, self.is_older_than): self.log.info("Object is older than %s seconds ago", self.is_older_than) else: self.log.debug("Object is not older than %s seconds ago", self.is_older_than) return elif self.last_modified_time and self.maximum_modified_time: # check to see if object was modified between last_modified_time and # maximum_modified_time if hook.is_updated_between( self.source_bucket, source_object, self.last_modified_time, self.maximum_modified_time ): self.log.info( "Object has been modified between %s and %s", self.last_modified_time, self.maximum_modified_time, ) else: self.log.debug( "Object was not modified between %s and %s", self.last_modified_time, self.maximum_modified_time, ) return elif self.last_modified_time is not None: # Check to see if object was modified after last_modified_time if hook.is_updated_after(self.source_bucket, source_object, self.last_modified_time): self.log.info("Object has been modified after %s ", self.last_modified_time) else: self.log.debug("Object was not modified after %s ", self.last_modified_time) return elif self.maximum_modified_time is not None: # Check to see if object was modified before maximum_modified_time if hook.is_updated_before(self.source_bucket, source_object, self.maximum_modified_time): self.log.info("Object has been modified before %s ", self.maximum_modified_time) else: self.log.debug("Object was not modified before %s ", self.maximum_modified_time) return self.log.info( 'Executing copy of gs://%s/%s to gs://%s/%s', self.source_bucket, source_object, self.destination_bucket, destination_object, ) hook.rewrite(self.source_bucket, source_object, self.destination_bucket, destination_object) if self.move_object: hook.delete(self.source_bucket, source_object)

Was this entry helpful?