PythonOperator¶
Use the PythonOperator
to execute
Python callables.
def print_context(ds, **kwargs):
"""Print the Airflow context and ds variable from the context."""
pprint(kwargs)
print(ds)
return 'Whatever you return gets printed in the logs'
run_this = PythonOperator(
task_id='print_the_context',
python_callable=print_context,
dag=dag,
)
Passing in arguments¶
Use the op_args
and op_kwargs
arguments to pass additional arguments
to the Python callable.
def my_sleeping_function(random_base):
"""This is a function that will run within the DAG execution"""
time.sleep(random_base)
# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
task = PythonOperator(
task_id='sleep_for_' + str(i),
python_callable=my_sleeping_function,
op_kwargs={'random_base': float(i) / 10},
dag=dag,
)
run_this >> task
Templating¶
Airflow passes in an additional set of keyword arguments: one for each of the
Jinja template variables and a templates_dict
argument.
The templates_dict
argument is templated, so each value in the dictionary
is evaluated as a Jinja template.
PythonVirtualenvOperator¶
Use the PythonVirtualenvOperator
to execute
Python callables inside a new Python virtual environment.
def callable_virtualenv():
"""
Example function that will be performed in a virtual environment.
Importing at the module level ensures that it will not attempt to import the
library before it is installed.
"""
from time import sleep
from colorama import Back, Fore, Style
print(Fore.RED + 'some red text')
print(Back.GREEN + 'and with a green background')
print(Style.DIM + 'and in dim text')
print(Style.RESET_ALL)
for _ in range(10):
print(Style.DIM + 'Please wait...', flush=True)
sleep(10)
print('Finished')
virtualenv_task = PythonVirtualenvOperator(
task_id="virtualenv_python",
python_callable=callable_virtualenv,
requirements=["colorama==0.4.0"],
system_site_packages=False,
dag=dag,
)
Passing in arguments¶
You can use the op_args
and op_kwargs
arguments the same way you use it in the PythonOperator.
Unfortunately we currently do not support to serialize var
and ti
/ task_instance
due to incompatibilities
with the underlying library. For airflow context variables make sure that you either have access to Airflow through
setting system_site_packages
to True
or add apache-airflow
to the requirements
argument.
Otherwise you won’t have access to the most context variables of Airflow in op_kwargs
.
If you want the context related to datetime objects like execution_date
you can add pendulum
and
lazy_object_proxy
.
Templating¶
You can use jinja Templating the same way you use it in PythonOperator.