Source code for airflow.providers.ssh.operators.ssh
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportwarningsfrombase64importb64encodefromtypingimportTYPE_CHECKING,Sequencefromairflow.configurationimportconffromairflow.exceptionsimportAirflowException,AirflowProviderDeprecationWarningfromairflow.modelsimportBaseOperatorfromairflow.utils.typesimportNOTSET,ArgNotSetifTYPE_CHECKING:fromparamiko.clientimportSSHClientfromairflow.providers.ssh.hooks.sshimportSSHHook
[docs]classSSHOperator(BaseOperator):""" SSHOperator to execute commands on given remote host using the ssh_hook. :param ssh_hook: predefined ssh_hook to use for remote execution. Either `ssh_hook` or `ssh_conn_id` needs to be provided. :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>` from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook` is provided. :param remote_host: remote host to connect (templated) Nullable. If provided, it will replace the `remote_host` which was defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`. :param command: command to execute on remote host. (templated) :param conn_timeout: timeout (in seconds) for maintaining the connection. The default is 10 seconds. Nullable. If provided, it will replace the `conn_timeout` which was predefined in the connection of `ssh_conn_id`. :param cmd_timeout: timeout (in seconds) for executing the command. The default is 10 seconds. Nullable, `None` means no timeout. If provided, it will replace the `cmd_timeout` which was predefined in the connection of `ssh_conn_id`. :param environment: a dict of shell environment variables. Note that the server will reject them silently if `AcceptEnv` is not set in SSH config. (templated) :param get_pty: request a pseudo-terminal from the server. Set to ``True`` to have the remote process killed upon task timeout. The default is ``False`` but note that `get_pty` is forced to ``True`` when the `command` starts with ``sudo``. :param banner_timeout: timeout to wait for banner from the server in seconds If *do_xcom_push* is *True*, the numeric exit code emitted by the ssh session is pushed to XCom under key ``ssh_exit``. """
[docs]defget_hook(self)->SSHHook:fromairflow.providers.ssh.hooks.sshimportSSHHookifself.ssh_conn_id:ifself.ssh_hookandisinstance(self.ssh_hook,SSHHook):self.log.info("ssh_conn_id is ignored when ssh_hook is provided.")else:self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")self.ssh_hook=SSHHook(ssh_conn_id=self.ssh_conn_id,conn_timeout=self.conn_timeout,cmd_timeout=self.cmd_timeout,banner_timeout=self.banner_timeout,)ifnotself.ssh_hook:raiseAirflowException("Cannot operate without ssh_hook or ssh_conn_id.")ifself.remote_hostisnotNone:self.log.info("remote_host is provided explicitly. ""It will replace the remote_host which was defined ""in ssh_hook or predefined in connection of ssh_conn_id.")self.ssh_hook.remote_host=self.remote_hostreturnself.ssh_hook
[docs]defget_ssh_client(self)->SSHClient:# Remember to use context manager or call .close() on this when doneself.log.info("Creating ssh_client")returnself.get_hook().get_conn()
[docs]defexec_ssh_client_command(self,ssh_client:SSHClient,command:str):warnings.warn("exec_ssh_client_command method on SSHOperator is deprecated, call ""`ssh_hook.exec_ssh_client_command` instead",AirflowProviderDeprecationWarning,)assertself.ssh_hookreturnself.ssh_hook.exec_ssh_client_command(ssh_client,command,timeout=self.cmd_timeout,environment=self.environment,get_pty=self.get_pty
)
[docs]defraise_for_status(self,exit_status:int,stderr:bytes,context=None)->None:ifcontextandself.do_xcom_push:ti=context.get("task_instance")ti.xcom_push(key="ssh_exit",value=exit_status)ifexit_status!=0:raiseAirflowException(f"SSH operator error: exit status = {exit_status}")
[docs]defexecute(self,context=None)->bytes|str:result:bytes|strifself.commandisNone:raiseAirflowException("SSH operator error: SSH command not specified. Aborting.")# Forcing get_pty to True if the command begins with "sudo".self.get_pty=self.command.startswith("sudo")orself.get_ptywithself.get_ssh_client()asssh_client:result=self.run_ssh_client_command(ssh_client,self.command,context=context)enable_pickling=conf.getboolean("core","enable_xcom_pickling")ifnotenable_pickling:result=b64encode(result).decode("utf-8")returnresult