Source code for

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from logging import DEBUG
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Sequence

from jinja2.nativetypes import NativeEnvironment
from pypsrp.powershell import Command
from pypsrp.serializer import TaggedValue

from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from import PsrpHook
from airflow.settings import json

# TODO: Replace with airflow.utils.helpers.exactly_one in Airflow 2.3.
[docs]def exactly_one(*args): return len(set(filter(None, args))) == 1
if TYPE_CHECKING: from airflow.utils.context import Context
[docs]class PsrpOperator(BaseOperator): """PowerShell Remoting Protocol operator. Use one of the 'command', 'cmdlet', or 'powershell' arguments. The 'securestring' template filter can be used to tag a value for serialization into a `System.Security.SecureString` (applicable only for DAGs which have `render_template_as_native_obj=True`). When using the `cmdlet` or `powershell` arguments and when `do_xcom_push` is enabled, the command output is converted to JSON by PowerShell using the `ConvertTo-Json < module/microsoft.powershell.utility/convertto-json>`__ cmdlet such that the operator return value is serializable to an XCom value. :param psrp_conn_id: connection id :param command: command to execute on remote host. (templated) :param powershell: powershell to execute on remote host. (templated) :param cmdlet: cmdlet to execute on remote host (templated). Also used as the default value for `task_id`. :param parameters: When using the `cmdlet` or `powershell` arguments, use this parameter to provide parameters (templated). Note that a parameter with a value of `None` becomes an *argument* (i.e., switch). :param logging_level: Logging level for message streams which are received during remote execution. The default is to include all messages in the task log. :param runspace_options: optional dictionary which is passed when creating the runspace pool. See :py:class:`~pypsrp.powershell.RunspacePool` for a description of the available options. :param wsman_options: optional dictionary which is passed when creating the `WSMan` client. See :py:class:`~pypsrp.wsman.WSMan` for a description of the available options. :param psrp_session_init: Optional command which will be added to the pipeline when a new PowerShell session has been established, prior to invoking the action specified using the `cmdlet`, `command`, or `powershell` parameters. """
[docs] template_fields: Sequence[str] = ( "cmdlet", "command", "parameters", "powershell",
[docs] template_fields_renderers = {"command": "powershell", "powershell": "powershell"}
[docs] ui_color = "#c2e2ff"
def __init__( self, *, psrp_conn_id: str, command: Optional[str] = None, powershell: Optional[str] = None, cmdlet: Optional[str] = None, parameters: Optional[Dict[str, str]] = None, logging_level: int = DEBUG, runspace_options: Optional[Dict[str, Any]] = None, wsman_options: Optional[Dict[str, Any]] = None, psrp_session_init: Optional[Command] = None, **kwargs, ) -> None: args = {command, powershell, cmdlet} if not exactly_one(*args): raise ValueError("Must provide exactly one of 'command', 'powershell', or 'cmdlet'") if parameters and not cmdlet: raise ValueError("Parameters only allowed with 'cmdlet'") if cmdlet: kwargs.setdefault('task_id', cmdlet) super().__init__(**kwargs) self.conn_id = psrp_conn_id self.command = command self.powershell = powershell self.cmdlet = cmdlet self.parameters = parameters self.logging_level = logging_level self.runspace_options = runspace_options self.wsman_options = wsman_options self.psrp_session_init = psrp_session_init
[docs] def execute(self, context: "Context") -> Optional[List[Any]]: with PsrpHook( self.conn_id, logging_level=self.logging_level, runspace_options=self.runspace_options, wsman_options=self.wsman_options, if not self.do_xcom_push else None, ) as hook, hook.invoke() as ps: if self.psrp_session_init is not None: ps.add_command(self.psrp_session_init) if self.command: ps.add_script(f"cmd.exe /c @'\n{self.command}\n'@") else: if self.cmdlet: ps.add_cmdlet(self.cmdlet) else: ps.add_script(self.powershell) if self.parameters: ps.add_parameters(self.parameters) if self.do_xcom_push: ps.add_cmdlet("ConvertTo-Json") if ps.had_errors: raise AirflowException("Process failed") rc = if rc: raise AirflowException(f"Process exited with non-zero status code: {rc}") if not self.do_xcom_push: return None return [json.loads(output) for output in ps.output]
[docs] def get_template_env(self): # Create a template environment overlay in order to leave the underlying # environment unchanged. env = super().get_template_env().overlay() native = isinstance(env, NativeEnvironment) def securestring(value: str): if not native: raise AirflowException( "Filter 'securestring' not applicable to non-native templating environment" ) return TaggedValue("SS", value) env.filters["securestring"] = securestring return env

Was this entry helpful?