Source code for airflow.providers.docker.decorators.docker

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import base64
import inspect
import os
import pickle
from tempfile import TemporaryDirectory
from textwrap import dedent
from typing import TYPE_CHECKING, Callable, Dict, Iterable, List, Optional, Sequence, TypeVar, Union

import dill

from airflow.decorators.base import DecoratedOperator, task_decorator_factory
from airflow.providers.docker.operators.docker import DockerOperator
from airflow.utils.python_virtualenv import remove_task_decorator, write_python_script

if TYPE_CHECKING:
    from airflow.utils.context import Context


def _generate_decode_command(env_var, file):
    # We don't need `f.close()` as the interpreter is about to exit anyway
    return (
        f'python -c "import base64, os;'
        rf'x = base64.b64decode(os.environ[\"{env_var}\"]);'
        rf'f = open(\"{file}\", \"wb\"); f.write(x);"'
    )


def _b64_encode_file(filename):
    with open(filename, "rb") as file_to_encode:
        return base64.b64encode(file_to_encode.read())


class _DockerDecoratedOperator(DecoratedOperator, DockerOperator):
    """
    Wraps a Python callable and captures args/kwargs when called for execution.

    :param python_callable: A reference to an object that is callable
    :type python_callable: python callable
    :param op_kwargs: a dictionary of keyword arguments that will get unpacked
        in your function (templated)
    :type op_kwargs: dict
    :param op_args: a list of positional arguments that will get unpacked when
        calling your callable (templated)
    :type op_args: list
    :param multiple_outputs: if set, function return value will be
        unrolled to multiple XCom values. Dict will unroll to xcom values with keys as keys.
        Defaults to False.
    :type multiple_outputs: bool
    """

    template_fields: Sequence[str] = ('op_args', 'op_kwargs')

    # since we won't mutate the arguments, we should just do the shallow copy
    # there are some cases we can't deepcopy the objects (e.g protobuf).
    shallow_copy_attrs: Sequence[str] = ('python_callable',)

    def __init__(
        self,
        use_dill=False,
        **kwargs,
    ) -> None:
        command = "dummy command"
        self.pickling_library = dill if use_dill else pickle
        super().__init__(
            command=command, retrieve_output=True, retrieve_output_path="/tmp/script.out", **kwargs
        )

    def execute(self, context: 'Context'):
        with TemporaryDirectory(prefix='venv') as tmp_dir:
            input_filename = os.path.join(tmp_dir, 'script.in')
            script_filename = os.path.join(tmp_dir, 'script.py')

            with open(input_filename, 'wb') as file:
                if self.op_args or self.op_kwargs:
                    self.pickling_library.dump({'args': self.op_args, 'kwargs': self.op_kwargs}, file)
            py_source = self._get_python_source()
            write_python_script(
                jinja_context=dict(
                    op_args=self.op_args,
                    op_kwargs=self.op_kwargs,
                    pickling_library=self.pickling_library.__name__,
                    python_callable=self.python_callable.__name__,
                    python_callable_source=py_source,
                    string_args_global=False,
                ),
                filename=script_filename,
            )

            # Pass the python script to be executed, and the input args, via environment variables. This is
            # more than slightly hacky, but it means it can work when Airflow itself is in the same Docker
            # engine where this task is going to run (unlike say trying to mount a file in)
            self.environment["__PYTHON_SCRIPT"] = _b64_encode_file(script_filename)
            if self.op_args or self.op_kwargs:
                self.environment["__PYTHON_INPUT"] = _b64_encode_file(input_filename)
            else:
                self.environment["__PYTHON_INPUT"] = ""

            self.command = (
                f"""bash -cx  '{_generate_decode_command("__PYTHON_SCRIPT", "/tmp/script.py")} &&"""
                f'{_generate_decode_command("__PYTHON_INPUT", "/tmp/script.in")} &&'
                f'python /tmp/script.py /tmp/script.in /tmp/script.out\''
            )
            return super().execute(context)

    def _get_python_source(self):
        raw_source = inspect.getsource(self.python_callable)
        res = dedent(raw_source)
        res = remove_task_decorator(res, "@task.docker")
        return res


