## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportosfromsubprocessimportPIPE,STDOUT,PopenfromtempfileimportNamedTemporaryFile,TemporaryDirectory,gettempdirfromtypingimportSequencefromairflow.sensors.baseimportBaseSensorOperatorfromairflow.utils.contextimportContext
[docs]classBashSensor(BaseSensorOperator):""" Executes a bash command/script. Return True if and only if the return code is 0. :param bash_command: The command, set of commands or reference to a bash script (must be '.sh') to be executed. :param env: If env is not None, it must be a mapping that defines the environment variables for the new process; these are used instead of inheriting the current process environment, which is the default behavior. (templated) :param output_encoding: output encoding of bash command. .. seealso:: For more information on how to use this sensor,take a look at the guide: :ref:`howto/operator:BashSensor` """
[docs]defpoke(self,context:Context):"""Execute the bash command in a temporary directory."""bash_command=self.bash_commandself.log.info("Tmp dir root location: \n%s",gettempdir())withTemporaryDirectory(prefix="airflowtmp")astmp_dir:withNamedTemporaryFile(dir=tmp_dir,prefix=self.task_id)asf:f.write(bytes(bash_command,"utf_8"))f.flush()fname=f.namescript_location=tmp_dir+"/"+fnameself.log.info("Temporary script location: %s",script_location)self.log.info("Running command: %s",bash_command)withPopen(["bash",fname],stdout=PIPE,stderr=STDOUT,close_fds=True,cwd=tmp_dir,env=self.env,preexec_fn=os.setsid,)asresp:ifresp.stdout:self.log.info("Output:")forlineiniter(resp.stdout.readline,b""):self.log.info(line.decode(self.output_encoding).strip())resp.wait()self.log.info("Command exited with return code %s",resp.returncode)returnnotresp.returncode