Source code for airflow.sensors.base_sensor_operator

# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.


from time import sleep

from airflow.exceptions import AirflowException, AirflowSensorTimeout, \
    AirflowSkipException
from airflow.models import BaseOperator, SkipMixin
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults


[docs]class BaseSensorOperator(BaseOperator, SkipMixin): """ Sensor operators are derived from this class an inherit these attributes. Sensor operators keep executing at a time interval and succeed when a criteria is met and fail if and when they time out. :param soft_fail: Set to true to mark the task as SKIPPED on failure :type soft_fail: bool :param poke_interval: Time in seconds that the job should wait in between each tries :type poke_interval: int :param timeout: Time, in seconds before the task times out and fails. :type timeout: int """ ui_color = '#e6f1f2' @apply_defaults def __init__(self, poke_interval=60, timeout=60 * 60 * 24 * 7, soft_fail=False, *args, **kwargs): super(BaseSensorOperator, self).__init__(*args, **kwargs) self.poke_interval = poke_interval self.soft_fail = soft_fail self.timeout = timeout
[docs] def poke(self, context): """ Function that the sensors defined while deriving this class should override. """ raise AirflowException('Override me.')
[docs] def execute(self, context): started_at = timezone.utcnow() while not self.poke(context): if (timezone.utcnow() - started_at).total_seconds() > self.timeout: # If sensor is in soft fail mode but will be retried then # give it a chance and fail with timeout. # This gives the ability to set up non-blocking AND soft-fail sensors. if self.soft_fail and not context['ti'].is_eligible_to_retry(): self._do_skip_downstream_tasks(context) raise AirflowSkipException('Snap. Time is OUT.') else: raise AirflowSensorTimeout('Snap. Time is OUT.') sleep(self.poke_interval) self.log.info("Success criteria met. Exiting.")
def _do_skip_downstream_tasks(self, context): downstream_tasks = context['task'].get_flat_relatives(upstream=False) self.log.debug("Downstream task_ids %s", downstream_tasks) if downstream_tasks: self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)