Source code for airflow.providers.amazon.aws.auth_manager.cli.avp_commands
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""User sub-commands."""
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING
import boto3
from airflow.configuration import conf
from airflow.exceptions import AirflowOptionalProviderFeatureException
from airflow.providers.amazon.aws.auth_manager.constants import CONF_REGION_NAME_KEY, CONF_SECTION_NAME
from airflow.utils import cli as cli_utils
try:
from airflow.utils.providers_configuration_loader import providers_configuration_loaded
except ImportError:
raise AirflowOptionalProviderFeatureException(
"Failed to import avp_commands. This feature is only available in Airflow "
"version >= 2.8.0 where Auth Managers are introduced."
)
if TYPE_CHECKING:
from botocore.client import BaseClient
[docs]log = logging.getLogger(__name__)
@cli_utils.action_cli
@providers_configuration_loaded
[docs]def init_avp(args):
"""Initialize Amazon Verified Permissions resources."""
client = _get_client()
# Create the policy store if needed
policy_store_id, is_new_policy_store = _create_policy_store(client, args)
if not is_new_policy_store:
print(
f"Since an existing policy store with description '{args.policy_store_description}' has been found in Amazon Verified Permissions, "
"the CLI nade no changes to this policy store for security reasons. "
"Any modification to this policy store must be done manually.",
)
else:
# Set the schema
_set_schema(client, policy_store_id, args)
if not args.dry_run:
print("Amazon Verified Permissions resources created successfully.")
print("Please set them in Airflow configuration under AIRFLOW__AWS_AUTH_MANAGER__<config name>.")
print(json.dumps({"avp_policy_store_id": policy_store_id}, indent=4))
@cli_utils.action_cli
@providers_configuration_loaded
[docs]def update_schema(args):
"""Update Amazon Verified Permissions policy store schema."""
client = _get_client()
_set_schema(client, args.policy_store_id, args)
if not args.dry_run:
print("Amazon Verified Permissions policy store schema updated successfully.")
def _get_client():
"""Returns Amazon Verified Permissions client."""
region_name = conf.get(CONF_SECTION_NAME, CONF_REGION_NAME_KEY)
return boto3.client("verifiedpermissions", region_name=region_name)
def _create_policy_store(client: BaseClient, args) -> tuple[str | None, bool]:
"""
Create if needed the policy store.
This function returns two elements:
- the policy store ID
- whether the policy store ID returned refers to a newly created policy store.
"""
paginator = client.get_paginator("list_policy_stores")
pages = paginator.paginate()
policy_stores = [application for page in pages for application in page["policyStores"]]
existing_policy_stores = [
policy_store
for policy_store in policy_stores
if policy_store.get("description") == args.policy_store_description
]
if args.verbose:
log.debug("Policy stores found: %s", policy_stores)
log.debug("Existing policy stores found: %s", existing_policy_stores)
if len(existing_policy_stores) > 0:
print(
f"There is already a policy store with description '{args.policy_store_description}' in Amazon Verified Permissions: '{existing_policy_stores[0]['policyStoreId']}'."
)
return existing_policy_stores[0]["policyStoreId"], False
else:
print(f"No policy store with description '{args.policy_store_description}' found, creating one.")
if args.dry_run:
print(
"Dry run, not creating the policy store with description '{args.policy_store_description}'."
)
return None, True
response = client.create_policy_store(
validationSettings={
"mode": "OFF",
},
description=args.policy_store_description,
)
if args.verbose:
log.debug("Response from create_policy_store: %s", response)
print(f"Policy store created: '{response['policyStoreId']}'")
return response["policyStoreId"], True
def _set_schema(client: BaseClient, policy_store_id: str, args) -> None:
"""Set the policy store schema."""
if args.dry_run:
print(f"Dry run, not updating the schema of the policy store with ID '{policy_store_id}'.")
return
if args.verbose:
log.debug("Disabling schema validation before updating schema")
response = client.update_policy_store(
policyStoreId=policy_store_id,
validationSettings={
"mode": "OFF",
},
)
if args.verbose:
log.debug("Response from update_policy_store: %s", response)
schema_path = Path(__file__).parents[0].joinpath("schema.json").resolve()
with open(schema_path) as schema_file:
response = client.put_schema(
policyStoreId=policy_store_id,
definition={
"cedarJson": schema_file.read(),
},
)
if args.verbose:
log.debug("Response from put_schema: %s", response)
if args.verbose:
log.debug("Enabling schema validation after updating schema")
response = client.update_policy_store(
policyStoreId=policy_store_id,
validationSettings={
"mode": "STRICT",
},
)
if args.verbose:
log.debug("Response from update_policy_store: %s", response)