airflow.sensors.smart_sensor

Module Contents

Classes

SensorWork

This class stores a sensor work with decoded context value. It is only used

CachedPokeWork

Wrapper class for the poke work inside smart sensor. It saves

SensorExceptionInfo

Hold sensor exception information and the type of exception. For possible transient

SmartSensorOperator

Smart sensor operators are derived from this class.

Attributes

config

handler_config

formatter_config

dictConfigurator

airflow.sensors.smart_sensor.config[source]
airflow.sensors.smart_sensor.handler_config[source]
airflow.sensors.smart_sensor.formatter_config[source]
airflow.sensors.smart_sensor.dictConfigurator[source]
class airflow.sensors.smart_sensor.SensorWork(si)[source]

This class stores a sensor work with decoded context value. It is only used inside of smart sensor. Create a sensor work based on sensor instance record. A sensor work object has the following attributes: dag_id: sensor_instance dag_id. task_id: sensor_instance task_id. execution_date: sensor_instance execution_date. try_number: sensor_instance try_number poke_context: Decoded poke_context for the sensor task. execution_context: Decoded execution_context. hashcode: This is the signature of poking job. operator: The sensor operator class. op_classpath: The sensor operator class path encoded_poke_context: The raw data from sensor_instance poke_context column. log: The sensor work logger which will mock the corresponding task instance log.

Parameters

si -- The sensor_instance ORM object.

__eq__(self, other)[source]

Return self==value.

static create_new_task_handler()[source]

Create task log handler for a sensor work. :return: log handler

log(self)[source]

Return logger for a sensor instance object.

close_sensor_logger(self)[source]

Close log handler for a sensor work.

property ti_key(self)[source]

Key for the task instance that maps to the sensor work.

property cache_key(self)[source]

Key used to query in smart sensor for cached sensor work.

class airflow.sensors.smart_sensor.CachedPokeWork[source]

Wrapper class for the poke work inside smart sensor. It saves the sensor_task used to poke and recent poke result state. state: poke state. sensor_task: The cached object for executing the poke function. last_poke_time: The latest time this cached work being called. to_flush: If we should flush the cached work.

set_state(self, state)[source]

Set state for cached poke work. :param state: The sensor_instance state.

clear_state(self)[source]

Clear state for cached poke work.

set_to_flush(self)[source]

Mark this poke work to be popped from cached dict after current loop.

is_expired(self)[source]

The cached task object expires if there is no poke for 20 minutes. :return: Boolean

class airflow.sensors.smart_sensor.SensorExceptionInfo(exception_info, is_infra_failure=False, infra_failure_retry_window=datetime.timedelta(minutes=130))[source]

Hold sensor exception information and the type of exception. For possible transient infra failure, give the task more chance to retry before fail it.

set_latest_exception(self, exception_info, is_infra_failure=False)[source]

This function set the latest exception information for sensor exception. If the exception implies an infra failure, this function will check the recorded infra failure timeout which was set at the first infra failure exception arrives. There is a 6 hours window for retry without failing current run.

Parameters
  • exception_info -- Details of the exception information.

  • is_infra_failure -- If current exception was caused by transient infra failure. There is a retry window _infra_failure_retry_window that the smart sensor will retry poke function without failing current task run.

set_infra_failure_timeout(self)[source]

Set the time point when the sensor should be failed if it kept getting infra failure. :return:

should_fail_current_run(self)[source]
Returns

Should the sensor fail

property exception_info(self)[source]
Returns

exception msg.

property is_infra_failure(self)[source]
Returns

If the exception is an infra failure

is_expired(self)[source]
Returns

If current exception need to be kept.

class airflow.sensors.smart_sensor.SmartSensorOperator(poke_interval=180, smart_sensor_timeout=60 * 60 * 24 * 7, soft_fail=False, shard_min=0, shard_max=100000, poke_timeout=6.0, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator, airflow.models.SkipMixin

Smart sensor operators are derived from this class.

Smart Sensor operators keep refresh a dictionary by visiting DB. Taking qualified active sensor tasks. Different from sensor operator, Smart sensor operators poke for all sensor tasks in the dictionary at a time interval. When a criteria is met or fail by time out, it update all sensor task state in task_instance table

Parameters
  • soft_fail -- Set to true to mark the task as SKIPPED on failure

  • poke_interval -- Time in seconds that the job should wait in between each tries.

  • smart_sensor_timeout -- Time, in seconds before the internal sensor job times out if poke_timeout is not defined.

  • shard_min -- shard code lower bound (inclusive)

  • shard_max -- shard code upper bound (exclusive)

  • poke_timeout -- Time, in seconds before the task times out and fails.

ui_color = #e6f1f2[source]
flush_cached_sensor_poke_results(self)[source]

Flush outdated cached sensor states saved in previous loop.

poke(self, sensor_work)[source]

Function that the sensors defined while deriving this class should override.

execute(self, context)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

on_kill(self)[source]

Override this method to cleanup subprocesses when a task instance gets killed. Any use of the threading, subprocess or multiprocessing module within an operator needs to be cleaned up or it will leave ghost processes behind.

Was this entry helpful?