Source code for airflow.providers.papermill.hooks.kernel
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttypingfromjupyter_clientimportAsyncKernelManagerfrompapermill.clientwrapimportPapermillNotebookClientfrompapermill.enginesimportNBClientEnginefrompapermill.utilsimportmerge_kwargs,remove_argsfromtraitletsimportUnicodefromairflow.hooks.baseimportBaseHook
[docs]classKernelHook(BaseHook):""" The KernelHook can be used to interact with remote jupyter kernel. Takes kernel host/ip from connection and refers to jupyter kernel ports and session_key from ``extra`` field. :param kernel_conn_id: connection that has kernel host/ip """
[docs]defclient(self,**kwargs:typing.Any):"""Create a client configured to connect to our kernel."""kernel_client=super().client(**kwargs)# load connection info to set session_keyconfig:dict[str,int|str|bytes]=dict(ip=self.ip,shell_port=self.shell_port,iopub_port=self.iopub_port,stdin_port=self.stdin_port,control_port=self.control_port,hb_port=self.hb_port,key=self.session_key,transport="tcp",signature_scheme="hmac-sha256",)kernel_client.load_connection_info(config)returnkernel_client
[docs]classRemoteKernelEngine(NBClientEngine):"""Papermill engine to use ``RemoteKernelManager`` to connect to remote kernel and execute notebook."""@classmethod
[docs]defexecute_managed_notebook(cls,nb_man,kernel_name,log_output=False,stdout_file=None,stderr_file=None,start_timeout=60,execution_timeout=None,**kwargs,):"""Perform the actual execution of the parameterized notebook locally."""km=RemoteKernelManager()km.ip=kwargs["kernel_ip"]km.shell_port=kwargs["kernel_shell_port"]km.iopub_port=kwargs["kernel_iopub_port"]km.stdin_port=kwargs["kernel_stdin_port"]km.control_port=kwargs["kernel_control_port"]km.hb_port=kwargs["kernel_hb_port"]km.ip=kwargs["kernel_ip"]km.session_key=kwargs["kernel_session_key"]# Exclude parameters that named differently downstreamsafe_kwargs=remove_args(["timeout","startup_timeout"],**kwargs)final_kwargs=merge_kwargs(safe_kwargs,timeout=execution_timeoutifexecution_timeoutelsekwargs.get("timeout"),startup_timeout=start_timeout,log_output=False,stdout_file=stdout_file,stderr_file=stderr_file,)returnPapermillNotebookClient(nb_man,km=km,**final_kwargs).execute()