Source code for airflow.hooks.webhdfs_hook

# -*- coding: utf-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

from hdfs import InsecureClient, HdfsError

from airflow import configuration
from airflow.exceptions import AirflowException
from airflow.hooks.base_hook import BaseHook
from airflow.utils.log.logging_mixin import LoggingMixin

_kerberos_security_mode = configuration.conf.get("core", "security") == "kerberos"
if _kerberos_security_mode:
        from hdfs.ext.kerberos import KerberosClient
    except ImportError:
        log = LoggingMixin().log
        log.error("Could not load the Kerberos extension for the WebHDFSHook.")

class AirflowWebHDFSHookException(AirflowException):

[docs]class WebHDFSHook(BaseHook): """ Interact with HDFS. This class is a wrapper around the hdfscli library. """ def __init__(self, webhdfs_conn_id='webhdfs_default', proxy_user=None): self.webhdfs_conn_id = webhdfs_conn_id self.proxy_user = proxy_user
[docs] def get_conn(self): """ Returns a hdfscli InsecureClient object. """ nn_connections = self.get_connections(self.webhdfs_conn_id) for nn in nn_connections: try: self.log.debug('Trying namenode %s', connection_str = 'http://{}:{nn.port}'.format(nn=nn) if _kerberos_security_mode: client = KerberosClient(connection_str) else: proxy_user = self.proxy_user or nn.login client = InsecureClient(connection_str, user=proxy_user) client.status('/') self.log.debug('Using namenode %s for hook', return client except HdfsError as e: self.log.debug( "Read operation on namenode {} " "failed with error: {e}".format(**locals()) ) nn_hosts = [ for c in nn_connections] no_nn_error = "Read operations failed " \ "on the namenodes below:\n{}".format("\n".join(nn_hosts)) raise AirflowWebHDFSHookException(no_nn_error)
[docs] def check_for_path(self, hdfs_path): """ Check for the existence of a path in HDFS by querying FileStatus. """ c = self.get_conn() return bool(c.status(hdfs_path, strict=False))
[docs] def load_file(self, source, destination, overwrite=True, parallelism=1, **kwargs): """ Uploads a file to HDFS :param source: Local path to file or folder. If a folder, all the files inside of it will be uploaded (note that this implies that folders empty of files will not be created remotely). :type source: str :param destination: PTarget HDFS path. If it already exists and is a directory, files will be uploaded inside. :type destination: str :param overwrite: Overwrite any existing file or directory. :type overwrite: bool :param parallelism: Number of threads to use for parallelization. A value of `0` (or negative) uses as many threads as there are files. :type parallelism: int :param \*\*kwargs: Keyword arguments forwarded to :meth:`upload`. """ c = self.get_conn() c.upload(hdfs_path=destination, local_path=source, overwrite=overwrite, n_threads=parallelism, **kwargs) self.log.debug("Uploaded file %s to %s", source, destination)