## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Hook for additional Package Indexes (Python)."""from__future__importannotationsimportsubprocessfromtypingimportAnyfromurllib.parseimportquote,urlparsefromairflow.hooks.baseimportBaseHook
[docs]classPackageIndexHook(BaseHook):"""Specify package indexes/Python package sources using Airflow connections."""
[docs]defget_ui_field_behaviour()->dict[str,Any]:"""Return custom field behaviour."""return{"hidden_fields":["schema","port","extra"],"relabeling":{"host":"Package Index URL"},"placeholders":{"host":"Example: https://my-package-mirror.net/pypi/repo-name/simple","login":"Username for package index","password":"Password for package index (will be masked)",},}
@staticmethoddef_get_basic_auth_conn_url(index_url:str,user:str|None,password:str|None)->str:"""Return a connection URL with basic auth credentials based on connection config."""url=urlparse(index_url)host=url.netloc.split("@")[-1]ifuser:ifpassword:host=f"{quote(user)}:{quote(password)}@{host}"else:host=f"{quote(user)}@{host}"returnurl._replace(netloc=host).geturl()
[docs]defget_conn(self)->Any:"""Return connection for the hook."""returnself.get_connection_url()
[docs]defget_connection_url(self)->Any:"""Return a connection URL with embedded credentials."""conn=self.get_connection(self.pi_conn_id)index_url=conn.hostifnotindex_url:raiseValueError("Please provide an index URL.")returnself._get_basic_auth_conn_url(index_url,conn.login,conn.password)
[docs]deftest_connection(self)->tuple[bool,str]:"""Test connection to package index url."""conn_url=self.get_connection_url()proc=subprocess.run(["pip","search","not-existing-test-package","--no-input","--index",conn_url],check=False,capture_output=True,)conn=self.get_connection(self.pi_conn_id)ifproc.returncodenotin[0,# executed successfully, found package23,# executed successfully, didn't find any packages# (but we do not expect it to find 'not-existing-test-package')]:returnFalse,f"Connection test to {conn.host} failed. Error: {str(proc.stderr)}"returnTrue,f"Connection to {conn.host} tested successfully!"