# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importcontextlibimportosimportsignalfromcollectionsimportnamedtuplefromsubprocessimportPIPE,STDOUT,PopenfromtempfileimportTemporaryDirectory,gettempdirfromtypingimportDict,List,Optionalfromairflow.hooks.baseimportBaseHook
[docs]classSubprocessHook(BaseHook):"""Hook for running processes with the ``subprocess`` module"""def__init__(self)->None:self.sub_process:Optional[Popen[bytes]]=Nonesuper().__init__()
[docs]defrun_command(self,command:List[str],env:Optional[Dict[str,str]]=None,output_encoding:str='utf-8',cwd:Optional[str]=None,)->SubprocessResult:""" Execute the command. If ``cwd`` is None, execute the command in a temporary directory which will be cleaned afterwards. If ``env`` is not supplied, ``os.environ`` is passed :param command: the command to run :param env: Optional dict containing environment variables to be made available to the shell environment in which ``command`` will be executed. If omitted, ``os.environ`` will be used. Note, that in case you have Sentry configured, original variables from the environment will also be passed to the subprocess with ``SUBPROCESS_`` prefix. See :doc:`/logging-monitoring/errors` for details. :param output_encoding: encoding to use for decoding stdout :param cwd: Working directory to run the command in. If None (default), the command is run in a temporary directory. :return: :class:`namedtuple` containing ``exit_code`` and ``output``, the last line from stderr or stdout """self.log.info('Tmp dir root location: \n%s',gettempdir())withcontextlib.ExitStack()asstack:ifcwdisNone:cwd=stack.enter_context(TemporaryDirectory(prefix='airflowtmp'))defpre_exec():# Restore default signal disposition and invoke setsidforsigin('SIGPIPE','SIGXFZ','SIGXFSZ'):ifhasattr(signal,sig):signal.signal(getattr(signal,sig),signal.SIG_DFL)os.setsid()self.log.info('Running command: %s',command)self.sub_process=Popen(command,stdout=PIPE,stderr=STDOUT,cwd=cwd,env=envifenvorenv=={}elseos.environ,preexec_fn=pre_exec,)self.log.info('Output:')line=''ifself.sub_processisNone:raiseRuntimeError("The subprocess should be created here and is None!")ifself.sub_process.stdoutisnotNone:forraw_lineiniter(self.sub_process.stdout.readline,b''):line=raw_line.decode(output_encoding,errors='backslashreplace').rstrip()self.log.info("%s",line)self.sub_process.wait()self.log.info('Command exited with return code %s',self.sub_process.returncode)return_code:int=self.sub_process.returncodereturnSubprocessResult(exit_code=return_code,output=line)
[docs]defsend_sigterm(self):"""Sends SIGTERM signal to ``self.sub_process`` if one exists."""self.log.info('Sending SIGTERM signal to process group')ifself.sub_processandhasattr(self.sub_process,'pid'):os.killpg(os.getpgid(self.sub_process.pid),signal.SIGTERM)