Source code for airflow.providers.fab.auth_manager.models.db

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from pathlib import Path

from airflow import settings
from airflow.exceptions import AirflowException
from airflow.providers.fab.auth_manager.models import metadata
from airflow.utils.db import _offline_migration, print_happy_cat
from airflow.utils.db_manager import BaseDBManager

[docs]PACKAGE_DIR = Path(__file__).parents[2]
_REVISION_HEADS_MAP: dict[str, str] = { "1.4.0": "6709f7a774b9", }
[docs]class FABDBManager(BaseDBManager): """Manages FAB database."""
[docs] metadata = metadata
[docs] version_table_name = "alembic_version_fab"
[docs] migration_dir = (PACKAGE_DIR / "migrations").as_posix()
[docs] alembic_file = (PACKAGE_DIR / "alembic.ini").as_posix()
[docs] supports_table_dropping = True
[docs] def upgradedb(self, to_revision=None, from_revision=None, show_sql_only=False): """Upgrade the database.""" if from_revision and not show_sql_only: raise AirflowException("`from_revision` only supported with `sql_only=True`.") # alembic adds significant import time, so we import it lazily if not settings.SQL_ALCHEMY_CONN: raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set. This is a critical assertion.") from alembic import command config = self.get_alembic_config() if show_sql_only: if settings.engine.dialect.name == "sqlite": raise SystemExit("Offline migration not supported for SQLite.") if not from_revision: from_revision = self.get_current_revision() if not to_revision: script = self.get_script_object(config) to_revision = script.get_current_head() if to_revision == from_revision: print_happy_cat("No migrations to apply; nothing to do.") return _offline_migration(command.upgrade, config, f"{from_revision}:{to_revision}") return # only running sql; our job is done if not self.get_current_revision(): # New DB; initialize and exit self.initdb() return command.upgrade(config, revision=to_revision or "heads")
[docs] def downgrade(self, to_revision, from_revision=None, show_sql_only=False): if from_revision and not show_sql_only: raise ValueError( "`from_revision` can't be combined with `show_sql_only=False`. When actually " "applying a downgrade (instead of just generating sql), we always " "downgrade from current revision." ) if not settings.SQL_ALCHEMY_CONN: raise RuntimeError("The settings.SQL_ALCHEMY_CONN not set.") # alembic adds significant import time, so we import it lazily from alembic import command self.log.info("Attempting downgrade of FAB migration to revision %s", to_revision) config = self.get_alembic_config() if show_sql_only: self.log.warning("Generating sql scripts for manual migration.") if not from_revision: from_revision = self.get_current_revision() if from_revision is None: self.log.info("No revision found") return revision_range = f"{from_revision}:{to_revision}" _offline_migration(command.downgrade, config=config, revision=revision_range) else: self.log.info("Applying FAB downgrade migrations.") command.downgrade(config, revision=to_revision, sql=show_sql_only)

Was this entry helpful?