# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
"""Spelling exceptions.
.. spelling:word-list::
    CreateRunResponse
    DatasetResource
    LinkedServiceResource
    LROPoller
    PipelineResource
    PipelineRun
    TriggerResource
    datafactory
    DataFlow
    mgmt
"""
from __future__ import annotations
import inspect
import time
from functools import wraps
from typing import Any, Callable, TypeVar, Union, cast
from asgiref.sync import sync_to_async
from azure.core.polling import LROPoller
from azure.identity import ClientSecretCredential, DefaultAzureCredential
from azure.identity.aio import (
    ClientSecretCredential as AsyncClientSecretCredential,
    DefaultAzureCredential as AsyncDefaultAzureCredential,
)
from azure.mgmt.datafactory import DataFactoryManagementClient
from azure.mgmt.datafactory.aio import DataFactoryManagementClient as AsyncDataFactoryManagementClient
from azure.mgmt.datafactory.models import (
    CreateRunResponse,
    DataFlow,
    DatasetResource,
    Factory,
    LinkedServiceResource,
    PipelineResource,
    PipelineRun,
    TriggerResource,
)
from airflow.exceptions import AirflowException
from airflow.hooks.base import BaseHook
from airflow.typing_compat import TypedDict
[docs]Credentials = Union[ClientSecretCredential, DefaultAzureCredential] 
[docs]AsyncCredentials = Union[AsyncClientSecretCredential, AsyncDefaultAzureCredential] 
[docs]T = TypeVar("T", bound=Any) 
[docs]def provide_targeted_factory(func: Callable) -> Callable:
    """
    Provide the targeted factory to the decorated function in case it isn't specified.
    If ``resource_group_name`` or ``factory_name`` is not provided it defaults to the value specified in
    the connection extras.
    """
    signature = inspect.signature(func)
    @wraps(func)
    def wrapper(*args, **kwargs) -> Callable:
        bound_args = signature.bind(*args, **kwargs)
        def bind_argument(arg, default_key):
            # Check if arg was not included in the function signature or, if it is, the value is not provided.
            if arg not in bound_args.arguments or bound_args.arguments[arg] is None:
                self = args[0]
                conn = self.get_connection(self.conn_id)
                extras = conn.extra_dejson
                default_value = extras.get(default_key) or extras.get(
                    f"extra__azure_data_factory__{default_key}"
                )
                if not default_value:
                    raise AirflowException("Could not determine the targeted data factory.")
                bound_args.arguments[arg] = default_value
        bind_argument("resource_group_name", "resource_group_name")
        bind_argument("factory_name", "factory_name")
        return func(*bound_args.args, **bound_args.kwargs)
    return wrapper 
[docs]class PipelineRunInfo(TypedDict):
    """Type class for the pipeline run info dictionary."""
[docs]    factory_name: str | None 
[docs]    resource_group_name: str | None  
[docs]class AzureDataFactoryPipelineRunStatus:
    """Azure Data Factory pipeline operation statuses."""
[docs]    IN_PROGRESS = "InProgress" 
[docs]    SUCCEEDED = "Succeeded" 
[docs]    CANCELING = "Canceling" 
[docs]    CANCELLED = "Cancelled" 
[docs]    TERMINAL_STATUSES = {CANCELLED, FAILED, SUCCEEDED} 
[docs]    FAILURE_STATES = {FAILED, CANCELLED}  
[docs]class AzureDataFactoryPipelineRunException(AirflowException):
    """An exception that indicates a pipeline run failed to complete.""" 
[docs]def get_field(extras: dict, field_name: str, strict: bool = False):
    """Get field from extra, first checking short name, then for backcompat we check for prefixed name."""
    backcompat_prefix = "extra__azure_data_factory__"
    if field_name.startswith("extra__"):
        raise ValueError(
            f"Got prefixed name {field_name}; please remove the '{backcompat_prefix}' prefix "
            "when using this method."
        )
    if field_name in extras:
        return extras[field_name] or None
    prefixed_name = f"{backcompat_prefix}{field_name}"
    if prefixed_name in extras:
        return extras[prefixed_name] or None
    if strict:
        raise KeyError(f"Field {field_name} not found in extras") 
