Source code for airflow.sensors.bash

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

import os
from subprocess import PIPE, STDOUT, Popen
from tempfile import NamedTemporaryFile, TemporaryDirectory, gettempdir
from typing import Sequence

from airflow.exceptions import AirflowFailException
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context


[docs]class BashSensor(BaseSensorOperator): """ Executes a bash command/script. Return True if and only if the return code is 0. :param bash_command: The command, set of commands or reference to a bash script (must be '.sh') to be executed. :param env: If env is not None, it must be a mapping that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated) :param output_encoding: output encoding of bash command. :param retry_exit_code: If task exits with this code, treat the sensor as not-yet-complete and retry the check later according to the usual retry/timeout settings. Any other non-zero return code will be treated as an error, and cause the sensor to fail. If set to ``None`` (the default), any non-zero exit code will cause a retry and the task will never raise an error except on time-out. .. seealso:: For more information on how to use this sensor,take a look at the guide: :ref:`howto/operator:BashSensor` """
[docs] template_fields: Sequence[str] = ("bash_command", "env")
def __init__( self, *, bash_command, env=None, output_encoding="utf-8", retry_exit_code: int | None = None, **kwargs ): super().__init__(**kwargs) self.bash_command = bash_command self.env = env self.output_encoding = output_encoding self.retry_exit_code = retry_exit_code
[docs] def poke(self, context: Context): """Execute the bash command in a temporary directory.""" bash_command = self.bash_command self.log.info("Tmp dir root location: %s", gettempdir()) with TemporaryDirectory(prefix="airflowtmp") as tmp_dir, NamedTemporaryFile( dir=tmp_dir, prefix=self.task_id ) as f: f.write(bytes(bash_command, "utf_8")) f.flush() fname = f.name script_location = tmp_dir + "/" + fname self.log.info("Temporary script location: %s", script_location) self.log.info("Running command: %s", bash_command) with Popen( ["bash", fname], stdout=PIPE, stderr=STDOUT, close_fds=True, cwd=tmp_dir, env=self.env, preexec_fn=os.setsid, ) as resp: if resp.stdout: self.log.info("Output:") for line in iter(resp.stdout.readline, b""): self.log.info(line.decode(self.output_encoding).strip()) resp.wait() self.log.info("Command exited with return code %s", resp.returncode) # zero code means success, the sensor can go green if resp.returncode == 0: return True # we have a retry exit code, sensor retries if return code matches, otherwise error elif self.retry_exit_code is not None: if resp.returncode == self.retry_exit_code: self.log.info("Return code matches retry code, will retry later") return False else: raise AirflowFailException(f"Command exited with return code {resp.returncode}") # backwards compatibility: sensor retries no matter the error code else: self.log.info("Non-zero return code and no retry code set, will retry later") return False

Was this entry helpful?