Source code for airflow.providers.qubole.sensors.qubole
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromtypingimportTYPE_CHECKING,Sequencefromqds_sdk.quboleimportQubolefromqds_sdk.sensorsimportFileSensor,PartitionSensorfromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHookfromairflow.sensors.baseimportBaseSensorOperatorifTYPE_CHECKING:fromairflow.utils.contextimportContext
[docs]classQuboleSensor(BaseSensorOperator):"""Base class for all Qubole Sensors."""
def__init__(self,*,data,qubole_conn_id:str="qubole_default",**kwargs)->None:self.data=dataself.qubole_conn_id=qubole_conn_idif"poke_interval"inkwargsandkwargs["poke_interval"]<5:raiseAirflowException(f"Sorry, poke_interval can't be less than 5 sec for task '{kwargs['task_id']}' "f"in dag '{kwargs['dag'].dag_id}'.")super().__init__(**kwargs)
[docs]defpoke(self,context:Context)->bool:conn=BaseHook.get_connection(self.qubole_conn_id)Qubole.configure(api_token=conn.password,api_url=conn.host)self.log.info("Poking: %s",self.data)status=Falsetry:status=self.sensor_class.check(self.data)# type: ignore[attr-defined]exceptExceptionase:self.log.exception(e)status=Falseself.log.info("Status of this Poke: %s",status)returnstatus
[docs]classQuboleFileSensor(QuboleSensor):""" Wait for a file or folder to be present in cloud storage. Check for file or folder presence via QDS APIs. .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/operator:QuboleFileSensor` :param qubole_conn_id: Connection id which consists of qds auth_token :param data: a JSON object containing payload, whose presence needs to be checked Check this `example <https://github.com/apache/airflow/blob/main\ /airflow/providers/qubole/example_dags/example_qubole_sensor.py>`_ for sample payload structure. .. note:: Both ``data`` and ``qubole_conn_id`` fields support templating. You can also use ``.txt`` files for template-driven use cases. """def__init__(self,**kwargs)->None:self.sensor_class=FileSensorsuper().__init__(**kwargs)
[docs]classQubolePartitionSensor(QuboleSensor):""" Wait for a Hive partition to show up in QHS (Qubole Hive Service). Check for Hive partition presence via QDS APIs. .. seealso:: For more information on how to use this sensor, take a look at the guide: :ref:`howto/operator:QubolePartitionSensor` :param qubole_conn_id: Connection id which consists of qds auth_token :param data: a JSON object containing payload, whose presence needs to be checked. Check this `example <https://github.com/apache/airflow/blob/main\ /airflow/providers/qubole/example_dags/example_qubole_sensor.py>`_ for sample payload structure. .. note:: Both ``data`` and ``qubole_conn_id`` fields support templating. You can also use ``.txt`` files for template-driven use cases. """def__init__(self,**kwargs)->None:self.sensor_class=PartitionSensorsuper().__init__(**kwargs)