[docs]class AzureDataFactoryHook(BaseHook):
    """
    A hook to interact with Azure Data Factory.
    :param azure_data_factory_conn_id: The :ref:`Azure Data Factory connection id<howto/connection:adf>`.
    """
[docs]    conn_type: str = "azure_data_factory" 
[docs]    conn_name_attr: str = "azure_data_factory_conn_id" 
[docs]    default_conn_name: str = "azure_data_factory_default" 
[docs]    hook_name: str = "Azure Data Factory" 
    @staticmethod
    @staticmethod
[docs]    def get_ui_field_behaviour() -> dict[str, Any]:
        """Returns custom field behaviour."""
        return {
            "hidden_fields": ["schema", "port", "host", "extra"],
            "relabeling": {
                "login": "Client ID",
                "password": "Secret",
            },
        } 
    def __init__(self, azure_data_factory_conn_id: str = default_conn_name):
        self._conn: DataFactoryManagementClient = None
        self.conn_id = azure_data_factory_conn_id
        super().__init__()
[docs]    def get_conn(self) -> DataFactoryManagementClient:
        if self._conn is not None:
            return self._conn
        conn = self.get_connection(self.conn_id)
        extras = conn.extra_dejson
        tenant = get_field(extras, "tenantId")
        try:
            subscription_id = get_field(extras, "subscriptionId", strict=True)
        except KeyError:
            raise ValueError("A Subscription ID is required to connect to Azure Data Factory.")
        credential: Credentials
        if conn.login is not None and conn.password is not None:
            if not tenant:
                raise ValueError("A Tenant ID is required when authenticating with Client ID and Secret.")
            credential = ClientSecretCredential(
                client_id=conn.login, client_secret=conn.password, tenant_id=tenant
            )
        else:
            credential = DefaultAzureCredential()
        self._conn = self._create_client(credential, subscription_id)
        return self._conn 
    @provide_targeted_factory
[docs]    def get_factory(
        self, resource_group_name: str | None = None, factory_name: str | None = None, **config: Any
    ) -> Factory:
        """
        Get the factory.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :return: The factory.
        """
        return self.get_conn().factories.get(resource_group_name, factory_name, **config) 
    def _factory_exists(self, resource_group_name, factory_name) -> bool:
        """Return whether or not the factory already exists."""
        factories = {
            factory.name for factory in self.get_conn().factories.list_by_resource_group(resource_group_name)
        }
        return factory_name in factories
    @staticmethod
    def _create_client(credential: Credentials, subscription_id: str):
        return DataFactoryManagementClient(
            credential=credential,
            subscription_id=subscription_id,
        )
    @provide_targeted_factory
