Creating Custom @task
Decorators¶
As of Airflow 2.2 it is possible add custom decorators to the TaskFlow interface from within a provider
package and have those decorators appear natively as part of the @task.____
design.
For an example. Let’s say you were trying to create an easier mechanism to run python functions as “foo”
tasks. The steps to create and register @task.foo
are:
Create a
FooDecoratedOperator
In this case, we are assuming that you have an existing
FooOperator
that takes a python function as an argument. By creating aFooDecoratedOperator
that inherits fromFooOperator
andairflow.decorators.base.DecoratedOperator
, Airflow will supply much of the needed functionality required to treat your new class as a taskflow native class.You should also override the
custom_operator_name
attribute to provide a custom name for the task. For example,_DockerDecoratedOperator
in theapache-airflow-providers-docker
provider sets this to@task.docker
to indicate the decorator name it implements.Create a
foo_task
functionOnce you have your decorated class, create a function like this, to convert the new
FooDecoratedOperator
into a TaskFlow function decorator!from typing import TYPE_CHECKING from airflow.decorators.base import task_decorator_factory if TYPE_CHECKING: from airflow.decorators.base import TaskDecorator def foo_task( python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs, ) -> "TaskDecorator": return task_decorator_factory( python_callable=python_callable, multiple_outputs=multiple_outputs, decorated_operator_class=FooDecoratedOperator, **kwargs, )
Register your new decorator in get_provider_info of your provider
Finally, add a key-value
task-decorators
to the dict returned from the provider entrypoint. This should be a list with each item containingname
andclass-name
keys. When Airflow starts, theProviderManager
class will automatically import this value andtask.foo
will work as a new decorator!def get_provider_info(): return { "package-name": "foo-provider-airflow", "name": "Foo", "task-decorators": [ { "name": "foo", # "Import path" and function name of the `foo_task` "class-name": ["name.of.python.package.foo_task"], } ], # ... }
Please note that the
name
must be a valid python identifier.
(Optional) Adding IDE auto-completion support¶
Note
This section mostly applies to the apache-airflow managed providers. We have not decided if we will allow third-party providers to register auto-completion in this way.
For better or worse, Python IDEs can not auto-complete dynamically generated methods (see JetBrain’s write up on the subject).
To hack around this problem, a type stub airflow/decorators/__init__.pyi
is provided to statically declare
the type signature of each task decorator. A newly added task decorator should declare its signature stub
like this:
airflow/decorators/__init__.pyi
def docker(
self,
*,
multiple_outputs: Optional[bool] = None,
use_dill: bool = False, # Added by _DockerDecoratedOperator.
python_command: str = "python3",
# 'command', 'retrieve_output', and 'retrieve_output_path' are filled by
# _DockerDecoratedOperator.
image: str,
api_version: Optional[str] = None,
container_name: Optional[str] = None,
cpus: float = 1.0,
docker_url: str = "unix://var/run/docker.sock",
environment: Optional[Dict[str, str]] = None,
private_environment: Optional[Dict[str, str]] = None,
force_pull: bool = False,
mem_limit: Optional[Union[float, str]] = None,
host_tmp_dir: Optional[str] = None,
network_mode: Optional[str] = None,
tls_ca_cert: Optional[str] = None,
tls_client_cert: Optional[str] = None,
tls_client_key: Optional[str] = None,
tls_hostname: Optional[Union[str, bool]] = None,
tls_ssl_version: Optional[str] = None,
tmp_dir: str = "/tmp/airflow",
user: Optional[Union[str, int]] = None,
mounts: Optional[List[str]] = None,
working_dir: Optional[str] = None,
xcom_all: bool = False,
docker_conn_id: Optional[str] = None,
dns: Optional[List[str]] = None,
dns_search: Optional[List[str]] = None,
auto_remove: bool = False,
shm_size: Optional[int] = None,
tty: bool = False,
privileged: bool = False,
cap_add: Optional[Iterable[str]] = None,
extra_hosts: Optional[Dict[str, str]] = None,
**kwargs,
) -> TaskDecorator:
"""Create a decorator to convert the decorated callable to a Docker task.
:param multiple_outputs: If set, function return value will be unrolled to multiple XCom values.
Dict will unroll to XCom values with keys as XCom keys. Defaults to False.
:param use_dill: Whether to use dill or pickle for serialization
:param python_command: Python command for executing functions, Default: python3
:param image: Docker image from which to create the container.
If image tag is omitted, "latest" will be used.
:param api_version: Remote API version. Set to ``auto`` to automatically
detect the server's version.
:param container_name: Name of the container. Optional (templated)
:param cpus: Number of CPUs to assign to the container. This value gets multiplied with 1024.
:param docker_url: URL of the host running the docker daemon.
Default is unix://var/run/docker.sock
:param environment: Environment variables to set in the container. (templated)
:param private_environment: Private environment variables to set in the container.
These are not templated, and hidden from the website.
:param force_pull: Pull the docker image on every run. Default is False.
:param mem_limit: Maximum amount of memory the container can use.
Either a float value, which represents the limit in bytes,
or a string like ``128m`` or ``1g``.
:param host_tmp_dir: Specify the location of the temporary directory on the host which will
be mapped to tmp_dir. If not provided defaults to using the standard system temp directory.
:param network_mode: Network mode for the container.
It can be one of the following:
bridge - Create new network stack for the container with default docker bridge network
None - No networking for this container
container:<name|id> - Use the network stack of another container specified via <name|id>
host - Use the host network stack. Incompatible with `port_bindings`
'<network-name>|<network-id>' - Connects the container to user created network(using `docker network create` command)
:param tls_ca_cert: Path to a PEM-encoded certificate authority
to secure the docker connection.
:param tls_client_cert: Path to the PEM-encoded certificate
used to authenticate docker client.
:param tls_client_key: Path to the PEM-encoded key used to authenticate docker client.
:param tls_hostname: Hostname to match against
the docker server certificate or False to disable the check.
:param tls_ssl_version: Version of SSL to use when communicating with docker daemon.
:param tmp_dir: Mount point inside the container to
a temporary directory created on the host by the operator.
The path is also made available via the environment variable
``AIRFLOW_TMP_DIR`` inside the container.
:param user: Default user inside the docker container.
:param mounts: List of mounts to mount into the container, e.g.
``['/host/path:/container/path', '/host/path2:/container/path2:ro']``.
:param working_dir: Working directory to
set on the container (equivalent to the -w switch the docker client)
:param xcom_all: Push all the stdout or just the last line.
The default is False (last line).
:param docker_conn_id: ID of the Airflow connection to use
:param dns: Docker custom DNS servers
:param dns_search: Docker custom DNS search domain
:param auto_remove: Auto-removal of the container on daemon side when the
container's process exits.
The default is False.
:param shm_size: Size of ``/dev/shm`` in bytes. The size must be
greater than 0. If omitted uses system default.
:param tty: Allocate pseudo-TTY to the container
This needs to be set see logs of the Docker container.
:param privileged: Give extended privileges to this container.
:param cap_add: Include container capabilities
"""
The signature should allow only keyword-only arguments, including one named multiple_outputs
that’s
automatically provided by default. All other arguments should be copied directly from the real FooOperator,
and we recommend adding a comment to explain what arguments are filled automatically by FooDecoratedOperator
and thus not included.
If the new decorator can be used without arguments (e.g. @task.python
instead of @task.python()
),
You should also add an overload that takes a single callable immediately after the “real” definition so mypy
can recognize the function as a “bare decorator”:
airflow/decorators/__init__.pyi
@overload
def python(self, python_callable: Callable[FParams, FReturn]) -> Task[FParams, FReturn]: ...
Once the change is merged and the next Airflow (minor or patch) release comes out, users will be able to see your decorator in IDE auto-complete. This auto-complete will change based on the version of the provider that the user has installed.
Please note that this step is not required to create a working decorator, but does create a better experience for users of the provider.