#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import warnings
from functools import wraps
from typing import IO, Any
from azure.storage.file import File, FileService
from airflow.hooks.base import BaseHook
def _ensure_prefixes(conn_type):
"""
Remove when provider min airflow version >= 2.5.0 since this is handled by
provider manager from that version.
"""
def dec(func):
@wraps(func)
def inner():
field_behaviors = func()
conn_attrs = {"host", "schema", "login", "password", "port", "extra"}
def _ensure_prefix(field):
if field not in conn_attrs and not field.startswith("extra__"):
return f"extra__{conn_type}__{field}"
else:
return field
if "placeholders" in field_behaviors:
placeholders = field_behaviors["placeholders"]
field_behaviors["placeholders"] = {_ensure_prefix(k): v for k, v in placeholders.items()}
return field_behaviors
return inner
return dec
[docs]class AzureFileShareHook(BaseHook):
"""
Interacts with Azure FileShare Storage.
:param azure_fileshare_conn_id: Reference to the
:ref:`Azure Container Volume connection id<howto/connection:azure_fileshare>`
of an Azure account of which container volumes should be used.
"""
[docs] conn_name_attr = "azure_fileshare_conn_id"
[docs] default_conn_name = "azure_fileshare_default"
[docs] conn_type = "azure_fileshare"
[docs] hook_name = "Azure FileShare"
def __init__(self, azure_fileshare_conn_id: str = "azure_fileshare_default") -> None:
super().__init__()
self.conn_id = azure_fileshare_conn_id
self._conn = None
@staticmethod
),
}
@staticmethod
@_ensure_prefixes(conn_type="azure_fileshare")
[docs] def get_ui_field_behaviour() -> dict[str, Any]:
"""Returns custom field behaviour"""
return {
"hidden_fields": ["schema", "port", "host", "extra"],
"relabeling": {
"login": "Blob Storage Login (optional)",
"password": "Blob Storage Key (optional)",
},
"placeholders": {
"login": "account name",
"password": "secret",
"sas_token": "account url or token (optional)",
"connection_string": "account url or token (optional)",
"protocol": "account url or token (optional)",
},
}
[docs] def get_conn(self) -> FileService:
"""Return the FileService object."""
def check_for_conflict(key):
backcompat_key = f"{backcompat_prefix}{key}"
if backcompat_key in extras:
warnings.warn(
f"Conflicting params `{key}` and `{backcompat_key}` found in extras for conn "
f"{self.conn_id}. Using value for `{key}`. Please ensure this is the correct value "
f"and remove the backcompat key `{backcompat_key}`."
)
backcompat_prefix = "extra__azure_fileshare__"
if self._conn:
return self._conn
conn = self.get_connection(self.conn_id)
extras = conn.extra_dejson
service_options = {}
for key, value in extras.items():
if value == "":
continue
if not key.startswith("extra__"):
service_options[key] = value
check_for_conflict(key)
elif key.startswith(backcompat_prefix):
short_name = key[len(backcompat_prefix) :]
if short_name not in service_options: # prefer values provided with short name
service_options[short_name] = value
else:
warnings.warn(f"Extra param `{key}` not recognized; ignoring.")
self._conn = FileService(account_name=conn.login, account_key=conn.password, **service_options)
return self._conn
[docs] def check_for_directory(self, share_name: str, directory_name: str, **kwargs) -> bool:
"""
Check if a directory exists on Azure File Share.
:param share_name: Name of the share.
:param directory_name: Name of the directory.
:param kwargs: Optional keyword arguments that
`FileService.exists()` takes.
:return: True if the file exists, False otherwise.
"""
return self.get_conn().exists(share_name, directory_name, **kwargs)
[docs] def check_for_file(self, share_name: str, directory_name: str, file_name: str, **kwargs) -> bool:
"""
Check if a file exists on Azure File Share.
:param share_name: Name of the share.
:param directory_name: Name of the directory.
:param file_name: Name of the file.
:param kwargs: Optional keyword arguments that
`FileService.exists()` takes.
:return: True if the file exists, False otherwise.
"""
return self.get_conn().exists(share_name, directory_name, file_name, **kwargs)
[docs] def list_directories_and_files(
self, share_name: str, directory_name: str | None = None, **kwargs
) -> list:
"""
Return the list of directories and files stored on a Azure File Share.
:param share_name: Name of the share.
:param directory_name: Name of the directory.
:param kwargs: Optional keyword arguments that
`FileService.list_directories_and_files()` takes.
:return: A list of files and directories
"""
return self.get_conn().list_directories_and_files(share_name, directory_name, **kwargs)
[docs] def list_files(self, share_name: str, directory_name: str | None = None, **kwargs) -> list[str]:
"""
Return the list of files stored on a Azure File Share.
:param share_name: Name of the share.
:param directory_name: Name of the directory.
:param kwargs: Optional keyword arguments that
`FileService.list_directories_and_files()` takes.
:return: A list of files
"""
return [
obj.name
for obj in self.list_directories_and_files(share_name, directory_name, **kwargs)
if isinstance(obj, File)
]
[docs] def create_share(self, share_name: str, **kwargs) -> bool:
"""
Create new Azure File Share.
:param share_name: Name of the share.
:param kwargs: Optional keyword arguments that
`FileService.create_share()` takes.
:return: True if share is created, False if share already exists.
"""
return self.get_conn().create_share(share_name, **kwargs)
[docs] def delete_share(self, share_name: str, **kwargs) -> bool:
"""
Delete existing Azure File Share.
:param share_name: Name of the share.
:param kwargs: Optional keyword arguments that
`FileService.delete_share()` takes.
:return: True if share is deleted, False if share does not exist.
"""
return self.get_conn().delete_share(share_name, **kwargs)
[docs] def create_directory(self, share_name: str, directory_name: str, **kwargs) -> list:
"""
Create a new directory on a Azure File Share.
:param share_name: Name of the share.
:param directory_name: Name of the directory.
:param kwargs: Optional keyword arguments that
`FileService.create_directory()` takes.
:return: A list of files and directories
"""
return self.get_conn().create_directory(share_name, directory_name, **kwargs)
[docs] def get_file(
self, file_path: str, share_name: str, directory_name: str, file_name: str, **kwargs
) -> None:
"""
Download a file from Azure File Share.
:param file_path: Where to store the file.
:param share_name: Name of the share.
:param directory_name: Name of the directory.
:param file_name: Name of the file.
:param kwargs: Optional keyword arguments that
`FileService.get_file_to_path()` takes.
"""
self.get_conn().get_file_to_path(share_name, directory_name, file_name, file_path, **kwargs)
[docs] def get_file_to_stream(
self, stream: IO, share_name: str, directory_name: str, file_name: str, **kwargs
) -> None:
"""
Download a file from Azure File Share.
:param stream: A filehandle to store the file to.
:param share_name: Name of the share.
:param directory_name: Name of the directory.
:param file_name: Name of the file.
:param kwargs: Optional keyword arguments that
`FileService.get_file_to_stream()` takes.
"""
self.get_conn().get_file_to_stream(share_name, directory_name, file_name, stream, **kwargs)
[docs] def load_file(
self, file_path: str, share_name: str, directory_name: str, file_name: str, **kwargs
) -> None:
"""
Upload a file to Azure File Share.
:param file_path: Path to the file to load.
:param share_name: Name of the share.
:param directory_name: Name of the directory.
:param file_name: Name of the file.
:param kwargs: Optional keyword arguments that
`FileService.create_file_from_path()` takes.
"""
self.get_conn().create_file_from_path(share_name, directory_name, file_name, file_path, **kwargs)
[docs] def load_string(
self, string_data: str, share_name: str, directory_name: str, file_name: str, **kwargs
) -> None:
"""
Upload a string to Azure File Share.
:param string_data: String to load.
:param share_name: Name of the share.
:param directory_name: Name of the directory.
:param file_name: Name of the file.
:param kwargs: Optional keyword arguments that
`FileService.create_file_from_text()` takes.
"""
self.get_conn().create_file_from_text(share_name, directory_name, file_name, string_data, **kwargs)
[docs] def load_stream(
self, stream: str, share_name: str, directory_name: str, file_name: str, count: str, **kwargs
) -> None:
"""
Upload a stream to Azure File Share.
:param stream: Opened file/stream to upload as the file content.
:param share_name: Name of the share.
:param directory_name: Name of the directory.
:param file_name: Name of the file.
:param count: Size of the stream in bytes
:param kwargs: Optional keyword arguments that
`FileService.create_file_from_stream()` takes.
"""
self.get_conn().create_file_from_stream(
share_name, directory_name, file_name, stream, count, **kwargs
)
[docs] def test_connection(self):
"""Test Azure FileShare connection."""
success = (True, "Successfully connected to Azure File Share.")
try:
# Attempt to retrieve file share information
next(iter(self.get_conn().list_shares()))
return success
except StopIteration:
# If the iterator returned is empty it should still be considered a successful connection since
# it's possible to create a storage account without any file share and none could
# legitimately exist yet.
return success
except Exception as e:
return False, str(e)