airflow.providers.opensearch.log.os_task_handler

Attributes

OsLogMsgType

USE_PER_RUN_LOG_ID

LOG_LINE_DEFAULTS

TASK_LOG_FIELDS

Classes

OpensearchTaskHandler

OpensearchTaskHandler is a Python log handler that reads logs from OpenSearch.

OpensearchRemoteLogIO

Convenience super-class to have a logger configured with the class name.

Functions

getattr_nested(obj, item, default)

Get item from obj but return default if not found.

get_os_kwargs_from_config()

Module Contents

airflow.providers.opensearch.log.os_task_handler.OsLogMsgType[source]
airflow.providers.opensearch.log.os_task_handler.USE_PER_RUN_LOG_ID = True[source]
airflow.providers.opensearch.log.os_task_handler.LOG_LINE_DEFAULTS[source]
airflow.providers.opensearch.log.os_task_handler.TASK_LOG_FIELDS = ['timestamp', 'event', 'level', 'chan', 'logger', 'error_detail', 'message', 'levelname'][source]
airflow.providers.opensearch.log.os_task_handler.getattr_nested(obj, item, default)[source]

Get item from obj but return default if not found.

E.g. calling getattr_nested(a, 'b.c', "NA") will return a.b.c if such a value exists, and “NA” otherwise.

airflow.providers.opensearch.log.os_task_handler.get_os_kwargs_from_config()[source]
class airflow.providers.opensearch.log.os_task_handler.OpensearchTaskHandler(base_log_folder, end_of_log_mark, write_stdout, json_format, json_fields, host, port, username, password, write_to_opensearch=False, target_index='airflow-logs', host_field='host', offset_field='offset', index_patterns=conf.get('opensearch', 'index_patterns', fallback='_all'), index_patterns_callable=conf.get('opensearch', 'index_patterns_callable', fallback=''), log_id_template=conf.get('opensearch', 'log_id_template', fallback='') or '{dag_id}-{task_id}-{run_id}-{map_index}-{try_number}', os_kwargs='default_os_kwargs', max_bytes=0, backup_count=0, delay=False, **kwargs)[source]

Bases: airflow.utils.log.file_task_handler.FileTaskHandler, airflow.utils.log.logging_mixin.ExternalLoggingMixin, airflow.utils.log.logging_mixin.LoggingMixin

OpensearchTaskHandler is a Python log handler that reads logs from OpenSearch.

Airflow flushes task logs to local files. Additional software setup can then ship those logs to OpenSearch. On Airflow 3, this task handler also registers a matching OpensearchRemoteLogIO so the new remote logging path can read from OpenSearch too. Airflow can also be configured to write task logs to OpenSearch directly. To enable this feature, set json_format and write_to_opensearch to True.

To efficiently query and sort OpenSearch results, this handler assumes each log message has a field log_id consists of ti primary keys: log_id = {dag_id}-{task_id}-{logical_date}-{try_number} Log messages with specific log_id are sorted based on offset, which is a unique integer indicates log message’s order. Timestamps here are unreliable because multiple log messages might have the same timestamp.

Parameters:
  • base_log_folder (str) – Base folder to store logs locally.

  • end_of_log_mark (str) – A marker string to signify the end of logs.

  • write_stdout (bool) – Whether to also write logs to stdout.

  • json_format (bool) – Whether to format logs as JSON.

  • json_fields (str) – Comma-separated list of fields to include in the JSON log output.

  • host (str) – OpenSearch host name.

  • port (int | None) – OpenSearch port.

  • username (str) – Username for OpenSearch authentication.

  • password (str) – Password for OpenSearch authentication.

  • write_to_opensearch (bool) – Whether to write logs directly to OpenSearch.

  • target_index (str) – Name of the index to write to when direct OpenSearch writes are enabled.

  • host_field (str) – The field name for the host in the logs (default is “host”).

  • offset_field (str) – The field name for the log offset (default is “offset”).

  • index_patterns (str) – Index pattern or template for storing logs.

  • index_patterns_callable (str) – Callable that dynamically generates index patterns based on context.

  • os_kwargs (dict | None | Literal['default_os_kwargs']) – Additional OpenSearch client options. This can be set to “default_os_kwargs” to load the default configuration from Airflow’s settings.

PAGE = 0[source]
MAX_LINE_PER_PAGE = 1000[source]
LOG_NAME = 'Opensearch'[source]
trigger_should_wrap = True[source]
closed = False[source]
mark_end_on_close = True[source]
end_of_log_mark[source]
write_stdout[source]
write_to_opensearch = False[source]
json_format[source]
json_fields[source]
host[source]
host_field = 'host'[source]
offset_field = 'offset'[source]
target_index = 'airflow-logs'[source]
index_patterns[source]
index_patterns_callable[source]
context_set = False[source]
client[source]
delete_local_copy[source]
log_id_template[source]
formatter: logging.Formatter[source]
handler: logging.FileHandler | logging.StreamHandler | None = None[source]
io[source]
set_context(ti, *, identifier=None)[source]

Provide task_instance context to airflow task handler.

Parameters:
  • ti (airflow.models.taskinstance.TaskInstance) – task instance object

  • identifier (str | None) – if set, identifies the Airflow component which is relaying logs from exceptional scenarios related to the task instance

emit(record)[source]

Do whatever it takes to actually log the specified logging record.

This version is intended to be implemented by subclasses and so raises a NotImplementedError.

close()[source]

Tidy up any resources used by the handler.

This version removes the handler from an internal map of handlers, _handlers, which is used for handler lookup by name. Subclasses should ensure that this gets called from overridden close() methods.

concat_logs(hits)[source]

Whether we can support external links.

TODO: It should support frontend just like ElasticSearchTaskhandler.

get_external_log_url(task_instance, try_number)[source]

Create an address for an external log collecting service.

TODO: It should support frontend just like ElasticSearchTaskhandler.

property log_name: str[source]

The log name.

static format_url(host)[source]
class airflow.providers.opensearch.log.os_task_handler.OpensearchRemoteLogIO(context=None)[source]

Bases: airflow.utils.log.logging_mixin.LoggingMixin

Convenience super-class to have a logger configured with the class name.

json_format: bool = False[source]
write_stdout: bool = False[source]
write_to_opensearch: bool = False[source]
delete_local_copy: bool = False[source]
host: str = 'localhost'[source]
port: int | None = 9200[source]
username: str = ''[source]
password: str = ''[source]
host_field: str = 'host'[source]
target_index: str = 'airflow-logs'[source]
offset_field: str = 'offset'[source]
base_log_folder: pathlib.Path[source]
log_id_template: str[source]
processors = ()[source]
__attrs_post_init__()[source]
upload(path, ti)[source]

Emit structured task logs to stdout and/or write them directly to OpenSearch.

read(_relative_path, ti)[source]

Was this entry helpful?