Source code for airflow.providers.apache.spark.hooks.spark_connect
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromtypingimportAnyfromurllib.parseimportquote,urlparse,urlunparsefromairflow.hooks.baseimportBaseHookfromairflow.utils.log.logging_mixinimportLoggingMixin
[docs]classSparkConnectHook(BaseHook,LoggingMixin):"""Hook for Spark Connect."""# from pyspark's ChannelBuilder
[docs]defget_ui_field_behaviour(cls)->dict[str,Any]:"""Return custom UI field behaviour for Spark Connect connection."""return{"hidden_fields":["schema",],"relabeling":{"password":"Token","login":"User ID"},}
@classmethod
[docs]defget_connection_form_widgets(cls)->dict[str,Any]:"""Return connection widgets to add to Spark Connect connection form."""fromflask_babelimportlazy_gettextfromwtformsimportBooleanFieldreturn{SparkConnectHook.PARAM_USE_SSL:BooleanField(lazy_gettext("Use SSL"),default=False),}
[docs]defget_connection_url(self)->str:conn=self.get_connection(self._conn_id)host=conn.hostifconn.host.find("://")==-1:host=f"sc://{conn.host}"ifconn.port:host=f"{conn.host}:{conn.port}"url=urlparse(host)ifurl.path:raiseValueError("Path {url.path} is not supported in Spark Connect connection URL")params=[]ifconn.login:params.append(f"{SparkConnectHook.PARAM_USER_ID}={quote(conn.login)}")ifconn.password:params.append(f"{SparkConnectHook.PARAM_TOKEN}={quote(conn.password)}")use_ssl=conn.extra_dejson.get(SparkConnectHook.PARAM_USE_SSL)ifuse_sslisnotNone:params.append(f"{SparkConnectHook.PARAM_USE_SSL}={quote(str(use_ssl))}")returnurlunparse(("sc",url.netloc,"/",";".join(params),# params"",url.fragment,))