Creating a custom Operator

Airflow allows you to create new operators to suit the requirements of you or your team. The extensibility is one of the many reasons which makes Apache Airflow powerful.

You can create any operator you want by extending the airflow.models.baseoperator.BaseOperator

There are two methods that you need to override in a derived class:

  • Constructor - Define the parameters required for the operator. You only need to specify the arguments specific to your operator. Use @apply_defaults decorator function to fill unspecified arguments with default_args. You can specify the default_args in the dag file. See Default args for more details.

  • Execute - The code to execute when the runner calls the operator. The method contains the airflow context as a parameter that can be used to read config values.

Let’s implement an example HelloOperator in a new file hello_operator.py:

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults

class HelloOperator(BaseOperator):

    @apply_defaults
    def __init__(
            self,
            name: str,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.name = name

    def execute(self, context):
        message = "Hello {}".format(self.name)
        print(message)
        return message

Note

For imports to work, you should place the file in a directory that is present in the PYTHONPATH env. Airflow adds dags/, plugins/, and config/ directories in the Airflow home to PYTHONPATH by default. e.g., In our example, the file is placed in the custom_operator directory.

You can now use the derived custom operator as follows:

from custom_operator.hello_operator import HelloOperator

with dag:
    hello_task = HelloOperator(task_id='sample-task', name='foo_bar')

Hooks

Hooks act as an interface to communicate with the external shared resources in a DAG. For example, multiple tasks in a DAG can require access to a MySQL database. Instead of creating a connection per task, you can retrieve a connection from the hook and utilize it. Hook also helps to avoid storing connection auth parameters in a DAG. See Managing Connections for how to create and manage connections.

Let’s extend our previous example to fetch name from MySQL:

class HelloDBOperator(BaseOperator):

        @apply_defaults
        def __init__(
                self,
                name: str,
                mysql_conn_id: str,
                database: str,
                *args, **kwargs) -> None:
            super().__init__(*args, **kwargs)
            self.name = name
            self.mysql_conn_id = mysql_conn_id
            self.database = database

        def execute(self, context):
            hook = MySqlHook(mysql_conn_id=self.mysql_conn_id,
                     schema=self.database)
            sql = "select name from user"
            result = hook.get_first(sql)
            message = "Hello {}".format(result['name'])
            print(message)
            return message

When the operator invokes the query on the hook object, a new connection gets created if it doesn’t exist. The hook retrieves the auth parameters such as username and password from Airflow backend and passes the params to the airflow.hooks.base_hook.BaseHook.get_connection(). You should create hook only in the execute method or any method which is called from execute. The constructor gets called whenever Airflow parses a DAG which happens frequently. The execute gets called only during a DAG run.

User interface

Airflow also allows the developer to control how the operator shows up in the DAG UI. Override ui_color to change the background color of the operator in UI. Override ui_fgcolor to change the color of the label.

class HelloOperator(BaseOperator):
    ui_color = '#ff0000'
    ui_fgcolor = '#000000'
    ....

Templating

You can use Jinja templates to parameterize your operator. Airflow considers the field names present in template_fields for templating while rendering the operator.

class HelloOperator(BaseOperator):

    template_fields = ['name']

    @apply_defaults
    def __init__(
            self,
            name: str,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.name = name

    def execute(self, context):
        message = "Hello from {}".format(self.name)
        print(message)
        return message

You can use the template as follows:

with dag:
    hello_task = HelloOperator(task_id='task_id_1', dag=dag, name='{{ task_instance.task_id }}')

In this example, Jinja looks for the name parameter and substitutes {{ task_instance.task_id }} with task_id_1.

The parameter can also contain a file name, for example, a bash script or a SQL file. You need to add the extension of your file in template_ext. If a template_field contains a string ending with the extension mentioned in template_ext, Jinja reads the content of the file and replace the templates with actual value. Note that Jinja substitutes the operator attributes and not the args.

class HelloOperator(BaseOperator):

    template_fields = ['guest_name']

    @apply_defaults
    def __init__(
            self,
            name: str,
            *args, **kwargs) -> None:
        super().__init__(*args, **kwargs)
        self.guest_name = name

In the example, the template_fields should be ['guest_name'] and not ['name']

Was this entry helpful?