BranchDateTimeOperator

Use the BranchDateTimeOperator to branch into one of two execution paths depending on whether the time falls into the range given by two target arguments,

This operator has two modes. First mode is to use current time (machine clock time at the moment the DAG is executed), and the second mode is to use the logical_date of the DAG run it is run with.

Usage with current time

The usages above might be useful in certain situations - for example when DAG is used to perform cleanups and maintenance and is not really supposed to be used for any DAGs that are supposed to be back-filled, because the “current time” make back-filling non-idempotent, it’s result depend on the time when the DAG actually was run. It’s also slightly non-deterministic potentially even if it is run on schedule. It can take some time between when the DAGRun was scheduled and executed and it might mean that even if the DAGRun was scheduled properly, the actual time used for branching decision will be different than the schedule time and the branching decision might be different depending on those delays.

airflow/example_dags/example_branch_datetime_operator.py[source]

empty_task_11 = EmptyOperator(task_id='date_in_range', dag=dag1)
empty_task_21 = EmptyOperator(task_id='date_outside_range', dag=dag1)

cond1 = BranchDateTimeOperator(
    task_id='datetime_branch',
    follow_task_ids_if_true=['date_in_range'],
    follow_task_ids_if_false=['date_outside_range'],
    target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
    target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
    dag=dag1,
)

# Run empty_task_1 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond1 >> [empty_task_11, empty_task_21]

The target parameters, target_upper and target_lower, can receive a datetime.datetime, a datetime.time, or None. When a datetime.time object is used, it will be combined with the current date in order to allow comparisons with it. In the event that target_upper is set to a datetime.time that occurs before the given target_lower, a day will be added to target_upper. This is done to allow for time periods that span over two dates.

airflow/example_dags/example_branch_datetime_operator.py[source]

empty_task_12 = EmptyOperator(task_id='date_in_range', dag=dag2)
empty_task_22 = EmptyOperator(task_id='date_outside_range', dag=dag2)

cond2 = BranchDateTimeOperator(
    task_id='datetime_branch',
    follow_task_ids_if_true=['date_in_range'],
    follow_task_ids_if_false=['date_outside_range'],
    target_upper=pendulum.time(0, 0, 0),
    target_lower=pendulum.time(15, 0, 0),
    dag=dag2,
)

# Since target_lower happens after target_upper, target_upper will be moved to the following day
# Run empty_task_1 if cond2 executes between 15:00:00, and 00:00:00 of the following day
cond2 >> [empty_task_12, empty_task_22]

If a target parameter is set to None, the operator will perform a unilateral comparison using only the non-None target. Setting both target_upper and target_lower to None will raise an exception.

Usage with logical date

The usage is much more “data range” friendly. The logical_date does not change when the DAG is re-run and it is not affected by execution delays, so this approach is suitable for idempotent DAG runs that might be back-filled.

airflow/example_dags/example_branch_datetime_operator.py[source]

empty_task_13 = EmptyOperator(task_id='date_in_range', dag=dag3)
empty_task_23 = EmptyOperator(task_id='date_outside_range', dag=dag3)

cond3 = BranchDateTimeOperator(
    task_id='datetime_branch',
    use_task_logical_date=True,
    follow_task_ids_if_true=['date_in_range'],
    follow_task_ids_if_false=['date_outside_range'],
    target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
    target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
    dag=dag3,
)

# Run empty_task_3 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
cond3 >> [empty_task_13, empty_task_23]

Was this entry helpful?