Source code for airflow.sensors.bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
from subprocess import PIPE, STDOUT, Popen
from tempfile import NamedTemporaryFile, TemporaryDirectory, gettempdir
from typing import Sequence
from airflow.exceptions import AirflowFailException
from airflow.sensors.base import BaseSensorOperator
from airflow.utils.context import Context
[docs]class BashSensor(BaseSensorOperator):
"""
Executes a bash command/script.
Return True if and only if the return code is 0.
:param bash_command: The command, set of commands or reference to a
bash script (must be '.sh') to be executed.
:param env: If env is not None, it must be a mapping that defines the
environment variables for the new process; these are used instead
of inheriting the current process environment, which is the default
behavior. (templated)
:param output_encoding: output encoding of bash command.
:param retry_exit_code: If task exits with this code, treat the sensor
as not-yet-complete and retry the check later according to the
usual retry/timeout settings. Any other non-zero return code will
be treated as an error, and cause the sensor to fail. If set to
``None`` (the default), any non-zero exit code will cause a retry
and the task will never raise an error except on time-out.
.. seealso::
For more information on how to use this sensor,take a look at the guide:
:ref:`howto/operator:BashSensor`
"""
[docs] template_fields: Sequence[str] = ("bash_command", "env")
def __init__(
self, *, bash_command, env=None, output_encoding="utf-8", retry_exit_code: int | None = None, **kwargs
):
super().__init__(**kwargs)
self.bash_command = bash_command
self.env = env
self.output_encoding = output_encoding
self.retry_exit_code = retry_exit_code
[docs] def poke(self, context: Context):
"""Execute the bash command in a temporary directory."""
bash_command = self.bash_command
self.log.info("Tmp dir root location: \n %s", gettempdir())
with TemporaryDirectory(prefix="airflowtmp") as tmp_dir:
with NamedTemporaryFile(dir=tmp_dir, prefix=self.task_id) as f:
f.write(bytes(bash_command, "utf_8"))
f.flush()
fname = f.name
script_location = tmp_dir + "/" + fname
self.log.info("Temporary script location: %s", script_location)
self.log.info("Running command: %s", bash_command)
with Popen(
["bash", fname],
stdout=PIPE,
stderr=STDOUT,
close_fds=True,
cwd=tmp_dir,
env=self.env,
preexec_fn=os.setsid,
) as resp:
if resp.stdout:
self.log.info("Output:")
for line in iter(resp.stdout.readline, b""):
self.log.info(line.decode(self.output_encoding).strip())
resp.wait()
self.log.info("Command exited with return code %s", resp.returncode)
# zero code means success, the sensor can go green
if resp.returncode == 0:
return True
# we have a retry exit code, sensor retries if return code matches, otherwise error
elif self.retry_exit_code is not None:
if resp.returncode == self.retry_exit_code:
self.log.info("Return code matches retry code, will retry later")
return False
else:
raise AirflowFailException(f"Command exited with return code {resp.returncode}")
# backwards compatibility: sensor retries no matter the error code
else:
self.log.info("Non-zero return code and no retry code set, will retry later")
return False