airflow.providers.elasticsearch.log.es_task_handler

Module Contents

airflow.providers.elasticsearch.log.es_task_handler.EsLogMsgType[source]
class airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler(base_log_folder: str, filename_template: str, log_id_template: str, end_of_log_mark: str, write_stdout: bool, json_format: bool, json_fields: str, host: str = 'localhost:9200', frontend: str = 'localhost:5601', es_kwargs: Optional[dict] = conf.getsection('elasticsearch_configs'))[source]

Bases: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.LoggingMixin

ElasticsearchTaskHandler is a python log handler that reads logs from Elasticsearch. Note logs are not directly indexed into Elasticsearch. Instead, it flushes logs into local files. Additional software setup is required to index the log into Elasticsearch, such as using Filebeat and Logstash. To efficiently query and sort Elasticsearch results, we assume each log message has a field log_id consists of ti primary keys: log_id = {dag_id}-{task_id}-{execution_date}-{try_number} Log messages with specific log_id are sorted based on offset, which is a unique integer indicates log message's order. Timestamp here are unreliable because multiple log messages might have the same timestamp.

PAGE = 0[source]
MAX_LINE_PER_PAGE = 1000[source]
LOG_NAME = Elasticsearch[source]
log_name[source]

The log name

_render_log_id(self, ti: TaskInstance, try_number: int)[source]
static _clean_execution_date(execution_date: datetime)[source]

Clean up an execution date so that it is safe to query in elasticsearch by removing reserved characters. # https://www.elastic.co/guide/en/elasticsearch/reference/current/query-dsl-query-string-query.html#_reserved_characters

Parameters

execution_date -- execution date of the dag run.

static _group_logs_by_host(logs)[source]
_read_grouped_logs(self)[source]
_read(self, ti: TaskInstance, try_number: int, metadata: Optional[dict] = None)[source]

Endpoint for streaming log.

Parameters
  • ti -- task instance object

  • try_number -- try_number of the task instance

  • metadata -- log metadata, can be used for steaming log reading and auto-tailing.

Returns

a list of tuple with host and log documents, metadata.

_format_msg(self, log_line)[source]

Format ES Record to match settings.LOG_FORMAT when used with json_format

es_read(self, log_id: str, offset: str, metadata: dict)[source]

Returns the logs matching log_id in Elasticsearch and next offset. Returns '' if no log is found or there was an error.

Parameters
  • log_id (str) -- the log_id of the log to read.

  • offset (str) -- the offset start to read log from.

  • metadata (dict) -- log metadata, used for steaming log download.

set_context(self, ti: TaskInstance)[source]

Provide task_instance context to airflow task handler.

Parameters

ti -- task instance object

close(self)[source]
get_external_log_url(self, task_instance: TaskInstance, try_number: int)[source]

Creates an address for an external log collecting service.

Parameters
  • task_instance -- task instance object

  • try_number (Optional[int]) -- task instance try_number to read logs from.

Type

task_instance: TaskInstance

Returns

URL to the external log collection service

Return type

str

class airflow.providers.elasticsearch.log.es_task_handler._ESJsonLogFmt(**kwargs)[source]

Helper class to read ES Logs and re-format it to match settings.LOG_FORMAT

Was this entry helpful?