Source code for airflow.providers.vertica.hooks.vertica
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromcollections.abcimportIterable,MappingfromtypingimportAny,Callable,overloadfromvertica_pythonimportconnectfromairflow.providers.common.sql.hooks.sqlimportDbApiHook,fetch_all_handler
[docs]defvertica_fetch_all_handler(cursor)->list[tuple]|None:""" Replace the default DbApiHook fetch_all_handler in order to fix this issue https://github.com/apache/airflow/issues/32993. Returned value will not change after the initial call of fetch_all_handler, all the remaining code is here only to make vertica client throws error. With Vertica, if you run the following sql (with split_statements set to false): INSERT INTO MyTable (Key, Label) values (1, 'test 1'); INSERT INTO MyTable (Key, Label) values (1, 'test 2'); INSERT INTO MyTable (Key, Label) values (3, 'test 3'); each insert will have its own result set and if you don't try to fetch data of those result sets you won't detect error on the second insert. """result=fetch_all_handler(cursor)# loop on all statement result sets to get errorsifcursor.descriptionisnotNone:whilecursor.nextset():ifcursor.descriptionisnotNone:row=cursor.fetchone()whilerow:row=cursor.fetchone()returnresult
[docs]classVerticaHook(DbApiHook):""" Interact with Vertica. This hook use a customized version of default fetch_all_handler named vertica_fetch_all_handler. """
@overloaddefrun(self,sql:str|Iterable[str],autocommit:bool=...,parameters:Iterable|Mapping[str,Any]|None=...,handler:Callable[[Any],Any]=...,split_statements:bool=...,return_last:bool=...,)->Any|list[Any]:...defrun(self,sql:str|Iterable[str],autocommit:bool=False,parameters:Iterable|Mapping|None=None,handler:Callable[[Any],Any]|None=None,split_statements:bool=False,return_last:bool=True,)->Any|list[Any]|None:""" Overwrite the common sql run. Will automatically replace fetch_all_handler by vertica_fetch_all_handler. """ifhandler==fetch_all_handler:handler=vertica_fetch_all_handlerreturnDbApiHook.run(self,sql,autocommit,parameters,handler,split_statements,return_last)