Source code for airflow.providers.amazon.aws.log.cloudwatch_task_handler
## Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsfromdatetimeimportdatetimefromfunctoolsimportcached_propertyimportwatchtowerfromairflow.configurationimportconffromairflow.providers.amazon.aws.hooks.logsimportAwsLogsHookfromairflow.utils.log.file_task_handlerimportFileTaskHandlerfromairflow.utils.log.logging_mixinimportLoggingMixin
[docs]classCloudwatchTaskHandler(FileTaskHandler,LoggingMixin):""" CloudwatchTaskHandler is a python log handler that handles and reads task instance logs. It extends airflow FileTaskHandler and uploads to and reads from Cloudwatch. :param base_log_folder: base folder to store logs locally :param log_group_arn: ARN of the Cloudwatch log group for remote log storage with format ``arn:aws:logs:{region name}:{account id}:log-group:{group name}`` :param filename_template: template for file name (local storage) or log stream name (remote) """
[docs]defclose(self):"""Close the handler responsible for the upload of the local log file to Cloudwatch."""# When application exit, system shuts down all handlers by# calling close method. Here we check if logger is already# closed to prevent uploading the log to remote storage multiple# times when `logging.shutdown` is called.ifself.closed:returnifself.handlerisnotNone:self.handler.close()# Mark closed so we don't double write if close is called twiceself.closed=True
def_read(self,task_instance,try_number,metadata=None):stream_name=self._render_filename(task_instance,try_number)try:return(f"*** Reading remote log from Cloudwatch log_group: {self.log_group} "f"log_stream: {stream_name}.\n{self.get_cloudwatch_logs(stream_name=stream_name)}\n",{"end_of_log":True},)exceptExceptionase:log=(f"*** Unable to read remote logs from Cloudwatch (log_group: {self.log_group}, log_stream: "f"{stream_name})\n*** {str(e)}\n\n")self.log.error(log)local_log,metadata=super()._read(task_instance,try_number,metadata)log+=local_logreturnlog,metadata
[docs]defget_cloudwatch_logs(self,stream_name:str)->str:""" Return all logs from the given log stream. :param stream_name: name of the Cloudwatch log stream to get all logs from :return: string of all logs from the given log stream """events=self.hook.get_log_events(log_group=self.log_group,log_stream_name=stream_name,start_from_head=True,)return"\n".join(self._event_to_str(event)foreventinevents)