Source code for airflow.providers.microsoft.psrp.hooks.psrp
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromcollections.abcimportGeneratorfromcontextlibimportcontextmanagerfromcopyimportcopyfromloggingimportDEBUG,ERROR,INFO,WARNINGfromtypingimportTYPE_CHECKING,Any,CallablefromweakrefimportWeakKeyDictionaryfrompypsrp.hostimportPSHostfrompypsrp.messagesimportMessageTypefrompypsrp.powershellimportPowerShell,PSInvocationState,RunspacePoolfrompypsrp.wsmanimportWSManfromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHook
[docs]classPsrpHook(BaseHook):""" Hook for PowerShell Remoting Protocol execution. When used as a context manager, the runspace pool is reused between shell sessions. :param psrp_conn_id: Required. The name of the PSRP connection. :param logging_level: Logging level for message streams which are received during remote execution. The default is to include all messages in the task log. :param operation_timeout: Override the default WSMan timeout when polling the pipeline. :param runspace_options: Optional dictionary which is passed when creating the runspace pool. See :py:class:`~pypsrp.powershell.RunspacePool` for a description of the available options. :param wsman_options: Optional dictionary which is passed when creating the `WSMan` client. See :py:class:`~pypsrp.wsman.WSMan` for a description of the available options. :param on_output_callback: Optional callback function to be called whenever an output response item is received during job status polling. :param exchange_keys: If true (default), automatically initiate a session key exchange when the hook is used as a context manager. :param host: Optional PowerShell host instance. If this is not set, the default implementation will be used. You can provide an alternative `configuration_name` using either `runspace_options` or by setting this key as the extra fields of your connection. """
[docs]defget_conn(self)->RunspacePool:""" Return a runspace pool. The returned object must be used as a context manager. """conn=self.get_connection(self.conn_id)self.log.info("Establishing WinRM connection %s to host: %s",self.conn_id,conn.host)extra=conn.extra_dejson.copy()defapply_extra(d,keys):d=d.copy()forkeyinkeys:value=extra.pop(key,None)ifvalueisnotNone:d[key]=valuereturndwsman_options=apply_extra(self._wsman_options,("auth","cert_validation","connection_timeout","locale","read_timeout","reconnection_retries","reconnection_backoff","ssl",),)wsman=WSMan(conn.host,username=conn.login,password=conn.password,**wsman_options)runspace_options=apply_extra(self._runspace_options,("configuration_name",))ifextra:raiseAirflowException(f"Unexpected extra configuration keys: {', '.join(sorted(extra))}")pool=RunspacePool(wsman,host=self._host,**runspace_options)self._wsman_ref[pool]=wsmanreturnpool
@contextmanager
[docs]definvoke(self)->Generator[PowerShell,None,None]:""" Yield a PowerShell object to which commands can be added. Upon exit, the commands will be invoked. """logger=copy(self.log)logger.setLevel(self._logging_level)local_context=self._connisNoneiflocal_context:self.__enter__()try:ifTYPE_CHECKING:assertself._connisnotNoneps=PowerShell(self._conn)yieldpsps.begin_invoke()streams=[ps.output,ps.streams.debug,ps.streams.error,ps.streams.information,ps.streams.progress,ps.streams.verbose,ps.streams.warning,]offsets=[0for_instreams]# We're using polling to make sure output and streams are# handled while the process is running.whileps.state==PSInvocationState.RUNNING:ps.poll_invoke(timeout=self._operation_timeout)fori,streaminenumerate(streams):offset=offsets[i]whilelen(stream)>offset:record=stream[offset]# Records received on the output stream during job# status polling are handled via an optional callback,# while the other streams are simply logged.ifstreamisps.output:ifself._on_output_callbackisnotNone:self._on_output_callback(record)else:self._log_record(logger.log,record)offset+=1offsets[i]=offset# For good measure, we'll make sure the process has# stopped running in any case.ps.end_invoke()self.log.info("Invocation state: %s",str(PSInvocationState(ps.state)))ifps.streams.error:raiseAirflowException("Process had one or more errors")finally:iflocal_context:self.__exit__(None,None,None)
[docs]definvoke_cmdlet(self,name:str,use_local_scope:bool|None=None,arguments:list[str]|None=None,parameters:dict[str,str]|None=None,)->PowerShell:"""Invoke a PowerShell cmdlet and return session."""withself.invoke()asps:ps.add_cmdlet(name,use_local_scope=use_local_scope)forargumentinargumentsor():ps.add_argument(argument)ifparameters:ps.add_parameters(parameters)returnps
[docs]definvoke_powershell(self,script:str)->PowerShell:"""Invoke a PowerShell script and return session."""withself.invoke()asps:ps.add_script(script)returnps
def_log_record(self,log,record):message_type=record.MESSAGE_TYPEifmessage_type==MessageType.ERROR_RECORD:log(INFO,"%s: %s",record.reason,record)ifrecord.script_stacktrace:fortraceinrecord.script_stacktrace.splitlines():log(INFO,trace)level=INFORMATIONAL_RECORD_LEVEL_MAP.get(message_type)iflevelisnotNone:try:message=str(record.message)exceptBaseExceptionasexc:# See https://github.com/jborean93/pypsrp/pull/130message=str(exc)# Sometimes a message will have a trailing \r\n sequence such as# the tracing output of the Set-PSDebug cmdlet.message=message.rstrip()ifrecord.command_nameisNone:log(level,"%s",message)else:log(level,"%s: %s",record.command_name,message)elifmessage_type==MessageType.INFORMATION_RECORD:log(INFO,"%s (%s): %s",record.computer,record.user,record.message_data)elifmessage_type==MessageType.PROGRESS_RECORD:log(INFO,"Progress: %s (%s)",record.activity,record.description)else:log(WARNING,"Unsupported message type: %s",message_type)