Source code for airflow.providers.amazon.aws.utils.task_log_fetcher
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimefromdatetimeimportdatetime,timedeltafromloggingimportLoggerfromthreadingimportEvent,ThreadfromtypingimportGeneratorfrombotocore.exceptionsimportClientError,ConnectionClosedErrorfromairflow.providers.amazon.aws.hooks.logsimportAwsLogsHook
[docs]classAwsTaskLogFetcher(Thread):"""Fetch Cloudwatch log events with specific interval and send the log events to the logger.info."""def__init__(self,*,log_group:str,log_stream_name:str,fetch_interval:timedelta,logger:Logger,aws_conn_id:str|None="aws_default",region_name:str|None=None,):super().__init__()self._event=Event()self.fetch_interval=fetch_intervalself.logger=loggerself.log_group=log_groupself.log_stream_name=log_stream_nameself.hook=AwsLogsHook(aws_conn_id=aws_conn_id,region_name=region_name)
def_get_log_events(self,skip_token:AwsLogsHook.ContinuationToken|None=None)->Generator:ifskip_tokenisNone:skip_token=AwsLogsHook.ContinuationToken()try:yield fromself.hook.get_log_events(self.log_group,self.log_stream_name,continuation_token=skip_token)exceptClientErroraserror:iferror.response["Error"]["Code"]!="ResourceNotFoundException":self.logger.warning("Error on retrieving Cloudwatch log events",error)else:self.logger.info("Cannot find log stream yet, it can take a couple of seconds to show up. ""If this error persists, check that the log group and stream are correct: ""group: %s\tstream: %s",self.log_group,self.log_stream_name,)yield from()exceptConnectionClosedErroraserror:self.logger.warning("ConnectionClosedError on retrieving Cloudwatch log events",error)yield from()@staticmethod
[docs]defget_last_log_messages(self,number_messages)->list:""" Gets the last logs messages in one single request. NOTE: some restrictions apply: - if logs are too old, the response will be empty - the max number of messages we can retrieve is constrained by cloudwatch limits (10,000). """response=self.hook.conn.get_log_events(logGroupName=self.log_group,logStreamName=self.log_stream_name,startFromHead=False,limit=number_messages,)return[log["message"]forloginresponse["events"]]