[docs]    def update_factory(
        self,
        factory: Factory,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> Factory:
        """
        Update the factory.
        :param factory: The factory resource definition.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :raise AirflowException: If the factory does not exist.
        :return: The factory.
        """
        if not self._factory_exists(resource_group_name, factory_name):
            raise AirflowException(f"Factory {factory!r} does not exist.")
        return self.get_conn().factories.create_or_update(
            resource_group_name, factory_name, factory, **config
        ) 
    @provide_targeted_factory
[docs]    def create_factory(
        self,
        factory: Factory,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> Factory:
        """
        Create the factory.
        :param factory: The factory resource definition.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :raise AirflowException: If the factory already exists.
        :return: The factory.
        """
        if self._factory_exists(resource_group_name, factory_name):
            raise AirflowException(f"Factory {factory!r} already exists.")
        return self.get_conn().factories.create_or_update(
            resource_group_name, factory_name, factory, **config
        ) 
    @provide_targeted_factory
[docs]    def delete_factory(
        self, resource_group_name: str | None = None, factory_name: str | None = None, **config: Any
    ) -> None:
        """
        Delete the factory.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        """
        self.get_conn().factories.delete(resource_group_name, factory_name, **config) 
    @provide_targeted_factory
[docs]    def get_linked_service(
        self,
        linked_service_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> LinkedServiceResource:
        """
        Get the linked service.
        :param linked_service_name: The linked service name.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :return: The linked service.
        """
        return self.get_conn().linked_services.get(
            resource_group_name, factory_name, linked_service_name, **config
        ) 
    def _linked_service_exists(self, resource_group_name, factory_name, linked_service_name) -> bool:
        """Return whether or not the linked service already exists."""
        linked_services = {
            linked_service.name
            for linked_service in self.get_conn().linked_services.list_by_factory(
                resource_group_name, factory_name
            )
        }
        return linked_service_name in linked_services
    @provide_targeted_factory
[docs]    def update_linked_service(
        self,
        linked_service_name: str,
        linked_service: LinkedServiceResource,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> LinkedServiceResource:
        """
        Update the linked service.
        :param linked_service_name: The linked service name.
        :param linked_service: The linked service resource definition.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :raise AirflowException: If the linked service does not exist.
        :return: The linked service.
        """
        if not self._linked_service_exists(resource_group_name, factory_name, linked_service_name):
            raise AirflowException(f"Linked service {linked_service_name!r} does not exist.")
        return self.get_conn().linked_services.create_or_update(
            resource_group_name, factory_name, linked_service_name, linked_service, **config
        ) 
    @provide_targeted_factory
[docs]    def create_linked_service(
        self,
        linked_service_name: str,
        linked_service: LinkedServiceResource,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> LinkedServiceResource:
        """
        Create the linked service.
        :param linked_service_name: The linked service name.
        :param linked_service: The linked service resource definition.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :raise AirflowException: If the linked service already exists.
        :return: The linked service.
        """
        if self._linked_service_exists(resource_group_name, factory_name, linked_service_name):
            raise AirflowException(f"Linked service {linked_service_name!r} already exists.")
        return self.get_conn().linked_services.create_or_update(
            resource_group_name, factory_name, linked_service_name, linked_service, **config
        ) 
    @provide_targeted_factory
[docs]    def delete_linked_service(
        self,
        linked_service_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> None:
        """
        Delete the linked service.
        :param linked_service_name: The linked service name.
        :param resource_group_name: The linked service name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        """
        self.get_conn().linked_services.delete(
            resource_group_name, factory_name, linked_service_name, **config
        ) 
    @provide_targeted_factory
[docs]    def get_dataset(
        self,
        dataset_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> DatasetResource:
        """
        Get the dataset.
        :param dataset_name: The dataset name.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :return: The dataset.
        """
        return self.get_conn().datasets.get(resource_group_name, factory_name, dataset_name, **config) 
    def _dataset_exists(self, resource_group_name, factory_name, dataset_name) -> bool:
        """Return whether or not the dataset already exists."""
        datasets = {
            dataset.name
            for dataset in self.get_conn().datasets.list_by_factory(resource_group_name, factory_name)
        }
        return dataset_name in datasets
    @provide_targeted_factory
[docs]    def update_dataset(
        self,
        dataset_name: str,
        dataset: DatasetResource,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> DatasetResource:
        """
        Update the dataset.
        :param dataset_name: The dataset name.
        :param dataset: The dataset resource definition.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :raise AirflowException: If the dataset does not exist.
        :return: The dataset.
        """
        if not self._dataset_exists(resource_group_name, factory_name, dataset_name):
            raise AirflowException(f"Dataset {dataset_name!r} does not exist.")
        return self.get_conn().datasets.create_or_update(
            resource_group_name, factory_name, dataset_name, dataset, **config
        ) 
    @provide_targeted_factory
[docs]    def create_dataset(
        self,
        dataset_name: str,
        dataset: DatasetResource,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> DatasetResource:
        """
        Create the dataset.
        :param dataset_name: The dataset name.
        :param dataset: The dataset resource definition.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :raise AirflowException: If the dataset already exists.
        :return: The dataset.
        """
        if self._dataset_exists(resource_group_name, factory_name, dataset_name):
            raise AirflowException(f"Dataset {dataset_name!r} already exists.")
        return self.get_conn().datasets.create_or_update(
            resource_group_name, factory_name, dataset_name, dataset, **config
        ) 
    @provide_targeted_factory
[docs]    def delete_dataset(
        self,
        dataset_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> None:
        """
        Delete the dataset.
        :param dataset_name: The dataset name.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        """
        self.get_conn().datasets.delete(resource_group_name, factory_name, dataset_name, **config) 
    @provide_targeted_factory
[docs]    def get_dataflow(
        self,
        dataflow_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> DataFlow:
        """
        Get the dataflow.
        :param dataflow_name: The dataflow name.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :return: The dataflow.
        """
        return self.get_conn().data_flows.get(resource_group_name, factory_name, dataflow_name, **config) 
    def _dataflow_exists(
        self,
        dataflow_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
    ) -> bool:
        """Return whether the dataflow already exists."""
        dataflows = {
            dataflow.name
            for dataflow in self.get_conn().data_flows.list_by_factory(resource_group_name, factory_name)
        }
        return dataflow_name in dataflows
    @provide_targeted_factory
[docs]    def update_dataflow(
        self,
        dataflow_name: str,
        dataflow: DataFlow,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> DataFlow:
        """
        Update the dataflow.
        :param dataflow_name: The dataflow name.
        :param dataflow: The dataflow resource definition.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :raise AirflowException: If the dataset does not exist.
        :return: The dataflow.
        """
        if not self._dataflow_exists(
            dataflow_name,
            resource_group_name,
            factory_name,
        ):
            raise AirflowException(f"Dataflow {dataflow_name!r} does not exist.")
        return self.get_conn().data_flows.create_or_update(
            resource_group_name, factory_name, dataflow_name, dataflow, **config
        ) 
    @provide_targeted_factory
[docs]    def create_dataflow(
        self,
        dataflow_name: str,
        dataflow: DataFlow,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> DataFlow:
        """
        Create the dataflow.
        :param dataflow_name: The dataflow name.
        :param dataflow: The dataflow resource definition.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :raise AirflowException: If the dataset already exists.
        :return: The dataset.
        """
        if self._dataflow_exists(dataflow_name, resource_group_name, factory_name):
            raise AirflowException(f"Dataflow {dataflow_name!r} already exists.")
        return self.get_conn().data_flows.create_or_update(
            resource_group_name, factory_name, dataflow_name, dataflow, **config
        ) 
    @provide_targeted_factory
[docs]    def delete_dataflow(
        self,
        dataflow_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> None:
        """
        Delete the dataflow.
        :param dataflow_name: The dataflow name.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        """
        self.get_conn().data_flows.delete(resource_group_name, factory_name, dataflow_name, **config) 
    @provide_targeted_factory
[docs]    def get_pipeline(
        self,
        pipeline_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> PipelineResource:
        """
        Get the pipeline.
        :param pipeline_name: The pipeline name.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :return: The pipeline.
        """
        return self.get_conn().pipelines.get(resource_group_name, factory_name, pipeline_name, **config) 
    def _pipeline_exists(self, resource_group_name, factory_name, pipeline_name) -> bool:
        """Return whether or not the pipeline already exists."""
        pipelines = {
            pipeline.name
            for pipeline in self.get_conn().pipelines.list_by_factory(resource_group_name, factory_name)
        }
        return pipeline_name in pipelines
    @provide_targeted_factory
[docs]    def update_pipeline(
        self,
        pipeline_name: str,
        pipeline: PipelineResource,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> PipelineResource:
        """
        Update the pipeline.
        :param pipeline_name: The pipeline name.
        :param pipeline: The pipeline resource definition.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :raise AirflowException: If the pipeline does not exist.
        :return: The pipeline.
        """
        if not self._pipeline_exists(resource_group_name, factory_name, pipeline_name):
            raise AirflowException(f"Pipeline {pipeline_name!r} does not exist.")
        return self.get_conn().pipelines.create_or_update(
            resource_group_name, factory_name, pipeline_name, pipeline, **config
        ) 
    @provide_targeted_factory
[docs]    def create_pipeline(
        self,
        pipeline_name: str,
        pipeline: PipelineResource,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> PipelineResource:
        """
        Create the pipeline.
        :param pipeline_name: The pipeline name.
        :param pipeline: The pipeline resource definition.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :raise AirflowException: If the pipeline already exists.
        :return: The pipeline.
        """
        if self._pipeline_exists(resource_group_name, factory_name, pipeline_name):
            raise AirflowException(f"Pipeline {pipeline_name!r} already exists.")
        return self.get_conn().pipelines.create_or_update(
            resource_group_name, factory_name, pipeline_name, pipeline, **config
        ) 
    @provide_targeted_factory
[docs]    def delete_pipeline(
        self,
        pipeline_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> None:
        """
        Delete the pipeline.
        :param pipeline_name: The pipeline name.
        :param resource_group_name: The pipeline name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        """
        self.get_conn().pipelines.delete(resource_group_name, factory_name, pipeline_name, **config) 
    @provide_targeted_factory
[docs]    def run_pipeline(
        self,
        pipeline_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> CreateRunResponse:
        """
        Run a pipeline.
        :param pipeline_name: The pipeline name.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :return: The pipeline run.
        """
        return self.get_conn().pipelines.create_run(
            resource_group_name, factory_name, pipeline_name, **config
        ) 
    @provide_targeted_factory
[docs]    def get_pipeline_run(
        self,
        run_id: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> PipelineRun:
        """
        Get the pipeline run.
        :param run_id: The pipeline run identifier.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :return: The pipeline run.
        """
        return self.get_conn().pipeline_runs.get(resource_group_name, factory_name, run_id, **config) 
[docs]    def get_pipeline_run_status(
        self,
        run_id: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
    ) -> str:
        """
        Get a pipeline run's current status.
        :param run_id: The pipeline run identifier.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :return: The status of the pipeline run.
        """
        self.log.info("Getting the status of run ID %s.", run_id)
        pipeline_run_status = self.get_pipeline_run(
            run_id=run_id,
            factory_name=factory_name,
            resource_group_name=resource_group_name,
        ).status
        self.log.info("Current status of pipeline run %s: %s", run_id, pipeline_run_status)
        return pipeline_run_status 
[docs]    def wait_for_pipeline_run_status(
        self,
        run_id: str,
        expected_statuses: str | set[str],
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        check_interval: int = 60,
        timeout: int = 60 * 60 * 24 * 7,
    ) -> bool:
        """
        Waits for a pipeline run to match an expected status.
        :param run_id: The pipeline run identifier.
        :param expected_statuses: The desired status(es) to check against a pipeline run's current status.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param check_interval: Time in seconds to check on a pipeline run's status.
        :param timeout: Time in seconds to wait for a pipeline to reach a terminal status or the expected
            status.
        :return: Boolean indicating if the pipeline run has reached the ``expected_status``.
        """
        pipeline_run_info = PipelineRunInfo(
            run_id=run_id,
            factory_name=factory_name,
            resource_group_name=resource_group_name,
        )
        pipeline_run_status = self.get_pipeline_run_status(**pipeline_run_info)
        start_time = time.monotonic()
        while (
            pipeline_run_status not in AzureDataFactoryPipelineRunStatus.TERMINAL_STATUSES
            and pipeline_run_status not in expected_statuses
        ):
            # Check if the pipeline-run duration has exceeded the ``timeout`` configured.
            if start_time + timeout < time.monotonic():
                raise AzureDataFactoryPipelineRunException(
                    f"Pipeline run {run_id} has not reached a terminal status after {timeout} seconds."
                )
            # Wait to check the status of the pipeline run based on the ``check_interval`` configured.
            time.sleep(check_interval)
            pipeline_run_status = self.get_pipeline_run_status(**pipeline_run_info)
        return pipeline_run_status in expected_statuses 
    @provide_targeted_factory
[docs]    def cancel_pipeline_run(
        self,
        run_id: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> None:
        """
        Cancel the pipeline run.
        :param run_id: The pipeline run identifier.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        """
        self.get_conn().pipeline_runs.cancel(resource_group_name, factory_name, run_id, **config) 
    @provide_targeted_factory
[docs]    def get_trigger(
        self,
        trigger_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> TriggerResource:
        """
        Get the trigger.
        :param trigger_name: The trigger name.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :return: The trigger.
        """
        return self.get_conn().triggers.get(resource_group_name, factory_name, trigger_name, **config) 
    def _trigger_exists(self, resource_group_name, factory_name, trigger_name) -> bool:
        """Return whether or not the trigger already exists."""
        triggers = {
            trigger.name
            for trigger in self.get_conn().triggers.list_by_factory(resource_group_name, factory_name)
        }
        return trigger_name in triggers
    @provide_targeted_factory
[docs]    def update_trigger(
        self,
        trigger_name: str,
        trigger: TriggerResource,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> TriggerResource:
        """
        Update the trigger.
        :param trigger_name: The trigger name.
        :param trigger: The trigger resource definition.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :raise AirflowException: If the trigger does not exist.
        :return: The trigger.
        """
        if not self._trigger_exists(resource_group_name, factory_name, trigger_name):
            raise AirflowException(f"Trigger {trigger_name!r} does not exist.")
        return self.get_conn().triggers.create_or_update(
            resource_group_name, factory_name, trigger_name, trigger, **config
        ) 
    @provide_targeted_factory
[docs]    def create_trigger(
        self,
        trigger_name: str,
        trigger: TriggerResource,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> TriggerResource:
        """
        Create the trigger.
        :param trigger_name: The trigger name.
        :param trigger: The trigger resource definition.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :raise AirflowException: If the trigger already exists.
        :return: The trigger.
        """
        if self._trigger_exists(resource_group_name, factory_name, trigger_name):
            raise AirflowException(f"Trigger {trigger_name!r} already exists.")
        return self.get_conn().triggers.create_or_update(
            resource_group_name, factory_name, trigger_name, trigger, **config
        ) 
    @provide_targeted_factory
[docs]    def delete_trigger(
        self,
        trigger_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> None:
        """
        Delete the trigger.
        :param trigger_name: The trigger name.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        """
        self.get_conn().triggers.delete(resource_group_name, factory_name, trigger_name, **config) 
    @provide_targeted_factory
[docs]    def start_trigger(
        self,
        trigger_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> LROPoller:
        """
        Start the trigger.
        :param trigger_name: The trigger name.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :return: An Azure operation poller.
        """
        return self.get_conn().triggers.begin_start(resource_group_name, factory_name, trigger_name, **config) 
    @provide_targeted_factory
[docs]    def stop_trigger(
        self,
        trigger_name: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> LROPoller:
        """
        Stop the trigger.
        :param trigger_name: The trigger name.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        :return: An Azure operation poller.
        """
        return self.get_conn().triggers.begin_stop(resource_group_name, factory_name, trigger_name, **config) 
    @provide_targeted_factory
[docs]    def rerun_trigger(
        self,
        trigger_name: str,
        run_id: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> None:
        """
        Rerun the trigger.
        :param trigger_name: The trigger name.
        :param run_id: The trigger run identifier.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        """
        return self.get_conn().trigger_runs.rerun(
            resource_group_name, factory_name, trigger_name, run_id, **config
        ) 
    @provide_targeted_factory
[docs]    def cancel_trigger(
        self,
        trigger_name: str,
        run_id: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> None:
        """
        Cancel the trigger.
        :param trigger_name: The trigger name.
        :param run_id: The trigger run identifier.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        """
        self.get_conn().trigger_runs.cancel(resource_group_name, factory_name, trigger_name, run_id, **config) 
[docs]    def test_connection(self) -> tuple[bool, str]:
        """Test a configured Azure Data Factory connection."""
        success = (True, "Successfully connected to Azure Data Factory.")
        try:
            # Attempt to list existing factories under the configured subscription and retrieve the first in
            # the returned iterator. The Azure Data Factory API does allow for creation of a
            # DataFactoryManagementClient with incorrect values but then will fail properly once items are
            # retrieved using the client. We need to _actually_ try to retrieve an object to properly test the
            # connection.
            next(self.get_conn().factories.list())
            return success
        except StopIteration:
            # If the iterator returned is empty it should still be considered a successful connection since
            # it's possible to create a Data Factory via the ``AzureDataFactoryHook`` and none could
            # legitimately exist yet.
            return success
        except Exception as e:
            return False, str(e)  
[docs]def provide_targeted_factory_async(func: T) -> T:
    """
    Provide the targeted factory to the async decorated function in case it isn't specified.
    If ``resource_group_name`` or ``factory_name`` is not provided it defaults to the value specified in
    the connection extras.
    """
    signature = inspect.signature(func)
    @wraps(func)
    async def wrapper(*args: Any, **kwargs: Any) -> Any:
        bound_args = signature.bind(*args, **kwargs)
        async def bind_argument(arg: Any, default_key: str) -> None:
            # Check if arg was not included in the function signature or, if it is, the value is not provided.
            if arg not in bound_args.arguments or bound_args.arguments[arg] is None:
                self = args[0]
                conn = await sync_to_async(self.get_connection)(self.conn_id)
                extras = conn.extra_dejson
                default_value = extras.get(default_key) or extras.get(
                    f"extra__azure_data_factory__{default_key}"
                )
                if not default_value:
                    raise AirflowException("Could not determine the targeted data factory.")
                bound_args.arguments[arg] = default_value
        await bind_argument("resource_group_name", "resource_group_name")
        await bind_argument("factory_name", "factory_name")
        return await func(*bound_args.args, **bound_args.kwargs)
    return cast(T, wrapper) 
[docs]class AzureDataFactoryAsyncHook(AzureDataFactoryHook):
    """
    An Async Hook that connects to Azure DataFactory to perform pipeline operations.
    :param azure_data_factory_conn_id: The :ref:`Azure Data Factory connection id<howto/connection:adf>`.
    """
[docs]    default_conn_name: str = "azure_data_factory_default" 
    def __init__(self, azure_data_factory_conn_id: str = default_conn_name):
        self._async_conn: AsyncDataFactoryManagementClient = None
        self.conn_id = azure_data_factory_conn_id
        super().__init__(azure_data_factory_conn_id=azure_data_factory_conn_id)
[docs]    async def get_async_conn(self) -> AsyncDataFactoryManagementClient:
        """Get async connection and connect to azure data factory."""
        if self._async_conn is not None:
            return self._async_conn
        conn = await sync_to_async(self.get_connection)(self.conn_id)
        extras = conn.extra_dejson
        tenant = get_field(extras, "tenantId")
        try:
            subscription_id = get_field(extras, "subscriptionId", strict=True)
        except KeyError:
            raise ValueError("A Subscription ID is required to connect to Azure Data Factory.")
        credential: AsyncCredentials
        if conn.login is not None and conn.password is not None:
            if not tenant:
                raise ValueError("A Tenant ID is required when authenticating with Client ID and Secret.")
            credential = AsyncClientSecretCredential(
                client_id=conn.login, client_secret=conn.password, tenant_id=tenant
            )
        else:
            credential = AsyncDefaultAzureCredential()
        self._async_conn = AsyncDataFactoryManagementClient(
            credential=credential,
            subscription_id=subscription_id,
        )
        return self._async_conn 
    @provide_targeted_factory_async
[docs]    async def get_pipeline_run(
        self,
        run_id: str,
        resource_group_name: str | None = None,
        factory_name: str | None = None,
        **config: Any,
    ) -> PipelineRun:
        """
        Connect to Azure Data Factory asynchronously to get the pipeline run details by run id.
        :param run_id: The pipeline run identifier.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        :param config: Extra parameters for the ADF client.
        """
        client = await self.get_async_conn()
        try:
            pipeline_run = await client.pipeline_runs.get(resource_group_name, factory_name, run_id)
            return pipeline_run
        except Exception as e:
            raise AirflowException(e) 
[docs]    async def get_adf_pipeline_run_status(
        self, run_id: str, resource_group_name: str | None = None, factory_name: str | None = None
    ) -> str:
        """
        Connect to Azure Data Factory asynchronously and get the pipeline status by run_id.
        :param run_id: The pipeline run identifier.
        :param resource_group_name: The resource group name.
        :param factory_name: The factory name.
        """
        try:
            pipeline_run = await self.get_pipeline_run(
                run_id=run_id,
                factory_name=factory_name,
                resource_group_name=resource_group_name,
            )
            status: str = pipeline_run.status
            return status
        except Exception as e:
            raise AirflowException(e)