Source code for airflow.providers.fab.www.security.permissions
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
from typing import TypedDict
# Resource Constants
[docs]
RESOURCE_ACTION = "Permissions" 
[docs]
RESOURCE_AUDIT_LOG = "Audit Logs" 
[docs]
RESOURCE_BACKFILL = "Backfills" 
[docs]
RESOURCE_CONFIG = "Configurations" 
[docs]
RESOURCE_CONNECTION = "Connections" 
[docs]
RESOURCE_DAG_CODE = "DAG Code" 
[docs]
RESOURCE_DAG_DEPENDENCIES = "DAG Dependencies" 
[docs]
RESOURCE_DAG_PREFIX = "DAG:" 
[docs]
RESOURCE_DAG_RUN = "DAG Runs" 
[docs]
RESOURCE_DAG_VERSION = "DAG Versions" 
[docs]
RESOURCE_DAG_WARNING = "DAG Warnings" 
[docs]
RESOURCE_CLUSTER_ACTIVITY = "Cluster Activity" 
[docs]
RESOURCE_ASSET = "Assets" 
[docs]
RESOURCE_ASSET_ALIAS = "Asset Aliases" 
[docs]
RESOURCE_DOCS = "Documentation" 
[docs]
RESOURCE_HITL_DETAIL = "HITL Detail" 
[docs]
RESOURCE_IMPORT_ERROR = "ImportError" 
[docs]
RESOURCE_MY_PASSWORD = "My Password" 
[docs]
RESOURCE_MY_PROFILE = "My Profile" 
[docs]
RESOURCE_PASSWORD = "Passwords" 
[docs]
RESOURCE_PERMISSION = "Permission Views"  # Refers to a Perm <-> View mapping, not an MVC View. 
[docs]
RESOURCE_PLUGIN = "Plugins" 
[docs]
RESOURCE_POOL = "Pools" 
[docs]
RESOURCE_PROVIDER = "Providers" 
[docs]
RESOURCE_RESOURCE = "View Menus" 
[docs]
RESOURCE_ROLE = "Roles" 
[docs]
RESOURCE_SLA_MISS = "SLA Misses" 
[docs]
RESOURCE_TASK_INSTANCE = "Task Instances" 
[docs]
RESOURCE_TASK_LOG = "Task Logs" 
[docs]
RESOURCE_TASK_RESCHEDULE = "Task Reschedules" 
[docs]
RESOURCE_TRIGGER = "Triggers" 
[docs]
RESOURCE_USER = "Users" 
[docs]
RESOURCE_USER_STATS_CHART = "User Stats Chart" 
[docs]
RESOURCE_VARIABLE = "Variables" 
[docs]
RESOURCE_WEBSITE = "Website" 
[docs]
RESOURCE_XCOM = "XComs" 
# Action Constants
[docs]
ACTION_CAN_CREATE = "can_create" 
[docs]
ACTION_CAN_READ = "can_read" 
[docs]
ACTION_CAN_EDIT = "can_edit" 
[docs]
ACTION_CAN_DELETE = "can_delete" 
[docs]
DEPRECATED_ACTION_CAN_DAG_READ = "can_dag_read" 
[docs]
DEPRECATED_ACTION_CAN_DAG_EDIT = "can_dag_edit" 
[docs]
class ResourceDetails(TypedDict):
    """Details of a resource (actions and prefix)."""
 
# Keeping DAG_ACTIONS to keep the compatibility with outdated versions of FAB provider
[docs]
DAG_ACTIONS = {ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE} 
[docs]
RESOURCE_DETAILS_MAP = {
    RESOURCE_DAG: ResourceDetails(
        actions={ACTION_CAN_READ, ACTION_CAN_EDIT, ACTION_CAN_DELETE}, prefix=RESOURCE_DAG_PREFIX
    ),
    RESOURCE_DAG_RUN: ResourceDetails(
        actions={ACTION_CAN_READ, ACTION_CAN_CREATE, ACTION_CAN_DELETE, ACTION_CAN_ACCESS_MENU},
        prefix="DAG Run:",
    ),
} 
[docs]
PREFIX_LIST = [details["prefix"] for details in RESOURCE_DETAILS_MAP.values()] 
[docs]
PREFIX_RESOURCES_MAP = {details["prefix"]: resource for resource, details in RESOURCE_DETAILS_MAP.items()} 
[docs]
def resource_name(dag_id: str, resource: str) -> str:
    """Return the resource name for a DAG id."""
    if dag_id in RESOURCE_DETAILS_MAP.keys():
        return dag_id
    if dag_id.startswith(tuple(PREFIX_RESOURCES_MAP.keys())):
        return dag_id
    return f"{RESOURCE_DETAILS_MAP[resource]['prefix']}{dag_id}"