Source code for airflow.providers.google.cloud.utils.field_sanitizer
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Sanitizer for body fields sent via Google Cloud API.The sanitizer removes fields specified from the body.Context-------In some cases where Google Cloud operation requires modification of existing resources (suchas instances or instance templates) we need to sanitize body of the resources returnedvia Google Cloud APIs. This is in the case when we retrieve information from Google Cloud first,modify the body and either update the existing resource or create a new one with themodified body. Usually when you retrieve resource from Google Cloud you get some extra fields whichare Output-only, and we need to delete those fields if we want to usethe body as input for subsequent create/insert type operation.Field specification-------------------Specification of fields is an array of strings which denote names of fields to be removed.The field can be either direct field name to remove from the body or the fullspecification of the path you should delete - separated with '.'>>> FIELDS_TO_SANITIZE = [>>> "kind",>>> "properties.disks.kind",>>> "properties.metadata.kind",>>>]>>> body = {>>> "kind": "compute#instanceTemplate",>>> "name": "instance",>>> "properties": {>>> "disks": [>>> {>>> "name": "a",>>> "kind": "compute#attachedDisk",>>> "type": "PERSISTENT",>>> "mode": "READ_WRITE",>>> },>>> {>>> "name": "b",>>> "kind": "compute#attachedDisk",>>> "type": "PERSISTENT",>>> "mode": "READ_WRITE",>>> }>>> ],>>> "metadata": {>>> "kind": "compute#metadata",>>> "fingerprint": "GDPUYxlwHe4=">>> },>>> }>>> }>>> sanitizer=GcpBodyFieldSanitizer(FIELDS_TO_SANITIZE)>>> sanitizer.sanitize(body)>>> json.dumps(body, indent=2){ "name": "instance", "properties": { "disks": [ { "name": "a", "type": "PERSISTENT", "mode": "READ_WRITE", }, { "name": "b", "type": "PERSISTENT", "mode": "READ_WRITE", } ], "metadata": { "fingerprint": "GDPUYxlwHe4=" }, }}Note that the components of the path can be either dictionaries or arrays of dictionaries.In case they are dictionaries, subsequent component names key of the field, in case ofarrays - the sanitizer iterates through all dictionaries in the array and searchescomponents in all elements of the array."""from__future__importannotationsfromairflow.exceptionsimportAirflowExceptionfromairflow.utils.log.logging_mixinimportLoggingMixin
[docs]classGcpFieldSanitizerException(AirflowException):"""Thrown when sanitizer finds unexpected field type in the path (other than dict or array)."""
[docs]classGcpBodyFieldSanitizer(LoggingMixin):"""Sanitizes the body according to specification. :param sanitize_specs: array of strings that specifies which fields to remove """def__init__(self,sanitize_specs:list[str])->None:super().__init__()self._sanitize_specs=sanitize_specsdef_sanitize(self,dictionary,remaining_field_spec,current_path):field_split=remaining_field_spec.split(".",1)iflen(field_split)==1:field_name=field_split[0]iffield_nameindictionary:self.log.info("Deleted %s [%s]",field_name,current_path)deldictionary[field_name]else:self.log.debug("The field %s is missing in %s at the path %s.",field_name,dictionary,current_path)else:field_name=field_split[0]remaining_path=field_split[1]child=dictionary.get(field_name)ifchildisNone:self.log.debug("The field %s is missing in %s at the path %s. ",field_name,dictionary,current_path)elifisinstance(child,dict):self._sanitize(child,remaining_path,f"{current_path}.{field_name}")elifisinstance(child,list):forindex,eleminenumerate(child):ifnotisinstance(elem,dict):self.log.warning("The field %s element at index %s is of wrong type. ""It should be dict and is %s. Skipping it.",current_path,index,elem,)self._sanitize(elem,remaining_path,f"{current_path}.{field_name}[{index}]")else:self.log.warning("The field %s is of wrong type. It should be dict or list and it is %s. Skipping it.",current_path,child,)
[docs]defsanitize(self,body)->None:"""Sanitizes the body according to specification."""foreleminself._sanitize_specs:self._sanitize(body,elem,"")