Source code for airflow.providers.amazon.aws.utils.task_log_fetcher
# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License.from__future__importannotationsimporttimefromcollectionsimportdequefromdatetimeimportdatetime,timedeltafromloggingimportLoggerfromthreadingimportEvent,ThreadfromtypingimportGeneratorfrombotocore.exceptionsimportClientError,ConnectionClosedErrorfromairflow.providers.amazon.aws.hooks.logsimportAwsLogsHook
[docs]classAwsTaskLogFetcher(Thread):""" Fetches Cloudwatch log events with specific interval as a thread and sends the log events to the info channel of the provided logger. """def__init__(self,*,log_group:str,log_stream_name:str,fetch_interval:timedelta,logger:Logger,aws_conn_id:str|None="aws_default",region_name:str|None=None,):super().__init__()self._event=Event()self.fetch_interval=fetch_intervalself.logger=loggerself.log_group=log_groupself.log_stream_name=log_stream_nameself.hook=AwsLogsHook(aws_conn_id=aws_conn_id,region_name=region_name)
def_get_log_events(self,skip_token:AwsLogsHook.ContinuationToken|None=None)->Generator:ifskip_tokenisNone:skip_token=AwsLogsHook.ContinuationToken()try:yield fromself.hook.get_log_events(self.log_group,self.log_stream_name,continuation_token=skip_token)exceptClientErroraserror:iferror.response["Error"]["Code"]!="ResourceNotFoundException":self.logger.warning("Error on retrieving Cloudwatch log events",error)else:self.logger.info("Cannot find log stream yet, it can take a couple of seconds to show up. ""If this error persists, check that the log group and stream are correct: ""group: %s\tstream: %s",self.log_group,self.log_stream_name,)yield from()exceptConnectionClosedErroraserror:self.logger.warning("ConnectionClosedError on retrieving Cloudwatch log events",error)yield from()def_event_to_str(self,event:dict)->str:event_dt=datetime.utcfromtimestamp(event["timestamp"]/1000.0)formatted_event_dt=event_dt.strftime("%Y-%m-%d %H:%M:%S,%f")[:-3]message=event["message"]returnf"[{formatted_event_dt}] {message}"