Source code for airflow.providers.ssh.operators.ssh
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.importwarningsfrombase64importb64encodefromtypingimportTYPE_CHECKING,Optional,Sequence,Unionfromairflow.configurationimportconffromairflow.exceptionsimportAirflowExceptionfromairflow.modelsimportBaseOperatorifTYPE_CHECKING:fromparamiko.clientimportSSHClientfromairflow.providers.ssh.hooks.sshimportSSHHook
[docs]classSSHOperator(BaseOperator):""" SSHOperator to execute commands on given remote host using the ssh_hook. :param ssh_hook: predefined ssh_hook to use for remote execution. Either `ssh_hook` or `ssh_conn_id` needs to be provided. :param ssh_conn_id: :ref:`ssh connection id<howto/connection:ssh>` from airflow Connections. `ssh_conn_id` will be ignored if `ssh_hook` is provided. :param remote_host: remote host to connect (templated) Nullable. If provided, it will replace the `remote_host` which was defined in `ssh_hook` or predefined in the connection of `ssh_conn_id`. :param command: command to execute on remote host. (templated) :param conn_timeout: timeout (in seconds) for maintaining the connection. The default is 10 seconds. Nullable. If provided, it will replace the `conn_timeout` which was predefined in the connection of `ssh_conn_id`. :param cmd_timeout: timeout (in seconds) for executing the command. The default is 10 seconds. :param timeout: (deprecated) timeout (in seconds) for executing the command. The default is 10 seconds. Use conn_timeout and cmd_timeout parameters instead. :param environment: a dict of shell environment variables. Note that the server will reject them silently if `AcceptEnv` is not set in SSH config. :param get_pty: request a pseudo-terminal from the server. Set to ``True`` to have the remote process killed upon task timeout. The default is ``False`` but note that `get_pty` is forced to ``True`` when the `command` starts with ``sudo``. :param banner_timeout: timeout to wait for banner from the server in seconds """
def__init__(self,*,ssh_hook:Optional["SSHHook"]=None,ssh_conn_id:Optional[str]=None,remote_host:Optional[str]=None,command:Optional[str]=None,timeout:Optional[int]=None,conn_timeout:Optional[int]=None,cmd_timeout:Optional[int]=None,environment:Optional[dict]=None,get_pty:bool=False,banner_timeout:float=30.0,**kwargs,)->None:super().__init__(**kwargs)self.ssh_hook=ssh_hookself.ssh_conn_id=ssh_conn_idself.remote_host=remote_hostself.command=commandself.timeout=timeoutself.conn_timeout=conn_timeoutself.cmd_timeout=cmd_timeoutifself.conn_timeoutisNoneandself.timeout:self.conn_timeout=self.timeoutifself.cmd_timeoutisNone:self.cmd_timeout=self.timeoutifself.timeoutelseCMD_TIMEOUTself.environment=environmentself.get_pty=get_ptyself.banner_timeout=banner_timeoutifself.timeout:warnings.warn('Parameter `timeout` is deprecated.''Please use `conn_timeout` and `cmd_timeout` instead.''The old option `timeout` will be removed in a future version.',DeprecationWarning,stacklevel=2,)
[docs]defget_hook(self)->"SSHHook":fromairflow.providers.ssh.hooks.sshimportSSHHookifself.ssh_conn_id:ifself.ssh_hookandisinstance(self.ssh_hook,SSHHook):self.log.info("ssh_conn_id is ignored when ssh_hook is provided.")else:self.log.info("ssh_hook is not provided or invalid. Trying ssh_conn_id to create SSHHook.")self.ssh_hook=SSHHook(ssh_conn_id=self.ssh_conn_id,conn_timeout=self.conn_timeout,banner_timeout=self.banner_timeout,)ifnotself.ssh_hook:raiseAirflowException("Cannot operate without ssh_hook or ssh_conn_id.")ifself.remote_hostisnotNone:self.log.info("remote_host is provided explicitly. ""It will replace the remote_host which was defined ""in ssh_hook or predefined in connection of ssh_conn_id.")self.ssh_hook.remote_host=self.remote_hostreturnself.ssh_hook
[docs]defget_ssh_client(self)->"SSHClient":# Remember to use context manager or call .close() on this when doneself.log.info('Creating ssh_client')returnself.get_hook().get_conn()
[docs]defexec_ssh_client_command(self,ssh_client:"SSHClient",command:str):warnings.warn('exec_ssh_client_command method on SSHOperator is deprecated, call ''`ssh_hook.exec_ssh_client_command` instead',DeprecationWarning,)assertself.ssh_hookreturnself.ssh_hook.exec_ssh_client_command(ssh_client,command,timeout=self.timeout,environment=self.environment,get_pty=self.get_pty
[docs]defexecute(self,context=None)->Union[bytes,str]:result:Union[bytes,str]ifself.commandisNone:raiseAirflowException("SSH operator error: SSH command not specified. Aborting.")# Forcing get_pty to True if the command begins with "sudo".self.get_pty=self.command.startswith('sudo')orself.get_ptytry:withself.get_ssh_client()asssh_client:result=self.run_ssh_client_command(ssh_client,self.command)exceptExceptionase:raiseAirflowException(f"SSH operator error: {str(e)}")enable_pickling=conf.getboolean('core','enable_xcom_pickling')ifnotenable_pickling:result=b64encode(result).decode('utf-8')returnresult