Source code for airflow.example_dags.example_branch_python_dop_operator_3

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
Example DAG demonstrating the usage of ``@task.branch`` TaskFlow API decorator with depends_on_past=True,
where tasks may be run or skipped on alternating runs.
from __future__ import annotations

import pendulum

from airflow import DAG
from airflow.decorators import task
from airflow.operators.empty import EmptyOperator

[docs]def should_run(**kwargs): """ Determine which empty_task should be run based on if the execution date minute is even or odd. :param dict kwargs: Context :return: Id of the task to run :rtype: str """ print( f"------------- exec dttm = {kwargs['execution_date']} and minute = {kwargs['execution_date'].minute}" ) if kwargs['execution_date'].minute % 2 == 0: return "empty_task_1" else: return "empty_task_2"
with DAG( dag_id='example_branch_dop_operator_v3', schedule='*/1 * * * *', start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), catchup=False, default_args={'depends_on_past': True}, tags=['example'], ) as dag:
[docs] cond = should_run()
empty_task_1 = EmptyOperator(task_id='empty_task_1') empty_task_2 = EmptyOperator(task_id='empty_task_2') cond >> [empty_task_1, empty_task_2]

Was this entry helpful?