Operators¶
An Operator is conceptually a template for a predefined Task, that you can just define declaratively inside your DAG:
with DAG("my-dag") as dag:
ping = SimpleHttpOperator(endpoint="http://example.com/update/")
email = EmailOperator(to="admin@example.com", subject="Update complete")
ping >> email
Airflow has a very extensive set of operators available, with some built-in to the core or pre-installed providers. Some popular operators from core include:
BashOperator
- executes a bash commandPythonOperator
- calls an arbitrary Python functionEmailOperator
- sends an email
For a list of all core operators, see: Core Operators and Hooks Reference.
If the operator you need isn’t installed with Airflow by default, you can probably find it as part of our huge set of community provider packages. Some popular operators from here include:
S3FileTransformOperator
But there are many, many more - you can see the full list of all community-managed operators, hooks, sensors and transfers in our providers packages documentation.
Note
Inside Airflow’s code, we often mix the concepts of Tasks and Operators, and they are mostly interchangeable. However, when we talk about a Task, we mean the generic “unit of execution” of a DAG; when we talk about an Operator, we mean a reusable, pre-made Task template whose logic is all done for you and that just needs some arguments.
Jinja Templating¶
Airflow leverages the power of Jinja Templating and this can be a powerful tool to use in combination with macros.
For example, say you want to pass the start of the data interval as an environment variable to a Bash script using the BashOperator
:
# The start of the data interval as YYYY-MM-DD
date = "{{ ds }}"
t = BashOperator(
task_id="test_env",
bash_command="/tmp/test.sh ",
dag=dag,
env={"DATA_INTERVAL_START": date},
)
Here, {{ ds }}
is a templated variable, and because the env
parameter of the BashOperator
is templated with Jinja, the data interval’s start date will be available as an environment variable named DATA_INTERVAL_START
in your Bash script.
You can use Jinja templating with every parameter that is marked as “templated” in the documentation. Template substitution occurs just before the pre_execute
function of your operator is called.
You can also use Jinja templating with nested fields, as long as these nested fields are marked as templated in the structure they belong to: fields registered in template_fields
property will be submitted to template substitution, like the path
field in the example below:
class MyDataReader:
template_fields = ["path"]
def __init__(self, my_path):
self.path = my_path
# [additional code here...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataReader("/tmp/{{ ds }}/my_file")],
dag=dag,
)
Note
The template_fields
property can equally be a class variable or an instance variable.
Deep nested fields can also be substituted, as long as all intermediate fields are marked as template fields:
class MyDataTransformer:
template_fields = ["reader"]
def __init__(self, my_reader):
self.reader = my_reader
# [additional code here...]
class MyDataReader:
template_fields = ["path"]
def __init__(self, my_path):
self.path = my_path
# [additional code here...]
t = PythonOperator(
task_id="transform_data",
python_callable=transform_data,
op_args=[MyDataTransformer(MyDataReader("/tmp/{{ ds }}/my_file"))],
dag=dag,
)
You can pass custom options to the Jinja Environment
when creating your DAG. One common usage is to avoid Jinja from dropping a trailing newline from a template string:
my_dag = DAG(
dag_id="my-dag",
jinja_environment_kwargs={
"keep_trailing_newline": True,
# some other jinja2 Environment options here
},
)
See the Jinja documentation to find all available options.
Rendering Fields as Native Python Objects¶
By default, all the template_fields
are rendered as strings.
Example, let’s say extract
task pushes a dictionary
(Example: {"1001": 301.27, "1002": 433.21, "1003": 502.22}
) to XCom table.
Now, when the following task is run, order_data
argument is passed a string, example:
'{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
.
transform = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
python_callable=transform,
)
If you instead want the rendered template field to return a Native Python object (dict
in our example),
you can pass render_template_as_native_obj=True
to the DAG as follows:
dag = DAG(
dag_id="example_template_as_python_object",
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
render_template_as_native_obj=True,
)
def extract():
data_string = '{"1001": 301.27, "1002": 433.21, "1003": 502.22}'
return json.loads(data_string)
def transform(order_data):
print(type(order_data))
for value in order_data.values():
total_order_value += value
return {"total_order_value": total_order_value}
extract_task = PythonOperator(task_id="extract", python_callable=extract)
transform_task = PythonOperator(
task_id="transform",
op_kwargs={"order_data": "{{ti.xcom_pull('extract')}}"},
python_callable=transform,
)
extract_task >> transform_task
In this case, order_data
argument is passed: {"1001": 301.27, "1002": 433.21, "1003": 502.22}
.
Airflow uses Jinja’s NativeEnvironment
when render_template_as_native_obj
is set to True
.
With NativeEnvironment
, rendering a template produces a native Python type.
Reserved params keyword¶
In Apache Airflow 2.2.0 params
variable is used during DAG serialization. Please do not use that name in third party operators.
If you upgrade your environment and get the following error:
AttributeError: 'str' object has no attribute '__module__'
change name from params
in your operators.