Source code for airflow.providers.microsoft.azure.log.wasb_task_handler
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimportosimportshutilfrompathlibimportPathfromtypingimportTYPE_CHECKING,Anyfromazure.core.exceptionsimportHttpResponseErrorfrompackaging.versionimportVersionfromairflow.compat.functoolsimportcached_propertyfromairflow.configurationimportconffromairflow.utils.log.file_task_handlerimportFileTaskHandlerfromairflow.utils.log.logging_mixinimportLoggingMixin
[docs]defget_default_delete_local_copy():"""Load delete_local_logs conf if Airflow version > 2.6 and return False if not TODO: delete this function when min airflow version >= 2.6 """fromairflow.versionimportversionifVersion(version)<Version("2.6"):returnFalsereturnconf.getboolean("logging","delete_local_logs")
[docs]classWasbTaskHandler(FileTaskHandler,LoggingMixin):""" WasbTaskHandler is a python log handler that handles and reads task instance logs. It extends airflow FileTaskHandler and uploads to and reads from Wasb remote storage. """
[docs]defhook(self):"""Returns WasbHook."""remote_conn_id=conf.get("logging","REMOTE_LOG_CONN_ID")try:fromairflow.providers.microsoft.azure.hooks.wasbimportWasbHookreturnWasbHook(remote_conn_id)exceptException:self.log.exception("Could not create a WasbHook with connection id '%s'. ""Do you have apache-airflow[azure] installed? ""Does connection the connection exist, and is it ""configured properly?",remote_conn_id,)returnNone
[docs]defset_context(self,ti)->None:super().set_context(ti)# Local location and remote location is needed to open and# upload local log file to Wasb remote storage.ifTYPE_CHECKING:assertself.handlerisnotNonefull_path=self.handler.baseFilenameself.log_relative_path=Path(full_path).relative_to(self.local_base).as_posix()is_trigger_log_context=getattr(ti,"is_trigger_log_context",False)self.upload_on_close=is_trigger_log_contextornotti.raw
[docs]defclose(self)->None:"""Close and upload local log file to remote storage Wasb."""# When application exit, system shuts down all handlers by# calling close method. Here we check if logger is already# closed to prevent uploading the log to remote storage multiple# times when `logging.shutdown` is called.ifself.closed:returnsuper().close()ifnotself.upload_on_close:returnlocal_loc=os.path.join(self.local_base,self.log_relative_path)remote_loc=os.path.join(self.remote_base,self.log_relative_path)ifos.path.exists(local_loc):# read log and remove old logs to get just the latest additionswithopen(local_loc)aslogfile:log=logfile.read()wasb_write=self.wasb_write(log,remote_loc,append=True)ifwasb_writeandself.delete_local_copy:shutil.rmtree(os.path.dirname(local_loc))# Mark closed so we don't double write if close is called twiceself.closed=True
def_read_remote_logs(self,ti,try_number,metadata=None):messages=[]logs=[]worker_log_relative_path=self._render_filename(ti,try_number)# todo: fix this# for some reason this handler was designed such that (1) container name is not configurable# (i.e. it's hardcoded in airflow_local_settings.py) and (2) the "relative path" is actually...# whatever you put in REMOTE_BASE_LOG_FOLDER i.e. it includes the "wasb://" in the blob# name. it's very screwed up but to change it we have to be careful not to break backcompat.prefix=os.path.join(self.remote_base,worker_log_relative_path)blob_names=[]try:blob_names=self.hook.get_blobs_list(container_name=self.wasb_container,prefix=prefix)exceptHttpResponseErrorase:messages.append(f"tried listing blobs with prefix={prefix} and container={self.wasb_container}")messages.append("could not list blobs "+str(e))self.log.exception("can't list blobs")ifblob_names:uris=[f"wasb://{self.wasb_container}/{b}"forbinblob_names]messages.extend(["Found remote logs:",*[f" * {x}"forxinsorted(uris)]])else:messages.append(f"No logs found in WASB; ti=%s {ti}")fornameinsorted(blob_names):remote_log=""try:remote_log=self.hook.read_file(self.wasb_container,name)ifremote_log:logs.append(remote_log)exceptExceptionase:messages.append(f"Unable to read remote blob '{name}' in container '{self.wasb_container}'\n{e}")self.log.exception("Could not read blob")returnmessages,logsdef_read(self,ti,try_number:int,metadata:dict[str,Any]|None=None)->tuple[str,dict[str,bool]]:""" Read logs of given task instance and try_number from Wasb remote storage. If failed, read the log from task instance host machine. todo: when min airflow version >= 2.6, remove this method :param ti: task instance object :param try_number: task instance try_number to read logs from :param metadata: log metadata, can be used for steaming log reading and auto-tailing. """ifhasattr(super(),"_read_remote_logs"):# from Airflow 2.6, we don't implement the `_read` method.# if parent has _read_remote_logs, we're >= 2.6returnsuper()._read(ti,try_number,metadata)# below is backcompat, for airflow < 2.6messages,logs=self._read_remote_logs(ti,try_number,metadata)ifnotlogs:returnsuper()._read(ti,try_number,metadata)return"".join([f"*** {x}\n"forxinmessages])+"\n".join(logs),{"end_of_log":True}
[docs]defwasb_log_exists(self,remote_log_location:str)->bool:""" Check if remote_log_location exists in remote storage :param remote_log_location: log's location in remote storage :return: True if location exists else False """try:returnself.hook.check_for_blob(self.wasb_container,remote_log_location)exceptExceptionase:self.log.debug('Exception when trying to check remote location: "%s"',e)returnFalse
[docs]defwasb_read(self,remote_log_location:str,return_error:bool=False):""" Returns the log found at the remote_log_location. Returns '' if no logs are found or there is an error. :param remote_log_location: the log's location in remote storage :param return_error: if True, returns a string error message if an error occurs. Otherwise returns '' when an error occurs. """try:returnself.hook.read_file(self.wasb_container,remote_log_location)exceptException:msg=f"Could not read logs from {remote_log_location}"self.log.exception(msg)# return error if neededifreturn_error:returnmsgreturn""
[docs]defwasb_write(self,log:str,remote_log_location:str,append:bool=True)->bool:""" Writes the log to the remote_log_location. Fails silently if no hook was created. :param log: the log to write to the remote_log_location :param remote_log_location: the log's location in remote storage :param append: if False, any existing log file is overwritten. If True, the new log is appended to any existing logs. """ifappendandself.wasb_log_exists(remote_log_location):old_log=self.wasb_read(remote_log_location)log="\n".join([old_log,log])ifold_logelselogtry:self.hook.load_string(log,self.wasb_container,remote_log_location,overwrite=True)exceptException:self.log.exception("Could not write logs to %s",remote_log_location)returnFalsereturnTrue