PythonOperator

Use the PythonOperator to execute Python callables.

airflow/example_dags/example_python_operator.pyView Source

def print_context(ds, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    pprint(kwargs)
    print(ds)
    return 'Whatever you return gets printed in the logs'

run_this = PythonOperator(
    task_id='print_the_context',
    python_callable=print_context,
)

Passing in arguments

Use the op_args and op_kwargs arguments to pass additional arguments to the Python callable.

airflow/example_dags/example_python_operator.pyView Source

def my_sleeping_function(random_base):
    """This is a function that will run within the DAG execution"""
    time.sleep(random_base)

# Generate 5 sleeping tasks, sleeping from 0.0 to 0.4 seconds respectively
for i in range(5):
    task = PythonOperator(
        task_id='sleep_for_' + str(i),
        python_callable=my_sleeping_function,
        op_kwargs={'random_base': float(i) / 10},
    )

    run_this >> task

Templating

Airflow passes in an additional set of keyword arguments: one for each of the Jinja template variables and a templates_dict argument.

The templates_dict argument is templated, so each value in the dictionary is evaluated as a Jinja template.

PythonVirtualenvOperator

Use the PythonVirtualenvOperator to execute Python callables inside a new Python virtual environment.

airflow/example_dags/example_python_operator.pyView Source

def callable_virtualenv():
    """
    Example function that will be performed in a virtual environment.

    Importing at the module level ensures that it will not attempt to import the
    library before it is installed.
    """
    from time import sleep

    from colorama import Back, Fore, Style

    print(Fore.RED + 'some red text')
    print(Back.GREEN + 'and with a green background')
    print(Style.DIM + 'and in dim text')
    print(Style.RESET_ALL)
    for _ in range(10):
        print(Style.DIM + 'Please wait...', flush=True)
        sleep(10)
    print('Finished')

virtualenv_task = PythonVirtualenvOperator(
    task_id="virtualenv_python",
    python_callable=callable_virtualenv,
    requirements=["colorama==0.4.0"],
    system_site_packages=False,
)

Passing in arguments

You can use the op_args and op_kwargs arguments the same way you use it in the PythonOperator. Unfortunately we currently do not support to serialize var and ti / task_instance due to incompatibilities with the underlying library. For airflow context variables make sure that you either have access to Airflow through setting system_site_packages to True or add apache-airflow to the requirements argument. Otherwise you won't have access to the most context variables of Airflow in op_kwargs. If you want the context related to datetime objects like data_interval_start you can add pendulum and lazy_object_proxy.

Templating

You can use jinja Templating the same way you use it in PythonOperator.

Was this entry helpful?