airflow.providers.elasticsearch.log.es_task_handler

Module Contents

Classes

ElasticsearchTaskHandler

ElasticsearchTaskHandler is a python log handler that

Functions

getattr_nested(obj, item, default)

Get item from obj but return default if not found

Attributes

LOG_LINE_DEFAULTS

EsLogMsgType

USE_PER_RUN_LOG_ID

airflow.providers.elasticsearch.log.es_task_handler.LOG_LINE_DEFAULTS[source]
airflow.providers.elasticsearch.log.es_task_handler.EsLogMsgType[source]
airflow.providers.elasticsearch.log.es_task_handler.USE_PER_RUN_LOG_ID[source]
class airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, host_field='host', offset_field='offset', host='localhost:9200', frontend='localhost:5601', index_patterns=conf.get('elasticsearch', 'index_patterns', fallback='_all'), es_kwargs=conf.getsection('elasticsearch_configs'), *, filename_template=None, log_id_template=None)[source]

Bases: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.ExternalLoggingMixin, airflow.utils.log.logging_mixin.LoggingMixin

ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch. Note that Airflow does not handle the indexing of logs into Elasticsearch. Instead, Airflow flushes logs into local files. Additional software setup is required to index the logs into Elasticsearch, such as using Filebeat and Logstash. To efficiently query and sort Elasticsearch results, this handler assumes each log message has a field log_id consists of ti primary keys: log_id = {dag_id}-{task_id}-{execution_date}-{try_number} Log messages with specific log_id are sorted based on offset, which is a unique integer indicates log message’s order. Timestamps here are unreliable because multiple log messages might have the same timestamp.

property log_name: str[source]

The log name

Whether we can support external links

PAGE = 0[source]
MAX_LINE_PER_PAGE = 1000[source]
LOG_NAME = 'Elasticsearch'[source]
trigger_should_wrap = True[source]
es_read(log_id, offset, metadata)[source]

Returns the logs matching log_id in Elasticsearch and next offset. Returns ‘’ if no log is found or there was an error.

Parameters
  • log_id (str) – the log_id of the log to read.

  • offset (str) – the offset start to read log from.

  • metadata (dict) – log metadata, used for steaming log download.

emit(record)[source]

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

set_context(ti)[source]

Provide task_instance context to airflow task handler.

Parameters

ti (airflow.models.taskinstance.TaskInstance) – task instance object

close()[source]

Tidy up any resources used by the handler.

This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.

get_external_log_url(task_instance, try_number)[source]

Creates an address for an external log collecting service.

Parameters
  • task_instance (airflow.models.taskinstance.TaskInstance) – task instance object

  • try_number (int) – task instance try_number to read logs from.

Returns

URL to the external log collection service

Return type

str

airflow.providers.elasticsearch.log.es_task_handler.getattr_nested(obj, item, default)[source]

Get item from obj but return default if not found

E.g. calling getattr_nested(a, 'b.c', "NA") will return a.b.c if such a value exists, and “NA” otherwise.

Was this entry helpful?