Operators

An Operator is conceptually a template for a predefined Task, that you can just define declaratively inside your DAG:

with DAG("my-dag") as dag:
    ping = SimpleHttpOperator(endpoint="http://example.com/update/")
    email = EmailOperator(to="admin@example.com", subject="Update complete")

    ping >> email

Airflow has a very extensive set of operators available, with some built-in to the core or pre-installed providers. Some popular operators from core include:

For a list of all core operators, see: Core Operators and Hooks Reference.

If the operator you need isn’t installed with Airflow by default, you can probably find it as part of our huge set of community provider packages. Some popular operators from here include:

But there are many, many more - you can see the full list of all community-managed operators, hooks, sensors and transfers in our providers packages documentation.

Note

Inside Airflow’s code, we often mix the concepts of Tasks and Operators, and they are mostly interchangeable. However, when we talk about a Task, we mean the generic “unit of execution” of a DAG; when we talk about an Operator, we mean a reusable, pre-made Task template whose logic is all done for you and that just needs some arguments.

Jinja Templating

Airflow leverages the power of Jinja Templating and this can be a powerful tool to use in combination with macros.

For example, say you want to pass the start of the data interval as an environment variable to a Bash script using the BashOperator:

# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
    task_id="test_env",
    bash_command="/tmp/test.sh ",
    dag=dag,
    env={"DATA_INTERVAL_START": date},
)

Here, {{ ds }} is a templated variable, and because the env parameter of the BashOperator is templated with Jinja, the data interval’s start date will be available as an environment variable named DATA_INTERVAL_START in your Bash script.

You can use Jinja templating with every parameter that is marked as “templated” in the documentation. Template substitution occurs just before the pre_execute function of your operator is called.

You can also use Jinja templating with nested fields, as long as these nested fields are marked as templated in the structure they belong to: fields registered in template_fields property will be submitted to template substitution, like the path field in the example below:

class MyDataReader:
    template_fields = ["path"]

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
    dag=dag,
)

Note

The template_fields property can equally be a class variable or an instance variable.

Deep nested fields can also be substituted, as long as all intermediate fields are marked as template fields:

class MyDataTransformer:
    template_fields = ["reader"]

    def __init__(self, my_reader):
        self.reader = my_reader

    # [additional code here...]


class MyDataReader:
    template_fields = ["path"]

    def __init__(self, my_path):
        self.path = my_path

    # [additional code here...]


t = PythonOperator(
    task_id="transform_data",
    python_callable=transform_data,
    op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
    dag=dag,
)

You can pass custom options to the Jinja Environment when creating your DAG. One common usage is to avoid Jinja from dropping a trailing newline from a template string:

my_dag = DAG(
    dag_id="my-dag",
    jinja_environment_kwargs={
        "keep_trailing_newline": True,
        # some other jinja2 Environment options here
    },
)

See the Jinja documentation to find all available options.

Rendering Fields as Native Python Objects

By default, all the template_fields are rendered as strings.

Example, let’s say extract task pushes a dictionary (Example: {"1001": 301.27, "1002": 433.21, "1003": 502.22}) to XCom table. Now, when the following task is run, order_data argument is passed a string, example: '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'.

transform = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
    python_callable=transform,
)

If you instead want the rendered template field to return a Native Python object (dict in our example), you can pass render_template_as_native_obj=True to the DAG as follows:

dag = DAG(
    dag_id="example_template_as_python_object",
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    render_template_as_native_obj=True,
)


def extract():
    data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
    return json.loads(data_string)


def transform(order_data):
    print(type(order_data))
    for value in order_data.values():
        total_order_value += value
    return {"total_order_value": total_order_value}


extract_task = PythonOperator(task_id="extract", python_callable=extract)

transform_task = PythonOperator(
    task_id="transform",
    op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
    python_callable=transform,
)

extract_task >> transform_task

In this case, order_data argument is passed: {"1001": 301.27, "1002": 433.21, "1003": 502.22}.

Airflow uses Jinja’s NativeEnvironment when render_template_as_native_obj is set to True. With NativeEnvironment, rendering a template produces a native Python type.

Reserved params keyword

In Apache Airflow 2.2.0 params variable is used during DAG serialization. Please do not use that name in third party operators. If you upgrade your environment and get the following error:

AttributeError: 'str' object has no attribute '__module__'

change name from params in your operators.

Was this entry helpful?