Source code for airflow.hooks.subprocess

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import contextlib
import os
import signal
from collections import namedtuple
from subprocess import PIPE, STDOUT, Popen
from tempfile import TemporaryDirectory, gettempdir

from airflow.hooks.base import BaseHook

[docs]SubprocessResult = namedtuple("SubprocessResult", ["exit_code", "output"])
[docs]class SubprocessHook(BaseHook): """Hook for running processes with the ``subprocess`` module.""" def __init__(self, **kwargs) -> None: self.sub_process: Popen[bytes] | None = None super().__init__(**kwargs)
[docs] def run_command( self, command: list[str], env: dict[str, str] | None = None, output_encoding: str = "utf-8", cwd: str | None = None, ) -> SubprocessResult: """ Execute the command. If ``cwd`` is None, execute the command in a temporary directory which will be cleaned afterwards. If ``env`` is not supplied, ``os.environ`` is passed :param command: the command to run :param env: Optional dict containing environment variables to be made available to the shell environment in which ``command`` will be executed. If omitted, ``os.environ`` will be used. Note, that in case you have Sentry configured, original variables from the environment will also be passed to the subprocess with ``SUBPROCESS_`` prefix. See :doc:`/administration-and-deployment/logging-monitoring/errors` for details. :param output_encoding: encoding to use for decoding stdout :param cwd: Working directory to run the command in. If None (default), the command is run in a temporary directory. :return: :class:`namedtuple` containing ``exit_code`` and ``output``, the last line from stderr or stdout """ self.log.info("Tmp dir root location: %s", gettempdir()) with contextlib.ExitStack() as stack: if cwd is None: cwd = stack.enter_context(TemporaryDirectory(prefix="airflowtmp")) def pre_exec(): # Restore default signal disposition and invoke setsid for sig in ("SIGPIPE", "SIGXFZ", "SIGXFSZ"): if hasattr(signal, sig): signal.signal(getattr(signal, sig), signal.SIG_DFL) os.setsid() self.log.info("Running command: %s", command) self.sub_process = Popen( command, stdout=PIPE, stderr=STDOUT, cwd=cwd, env=env if env or env == {} else os.environ, preexec_fn=pre_exec, ) self.log.info("Output:") line = "" if self.sub_process is None: raise RuntimeError("The subprocess should be created here and is None!") if self.sub_process.stdout is not None: for raw_line in iter(self.sub_process.stdout.readline, b""): line = raw_line.decode(output_encoding, errors="backslashreplace").rstrip() self.log.info("%s", line) self.sub_process.wait() self.log.info("Command exited with return code %s", self.sub_process.returncode) return_code: int = self.sub_process.returncode return SubprocessResult(exit_code=return_code, output=line)
[docs] def send_sigterm(self): """Send SIGTERM signal to ``self.sub_process`` if one exists.""" self.log.info("Sending SIGTERM signal to process group") if self.sub_process and hasattr(self.sub_process, "pid"): os.killpg(os.getpgid(self.sub_process.pid), signal.SIGTERM)

Was this entry helpful?