Source code for airflow.providers.common.sql.dialects.dialect
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportrefromcollections.abcimportIterable,MappingfromtypingimportTYPE_CHECKING,Any,Callable,TypeVarfrommethodtoolsimportlru_cachefromairflow.utils.log.logging_mixinimportLoggingMixinifTYPE_CHECKING:fromsqlalchemy.engineimportInspector
def__init__(self,hook,**kwargs)->None:super().__init__(**kwargs)fromairflow.providers.common.sql.hooks.sqlimportDbApiHookifnotisinstance(hook,DbApiHook):raiseTypeError(f"hook must be an instance of {DbApiHook.__class__.__name__}")
[docs]defescape_word(self,word:str)->str:""" Escape the word if necessary. If the word is a reserved word or contains special characters or if the ``escape_column_names`` property is set to True in connection extra field, then the given word will be escaped. :param word: Name of the column :return: The escaped word """ifword!=self.escape_word_format.format(self.unescape_word(word))and(self.escape_column_namesorword.casefold()inself.reserved_wordsorself.pattern.search(word)):returnself.escape_word_format.format(word)returnword
[docs]defunescape_word(self,word:str|None)->str|None:""" Remove escape characters from each part of a dotted identifier (e.g., schema.table). :param word: Escaped schema, table, or column name, potentially with multiple segments. :return: The word without escaped characters. """ifnotword:returnwordescape_char_start=self.escape_word_format[0]escape_char_end=self.escape_word_format[-1]defunescape_part(part:str)->str:ifpart.startswith(escape_char_start)andpart.endswith(escape_char_end):returnpart[1:-1]returnpartreturn".".join(map(unescape_part,word.split(".")))
[docs]defgenerate_insert_sql(self,table,values,target_fields,**kwargs)->str:""" Generate the INSERT SQL statement. :param table: Name of the target table :param values: The row to insert into the table :param target_fields: The names of the columns to fill in the table :return: The generated INSERT SQL statement """returnself.insert_statement_format.format(table,self._joined_target_fields(target_fields),self._joined_placeholders(values))
[docs]defgenerate_replace_sql(self,table,values,target_fields,**kwargs)->str:""" Generate the REPLACE SQL statement. :param table: Name of the target table :param values: The row to insert into the table :param target_fields: The names of the columns to fill in the table :return: The generated REPLACE SQL statement """returnself.replace_statement_format.format(table,self._joined_target_fields(target_fields),self._joined_placeholders(values))