# -*- coding: utf-8 -*-
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from time import sleep
from airflow.exceptions import AirflowException, AirflowSensorTimeout, \
AirflowSkipException
from airflow.models import BaseOperator, SkipMixin
from airflow.utils import timezone
from airflow.utils.decorators import apply_defaults
[docs]class BaseSensorOperator(BaseOperator, SkipMixin):
"""
Sensor operators are derived from this class an inherit these attributes.
Sensor operators keep executing at a time interval and succeed when
a criteria is met and fail if and when they time out.
:param soft_fail: Set to true to mark the task as SKIPPED on failure
:type soft_fail: bool
:param poke_interval: Time in seconds that the job should wait in
between each tries
:type poke_interval: int
:param timeout: Time, in seconds before the task times out and fails.
:type timeout: int
"""
ui_color = '#e6f1f2'
@apply_defaults
def __init__(self,
poke_interval=60,
timeout=60 * 60 * 24 * 7,
soft_fail=False,
*args,
**kwargs):
super(BaseSensorOperator, self).__init__(*args, **kwargs)
self.poke_interval = poke_interval
self.soft_fail = soft_fail
self.timeout = timeout
[docs] def poke(self, context):
"""
Function that the sensors defined while deriving this class should
override.
"""
raise AirflowException('Override me.')
[docs] def execute(self, context):
started_at = timezone.utcnow()
while not self.poke(context):
if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
# If sensor is in soft fail mode but will be retried then
# give it a chance and fail with timeout.
# This gives the ability to set up non-blocking AND soft-fail sensors.
if self.soft_fail and not context['ti'].is_eligible_to_retry():
self._do_skip_downstream_tasks(context)
raise AirflowSkipException('Snap. Time is OUT.')
else:
raise AirflowSensorTimeout('Snap. Time is OUT.')
sleep(self.poke_interval)
self.log.info("Success criteria met. Exiting.")
def _do_skip_downstream_tasks(self, context):
downstream_tasks = context['task'].get_flat_relatives(upstream=False)
self.log.debug("Downstream task_ids %s", downstream_tasks)
if downstream_tasks:
self.skip(context['dag_run'], context['ti'].execution_date, downstream_tasks)