Source code for airflow.providers.apache.hdfs.hooks.webhdfs
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""Hook for Web HDFS."""from__future__importannotationsimportloggingimportsocketfromtypingimportAnyimportrequestsfromhdfsimportHdfsError,InsecureClientfromairflow.configurationimportconffromairflow.exceptionsimportAirflowExceptionfromairflow.hooks.baseimportBaseHook
_kerberos_security_mode=conf.get("core","security")=="kerberos"if_kerberos_security_mode:try:fromhdfs.ext.kerberosimportKerberosClientexceptImportError:log.error("Could not load the Kerberos extension for the WebHDFSHook.")raise
[docs]classAirflowWebHDFSHookException(AirflowException):"""Exception specific for WebHDFS hook."""
[docs]classWebHDFSHook(BaseHook):""" Interact with HDFS. This class is a wrapper around the hdfscli library. :param webhdfs_conn_id: The connection id for the webhdfs client to connect to. :param proxy_user: The user used to authenticate. """
[docs]defget_conn(self)->Any:""" Establish a connection depending on the security mode set via config or environment variable. :return: a hdfscli InsecureClient or KerberosClient object. """connection=self._find_valid_server()ifconnectionisNone:raiseAirflowWebHDFSHookException("Failed to locate the valid server.")returnconnection
def_find_valid_server(self)->Any:connection=self.get_connection(self.webhdfs_conn_id)namenodes=connection.host.split(",")fornamenodeinnamenodes:host_socket=socket.socket(socket.AF_INET,socket.SOCK_STREAM)self.log.info("Trying to connect to %s:%s",namenode,connection.port)try:conn_check=host_socket.connect_ex((namenode,connection.port))ifconn_check==0:self.log.info("Trying namenode %s",namenode)client=self._get_client(namenode,connection.port,connection.login,connection.get_password(),connection.schema,connection.extra_dejson,)client.status("/")self.log.info("Using namenode %s for hook",namenode)host_socket.close()returnclientelse:self.log.warning("Could not connect to %s:%s",namenode,connection.port)exceptHdfsErrorashdfs_error:self.log.info("Read operation on namenode %s failed with error: %s",namenode,hdfs_error)returnNonedef_get_client(self,namenode:str,port:int,login:str,password:str|None,schema:str,extra_dejson:dict)->Any:""" Get WebHDFS client. Additional options via ``extra``: - use_ssl: enable SSL connection (default: False) - verify: CA certificate path or boolean for SSL verification (default: False) - cert: client certificate path for mTLS, can be combined cert or used with ``key`` - key: client key path for mTLS with ``cert`` """connection_str=f"http://{namenode}"session=requests.Session()ifpasswordisnotNone:session.auth=(login,password)ifextra_dejson.get("use_ssl","False")=="True"orextra_dejson.get("use_ssl",False):connection_str=f"https://{namenode}"session.verify=extra_dejson.get("verify",False)# Handle mTLS certificatescert=extra_dejson.get("cert")key=extra_dejson.get("key")ifcert:ifkey:session.cert=(cert,key)else:session.cert=certifportisnotNone:connection_str+=f":{port}"ifschemaisnotNone:connection_str+=f"/{schema}"if_kerberos_security_mode:returnKerberosClient(connection_str,session=session)proxy_user=self.proxy_userorloginreturnInsecureClient(connection_str,user=proxy_user,session=session)
[docs]defcheck_for_path(self,hdfs_path:str)->bool:""" Check for the existence of a path in HDFS by querying FileStatus. :param hdfs_path: The path to check. :return: True if the path exists and False if not. """conn=self.get_conn()status=conn.status(hdfs_path,strict=False)returnbool(status)
[docs]defload_file(self,source:str,destination:str,overwrite:bool=True,parallelism:int=1,**kwargs:Any)->None:""" Upload a file to HDFS. :param source: Local path to file or folder. If it's a folder, all the files inside it will be uploaded. .. note:: This implies that folders empty of files will not be created remotely. :param destination: PTarget HDFS path. If it already exists and is a directory, files will be uploaded inside. :param overwrite: Overwrite any existing file or directory. :param parallelism: Number of threads to use for parallelization. A value of `0` (or negative) uses as many threads as there are files. :param kwargs: Keyword arguments forwarded to :meth:`hdfs.client.Client.upload`. """conn=self.get_conn()conn.upload(hdfs_path=destination,local_path=source,overwrite=overwrite,n_threads=parallelism,**kwargs)self.log.debug("Uploaded file %s to %s",source,destination)
[docs]defread_file(self,filename:str)->bytes:""" Read a file from HDFS. :param filename: The path of the file to read. :return: File content as a raw string """conn=self.get_conn()withconn.read(filename)asreader:content=reader.read()returncontent