Source code for airflow.providers.apache.hdfs.log.hdfs_task_handler
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportosimportpathlibimportshutilfromfunctoolsimportcached_propertyfromurllib.parseimporturlsplitfromairflow.configurationimportconffromairflow.providers.apache.hdfs.hooks.webhdfsimportWebHDFSHookfromairflow.utils.log.file_task_handlerimportFileTaskHandlerfromairflow.utils.log.logging_mixinimportLoggingMixin
[docs]classHdfsTaskHandler(FileTaskHandler,LoggingMixin):"""Logging handler to upload and read from HDFS."""def__init__(self,base_log_folder:str,hdfs_log_folder:str,**kwargs):super().__init__(base_log_folder)
[docs]defset_context(self,ti):super().set_context(ti)# Local location and remote location is needed to open and# upload local log file to HDFS storage.full_path=self.handler.baseFilenameself.log_relative_path=pathlib.Path(full_path).relative_to(self.local_base).as_posix()is_trigger_log_context=getattr(ti,"is_trigger_log_context",False)self.upload_on_close=is_trigger_log_contextornotti.raw# Clear the file first so that duplicate data is not uploaded# when reusing the same path (e.g. with rescheduled sensors)ifself.upload_on_close:withopen(self.handler.baseFilename,"w"):pass
[docs]defclose(self):"""Close and upload local log file to HDFS."""# When application exit, system shuts down all handlers by# calling close method. Here we check if logger is already# closed to prevent uploading the log to remote storage multiple# times when `logging.shutdown` is called.ifself.closed:returnsuper().close()ifnotself.upload_on_close:returnlocal_loc=os.path.join(self.local_base,self.log_relative_path)remote_loc=os.path.join(self.remote_base,self.log_relative_path)ifos.path.exists(local_loc)andos.path.isfile(local_loc):self.hook.load_file(local_loc,remote_loc)ifself.delete_local_copy:shutil.rmtree(os.path.dirname(local_loc))# Mark closed so we don't double write if close is called twiceself.closed=True
def_read_remote_logs(self,ti,try_number,metadata=None):# Explicitly getting log relative path is necessary as the given# task instance might be different from task instance passed# in set_context method.worker_log_rel_path=self._render_filename(ti,try_number)logs=[]messages=[]file_path=os.path.join(self.remote_base,worker_log_rel_path)ifself.hook.check_for_path(file_path):logs.append(self.hook.read_file(file_path).decode("utf-8"))else:messages.append(f"No logs found on hdfs for ti={ti}")returnmessages,logs