# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import asyncio
from typing import Any, AsyncIterator
from airflow.providers.microsoft.azure.hooks.wasb import WasbAsyncHook
from airflow.triggers.base import BaseTrigger, TriggerEvent
[docs]class WasbBlobSensorTrigger(BaseTrigger):
    """
    WasbBlobSensorTrigger is fired as deferred class with params to run the task in
    trigger worker to check for existence of the given blob in the provided container.
    :param container_name: name of the container in which the blob should be searched for
    :param blob_name: name of the blob to check existence for
    :param wasb_conn_id: the connection identifier for connecting to Azure WASB
    :param poke_interval:  polling period in seconds to check for the status
    :param public_read: whether an anonymous public read access should be used. Default is False
    """
    def __init__(
        self,
        container_name: str,
        blob_name: str,
        wasb_conn_id: str = "wasb_default",
        public_read: bool = False,
        poke_interval: float = 5.0,
    ):
        super().__init__()
        self.container_name = container_name
        self.blob_name = blob_name
        self.wasb_conn_id = wasb_conn_id
        self.poke_interval = poke_interval
        self.public_read = public_read
[docs]    def serialize(self) -> tuple[str, dict[str, Any]]:
        """Serializes WasbBlobSensorTrigger arguments and classpath."""
        return (
            "airflow.providers.microsoft.azure.triggers.wasb.WasbBlobSensorTrigger",
            {
                "container_name": self.container_name,
                "blob_name": self.blob_name,
                "wasb_conn_id": self.wasb_conn_id,
                "poke_interval": self.poke_interval,
                "public_read": self.public_read,
            },
        ) 
[docs]    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Makes async connection to Azure WASB and polls for existence of the given blob name."""
        blob_exists = False
        hook = WasbAsyncHook(wasb_conn_id=self.wasb_conn_id, public_read=self.public_read)
        try:
            async with await hook.get_async_conn():
                while not blob_exists:
                    blob_exists = await hook.check_for_blob_async(
                        container_name=self.container_name,
                        blob_name=self.blob_name,
                    )
                    if blob_exists:
                        message = f"Blob {self.blob_name} found in container {self.container_name}."
                        yield TriggerEvent({"status": "success", "message": message})
                    else:
                        message = (
                            f"Blob {self.blob_name} not available yet in container {self.container_name}."
                            f" Sleeping for {self.poke_interval} seconds"
                        )
                        self.log.info(message)
                        await asyncio.sleep(self.poke_interval)
        except Exception as e:
            yield TriggerEvent({"status": "error", "message": str(e)})  
[docs]class WasbPrefixSensorTrigger(BaseTrigger):
    """
    WasbPrefixSensorTrigger is fired as a deferred class with params to run the task in trigger.
    It checks for the existence of a blob with the given prefix in the provided container.
    :param container_name: name of the container in which the blob should be searched for
    :param prefix: prefix of the blob to check existence for
    :param include: specifies one or more additional datasets to include in the
            response. Options include: ``snapshots``, ``metadata``, ``uncommittedblobs``,
            ``copy``, ``deleted``
    :param delimiter: filters objects based on the delimiter (for e.g '.csv')
    :param wasb_conn_id: the connection identifier for connecting to Azure WASB
    :param poke_interval:  polling period in seconds to check for the status
    :param public_read: whether an anonymous public read access should be used. Default is False
    """
    def __init__(
        self,
        container_name: str,
        prefix: str,
        include: list[str] | None = None,
        delimiter: str = "/",
        wasb_conn_id: str = "wasb_default",
        public_read: bool = False,
        poke_interval: float = 5.0,
    ):
        super().__init__()
        self.container_name = container_name
        self.prefix = prefix
        self.include = include
        self.delimiter = delimiter
        self.wasb_conn_id = wasb_conn_id
        self.poke_interval = poke_interval
        self.public_read = public_read
[docs]    def serialize(self) -> tuple[str, dict[str, Any]]:
        """Serializes WasbPrefixSensorTrigger arguments and classpath."""
        return (
            "airflow.providers.microsoft.azure.triggers.wasb.WasbPrefixSensorTrigger",
            {
                "container_name": self.container_name,
                "prefix": self.prefix,
                "include": self.include,
                "delimiter": self.delimiter,
                "wasb_conn_id": self.wasb_conn_id,
                "poke_interval": self.poke_interval,
                "public_read": self.public_read,
            },
        ) 
[docs]    async def run(self) -> AsyncIterator[TriggerEvent]:
        """Makes async connection to Azure WASB and polls for existence of a blob with given prefix."""
        prefix_exists = False
        hook = WasbAsyncHook(wasb_conn_id=self.wasb_conn_id, public_read=self.public_read)
        try:
            async with await hook.get_async_conn():
                while not prefix_exists:
                    prefix_exists = await hook.check_for_prefix_async(
                        container_name=self.container_name,
                        prefix=self.prefix,
                        include=self.include,
                        delimiter=self.delimiter,
                    )
                    if prefix_exists:
                        message = f"Prefix {self.prefix} found in container {self.container_name}."
                        yield TriggerEvent({"status": "success", "message": message})
                    else:
                        message = (
                            f"Prefix {self.prefix} not available yet in container {self.container_name}."
                            f" Sleeping for {self.poke_interval} seconds"
                        )
                        self.log.info(message)
                        await asyncio.sleep(self.poke_interval)
        except Exception as e:
            yield TriggerEvent({"status": "error", "message": str(e)})