airflow.sensors.smart_sensor

Module Contents

airflow.sensors.smart_sensor.config[source]
airflow.sensors.smart_sensor.handler_config[source]
airflow.sensors.smart_sensor.formatter_config[source]
airflow.sensors.smart_sensor.dictConfigurator[source]
class airflow.sensors.smart_sensor.SensorWork(si)[source]

This class stores a sensor work with decoded context value. It is only used inside of smart sensor. Create a sensor work based on sensor instance record. A sensor work object has the following attributes: dag_id: sensor_instance dag_id. task_id: sensor_instance task_id. execution_date: sensor_instance execution_date. try_number: sensor_instance try_number poke_context: Decoded poke_context for the sensor task. execution_context: Decoded execution_context. hashcode: This is the signature of poking job. operator: The sensor operator class. op_classpath: The sensor operator class path encoded_poke_context: The raw data from sensor_instance poke_context column. log: The sensor work logger which will mock the corresponding task instance log.

Parameters

si -- The sensor_instance ORM object.

ti_key[source]

Key for the task instance that maps to the sensor work.

cache_key[source]

Key used to query in smart sensor for cached sensor work.

__eq__(self, other)[source]
static create_new_task_handler()[source]

Create task log handler for a sensor work. :return: log handler

_get_sensor_logger(self, si)[source]

Return logger for a sensor instance object.

close_sensor_logger(self)[source]

Close log handler for a sensor work.

class airflow.sensors.smart_sensor.CachedPokeWork[source]

Wrapper class for the poke work inside smart sensor. It saves the sensor_task used to poke and recent poke result state. state: poke state. sensor_task: The cached object for executing the poke function. last_poke_time: The latest time this cached work being called. to_flush: If we should flush the cached work.

set_state(self, state)[source]

Set state for cached poke work. :param state: The sensor_instance state.

clear_state(self)[source]

Clear state for cached poke work.

set_to_flush(self)[source]

Mark this poke work to be popped from cached dict after current loop.

is_expired(self)[source]

The cached task object expires if there is no poke for 20 minutes. :return: Boolean

class airflow.sensors.smart_sensor.SensorExceptionInfo(exception_info, is_infra_failure=False, infra_failure_retry_window=datetime.timedelta(minutes=130))[source]

Hold sensor exception information and the type of exception. For possible transient infra failure, give the task more chance to retry before fail it.

exception_info[source]
Returns

exception msg.

is_infra_failure[source]
Returns

If the exception is an infra failure

Type

boolean

set_latest_exception(self, exception_info, is_infra_failure=False)[source]

This function set the latest exception information for sensor exception. If the exception implies an infra failure, this function will check the recorded infra failure timeout which was set at the first infra failure exception arrives. There is a 6 hours window for retry without failing current run.

Parameters
  • exception_info -- Details of the exception information.

  • is_infra_failure -- If current exception was caused by transient infra failure. There is a retry window _infra_failure_retry_window that the smart sensor will retry poke function without failing current task run.

set_infra_failure_timeout(self)[source]

Set the time point when the sensor should be failed if it kept getting infra failure. :return:

should_fail_current_run(self)[source]
Returns

Should the sensor fail

Type

boolean

is_expired(self)[source]
Returns

If current exception need to be kept.

Type

boolean

class airflow.sensors.smart_sensor.SmartSensorOperator(poke_interval=180, smart_sensor_timeout=60 * 60 * 24 * 7, soft_fail=False, shard_min=0, shard_max=100000, poke_timeout=6.0, *args, **kwargs)[source]

Bases: airflow.models.BaseOperator, airflow.models.SkipMixin

Smart sensor operators are derived from this class.

Smart Sensor operators keep refresh a dictionary by visiting DB. Taking qualified active sensor tasks. Different from sensor operator, Smart sensor operators poke for all sensor tasks in the dictionary at a time interval. When a criteria is met or fail by time out, it update all sensor task state in task_instance table

Parameters
  • soft_fail (bool) -- Set to true to mark the task as SKIPPED on failure

  • poke_interval (int) -- Time in seconds that the job should wait in between each tries.

  • smart_sensor_timeout (float) -- Time, in seconds before the internal sensor job times out if poke_timeout is not defined.

  • shard_min (int) -- shard code lower bound (inclusive)

  • shard_max (int) -- shard code upper bound (exclusive)

  • poke_timeout (float) -- Time, in seconds before the task times out and fails.

ui_color = #e6f1f2[source]
_validate_input_values(self)[source]
_load_sensor_works(self, session=None)[source]

Refresh sensor instances need to be handled by this operator. Create smart sensor internal object based on the information persisted in the sensor_instance table.

_update_ti_hostname(self, sensor_works, session=None)[source]

Update task instance hostname for new sensor works.

Parameters
  • sensor_works -- Smart sensor internal object for a sensor task.

  • session -- The sqlalchemy session.

_mark_multi_state(self, operator, poke_hash, encoded_poke_context, state, session=None)[source]

Mark state for multiple tasks in the task_instance table to a new state if they have the same signature as the poke_hash.

Parameters
  • operator -- The sensor's operator class name.

  • poke_hash -- The hash code generated from sensor's poke context.

  • encoded_poke_context -- The raw encoded poke_context.

  • state -- Set multiple sensor tasks to this state.

  • session -- The sqlalchemy session.

_retry_or_fail_task(self, sensor_work, error, session=None)[source]

Change single task state for sensor task. For final state, set the end_date. Since smart sensor take care all retries in one process. Failed sensor tasks logically experienced all retries and the try_number should be set to max_tries.

Parameters
  • sensor_work (SensorWork) -- The sensor_work with exception.

  • error (str.) -- The error message for this sensor_work.

  • session -- The sqlalchemy session.

_check_and_handle_ti_timeout(self, sensor_work)[source]

Check if a sensor task in smart sensor is timeout. Could be either sensor operator timeout or general operator execution_timeout.

Parameters

sensor_work -- SensorWork

_handle_poke_exception(self, sensor_work)[source]

Fail task if accumulated exceptions exceeds retries.

Parameters

sensor_work -- SensorWork

_process_sensor_work_with_cached_state(self, sensor_work, state)[source]
_execute_sensor_work(self, sensor_work)[source]
flush_cached_sensor_poke_results(self)[source]

Flush outdated cached sensor states saved in previous loop.

poke(self, sensor_work)[source]

Function that the sensors defined while deriving this class should override.

_emit_loop_stats(self)[source]
execute(self, context)[source]
on_kill(self)[source]

Was this entry helpful?