[docs]T = TypeVar("T", bound=Callable)
[docs]def docker_task( python_callable: Optional[Callable] = None, multiple_outputs: Optional[bool] = None, **kwargs ): """ Python operator decorator. Wraps a function into an Airflow operator. Also accepts any argument that DockerOperator will via ``kwargs``. Can be reused in a single DAG. :param python_callable: Function to decorate :type python_callable: Optional[Callable] :param multiple_outputs: if set, function return value will be unrolled to multiple XCom values. List/Tuples will unroll to xcom values with index as key. Dict will unroll to xcom values with keys as XCom keys. Defaults to False. :type multiple_outputs: bool """ return task_decorator_factory( python_callable=python_callable, multiple_outputs=multiple_outputs, decorated_operator_class=_DockerDecoratedOperator, **kwargs,
) # [START decoratormixin]
[docs]class DockerDecoratorMixin: """ Helper class for inheritance. This class is only used during type checking or auto-completion :meta private: """
[docs] def docker( self, multiple_outputs: Optional[bool] = None, use_dill: bool = False, image: str = "", api_version: Optional[str] = None, container_name: Optional[str] = None, cpus: float = 1.0, docker_url: str = 'unix://var/run/docker.sock', environment: Optional[Dict] = None, private_environment: Optional[Dict] = None, force_pull: bool = False, mem_limit: Optional[Union[float, str]] = None, host_tmp_dir: Optional[str] = None, network_mode: Optional[str] = None, tls_ca_cert: Optional[str] = None, tls_client_cert: Optional[str] = None, tls_client_key: Optional[str] = None, tls_hostname: Optional[Union[str, bool]] = None, tls_ssl_version: Optional[str] = None, tmp_dir: str = '/tmp/airflow', user: Optional[Union[str, int]] = None, mounts: Optional[List[str]] = None, working_dir: Optional[str] = None, xcom_all: bool = False, docker_conn_id: Optional[str] = None, dns: Optional[List[str]] = None, dns_search: Optional[List[str]] = None, auto_remove: bool = False, shm_size: Optional[int] = None, tty: bool = False, privileged: bool = False, cap_add: Optional[Iterable[str]] = None, extra_hosts: Optional[Dict[str, str]] = None, **kwargs, ): """ :param python_callable: A python function with no references to outside variables, defined with def, which will be run in a virtualenv :type python_callable: function :param multiple_outputs: if set, function return value will be unrolled to multiple XCom values. List/Tuples will unroll to xcom values with index as key. Dict will unroll to xcom values with keys as XCom keys. Defaults to False. :type multiple_outputs: bool :param use_dill: Whether to use dill or pickle for serialization :type use_dill: bool :param image: Docker image from which to create the container. If image tag is omitted, "latest" will be used. :type image: str :param api_version: Remote API version. Set to ``auto`` to automatically detect the server's version. :type api_version: str :param container_name: Name of the container. Optional (templated) :type container_name: str or None :param cpus: Number of CPUs to assign to the container. This value gets multiplied with 1024. See https://docs.docker.com/engine/reference/run/#cpu-share-constraint :type cpus: float :param docker_url: URL of the host running the docker daemon. Default is unix://var/run/docker.sock :type docker_url: str :param environment: Environment variables to set in the container. (templated) :type environment: dict :param private_environment: Private environment variables to set in the container. These are not templated, and hidden from the website. :type private_environment: dict :param force_pull: Pull the docker image on every run. Default is False. :type force_pull: bool :param mem_limit: Maximum amount of memory the container can use. Either a float value, which represents the limit in bytes, or a string like ``128m`` or ``1g``. :type mem_limit: float or str :param host_tmp_dir: Specify the location of the temporary directory on the host which will be mapped to tmp_dir. If not provided defaults to using the standard system temp directory. :type host_tmp_dir: str :param network_mode: Network mode for the container. :type network_mode: str :param tls_ca_cert: Path to a PEM-encoded certificate authority to secure the docker connection. :type tls_ca_cert: str :param tls_client_cert: Path to the PEM-encoded certificate used to authenticate docker client. :type tls_client_cert: str :param tls_client_key: Path to the PEM-encoded key used to authenticate docker client. :type tls_client_key: str :param tls_hostname: Hostname to match against the docker server certificate or False to disable the check. :type tls_hostname: str or bool :param tls_ssl_version: Version of SSL to use when communicating with docker daemon. :type tls_ssl_version: str :param tmp_dir: Mount point inside the container to a temporary directory created on the host by the operator. The path is also made available via the environment variable ``AIRFLOW_TMP_DIR`` inside the container. :type tmp_dir: str :param user: Default user inside the docker container. :type user: int or str :param mounts: List of mounts to mount into the container, e.g. ``['/host/path:/container/path', '/host/path2:/container/path2:ro']``. :type mounts: list :param working_dir: Working directory to set on the container (equivalent to the -w switch the docker client) :type working_dir: str :param xcom_all: Push all the stdout or just the last line. The default is False (last line). :type xcom_all: bool :param docker_conn_id: ID of the Airflow connection to use :type docker_conn_id: str :param dns: Docker custom DNS servers :type dns: list[str] :param dns_search: Docker custom DNS search domain :type dns_search: list[str] :param auto_remove: Auto-removal of the container on daemon side when the container's process exits. The default is False. :type auto_remove: bool :param shm_size: Size of ``/dev/shm`` in bytes. The size must be greater than 0. If omitted uses system default. :type shm_size: int :param tty: Allocate pseudo-TTY to the container This needs to be set see logs of the Docker container. :type tty: bool :param privileged: Give extended privileges to this container. :type privileged: bool :param cap_add: Include container capabilities :type cap_add: list[str] """ ...
# [END decoratormixin]

Was this entry helpful?