Branching operators

Module Contents



This is a base class for creating operators with branching functionality,

class airflow.operators.branch.BaseBranchOperator(task_id: str, owner: str = conf.get('operators', 'DEFAULT_OWNER'), email: Optional[Union[str, Iterable[str]]] = None, email_on_retry: bool = conf.getboolean('email', 'default_email_on_retry', fallback=True), email_on_failure: bool = conf.getboolean('email', 'default_email_on_failure', fallback=True), retries: Optional[int] = conf.getint('core', 'default_task_retries', fallback=0), retry_delay: datetime.timedelta = timedelta(seconds=300), retry_exponential_backoff: bool = False, max_retry_delay: Optional[datetime.timedelta] = None, start_date: Optional[datetime.datetime] = None, end_date: Optional[datetime.datetime] = None, depends_on_past: bool = False, wait_for_downstream: bool = False, dag=None, params: Optional[Dict] = None, default_args: Optional[Dict] = None, priority_weight: int = 1, weight_rule: str = conf.get('core', 'default_task_weight_rule', fallback=WeightRule.DOWNSTREAM), queue: str = conf.get('operators', 'default_queue'), pool: Optional[str] = None, pool_slots: int = 1, sla: Optional[datetime.timedelta] = None, execution_timeout: Optional[datetime.timedelta] = None, on_execute_callback: Optional[TaskStateChangeCallback] = None, on_failure_callback: Optional[TaskStateChangeCallback] = None, on_success_callback: Optional[TaskStateChangeCallback] = None, on_retry_callback: Optional[TaskStateChangeCallback] = None, pre_execute: Optional[TaskPreExecuteHook] = None, post_execute: Optional[TaskPostExecuteHook] = None, trigger_rule: str = TriggerRule.ALL_SUCCESS, resources: Optional[Dict] = None, run_as_user: Optional[str] = None, task_concurrency: Optional[int] = None, max_active_tis_per_dag: Optional[int] = None, executor_config: Optional[Dict] = None, do_xcom_push: bool = True, inlets: Optional[Any] = None, outlets: Optional[Any] = None, task_group: Optional[airflow.utils.task_group.TaskGroup] = None, doc: Optional[str] = None, doc_md: Optional[str] = None, doc_json: Optional[str] = None, doc_yaml: Optional[str] = None, doc_rst: Optional[str] = None, **kwargs)[source]

Bases: airflow.models.BaseOperator, airflow.models.skipmixin.SkipMixin

This is a base class for creating operators with branching functionality, similarly to BranchPythonOperator.

Users should subclass this operator and implement the function choose_branch(self, context). This should run whatever business logic is needed to determine the branch, and return either the task_id for a single task (as a str) or a list of task_ids.

The operator will continue with the returned task_id(s), and all other tasks directly downstream of this operator will be skipped.

abstract choose_branch(self, context: Dict) Union[str, Iterable[str]][source]

Subclasses should implement this, running whatever logic is necessary to choose a branch and returning a task_id or list of task_ids.


context (dict) – Context dictionary as passed to execute()

execute(self, context: Dict)[source]

This is the main method to derive when creating an operator. Context is the same dictionary used as when rendering jinja templates.

Refer to get_template_context for more context.

Was this entry helpful?