Source code for airflow.providers.common.sql.triggers.sql
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromtypingimportTYPE_CHECKINGfromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookfromairflow.providers.common.sql.hooks.sqlimportDbApiHookfromairflow.triggers.baseimportBaseTrigger,TriggerEventifTYPE_CHECKING:fromcollections.abcimportAsyncIteratorfromtypingimportAny
[docs]classSQLExecuteQueryTrigger(BaseTrigger):""" A trigger that executes SQL code in async mode. :param sql: the sql statement to be executed (str) or a list of sql statements to execute :param conn_id: the connection ID used to connect to the database :param hook_params: hook parameters """def__init__(self,sql:str|list[str],conn_id:str,hook_params:dict|None=None,**kwargs,):super().__init__(**kwargs)
[docs]defserialize(self)->tuple[str,dict[str,Any]]:"""Serialize the SQLExecuteQueryTrigger arguments and classpath."""return(f"{self.__class__.__module__}.{self.__class__.__name__}",{"sql":self.sql,"conn_id":self.conn_id,"hook_params":self.hook_params,},)
[docs]asyncdefrun(self)->AsyncIterator[TriggerEvent]:try:hook=BaseHook.get_hook(self.conn_id,hook_params=self.hook_params)ifnotisinstance(hook,DbApiHook):raiseAirflowException(f"You are trying to use `common-sql` with {hook.__class__.__name__},"" but its provider does not support it. Please upgrade the provider to a version that"" supports `common-sql`. The hook class should be a subclass of"f" `{hook.__class__.__module__}.{hook.__class__.__name__}`."f" Got {hook.__class__.__name__} hook with class hierarchy: {hook.__class__.mro()}")self.log.info("Extracting data from %s",self.conn_id)self.log.info("Executing: \n%s",self.sql)self.log.info("Reading records from %s",self.conn_id)results=hook.get_records(self.sql)self.log.info("Reading records from %s done!",self.conn_id)self.log.debug("results: %s",results)yieldTriggerEvent({"status":"success","results":results})exceptExceptionase:self.log.exception("An error occurred: %s",e)yieldTriggerEvent({"status":"failure","message":str(e)})