airflow.providers.openlineage.api.sql¶
Public helpers for emitting OpenLineage events describing SQL query executions.
Functions¶
|
Emit a START + COMPLETE/FAIL OpenLineage event pair describing a single SQL query execution. |
Module Contents¶
- airflow.providers.openlineage.api.sql.emit_query_lineage(*, query_id=None, query_source_namespace=None, query_text=None, inputs=None, outputs=None, start_time=None, end_time=None, is_successful=True, error_message=None, default_database=None, default_schema=None, job_name=None, task_instance=None, additional_run_facets=None, additional_job_facets=None, raise_on_error=False)[source]¶
Emit a START + COMPLETE/FAIL OpenLineage event pair describing a single SQL query execution.
The emitted events carry a
parentrun facet pointing at the currently executing Airflow task run. Any OpenLineage root information present on the task instance is propagated to the emitted events so the entire run hierarchy stays connected. This helper can be called multiple times within a single task; each call produces a distinct query event pair identified by a sequential job name suffix (<dag_id>.<task_id>.query.<n>).- Parameters:
query_id (str | None) – Unique identifier of the query in the given
query_source_namespace. When bothquery_idandquery_source_namespaceare provided, anexternalQueryrun facet is attached to the emitted events.query_source_namespace (str | None) – OpenLineage namespace of the system that ran the query, e.g.
"snowflake://org-acct","databricks://adb-<id>.azuredatabricks.net","bigquery".query_text (str | None) – Raw SQL query text. When provided, it is attached via a
sqlJobFacet andinputs/outputsexplicitly supplied are enriched with datasets retrieved from query parsing.inputs (list[openlineage.client.event_v2.Dataset] | None) – Additional input datasets.
outputs (list[openlineage.client.event_v2.Dataset] | None) – Additional output datasets.
start_time (datetime.datetime | None) – Event time of the START event. Defaults to the current UTC time.
end_time (datetime.datetime | None) – Event time of the COMPLETE/FAIL event. Defaults to the current UTC time.
is_successful (bool) – Whether the query completed successfully (COMPLETE) or failed (FAIL).
error_message (str | None) – Optional error message attached as an
errorMessagerun facet.default_database (str | None) – Default database for resolving unqualified tables in
query_text.default_schema (str | None) – Default schema for resolving unqualified tables in
query_text.job_name (str | None) – Job name to use in both events. Defaults to <ti_job_name>.manual_query.<counter>.
task_instance (airflow.models.taskinstance.TaskInstance | airflow.sdk.execution_time.task_runner.RuntimeTaskInstance | None) – The Airflow task instance to attribute the query to. Defaults to the currently executing task instance obtained from the execution context.
additional_run_facets (dict[str, openlineage.client.facet_v2.RunFacet] | None) – Extra run facets to merge into the emitted events.
additional_job_facets (dict[str, openlineage.client.facet_v2.JobFacet] | None) – Extra job facets to merge into the emitted events.
raise_on_error (bool) – When
False(default), any exception raised while building or emitting the events is logged at WARNING level and the function returns silently — so a broken lineage helper never breaks a user’s task. Set toTrueto opt into normal exception propagation.
- Raises:
RuntimeError – When
raise_on_error=True, iftask_instanceis not provided and cannot be resolved from the current execution context.
Example:
from airflow.providers.openlineage.api import emit_query_lineage @task def my_task(): emit_query_lineage( query_id="acde070d-8c4c-4f0d-9d8a-162843c10333", query_source_namespace="databricks://adb-498971240325220.10.azuredatabricks.net", query_text="SELECT * FROM analytics.public.users", )