airflow.providers.openlineage.utils.utils

Attributes

AnyOperator

log

Classes

InfoJsonEncodable

Airflow objects might not be json-encodable overall.

DagInfo

Defines encoding DAG object to JSON.

DagRunInfo

Defines encoding DagRun object to JSON.

TaskInstanceInfo

Defines encoding TaskInstance object to JSON.

AssetInfo

Defines encoding Airflow Asset object to JSON.

TaskInfo

Defines encoding BaseOperator/AbstractOperator object to JSON.

TaskInfoComplete

Defines encoding BaseOperator/AbstractOperator object to JSON used when user enables full task info.

TaskGroupInfo

Defines encoding TaskGroup object to JSON.

OpenLineageRedactor

This class redacts sensitive data similar to SecretsMasker in Airflow logs.

Functions

try_import_from_string(string)

get_operator_class(task)

get_operator_provider_version(operator)

Get the provider package version for the given operator.

get_job_name(task)

get_task_parent_run_facet(parent_run_id, parent_job_name)

Retrieve the parent run facet.

get_root_information_from_dagrun_conf(dr_conf)

Extract root parent run and job information from a DAG run config.

get_parent_information_from_dagrun_conf(dr_conf)

Extract parent run and job information from a DAG run config.

get_dag_parent_run_facet(dr_conf)

Build the OpenLineage parent run facet from a DAG run configuration.

get_task_documentation(operator)

Get task documentation and mime type, truncated to _MAX_DOC_BYTES bytes length, if present.

get_dag_documentation(dag)

Get dag documentation and mime type, truncated to _MAX_DOC_BYTES bytes length, if present.

get_airflow_mapped_task_facet(task_instance)

get_user_provided_run_facets(ti, ti_state)

get_fully_qualified_class_name(operator)

is_operator_disabled(operator)

is_selective_lineage_enabled(obj)

If selective enable is active check if DAG or Task is enabled to emit events.

is_ti_rescheduled_already(ti[, session])

get_airflow_dag_run_facet(dag_run)

get_processing_engine_facet()

get_airflow_debug_facet()

get_airflow_run_facet(dag_run, dag, task_instance, ...)

get_airflow_job_facet(dag_run)

get_airflow_state_run_facet(dag_id, run_id, task_ids, ...)

is_dag_run_asset_triggered(dag_run)

Return whether the given DAG run was triggered by an asset.

build_task_instance_ol_run_id(dag_id, task_id, ...)

Generate a deterministic OpenLineage run ID for a task instance.

is_valid_uuid(uuid_string)

Validate that a string is a valid UUID format.

build_dag_run_ol_run_id(dag_id, logical_date, clear_number)

Generate a deterministic OpenLineage run ID for a DAG run.

get_dag_job_dependency_facet(dag_id, dag_run_id)

Safely retrieve the asset-triggered job dependency facet for a DagRun.

get_unknown_source_attribute_run_facet(task[, name])

is_json_serializable(item)

print_warning(log)

get_filtered_unknown_operator_keys(operator)

should_use_external_connection(hook)

translate_airflow_asset(asset, lineage_context)

Convert an Asset with an AIP-60 compliant URI to an OpenLineageDataset.

Module Contents

type airflow.providers.openlineage.utils.utils.AnyOperator = BaseOperator | MappedOperator | SerializedBaseOperator | SerializedMappedOperator[source]
airflow.providers.openlineage.utils.utils.log[source]
airflow.providers.openlineage.utils.utils.try_import_from_string(string)[source]
airflow.providers.openlineage.utils.utils.get_operator_class(task)[source]
airflow.providers.openlineage.utils.utils.get_operator_provider_version(operator)[source]

Get the provider package version for the given operator.

airflow.providers.openlineage.utils.utils.get_job_name(task)[source]
airflow.providers.openlineage.utils.utils.get_task_parent_run_facet(parent_run_id, parent_job_name, parent_job_namespace=conf.namespace(), root_parent_run_id=None, root_parent_job_name=None, root_parent_job_namespace=None, dr_conf=None)[source]

Retrieve the parent run facet.

airflow.providers.openlineage.utils.utils.get_root_information_from_dagrun_conf(dr_conf)[source]

Extract root parent run and job information from a DAG run config.

airflow.providers.openlineage.utils.utils.get_parent_information_from_dagrun_conf(dr_conf)[source]

Extract parent run and job information from a DAG run config.

