Source code for airflow.providers.databricks.triggers.databricks
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportasynciofromtypingimportAnyfromairflow.providers.databricks.hooks.databricksimportDatabricksHookfromairflow.triggers.baseimportBaseTrigger,TriggerEvent
[docs]classDatabricksExecutionTrigger(BaseTrigger):""" The trigger handles the logic of async communication with DataBricks API. :param run_id: id of the run :param databricks_conn_id: Reference to the :ref:`Databricks connection <howto/connection:databricks>`. :param polling_period_seconds: Controls the rate of the poll for the result of this run. By default, the trigger will poll every 30 seconds. :param retry_limit: The number of times to retry the connection in case of service outages. :param retry_delay: The number of seconds to wait between retries. :param retry_args: An optional dictionary with arguments passed to ``tenacity.Retrying`` class. :param run_page_url: The run page url. """def__init__(self,run_id:int,databricks_conn_id:str,polling_period_seconds:int=30,retry_limit:int=3,retry_delay:int=10,retry_args:dict[Any,Any]|None=None,run_page_url:str|None=None,)->None:super().__init__()self.run_id=run_idself.databricks_conn_id=databricks_conn_idself.polling_period_seconds=polling_period_secondsself.retry_limit=retry_limitself.retry_delay=retry_delayself.retry_args=retry_argsself.run_page_url=run_page_urlself.hook=DatabricksHook(databricks_conn_id,retry_limit=self.retry_limit,retry_delay=self.retry_delay,retry_args=retry_args,)
[docs]asyncdefrun(self):asyncwithself.hook:whileTrue:run_state=awaitself.hook.a_get_run_state(self.run_id)ifrun_state.is_terminal:yieldTriggerEvent({"run_id":self.run_id,"run_page_url":self.run_page_url,"run_state":run_state.to_json(),})returnelse:self.log.info("run-id %s in run state %s. sleeping for %s seconds",self.run_id,run_state,self.polling_period_seconds,)awaitasyncio.sleep(self.polling_period_seconds)