Source code for airflow.providers.apache.hdfs.log.hdfs_task_handler
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportloggingimportosimportshutilfromfunctoolsimportcached_propertyfrompathlibimportPathfromtypingimportTYPE_CHECKINGfromurllib.parseimporturlsplitimportattrsfromairflow.configurationimportconffromairflow.providers.apache.hdfs.hooks.webhdfsimportWebHDFSHookfromairflow.utils.log.file_task_handlerimportFileTaskHandlerfromairflow.utils.log.logging_mixinimportLoggingMixinifTYPE_CHECKING:fromairflow.models.taskinstanceimportTaskInstancefromairflow.sdk.typesimportRuntimeTaskInstanceProtocolasRuntimeTIfromairflow.utils.log.file_task_handlerimportLogMessages,LogSourceInfo@attrs.define(kw_only=True)
[docs]defupload(self,path:os.PathLike|str,ti:RuntimeTI):"""Upload the given log path to the remote storage."""path=Path(path)ifpath.is_absolute():local_loc=pathremote_loc=os.path.join(self.remote_base,path.relative_to(self.base_log_folder))else:local_loc=self.base_log_folder.joinpath(path)remote_loc=os.path.join(self.remote_base,path)iflocal_loc.is_file():self.hook.load_file(local_loc,remote_loc)ifself.delete_local_copy:shutil.rmtree(os.path.dirname(local_loc))
[docs]defread(self,relative_path:str,ti:RuntimeTI)->tuple[LogSourceInfo,LogMessages]:logs=[]messages=[]file_path=os.path.join(self.remote_base,relative_path)ifself.hook.check_for_path(file_path):logs.append(self.hook.read_file(file_path).decode("utf-8"))else:messages.append(f"No logs found on hdfs for ti={ti}")returnmessages,logs
[docs]classHdfsTaskHandler(FileTaskHandler,LoggingMixin):""" HdfsTaskHandler is a Python logging handler that handles and reads task instance logs. It extends airflow FileTaskHandler and uploads to and reads from HDFS. """def__init__(self,base_log_folder:str,hdfs_log_folder:str,**kwargs):super().__init__(base_log_folder)
[docs]defset_context(self,ti:TaskInstance,*,identifier:str|None=None)->None:super().set_context(ti)# Local location and remote location is needed to open and# upload local log file to HDFS storage.ifTYPE_CHECKING:assertself.handlerisnotNonefull_path=self.handler.baseFilenameself.log_relative_path=Path(full_path).relative_to(self.local_base).as_posix()is_trigger_log_context=getattr(ti,"is_trigger_log_context",False)self.upload_on_close=is_trigger_log_contextornotti.rawself.ti=ti# Clear the file first so that duplicate data is not uploaded# when reusing the same path (e.g. with rescheduled sensors)ifself.upload_on_close:withopen(self.handler.baseFilename,"w"):pass
[docs]defclose(self):"""Close and upload local log file to HDFS."""# When application exit, system shuts down all handlers by# calling close method. Here we check if logger is already# closed to prevent uploading the log to remote storage multiple# times when `logging.shutdown` is called.ifself.closed:returnsuper().close()ifnotself.upload_on_close:returnifhasattr(self,"ti"):self.io.upload(self.log_relative_path,self.ti)# Mark closed so we don't double write if close is called twiceself.closed=True
def_read_remote_logs(self,ti,try_number,metadata=None)->tuple[LogSourceInfo,LogMessages]:# Explicitly getting log relative path is necessary as the given# task instance might be different from task instance passed# in set_context method.worker_log_rel_path=self._render_filename(ti,try_number)messages,logs=self.io.read(worker_log_rel_path,ti)returnmessages,logs