Source code for airflow.contrib.hooks.gcs_hook

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from googleapiclient.errors import HttpError

from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
from airflow.exceptions import AirflowException

import gzip as gz
import shutil
import re
import os


[docs]class GoogleCloudStorageHook(GoogleCloudBaseHook): """ Interact with Google Cloud Storage. This hook uses the Google Cloud Platform connection. """ def __init__(self, google_cloud_storage_conn_id='google_cloud_default', delegate_to=None): super(GoogleCloudStorageHook, self).__init__(google_cloud_storage_conn_id, delegate_to)
[docs] def get_conn(self): """ Returns a Google Cloud Storage service object. """ http_authorized = self._authorize() return build( 'storage', 'v1', http=http_authorized, cache_discovery=False)
# pylint:disable=redefined-builtin
[docs] def copy(self, source_bucket, source_object, destination_bucket=None, destination_object=None): """ Copies an object from a bucket to another, with renaming if requested. destination_bucket or destination_object can be omitted, in which case source bucket/object is used, but not both. :param source_bucket: The bucket of the object to copy from. :type source_bucket: str :param source_object: The object to copy. :type source_object: str :param destination_bucket: The destination of the object to copied to. Can be omitted; then the same bucket is used. :type destination_bucket: str :param destination_object: The (renamed) path of the object if given. Can be omitted; then the same name is used. :type destination_object: str """ destination_bucket = destination_bucket or source_bucket destination_object = destination_object or source_object if source_bucket == destination_bucket and \ source_object == destination_object: raise ValueError( 'Either source/destination bucket or source/destination object ' 'must be different, not both the same: bucket=%s, object=%s' % (source_bucket, source_object)) if not source_bucket or not source_object: raise ValueError('source_bucket and source_object cannot be empty.') service = self.get_conn() try: service \ .objects() \ .copy(sourceBucket=source_bucket, sourceObject=source_object, destinationBucket=destination_bucket, destinationObject=destination_object, body='') \ .execute() return True except HttpError as ex: if ex.resp['status'] == '404': return False raise
[docs] def rewrite(self, source_bucket, source_object, destination_bucket, destination_object=None): """ Has the same functionality as copy, except that will work on files over 5 TB, as well as when copying between locations and/or storage classes. destination_object can be omitted, in which case source_object is used. :param source_bucket: The bucket of the object to copy from. :type source_bucket: str :param source_object: The object to copy. :type source_object: str :param destination_bucket: The destination of the object to copied to. :type destination_bucket: str :param destination_object: The (renamed) path of the object if given. Can be omitted; then the same name is used. """ destination_object = destination_object or source_object if (source_bucket == destination_bucket and source_object == destination_object): raise ValueError( 'Either source/destination bucket or source/destination object ' 'must be different, not both the same: bucket=%s, object=%s' % (source_bucket, source_object)) if not source_bucket or not source_object: raise ValueError('source_bucket and source_object cannot be empty.') service = self.get_conn() request_count = 1 try: result = service.objects() \ .rewrite(sourceBucket=source_bucket, sourceObject=source_object, destinationBucket=destination_bucket, destinationObject=destination_object, body='') \ .execute() self.log.info('Rewrite request #%s: %s', request_count, result) while not result['done']: request_count += 1 result = service.objects() \ .rewrite(sourceBucket=source_bucket, sourceObject=source_object, destinationBucket=destination_bucket, destinationObject=destination_object, rewriteToken=result['rewriteToken'], body='') \ .execute() self.log.info('Rewrite request #%s: %s', request_count, result) return True except HttpError as ex: if ex.resp['status'] == '404': return False raise
# pylint:disable=redefined-builtin
[docs] def download(self, bucket, object, filename=None): """ Get a file from Google Cloud Storage. :param bucket: The bucket to fetch from. :type bucket: str :param object: The object to fetch. :type object: str :param filename: If set, a local file path where the file should be written to. :type filename: str """ service = self.get_conn() downloaded_file_bytes = service \ .objects() \ .get_media(bucket=bucket, object=object) \ .execute() # Write the file to local file path, if requested. if filename: write_argument = 'wb' if isinstance(downloaded_file_bytes, bytes) else 'w' with open(filename, write_argument) as file_fd: file_fd.write(downloaded_file_bytes) return downloaded_file_bytes
# pylint:disable=redefined-builtin
[docs] def upload(self, bucket, object, filename, mime_type='application/octet-stream', gzip=False, multipart=False, num_retries=0): """ Uploads a local file to Google Cloud Storage. :param bucket: The bucket to upload to. :type bucket: str :param object: The object name to set when uploading the local file. :type object: str :param filename: The local file path to the file to be uploaded. :type filename: str :param mime_type: The MIME type to set when uploading the file. :type mime_type: str :param gzip: Option to compress file for upload :type gzip: bool :param multipart: If True, the upload will be split into multiple HTTP requests. The default size is 256MiB per request. Pass a number instead of True to specify the request size, which must be a multiple of 262144 (256KiB). :type multipart: bool or int :param num_retries: The number of times to attempt to re-upload the file (or individual chunks, in the case of multipart uploads). Retries are attempted with exponential backoff. :type num_retries: int """ service = self.get_conn() if gzip: filename_gz = filename + '.gz' with open(filename, 'rb') as f_in: with gz.open(filename_gz, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) filename = filename_gz try: if multipart: if multipart is True: chunksize = 256 * 1024 * 1024 else: chunksize = multipart if chunksize % (256 * 1024) > 0 or chunksize < 0: raise ValueError("Multipart size is not a multiple of 262144 (256KiB)") media = MediaFileUpload(filename, mimetype=mime_type, chunksize=chunksize, resumable=True) request = service.objects().insert(bucket=bucket, name=object, media_body=media) response = None while response is None: status, response = request.next_chunk(num_retries=num_retries) if status: self.log.info("Upload progress %.1f%%", status.progress() * 100) else: media = MediaFileUpload(filename, mime_type) service \ .objects() \ .insert(bucket=bucket, name=object, media_body=media) \ .execute(num_retries=num_retries) except HttpError as ex: if ex.resp['status'] == '404': return False raise finally: if gzip: os.remove(filename) return True
# pylint:disable=redefined-builtin
[docs] def exists(self, bucket, object): """ Checks for the existence of a file in Google Cloud Storage. :param bucket: The Google cloud storage bucket where the object is. :type bucket: str :param object: The name of the object to check in the Google cloud storage bucket. :type object: str """ service = self.get_conn() try: service \ .objects() \ .get(bucket=bucket, object=object) \ .execute() return True except HttpError as ex: if ex.resp['status'] == '404': return False raise
# pylint:disable=redefined-builtin
[docs] def is_updated_after(self, bucket, object, ts): """ Checks if an object is updated in Google Cloud Storage. :param bucket: The Google cloud storage bucket where the object is. :type bucket: str :param object: The name of the object to check in the Google cloud storage bucket. :type object: str :param ts: The timestamp to check against. :type ts: datetime.datetime """ service = self.get_conn() try: response = (service .objects() .get(bucket=bucket, object=object) .execute()) if 'updated' in response: import dateutil.parser import dateutil.tz if not ts.tzinfo: ts = ts.replace(tzinfo=dateutil.tz.tzutc()) updated = dateutil.parser.parse(response['updated']) self.log.info("Verify object date: %s > %s", updated, ts) if updated > ts: return True except HttpError as ex: if ex.resp['status'] != '404': raise return False
[docs] def delete(self, bucket, object, generation=None): """ Delete an object if versioning is not enabled for the bucket, or if generation parameter is used. :param bucket: name of the bucket, where the object resides :type bucket: str :param object: name of the object to delete :type object: str :param generation: if present, permanently delete the object of this generation :type generation: str :return: True if succeeded """ service = self.get_conn() try: service \ .objects() \ .delete(bucket=bucket, object=object, generation=generation) \ .execute() return True except HttpError as ex: if ex.resp['status'] == '404': return False raise
[docs] def list(self, bucket, versions=None, maxResults=None, prefix=None, delimiter=None): """ List all objects from the bucket with the give string prefix in name :param bucket: bucket name :type bucket: str :param versions: if true, list all versions of the objects :type versions: bool :param maxResults: max count of items to return in a single page of responses :type maxResults: int :param prefix: prefix string which filters objects whose name begin with this prefix :type prefix: str :param delimiter: filters objects based on the delimiter (for e.g '.csv') :type delimiter: str :return: a stream of object names matching the filtering criteria """ service = self.get_conn() ids = list() pageToken = None while True: response = service.objects().list( bucket=bucket, versions=versions, maxResults=maxResults, pageToken=pageToken, prefix=prefix, delimiter=delimiter ).execute() if 'prefixes' not in response: if 'items' not in response: self.log.info("No items found for prefix: %s", prefix) break for item in response['items']: if item and 'name' in item: ids.append(item['name']) else: for item in response['prefixes']: ids.append(item) if 'nextPageToken' not in response: # no further pages of results, so stop the loop break pageToken = response['nextPageToken'] if not pageToken: # empty next page token break return ids
[docs] def get_size(self, bucket, object): """ Gets the size of a file in Google Cloud Storage. :param bucket: The Google cloud storage bucket where the object is. :type bucket: str :param object: The name of the object to check in the Google cloud storage bucket. :type object: str """ self.log.info('Checking the file size of object: %s in bucket: %s', object, bucket) service = self.get_conn() try: response = service.objects().get( bucket=bucket, object=object ).execute() if 'name' in response and response['name'][-1] != '/': # Remove Directories & Just check size of files size = response['size'] self.log.info('The file size of %s is %s bytes.', object, size) return size else: raise ValueError('Object is not a file') except HttpError as ex: if ex.resp['status'] == '404': raise ValueError('Object Not Found')
[docs] def get_crc32c(self, bucket, object): """ Gets the CRC32c checksum of an object in Google Cloud Storage. :param bucket: The Google cloud storage bucket where the object is. :type bucket: str :param object: The name of the object to check in the Google cloud storage bucket. :type object: str """ self.log.info('Retrieving the crc32c checksum of ' 'object: %s in bucket: %s', object, bucket) service = self.get_conn() try: response = service.objects().get( bucket=bucket, object=object ).execute() crc32c = response['crc32c'] self.log.info('The crc32c checksum of %s is %s', object, crc32c) return crc32c except HttpError as ex: if ex.resp['status'] == '404': raise ValueError('Object Not Found')
[docs] def get_md5hash(self, bucket, object): """ Gets the MD5 hash of an object in Google Cloud Storage. :param bucket: The Google cloud storage bucket where the object is. :type bucket: str :param object: The name of the object to check in the Google cloud storage bucket. :type object: str """ self.log.info('Retrieving the MD5 hash of ' 'object: %s in bucket: %s', object, bucket) service = self.get_conn() try: response = service.objects().get( bucket=bucket, object=object ).execute() md5hash = response['md5Hash'] self.log.info('The md5Hash of %s is %s', object, md5hash) return md5hash except HttpError as ex: if ex.resp['status'] == '404': raise ValueError('Object Not Found')
[docs] def create_bucket(self, bucket_name, resource=None, storage_class='MULTI_REGIONAL', location='US', project_id=None, labels=None ): """ Creates a new bucket. Google Cloud Storage uses a flat namespace, so you can't create a bucket with a name that is already in use. .. seealso:: For more information, see Bucket Naming Guidelines: https://cloud.google.com/storage/docs/bucketnaming.html#requirements :param bucket_name: The name of the bucket. :type bucket_name: str :param resource: An optional dict with parameters for creating the bucket. For information on available parameters, see Cloud Storage API doc: https://cloud.google.com/storage/docs/json_api/v1/buckets/insert :type resource: dict :param storage_class: This defines how objects in the bucket are stored and determines the SLA and the cost of storage. Values include - ``MULTI_REGIONAL`` - ``REGIONAL`` - ``STANDARD`` - ``NEARLINE`` - ``COLDLINE``. If this value is not specified when the bucket is created, it will default to STANDARD. :type storage_class: str :param location: The location of the bucket. Object data for objects in the bucket resides in physical storage within this region. Defaults to US. .. seealso:: https://developers.google.com/storage/docs/bucket-locations :type location: str :param project_id: The ID of the GCP Project. :type project_id: str :param labels: User-provided labels, in key/value pairs. :type labels: dict :return: If successful, it returns the ``id`` of the bucket. """ project_id = project_id if project_id is not None else self.project_id storage_classes = [ 'MULTI_REGIONAL', 'REGIONAL', 'NEARLINE', 'COLDLINE', 'STANDARD', # alias for MULTI_REGIONAL/REGIONAL, based on location ] self.log.info('Creating Bucket: %s; Location: %s; Storage Class: %s', bucket_name, location, storage_class) if storage_class not in storage_classes: raise ValueError( 'Invalid value ({}) passed to storage_class. Value should be ' 'one of {}'.format(storage_class, storage_classes)) if not re.match('[a-zA-Z0-9]+', bucket_name[0]): raise ValueError('Bucket names must start with a number or letter.') if not re.match('[a-zA-Z0-9]+', bucket_name[-1]): raise ValueError('Bucket names must end with a number or letter.') service = self.get_conn() bucket_resource = resource or {} bucket_resource.update({ 'name': bucket_name, 'location': location, 'storageClass': storage_class }) self.log.info('The Default Project ID is %s', self.project_id) if labels is not None: bucket_resource['labels'] = labels try: response = service.buckets().insert( project=project_id, body=bucket_resource ).execute() self.log.info('Bucket: %s created successfully.', bucket_name) return response['id'] except HttpError as ex: raise AirflowException( 'Bucket creation failed. Error was: {}'.format(ex.content)
)
[docs] def insert_bucket_acl(self, bucket, entity, role, user_project): """ Creates a new ACL entry on the specified bucket. See: https://cloud.google.com/storage/docs/json_api/v1/bucketAccessControls/insert :param bucket: Name of a bucket. :type bucket: str :param entity: The entity holding the permission, in one of the following forms: user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers. See: https://cloud.google.com/storage/docs/access-control/lists#scopes :type entity: str :param role: The access permission for the entity. Acceptable values are: "OWNER", "READER", "WRITER". :type role: str :param user_project: (Optional) The project to be billed for this request. Required for Requester Pays buckets. :type user_project: str """ self.log.info('Creating a new ACL entry in bucket: %s', bucket) service = self.get_conn() try: response = service.bucketAccessControls().insert( bucket=bucket, body={ "entity": entity, "role": role }, userProject=user_project ).execute() if response: self.log.info('A new ACL entry created in bucket: %s', bucket) except HttpError as ex: raise AirflowException( 'Bucket ACL entry creation failed. Error was: {}'.format(ex.content)
)
[docs] def insert_object_acl(self, bucket, object_name, entity, role, generation, user_project): """ Creates a new ACL entry on the specified object. See: https://cloud.google.com/storage/docs/json_api/v1/objectAccessControls/insert :param bucket: Name of a bucket. :type bucket: str :param object_name: Name of the object. For information about how to URL encode object names to be path safe, see: https://cloud.google.com/storage/docs/json_api/#encoding :type object_name: str :param entity: The entity holding the permission, in one of the following forms: user-userId, user-email, group-groupId, group-email, domain-domain, project-team-projectId, allUsers, allAuthenticatedUsers See: https://cloud.google.com/storage/docs/access-control/lists#scopes :type entity: str :param role: The access permission for the entity. Acceptable values are: "OWNER", "READER". :type role: str :param generation: (Optional) If present, selects a specific revision of this object (as opposed to the latest version, the default). :type generation: str :param user_project: (Optional) The project to be billed for this request. Required for Requester Pays buckets. :type user_project: str """ self.log.info('Creating a new ACL entry for object: %s in bucket: %s', object_name, bucket) service = self.get_conn() try: response = service.objectAccessControls().insert( bucket=bucket, object=object_name, body={ "entity": entity, "role": role }, generation=generation, userProject=user_project ).execute() if response: self.log.info('A new ACL entry created for object: %s in bucket: %s', object_name, bucket) except HttpError as ex: raise AirflowException( 'Object ACL entry creation failed. Error was: {}'.format(ex.content)
)
[docs] def compose(self, bucket, source_objects, destination_object, num_retries=5): """ Composes a list of existing object into a new object in the same storage bucket Currently it only supports up to 32 objects that can be concatenated in a single operation https://cloud.google.com/storage/docs/json_api/v1/objects/compose :param bucket: The name of the bucket containing the source objects. This is also the same bucket to store the composed destination object. :type bucket: str :param source_objects: The list of source objects that will be composed into a single object. :type source_objects: list :param destination_object: The path of the object if given. :type destination_object: str """ if not source_objects or not len(source_objects): raise ValueError('source_objects cannot be empty.') if not bucket or not destination_object: raise ValueError('bucket and destination_object cannot be empty.') service = self.get_conn() dict_source_objects = [{'name': source_object} for source_object in source_objects] body = { 'sourceObjects': dict_source_objects } try: self.log.info("Composing %s to %s in the bucket %s", source_objects, destination_object, bucket) service \ .objects() \ .compose(destinationBucket=bucket, destinationObject=destination_object, body=body) \ .execute(num_retries=num_retries) return True except HttpError as ex: if ex.resp['status'] == '404': return False raise
[docs]def _parse_gcs_url(gsurl): """ Given a Google Cloud Storage URL (gs://<bucket>/<blob>), returns a tuple containing the corresponding bucket and blob. """ # Python 3 try: from urllib.parse import urlparse # Python 2 except ImportError: from urlparse import urlparse parsed_url = urlparse(gsurl) if not parsed_url.netloc: raise AirflowException('Please provide a bucket name') else: bucket = parsed_url.netloc # Remove leading '/' but NOT trailing one blob = parsed_url.path.lstrip('/') return bucket, blob