airflow.providers.openlineage.utils.utils.get_dag_parent_run_facet(dr_conf)[source]

Build the OpenLineage parent run facet from a DAG run configuration.

This function extracts parent run identifiers - run ID, job namespace, and job name - from the DAG run configuration to construct an OpenLineage ParentRunFacet. It requires a complete set of parent identifiers to proceed; if some but not all are present, or if the run ID is invalid, the function returns an empty dictionary.

When valid parent identifiers are found, it also attempts to retrieve corresponding root identifiers using get_root_information_from_dagrun_conf, which may fall back to the parent identifiers if no explicit root data is available. The resulting facet links both the immediate parent and root lineage information for the run.

Args:

dr_conf: The DAG run configuration dictionary.

Returns:

A dictionary containing a single entry mapping the facet name to the constructed ParentRunFacet. Returns an empty dictionary if the configuration does not contain a complete or valid set of parent identifiers.

airflow.providers.openlineage.utils.utils.get_task_documentation(operator)[source]

Get task documentation and mime type, truncated to _MAX_DOC_BYTES bytes length, if present.

airflow.providers.openlineage.utils.utils.get_dag_documentation(dag)[source]

Get dag documentation and mime type, truncated to _MAX_DOC_BYTES bytes length, if present.

airflow.providers.openlineage.utils.utils.get_airflow_mapped_task_facet(task_instance)[source]
airflow.providers.openlineage.utils.utils.get_user_provided_run_facets(ti, ti_state)[source]
airflow.providers.openlineage.utils.utils.get_fully_qualified_class_name(operator)[source]
airflow.providers.openlineage.utils.utils.is_operator_disabled(operator)[source]
airflow.providers.openlineage.utils.utils.is_selective_lineage_enabled(obj)[source]

If selective enable is active check if DAG or Task is enabled to emit events.

airflow.providers.openlineage.utils.utils.is_ti_rescheduled_already(ti, session=NEW_SESSION)[source]
class airflow.providers.openlineage.utils.utils.InfoJsonEncodable(obj)[source]

Bases: dict

Airflow objects might not be json-encodable overall.

The class provides additional attributes to control what and how is encoded:

  • renames: a dictionary of attribute name changes

  • casts: a dictionary consisting of attribute names
    and corresponding methods that should change
    object value
  • includes: list of attributes to be included in encoding

  • excludes: list of attributes to be excluded from encoding

Don’t use both includes and excludes.

renames: dict[str, str][source]
casts: dict[str, Any][source]
includes: list[str] = [][source]
excludes: list[str] = [][source]
obj[source]
class airflow.providers.openlineage.utils.utils.DagInfo(obj)[source]

Bases: InfoJsonEncodable

Defines encoding DAG object to JSON.

includes = ['dag_id', 'description', 'fileloc', 'owner', 'owner_links', 'schedule_interval', 'start_date', 'tags'][source]
casts[source]
renames[source]
classmethod timetable_summary(dag)[source]

Extract summary from timetable if missing a timetable_summary property.

classmethod serialize_timetable(dag)[source]
class airflow.providers.openlineage.utils.utils.DagRunInfo(obj)[source]

Bases: InfoJsonEncodable

Defines encoding DagRun object to JSON.

