Source code for airflow.contrib.hooks.gdrive_hook
# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Hook for Google Drive service"""
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
# noinspection PyAbstractClass
[docs]class GoogleDriveHook(GoogleCloudBaseHook):
"""
Hook for the Google Drive APIs.
:param api_version: API version used (for example v3).
:type api_version: str
:param gcp_conn_id: The connection ID to use when fetching connection info.
:type gcp_conn_id: str
:param delegate_to: The account to impersonate, if any.
For this to work, the service account making the request must have
domain-wide delegation enabled.
:type delegate_to: str
"""
def __init__(
self, api_version="v3", gcp_conn_id="google_cloud_default", delegate_to=None
):
super(GoogleDriveHook, self).__init__(gcp_conn_id, delegate_to)
self.api_version = api_version
[docs] def get_conn(self):
"""
Retrieves the connection to Google Drive.
:return: Google Drive services object.
"""
if not self._conn:
http_authorized = self._authorize()
self._conn = build("drive", self.api_version, http=http_authorized, cache_discovery=False)
return self._conn
[docs] def _ensure_folders_exists(self, path):
service = self.get_conn()
current_parent = "root"
folders = path.split("/")
depth = 0
# First tries to enter directories
for current_folder in folders:
self.log.debug("Looking for %s directory with %s parent", current_folder, current_parent)
conditions = [
"mimeType = 'application/vnd.google-apps.folder'",
"name='{}'".format(current_folder),
"'{}' in parents".format(current_parent),
]
result = (
service.files() # pylint: disable=no-member
.list(q=" and ".join(conditions), spaces="drive", fields="files(id, name)")
.execute(num_retries=self.num_retries)
)
files = result.get("files", [])
if not files:
self.log.info("Not found %s directory", current_folder)
# If the directory does not exist, break loops
break
depth += 1
current_parent = files[0].get("id")
# Check if there are directories to process
if depth != len(folders):
# Create missing directories
for current_folder in folders[depth:]:
file_metadata = {
"name": current_folder,
"mimeType": "application/vnd.google-apps.folder",
"parents": [current_parent],
}
file = (
service.files() # pylint: disable=no-member
.create(body=file_metadata, fields="id")
.execute(num_retries=self.num_retries)
)
self.log.info("Created %s directory", current_folder)
current_parent = file.get("id")
# Return the ID of the last directory
return current_parent
[docs] def upload_file(self, local_location, remote_location):
"""
Uploads a file that is available locally to a Google Drive service.
:param local_location: The path where the file is available.
:type local_location: str
:param remote_location: The path where the file will be send
:type remote_location: str
:return: File ID
:rtype: str
"""
service = self.get_conn()
directory_path, _, filename = remote_location.rpartition("/")
if directory_path:
parent = self._ensure_folders_exists(directory_path)
else:
parent = "root"
file_metadata = {"name": filename, "parents": [parent]}
media = MediaFileUpload(local_location)
file = (
service.files() # pylint: disable=no-member
.create(body=file_metadata, media_body=media, fields="id")
.execute(num_retries=self.num_retries)
)
self.log.info("File %s uploaded to gdrive://%s.", local_location, remote_location)
return file.get("id")