PythonOperator¶
Use the PythonOperator
to execute
Python callables.
@task(task_id="print_the_context")
def print_context(ds=None, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = print_context()
Passing in arguments¶
Use the op_args
and op_kwargs
arguments to pass additional arguments
to the Python callable.
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
@task(task_id=f'sleep_for_{i}')
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
sleeping_task = my_sleeping_function(random_base=float(i) / 10)
run_this >> sleeping_task
Templating¶
Airflow passes in an additional set of keyword arguments: one for each of the
Jinja template variables and a templates_dict
argument.
The templates_dict
argument is templated, so each value in the dictionary
is evaluated as a Jinja template.
PythonVirtualenvOperator¶
Use the PythonVirtualenvOperator
to execute Python callables inside a new Python virtual environment. The virtualenv
package needs to be installed in the environment that runs Airflow (as optional dependency pip install airflow[virtualenv] --constraint ...
).
@task.virtualenv(
task_id="virtualenv_python", requirements=["colorama==0.4.0"], system_site_packages=False
)
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + 'some red text')
print(Back.GREEN + 'and with a green background')
print(Style.DIM + 'and in dim text')
print(Style.RESET_ALL)
for _ in range(10):
print(Style.DIM + 'Please wait...', flush=True)
sleep(10)
print('Finished')
virtualenv_task = callable_virtualenv()
Passing in arguments¶
You can use the op_args
and op_kwargs
arguments the same way you use it in the PythonOperator.
Unfortunately we currently do not support to serialize var
and ti
/ task_instance
due to incompatibilities
with the underlying library. For Airflow context variables make sure that you either have access to Airflow through
setting system_site_packages
to True
or add apache-airflow
to the requirements
argument.
Otherwise you won’t have access to the most context variables of Airflow in op_kwargs
.
If you want the context related to datetime objects like data_interval_start
you can add pendulum
and
lazy_object_proxy
.
If additional parameters for package installation are needed pass them in requirements.txt
as in the example below:
SomePackage==0.2.1 --pre --index-url http://some.archives.com/archives
AnotherPackage==1.4.3 --no-index --find-links /my/local/archives
All supported options are listed in the requirements file format.
ShortCircuitOperator¶
Use the ShortCircuitOperator
to control whether a pipeline continues
if a condition is satisfied or a truthy value is obtained. The evaluation of this condition and truthy value
is done via the output of a python_callable
. If the python_callable
returns True or a truthy value,
the pipeline is allowed to continue and an XCom of the output will be pushed. If the
output is False or a falsy value, the pipeline will be short-circuited based on the configured
short-circuiting (more on this later). In the example below, the tasks that follow the “condition_is_True”
ShortCircuitOperator will execute while the tasks downstream of the “condition_is_False” ShortCircuitOperator
will be skipped.
cond_true = ShortCircuitOperator(
task_id='condition_is_True',
python_callable=lambda: True,
)
cond_false = ShortCircuitOperator(
task_id='condition_is_False',
python_callable=lambda: False,
)
ds_true = [EmptyOperator(task_id='true_' + str(i)) for i in [1, 2]]
ds_false = [EmptyOperator(task_id='false_' + str(i)) for i in [1, 2]]
chain(cond_true, *ds_true)
chain(cond_false, *ds_false)
The “short-circuiting” can be configured to either respect or ignore the trigger rule
defined for downstream tasks. If ignore_downstream_trigger_rules
is set to True, the default configuration, all
downstream tasks are skipped without considering the trigger_rule
defined for tasks. If this parameter is
set to False, the direct downstream tasks are skipped but the specified trigger_rule
for other subsequent
downstream tasks are respected. In this short-circuiting configuration, the operator assumes the direct
downstream task(s) were purposely meant to be skipped but perhaps not other subsequent tasks. This
configuration is especially useful if only part of a pipeline should be short-circuited rather than all
tasks which follow the ShortCircuitOperator task.
In the example below, notice that the ShortCircuitOperator task is configured to respect downstream trigger
rules. This means while the tasks that follow the “short_circuit” ShortCircuitOperator task will be skipped
since the python_callable
returns False, “task_7” will still execute as its set to execute when upstream
tasks have completed running regardless of status (i.e. the TriggerRule.ALL_DONE
trigger rule).
[task_1, task_2, task_3, task_4, task_5, task_6] = [
EmptyOperator(task_id=f"task_{i}") for i in range(1, 7)
]
task_7 = EmptyOperator(task_id="task_7", trigger_rule=TriggerRule.ALL_DONE)
short_circuit = ShortCircuitOperator(
task_id="short_circuit", ignore_downstream_trigger_rules=False, python_callable=lambda: False
)
chain(task_1, [task_2, short_circuit], [task_3, task_4], [task_5, task_6], task_7)
Passing in arguments¶
Both the op_args
and op_kwargs
arguments can be used in same way as described for the PythonOperator.
Templating¶
Jinja templating can be used in same way as described for the PythonOperator.