Source code for airflow.providers.ssh.operators.ssh
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfrombase64importb64encodefromcollections.abcimportContainer,Sequencefromfunctoolsimportcached_propertyfromtypingimportTYPE_CHECKINGfromairflow.configurationimportconffromairflow.exceptionsimportAirflowException,AirflowSkipExceptionfromairflow.modelsimportBaseOperatorfromairflow.providers.ssh.hooks.sshimportSSHHookfromairflow.utils.typesimportNOTSET,ArgNotSetifTYPE_CHECKING:fromparamiko.clientimportSSHClient
[docs]classSSHOperator(BaseOperator):""" SSHOperator to execute commands on given remote host using the ssh_hook. :param ssh_hook: predefined ssh_hook to use for remote execution. Either `ssh_hook` or `ssh_conn_id` needs to be provided. :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>` from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook` is provided. :param remote_host: remote host to connect (templated) Nullable. If provided, it will replace the `remote_host` which was defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`. :param command: command to execute on remote host. (templated) :param conn_timeout: timeout (in seconds) for maintaining the connection. The default is 10 seconds. Nullable. If provided, it will replace the `conn_timeout` which was predefined in the connection of `ssh_conn_id`. :param cmd_timeout: timeout (in seconds) for executing the command. The default is 10 seconds. Nullable, `None` means no timeout. If provided, it will replace the `cmd_timeout` which was predefined in the connection of `ssh_conn_id`. :param environment: a dict of shell environment variables. Note that the server will reject them silently if `AcceptEnv` is not set in SSH config. (templated) :param get_pty: request a pseudo-terminal from the server. Set to ``True`` to have the remote process killed upon task timeout. The default is ``False`` but note that `get_pty` is forced to ``True`` when the `command` starts with ``sudo``. :param banner_timeout: timeout to wait for banner from the server in seconds :param skip_on_exit_code: If command exits with this exit code, leave the task in ``skipped`` state (default: None). If set to ``None``, any non-zero exit code will be treated as a failure. If *do_xcom_push* is *True*, the numeric exit code emitted by the ssh session is pushed to XCom under key ``ssh_exit``. """
[docs]defssh_hook(self)->SSHHook:"""Create SSHHook to run commands on remote host."""ifself.ssh_conn_id:self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")hook=SSHHook(ssh_conn_id=self.ssh_conn_id,conn_timeout=self.conn_timeout,cmd_timeout=self.cmd_timeout,banner_timeout=self.banner_timeout,)ifself.remote_hostisnotNone:self.log.info("remote_host is provided explicitly. ""It will replace the remote_host which was defined ""in ssh_hook or predefined in connection of ssh_conn_id.")hook.remote_host=self.remote_hostreturnhookraiseAirflowException("Cannot operate without ssh_hook or ssh_conn_id.")
[docs]defexecute(self,context=None)->bytes|str:result:bytes|strifself.commandisNone:raiseAirflowException("SSH operator error: SSH command not specified. Aborting.")# Forcing get_pty to True if the command begins with "sudo".self.get_pty=self.command.startswith("sudo")orself.get_ptywithself.get_ssh_client()asssh_client:result=self.run_ssh_client_command(ssh_client,self.command,context=context)# TODO: Remove this after minimum Airflow version is 3.0enable_pickling=conf.getboolean("core","enable_xcom_pickling",fallback=False)ifnotenable_pickling:result=b64encode(result).decode("utf-8")returnresult