includes = ['clear_number', 'conf', 'dag_id', 'data_interval_end', 'data_interval_start', 'end_date',...[source]
casts[source]
classmethod duration(dagrun)[source]
classmethod deadlines(dagrun)[source]

Extract deadline state and alert definitions from a DagRun (on scheduler).

Returns a dict (not a list) so _cast_basic_types passes it through.

classmethod dag_version_info(dagrun, key)[source]

Extract deg version info for given key, sourced from DagRun (on scheduler).

class airflow.providers.openlineage.utils.utils.TaskInstanceInfo(obj)[source]

Bases: InfoJsonEncodable

Defines encoding TaskInstance object to JSON.

includes = ['duration', 'try_number', 'pool', 'queued_dttm', 'log_url'][source]
casts[source]
class airflow.providers.openlineage.utils.utils.AssetInfo(obj)[source]

Bases: InfoJsonEncodable

Defines encoding Airflow Asset object to JSON.

includes = ['uri', 'extra'][source]
class airflow.providers.openlineage.utils.utils.TaskInfo(obj)[source]

Bases: InfoJsonEncodable

Defines encoding BaseOperator/AbstractOperator object to JSON.

renames[source]
includes = ['deferrable', 'depends_on_past', 'downstream_task_ids', 'execution_timeout', 'executor_config',...[source]
casts[source]
class airflow.providers.openlineage.utils.utils.TaskInfoComplete(obj)[source]

Bases: TaskInfo

Defines encoding BaseOperator/AbstractOperator object to JSON used when user enables full task info.

includes = [][source]
excludes = ['_BaseOperator__instantiated', '_dag', '_hook', '_log', '_outlets', '_inlets',...[source]
class airflow.providers.openlineage.utils.utils.TaskGroupInfo(obj)[source]

Bases: InfoJsonEncodable

Defines encoding TaskGroup object to JSON.

renames[source]
includes = ['downstream_group_ids', 'downstream_task_ids', 'prefix_group_id', 'tooltip',...[source]
airflow.providers.openlineage.utils.utils.get_airflow_dag_run_facet(dag_run)[source]
airflow.providers.openlineage.utils.utils.get_processing_engine_facet()[source]
airflow.providers.openlineage.utils.utils.get_airflow_debug_facet()[source]
airflow.providers.openlineage.utils.utils.get_airflow_run_facet(dag_run, dag, task_instance, task, task_uuid)[source]
airflow.providers.openlineage.utils.utils.get_airflow_job_facet(dag_run)[source]
airflow.providers.openlineage.utils.utils.get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state)[source]
airflow.providers.openlineage.utils.utils.is_dag_run_asset_triggered(dag_run)[source]

Return whether the given DAG run was triggered by an asset.

airflow.providers.openlineage.utils.utils.build_task_instance_ol_run_id(dag_id, task_id, try_number, logical_date, map_index)[source]

Generate a deterministic OpenLineage run ID for a task instance.

Args:

dag_id: The DAG identifier. task_id: The task identifier. try_number: The task try number. logical_date: The logical execution date from dagrun. map_index: The task map index.

Returns:

A deterministic OpenLineage run ID for the task instance.

airflow.providers.openlineage.utils.utils.is_valid_uuid(uuid_string)[source]

Validate that a string is a valid UUID format.

airflow.providers.openlineage.utils.utils.build_dag_run_ol_run_id(dag_id, logical_date, clear_number)[source]

Generate a deterministic OpenLineage run ID for a DAG run.

Args:

dag_id: The DAG identifier. logical_date: The logical execution date. clear_number: The DAG run clear number.

Returns:

A deterministic OpenLineage run ID for the DAG run.

airflow.providers.openlineage.utils.utils.get_dag_job_dependency_facet(dag_id, dag_run_id)[source]

Safely retrieve the asset-triggered job dependency facet for a DagRun.

This function collects information about the asset events that triggered the specified DagRun, including details about the originating DAG runs and task instances. If the DagRun was not triggered by assets, or if any error occurs during lookup or processing, the function logs the error and returns an empty dictionary. This guarantees that facet generation never raises exceptions and does not interfere with event emission processes.

Args:

dag_id: The DAG identifier. dag_run_id: The DagRun identifier.

Returns:

A dictionary with JobDependenciesRunFacet, or an empty dictionary if the DagRun was not asset-triggered or if an error occurs.

airflow.providers.openlineage.utils.utils.get_unknown_source_attribute_run_facet(task, name=None)[source]
class airflow.providers.openlineage.utils.utils.OpenLineageRedactor[source]

Bases: airflow.sdk.execution_time.secrets_masker.SecretsMasker

This class redacts sensitive data similar to SecretsMasker in Airflow logs.

The difference is that our default max recursion depth is way higher - due to the structure of OL events we need more depth. Additionally, we allow data structures to specify data that needs not to be redacted by specifying _skip_redact list by deriving RedactMixin.

classmethod from_masker(other)[source]
airflow.providers.openlineage.utils.utils.is_json_serializable(item)[source]
airflow.providers.openlineage.utils.utils.print_warning(log)[source]
airflow.providers.openlineage.utils.utils.get_filtered_unknown_operator_keys(operator)[source]
airflow.providers.openlineage.utils.utils.should_use_external_connection(hook)[source]
airflow.providers.openlineage.utils.utils.translate_airflow_asset(asset, lineage_context)[source]

Convert an Asset with an AIP-60 compliant URI to an OpenLineageDataset.

This function returns None if no URI normalizer is defined, no asset converter is found or some core Airflow changes are missing and ImportError is raised.

Was this entry helpful?